diff --git a/README.md b/README.md index 4badf1f..43aec60 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,29 @@ # AQUA-to-ARTEMiS -Imports AQUAs db.sqlite database into ARTEMiS database, currently WIP \ No newline at end of file +Imports AQUAs db.sqlite database into ARTEMiS database, currently WIP + +## Requirements + +Already have [ARTEMiS](https://gitea.tendokyu.moe/Dniel97/artemis) installed and created the database with + +```sh +python dbutils.py create +``` + +**The `aqua_importer.py` script will read the settings from your specified ARTEMiS `config/` folder!** + +## Instructions + +Place `aqua_importer.py` in your ARTEMiS directory and execute it: + +```sh +python aqua_importer.py "C:\path\to\aqua\data" +``` + +## TODO + +- [x] Chunithm New!! Plus +- [x] O.N.G.E.K.I. Bright Memory +- [ ] maimai DX UNiVERSE/FESTiVAL +- [ ] Change the game version to import using arguments +- [ ] Code clean up/optimizations \ No newline at end of file diff --git a/aqua_importer.py b/aqua_importer.py new file mode 100644 index 0000000..dc312b1 --- /dev/null +++ b/aqua_importer.py @@ -0,0 +1,655 @@ +from datetime import datetime +import os +import inflection +from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy import create_engine, inspect +from sqlalchemy.engine import Row +from sqlalchemy.engine.cursor import CursorResult +from sqlalchemy.sql import text +from sqlalchemy.exc import SQLAlchemyError +from logging.handlers import TimedRotatingFileHandler +from typing import Any, Dict, Optional +import yaml +import yaml +import argparse +import logging +import coloredlogs + +from core.config import CoreConfig +from core.data.database import Data +from titles.chuni.config import ChuniConfig +from titles.chuni.const import ChuniConstants +from titles.chuni.newplus import ChuniNewPlus +from titles.ongeki.brightmemory import OngekiBrightMemory +from titles.ongeki.config import OngekiConfig +from titles.ongeki.const import OngekiConstants + + +class AquaData: + def __init__(self, aqua_db_path: str) -> None: + self.__url = f"sqlite:///{aqua_db_path}" + + self.__engine = create_engine(self.__url, pool_recycle=3600) + # self.inspector = reflection.Inspector.from_engine(self.__engine) + + session = sessionmaker(bind=self.__engine) + self.inspect = inspect(self.__engine) + self.conn = scoped_session(session) + + log_fmt_str = "[%(asctime)s] %(levelname)s | AQUA | %(message)s" + log_fmt = logging.Formatter(log_fmt_str) + self.logger = logging.getLogger("aqua") + + # Prevent the logger from adding handlers multiple times + if not getattr(self.logger, "handler_set", None): + consoleHandler = logging.StreamHandler() + consoleHandler.setFormatter(log_fmt) + + self.logger.addHandler(consoleHandler) + + self.logger.setLevel("WARN") + coloredlogs.install("WARN", logger=self.logger, fmt=log_fmt_str) + self.logger.handler_set = True # type: ignore + + def execute(self, sql: str, opts: Dict[str, Any] = {}) -> Optional[CursorResult]: + res = None + + try: + self.logger.info(f"SQL Execute: {''.join(str(sql).splitlines())} || {opts}") + res = self.conn.execute(text(sql), opts) + + except SQLAlchemyError as e: + self.logger.error(f"SQLAlchemy error {e}") + return None + + except UnicodeEncodeError as e: + self.logger.error(f"UnicodeEncodeError error {e}") + return None + + except: + try: + res = self.conn.execute(sql, opts) + + except SQLAlchemyError as e: + self.logger.error(f"SQLAlchemy error {e}") + return None + + except UnicodeEncodeError as e: + self.logger.error(f"UnicodeEncodeError error {e}") + return None + + except: + self.logger.error(f"Unknown error") + raise + + return res + + +class Importer: + def __init__(self, core_cfg: CoreConfig, cfg_folder: str, aqua_folder: str): + self.config = core_cfg + self.config_folder = cfg_folder + self.data = Data(core_cfg) + self.title_registry: Dict[str, Any] = {} + + self.logger = logging.getLogger("importer") + if not hasattr(self.logger, "initialized"): + log_fmt_str = "[%(asctime)s] Importer | %(levelname)s | %(message)s" + log_fmt = logging.Formatter(log_fmt_str) + + fileHandler = TimedRotatingFileHandler( + "{0}/{1}.log".format(self.config.server.log_dir, "importer"), + when="d", + backupCount=10, + ) + fileHandler.setFormatter(log_fmt) + + consoleHandler = logging.StreamHandler() + consoleHandler.setFormatter(log_fmt) + + self.logger.addHandler(fileHandler) + self.logger.addHandler(consoleHandler) + + self.logger.setLevel("INFO") + coloredlogs.install(level="INFO", logger=self.logger, fmt=log_fmt_str) + self.logger.initialized = True + + aqua_db_path = None + if os.path.exists(aqua_folder): + temp = os.path.join(aqua_folder, "db.sqlite") + if os.path.isfile(temp): + aqua_db_path = temp + + if not aqua_db_path: + self.logger.error("Could not locate AQUA db.sqlite file!") + exit(1) + + self.aqua = AquaData(aqua_db_path) + + def get_user_id(self, luid: str): + user_id = self.data.card.get_user_id_from_card(access_code=luid) + if user_id is not None: + return user_id + + user_id = self.data.user.create_user() + + if user_id is None: + user_id = -1 + self.logger.error("Failed to register user!") + + else: + card_id = self.data.card.create_card(user_id, luid) + + if card_id is None: + user_id = -1 + self.logger.error("Failed to register card!") + + return user_id + + def parse_aqua_db(self, table_name: str) -> tuple: + result = self.aqua.execute(f"SELECT * FROM {table_name}") + datetime_columns = [ + c + for c in self.aqua.inspect.get_columns(table_name) + if str(c["type"]) == "DATETIME" + ] + + return result, datetime_columns + + def parse_aqua_row( + self, + row: Row, + datetime_columns: list[Dict], + unused_columns: list[str], + user_id_column: str = "userId", + ) -> Dict: + row = row._asdict() + for column in datetime_columns: + ts = row[column["name"]] + if ts is None: + continue + # actuall remove the last 3 zeros for the correct timestamp + fixed_ts = int(str(ts)[:-3]) + # save the datetim object in the dict + row[column["name"]] = datetime.fromtimestamp(fixed_ts) + + tmp = {} + for k, v in row.items(): + # convert the key (column name) to camelCase for ARTEMiS + k = inflection.camelize(k, uppercase_first_letter=False) + # add the new camelCase key, value pair to tmp + tmp[k] = v if v != "null" else None + + # get the aqua internal user id and also drop it from the tmp dict + aqua_user_id = tmp[user_id_column] + tmp.pop(user_id_column) + + # removes unused columns + for unused in unused_columns: + tmp.pop(unused) + + # get from the internal user id the actual luid + card_data = None + card_result = self.aqua.execute( + f"SELECT * FROM sega_card WHERE id = {aqua_user_id}" + ) + for card in card_result: + card_data = card._asdict() + + # TODO: Add card_data is None check + card_id = card_data["luid"] + + # get the ARTEMiS internal user id, if not create an user + user_id = self.get_user_id(card_id) + + # add the ARTEMiS user id to the dict + tmp["user"] = user_id + + return tmp + + def import_chuni(self): + game_cfg = ChuniConfig() + game_cfg.update(yaml.safe_load(open(f"{self.config_folder}/chuni.yaml"))) + + base = ChuniNewPlus(self.config, game_cfg) + version_str = ChuniConstants.game_ver_to_string(base.version) + + answer = input(f"Do you really want to import ALL {version_str} data into ARTEMiS? [N/y]: ") + if answer.lower() != "y": + self.logger.info("User aborted operation") + return + + result, datetime_columns = self.parse_aqua_db("chusan_user_data") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id", "lastLoginDate"], + user_id_column="cardId", + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userData": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userData: {tmp['user']}") + + result, datetime_columns = self.parse_aqua_db("chusan_user_game_option") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + tmp["speed_120"] = tmp.pop("speed120") + tmp["fieldWallPosition_120"] = tmp.pop("fieldWallPosition120") + tmp["playTimingOffset_120"] = tmp.pop("playTimingOffset120") + tmp["judgeTimingOffset_120"] = tmp.pop("judgeTimingOffset120") + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userGameOption": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userGameOption: {tmp['user']}") + + result, datetime_columns = self.parse_aqua_db("chusan_user_general_data") + for row in result: + tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) + + if tmp["propertyKey"] == "recent_rating_list": + rating_list = [] + for rating in tmp["propertyValue"].split(","): + music_id, difficult_id, score = rating.split(":") + rating_list.append({ + "score": score, + "musicId": music_id, + "difficultId": difficult_id, + "romVersionCode": "2000001" + + }) + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userRecentRatingList": rating_list}} + ) + + self.logger.info( + f"Imported {version_str} userRecentRating: {tmp['user']}" + ) + + result, datetime_columns = self.parse_aqua_db("chusan_user_activity") + for row in result: + tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) + tmp["id"] = tmp["activityId"] + tmp.pop("activityId") + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userActivityList": [tmp]}} + ) + + self.logger.info( + f"Imported {version_str} userActivity: {tmp['activityId']}" + ) + + result, datetime_columns = self.parse_aqua_db("chusan_user_character") + for row in result: + tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userCharacterList": [tmp]}} + ) + + self.logger.info( + f"Imported {version_str} userCharacter: {tmp['characterId']}" + ) + + result, datetime_columns = self.parse_aqua_db("chusan_user_course") + for row in result: + tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userCourseList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userCourse: {tmp['courseId']}") + + result, datetime_columns = self.parse_aqua_db("chusan_user_duel") + for row in result: + tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userDuelList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userDuel: {tmp['duelId']}") + + result, datetime_columns = self.parse_aqua_db("chusan_user_item") + for row in result: + tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userItemList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userItem: {tmp['itemId']}") + + result, datetime_columns = self.parse_aqua_db("chusan_user_map_area") + for row in result: + tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userMapAreaList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userMapArea: {tmp['mapAreaId']}") + + result, datetime_columns = self.parse_aqua_db("chusan_user_music_detail") + for row in result: + tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userMusicDetailList": [tmp]}} + ) + + self.logger.info( + f"Imported {version_str} userMusicDetail: {tmp['musicId']}" + ) + + result, datetime_columns = self.parse_aqua_db("chusan_user_playlog") + for row in result: + tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userPlaylogList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userPlaylog: {tmp['musicId']}") + + def import_ongeki(self): + game_cfg = OngekiConfig() + game_cfg.update(yaml.safe_load(open(f"{self.config_folder}/ongeki.yaml"))) + + base = OngekiBrightMemory(self.config, game_cfg) + version_str = OngekiConstants.game_ver_to_string(base.version) + + answer = input(f"Do you really want to import ALL {version_str} data into ARTEMiS? [N/y]: ") + if answer.lower() != "y": + self.logger.info("User aborted operation") + return + + result, datetime_columns = self.parse_aqua_db("ongeki_user_data") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + user_id_column="aimeCardId", + ) + # useless but required + tmp["accessCode"] = "" + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userData": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userData: {tmp['user']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_option") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + tmp.pop("dispbp") + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userOption": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userOption: {tmp['user']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_general_data") + for row in result: + tmp = self.parse_aqua_row(row, datetime_columns, unused_columns=["id"]) + + if tmp["propertyKey"] == "recent_rating_list": + rating_list = [] + for rating in tmp["propertyValue"].split(","): + music_id, difficult_id, score = rating.split(":") + rating_list.append({ + "score": score, + "musicId": music_id, + "difficultId": difficult_id, + "romVersionCode": "1000000" + + }) + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userRecentRatingList": rating_list}} + ) + + self.logger.info( + f"Imported {version_str} userRecentRating: {tmp['user']}" + ) + + result, datetime_columns = self.parse_aqua_db("ongeki_user_deck") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userDeckList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userDeck: {tmp['deckId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_activity") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + tmp["id"] = tmp["activityId"] + tmp.pop("activityId") + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userActivityList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userActivity: {tmp['id']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_card") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userCardList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userCard: {tmp['cardId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_chapter") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userChapterList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userChapter: {tmp['chapterId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_character") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userCharacterList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userCharacter: {tmp['characterId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_deck") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userDeckList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userDeck: {tmp['deckId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_item") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userItemList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userItem: {tmp['itemId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_item") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userItemList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userItem: {tmp['itemId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_memory_chapter") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userMemoryChapterList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userMemoryChapter: {tmp['chapterId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_mission_point") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userMissionPointList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userMissionPoint: {tmp['eventId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_music_detail") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userMusicDetailList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userMusicDetail: {tmp['musicId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_playlog") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userPlaylogList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userPlaylog: {tmp['musicId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_story") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userStoryList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userStory: {tmp['storyId']}") + + result, datetime_columns = self.parse_aqua_db("ongeki_user_tech_count") + for row in result: + tmp = self.parse_aqua_row( + row, + datetime_columns, + unused_columns=["id"], + ) + + base.handle_upsert_user_all_api_request( + {"userId": tmp["user"], "upsertUserAll": {"userTechCountList": [tmp]}} + ) + + self.logger.info(f"Imported {version_str} userTechCount: {tmp['levelId']}") + + +def main(): + parser = argparse.ArgumentParser(description="AQUA to ARTEMiS") + parser.add_argument( + "--config", "-c", type=str, help="Config directory to use", default="config" + ) + parser.add_argument( + "aqua_folder_path", + type=str, + help="Absolute folder path to AQUA /data folder, where db.sqlite is located in", + ) + args = parser.parse_args() + + core_cfg = CoreConfig() + core_cfg.update(yaml.safe_load(open(f"{args.config}/core.yaml"))) + + importer = Importer(core_cfg, args.config, args.aqua_folder_path) + + importer.import_chuni() + importer.import_ongeki() + + +if __name__ == "__main__": + main()