from datetime import datetime import os import inflection from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy import create_engine, inspect from sqlalchemy.engine import Row from sqlalchemy.engine.cursor import CursorResult from sqlalchemy.sql import text from sqlalchemy.exc import SQLAlchemyError from logging.handlers import TimedRotatingFileHandler from typing import Any, Dict, Optional import yaml import yaml import argparse import logging import coloredlogs from core.config import CoreConfig from core.data.database import Data from titles.chuni.config import ChuniConfig from titles.chuni.const import ChuniConstants from titles.chuni.newplus import ChuniNewPlus from titles.ongeki.brightmemory import OngekiBrightMemory from titles.ongeki.config import OngekiConfig from titles.ongeki.const import OngekiConstants class AquaData: def __init__(self, aqua_db_path: str) -> None: self.__url = f"sqlite:///{aqua_db_path}" self.__engine = create_engine(self.__url, pool_recycle=3600) # self.inspector = reflection.Inspector.from_engine(self.__engine) session = sessionmaker(bind=self.__engine) self.inspect = inspect(self.__engine) self.conn = scoped_session(session) log_fmt_str = "[%(asctime)s] %(levelname)s | AQUA | %(message)s" log_fmt = logging.Formatter(log_fmt_str) self.logger = logging.getLogger("aqua") # Prevent the logger from adding handlers multiple times if not getattr(self.logger, "handler_set", None): consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(log_fmt) self.logger.addHandler(consoleHandler) self.logger.setLevel("WARN") coloredlogs.install("WARN", logger=self.logger, fmt=log_fmt_str) self.logger.handler_set = True # type: ignore def execute(self, sql: str, opts: Dict[str, Any] = {}) -> Optional[CursorResult]: res = None try: self.logger.info(f"SQL Execute: {''.join(str(sql).splitlines())} || {opts}") res = self.conn.execute(text(sql), opts) except SQLAlchemyError as e: self.logger.error(f"SQLAlchemy error {e}") return None except UnicodeEncodeError as e: self.logger.error(f"UnicodeEncodeError error {e}") return None except: try: res = self.conn.execute(sql, opts) except SQLAlchemyError as e: self.logger.error(f"SQLAlchemy error {e}") return None except UnicodeEncodeError as e: self.logger.error(f"UnicodeEncodeError error {e}") return None except: self.logger.error(f"Unknown error") raise return res class Importer: def __init__(self, core_cfg: CoreConfig, cfg_folder: str, aqua_folder: str): self.config = core_cfg self.config_folder = cfg_folder self.data = Data(core_cfg) self.title_registry: Dict[str, Any] = {} self.logger = logging.getLogger("importer") if not hasattr(self.logger, "initialized"): log_fmt_str = "[%(asctime)s] Importer | %(levelname)s | %(message)s" log_fmt = logging.Formatter(log_fmt_str) fileHandler = TimedRotatingFileHandler( "{0}/{1}.log".format(self.config.server.log_dir, "importer"), when="d", backupCount=10, ) fileHandler.setFormatter(log_fmt) consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(log_fmt) self.logger.addHandler(fileHandler) self.logger.addHandler(consoleHandler) self.logger.setLevel("INFO") coloredlogs.install(level="INFO", logger=self.logger, fmt=log_fmt_str) self.logger.initialized = True aqua_db_path = None if os.path.exists(aqua_folder): temp = os.path.join(aqua_folder, "db.sqlite") if os.path.isfile(temp): aqua_db_path = temp if not aqua_db_path: self.logger.error("Could not locate AQUA db.sqlite file!") exit(1) self.aqua = AquaData(aqua_db_path) def get_user_id(self, luid: str): user_id = self.data.card.get_user_id_from_card(access_code=luid) if user_id is not None: return user_id user_id = self.data.user.create_user() if user_id is None: user_id = -1 self.logger.error("Failed to register user!") else: card_id = self.data.card.create_card(user_id, luid) if card_id is None: user_id = -1 self.logger.error("Failed to register card!") return user_id def parse_aqua_db(self, table_name: str) -> tuple: result = self.aqua.execute(f"SELECT * FROM {table_name}") datetime_columns = [ c for c in self.aqua.inspect.get_columns(table_name) if str(c["type"]) == "DATETIME" ] return result, datetime_columns def parse_aqua_row( self, row: Row, datetime_columns: list[Dict], unused_columns: list[str], user_id_column: str = "userId", ) -> Dict: row = row._asdict() for column in datetime_columns: ts = row[column["name"]] if ts is None: continue # actuall remove the last 3 zeros for the correct timestamp fixed_ts = int(str(ts)[:-3]) # save the datetim object in the dict row[column["name"]] = datetime.fromtimestamp(fixed_ts) tmp = {} for k, v in row.items(): # convert the key (column name) to camelCase for ARTEMiS k = inflection.camelize(k, uppercase_first_letter=False) # add the new camelCase key, value pair to tmp tmp[k] = v if v != "null" else None # get the aqua internal user id and also drop it from the tmp dict aqua_user_id = tmp[user_id_column] tmp.pop(user_id_column) # removes unused columns for unused in unused_columns: tmp.pop(unused) # get from the internal user id the actual luid card_data = None card_result = self.aqua.execute( f"SELECT * FROM sega_card WHERE id = {aqua_user_id}" ) for card in card_result: card_data = card._asdict() # TODO: Add card_data is None check card_id = card_data["luid"] # get the ARTEMiS internal user id, if not create an user user_id = self.get_user_id(card_id) # add the ARTEMiS user id to the dict tmp["user"] = user_id return tmp def import_chuni(self): game_cfg = ChuniConfig() game_cfg.update(yaml.safe_load(open(f"{self.config_folder}/chuni.yaml"))) base = ChuniNewPlus(self.config, game_cfg) version_str = ChuniConstants.game_ver_to_string(base.version) answer = input(f"Do you really want to import ALL {version_str} data into ARTEMiS? [N/y]: ") if answer.lower() != "y": self.logger.info("User aborted operation") return result, datetime_columns = self.parse_aqua_db("chusan_user_data") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id", "lastLoginDate"], user_id_column="cardId", ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userData": [tmp]}} ) self.logger.info(f"Imported {version_str} userData: {tmp['user']}") result, datetime_columns = self.parse_aqua_db("chusan_user_game_option") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) tmp["speed_120"] = tmp.pop("speed120") tmp["fieldWallPosition_120"] = tmp.pop("fieldWallPosition120") tmp["playTimingOffset_120"] = tmp.pop("playTimingOffset120") tmp["judgeTimingOffset_120"] = tmp.pop("judgeTimingOffset120") base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userGameOption": [tmp]}} ) self.logger.info(f"Imported {version_str} userGameOption: {tmp['user']}") result, datetime_columns = self.parse_aqua_db("chusan_user_general_data") for row in result: tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) if tmp["propertyKey"] == "recent_rating_list": rating_list = [] for rating in tmp["propertyValue"].split(","): music_id, difficult_id, score = rating.split(":") rating_list.append({ "score": score, "musicId": music_id, "difficultId": difficult_id, "romVersionCode": "2000001" }) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userRecentRatingList": rating_list}} ) self.logger.info( f"Imported {version_str} userRecentRating: {tmp['user']}" ) result, datetime_columns = self.parse_aqua_db("chusan_user_activity") for row in result: tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) tmp["id"] = tmp["activityId"] tmp.pop("activityId") base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userActivityList": [tmp]}} ) self.logger.info( f"Imported {version_str} userActivity: {tmp['activityId']}" ) result, datetime_columns = self.parse_aqua_db("chusan_user_character") for row in result: tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userCharacterList": [tmp]}} ) self.logger.info( f"Imported {version_str} userCharacter: {tmp['characterId']}" ) result, datetime_columns = self.parse_aqua_db("chusan_user_course") for row in result: tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userCourseList": [tmp]}} ) self.logger.info(f"Imported {version_str} userCourse: {tmp['courseId']}") result, datetime_columns = self.parse_aqua_db("chusan_user_duel") for row in result: tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userDuelList": [tmp]}} ) self.logger.info(f"Imported {version_str} userDuel: {tmp['duelId']}") result, datetime_columns = self.parse_aqua_db("chusan_user_item") for row in result: tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userItemList": [tmp]}} ) self.logger.info(f"Imported {version_str} userItem: {tmp['itemId']}") result, datetime_columns = self.parse_aqua_db("chusan_user_map_area") for row in result: tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userMapAreaList": [tmp]}} ) self.logger.info(f"Imported {version_str} userMapArea: {tmp['mapAreaId']}") result, datetime_columns = self.parse_aqua_db("chusan_user_music_detail") for row in result: tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userMusicDetailList": [tmp]}} ) self.logger.info( f"Imported {version_str} userMusicDetail: {tmp['musicId']}" ) result, datetime_columns = self.parse_aqua_db("chusan_user_playlog") for row in result: tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userPlaylogList": [tmp]}} ) self.logger.info(f"Imported {version_str} userPlaylog: {tmp['musicId']}") def import_ongeki(self): game_cfg = OngekiConfig() game_cfg.update(yaml.safe_load(open(f"{self.config_folder}/ongeki.yaml"))) base = OngekiBrightMemory(self.config, game_cfg) version_str = OngekiConstants.game_ver_to_string(base.version) answer = input(f"Do you really want to import ALL {version_str} data into ARTEMiS? [N/y]: ") if answer.lower() != "y": self.logger.info("User aborted operation") return result, datetime_columns = self.parse_aqua_db("ongeki_user_data") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], user_id_column="aimeCardId", ) # useless but required tmp["accessCode"] = "" base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userData": [tmp]}} ) self.logger.info(f"Imported {version_str} userData: {tmp['user']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_option") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) tmp.pop("dispbp") base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userOption": [tmp]}} ) self.logger.info(f"Imported {version_str} userOption: {tmp['user']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_general_data") for row in result: tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) if tmp["propertyKey"] == "recent_rating_list": rating_list = [] for rating in tmp["propertyValue"].split(","): music_id, difficult_id, score = rating.split(":") rating_list.append({ "score": score, "musicId": music_id, "difficultId": difficult_id, "romVersionCode": "1000000" }) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userRecentRatingList": rating_list}} ) self.logger.info( f"Imported {version_str} userRecentRating: {tmp['user']}" ) result, datetime_columns = self.parse_aqua_db("ongeki_user_deck") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userDeckList": [tmp]}} ) self.logger.info(f"Imported {version_str} userDeck: {tmp['deckId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_activity") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) tmp["id"] = tmp["activityId"] tmp.pop("activityId") base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userActivityList": [tmp]}} ) self.logger.info(f"Imported {version_str} userActivity: {tmp['id']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_card") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userCardList": [tmp]}} ) self.logger.info(f"Imported {version_str} userCard: {tmp['cardId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_chapter") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userChapterList": [tmp]}} ) self.logger.info(f"Imported {version_str} userChapter: {tmp['chapterId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_character") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userCharacterList": [tmp]}} ) self.logger.info(f"Imported {version_str} userCharacter: {tmp['characterId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_deck") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userDeckList": [tmp]}} ) self.logger.info(f"Imported {version_str} userDeck: {tmp['deckId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_item") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userItemList": [tmp]}} ) self.logger.info(f"Imported {version_str} userItem: {tmp['itemId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_item") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userItemList": [tmp]}} ) self.logger.info(f"Imported {version_str} userItem: {tmp['itemId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_memory_chapter") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userMemoryChapterList": [tmp]}} ) self.logger.info(f"Imported {version_str} userMemoryChapter: {tmp['chapterId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_mission_point") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userMissionPointList": [tmp]}} ) self.logger.info(f"Imported {version_str} userMissionPoint: {tmp['eventId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_music_detail") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userMusicDetailList": [tmp]}} ) self.logger.info(f"Imported {version_str} userMusicDetail: {tmp['musicId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_playlog") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userPlaylogList": [tmp]}} ) self.logger.info(f"Imported {version_str} userPlaylog: {tmp['musicId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_story") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userStoryList": [tmp]}} ) self.logger.info(f"Imported {version_str} userStory: {tmp['storyId']}") result, datetime_columns = self.parse_aqua_db("ongeki_user_tech_count") for row in result: tmp = self.parse_aqua_row( row, datetime_columns, unused_columns=["id"], ) base.handle_upsert_user_all_api_request( {"userId": tmp["user"], "upsertUserAll": {"userTechCountList": [tmp]}} ) self.logger.info(f"Imported {version_str} userTechCount: {tmp['levelId']}") def main(): parser = argparse.ArgumentParser(description="AQUA to ARTEMiS") parser.add_argument( "--config", "-c", type=str, help="Config directory to use", default="config" ) parser.add_argument( "aqua_folder_path", type=str, help="Absolute folder path to AQUA /data folder, where db.sqlite is located in", ) args = parser.parse_args() core_cfg = CoreConfig() core_cfg.update(yaml.safe_load(open(f"{args.config}/core.yaml"))) importer = Importer(core_cfg, args.config, args.aqua_folder_path) importer.import_chuni() importer.import_ongeki() if __name__ == "__main__": main()