AQUA-to-ARTEMiS/aqua_importer.py

724 lines
26 KiB
Python

from datetime import datetime
import os
import inflection
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import Row
from sqlalchemy.engine.cursor import CursorResult
from sqlalchemy.sql import text
from sqlalchemy.exc import SQLAlchemyError
from logging.handlers import TimedRotatingFileHandler
from typing import Any, Dict, Optional
import yaml
import yaml
import argparse
import logging
import coloredlogs
from core.config import CoreConfig
from core.data.database import Data
from titles.chuni.config import ChuniConfig
from titles.chuni.const import ChuniConstants
from titles.chuni.sun import ChuniSun
from titles.ongeki.brightmemory import OngekiBrightMemory
from titles.ongeki.config import OngekiConfig
from titles.ongeki.const import OngekiConstants
class AquaData:
def __init__(self, aqua_db_path: str) -> None:
self.__url = f"sqlite:///{aqua_db_path}"
self.__engine = create_engine(self.__url, pool_recycle=3600)
# self.inspector = reflection.Inspector.from_engine(self.__engine)
session = sessionmaker(bind=self.__engine)
self.inspect = inspect(self.__engine)
self.conn = scoped_session(session)
log_fmt_str = "[%(asctime)s] %(levelname)s | AQUA | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
self.logger = logging.getLogger("aqua")
# Prevent the logger from adding handlers multiple times
if not getattr(self.logger, "handler_set", None):
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(consoleHandler)
self.logger.setLevel("WARN")
coloredlogs.install("WARN", logger=self.logger, fmt=log_fmt_str)
self.logger.handler_set = True # type: ignore
def execute(self, sql: str, opts: Dict[str, Any] = {}) -> Optional[CursorResult]:
res = None
try:
self.logger.info(f"SQL Execute: {''.join(str(sql).splitlines())} || {opts}")
res = self.conn.execute(text(sql), opts)
except SQLAlchemyError as e:
self.logger.error(f"SQLAlchemy error {e}")
return None
except UnicodeEncodeError as e:
self.logger.error(f"UnicodeEncodeError error {e}")
return None
except:
try:
res = self.conn.execute(sql, opts)
except SQLAlchemyError as e:
self.logger.error(f"SQLAlchemy error {e}")
return None
except UnicodeEncodeError as e:
self.logger.error(f"UnicodeEncodeError error {e}")
return None
except:
self.logger.error(f"Unknown error")
raise
return res
class Importer:
def __init__(self, core_cfg: CoreConfig, cfg_folder: str, aqua_folder: str):
self.config = core_cfg
self.config_folder = cfg_folder
self.data = Data(core_cfg)
self.title_registry: Dict[str, Any] = {}
self.logger = logging.getLogger("importer")
if not hasattr(self.logger, "initialized"):
log_fmt_str = "[%(asctime)s] Importer | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.config.server.log_dir, "importer"),
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel("INFO")
coloredlogs.install(level="INFO", logger=self.logger, fmt=log_fmt_str)
self.logger.initialized = True
aqua_db_path = None
if os.path.exists(aqua_folder):
temp = os.path.join(aqua_folder, "db.sqlite")
if os.path.isfile(temp):
aqua_db_path = temp
if not aqua_db_path:
self.logger.error("Could not locate AQUA db.sqlite file!")
exit(1)
self.aqua = AquaData(aqua_db_path)
def get_user_id(self, luid: str):
user_id = self.data.card.get_user_id_from_card(access_code=luid)
if user_id is not None:
return user_id
user_id = self.data.user.create_user()
if user_id is None:
user_id = -1
self.logger.error("Failed to register user!")
else:
card_id = self.data.card.create_card(user_id, luid)
if card_id is None:
user_id = -1
self.logger.error("Failed to register card!")
return user_id
def parse_aqua_db(self, table_name: str) -> tuple:
result = self.aqua.execute(f"SELECT * FROM {table_name}")
datetime_columns = [
c
for c in self.aqua.inspect.get_columns(table_name)
if str(c["type"]) == "DATETIME"
]
return result, datetime_columns
def parse_aqua_row(
self,
row: Row,
datetime_columns: list[Dict],
unused_columns: list[str],
card_id: int,
) -> Dict:
row = row._asdict()
for column in datetime_columns:
ts = row[column["name"]]
if ts is None:
continue
# actuall remove the last 3 zeros for the correct timestamp
fixed_ts = int(str(ts)[:-3])
# save the datetim object in the dict
row[column["name"]] = datetime.fromtimestamp(fixed_ts)
tmp = {}
for k, v in row.items():
# convert the key (column name) to camelCase for ARTEMiS
k = inflection.camelize(k, uppercase_first_letter=False)
# add the new camelCase key, value pair to tmp
tmp[k] = v if v != "null" else None
# drop the aqua internal user id
tmp.pop("userId", None)
# removes unused columns
for unused in unused_columns:
tmp.pop(unused)
# get from the internal user id the actual luid
card_data = None
card_result = self.aqua.execute(f"SELECT * FROM sega_card WHERE id = {card_id}")
for card in card_result:
card_data = card._asdict()
# TODO: Add card_data is None check
card_id = card_data["luid"]
# get the ARTEMiS internal user id, if not create an user
user_id = self.get_user_id(card_id)
# add the ARTEMiS user id to the dict
tmp["user"] = user_id
return tmp
def get_chuni_card_id_by_aqua_row(self, row: Row, user_id_column: str = "user_id"):
aqua_user_id = row._asdict()[user_id_column]
user_result = self.aqua.execute(
f"SELECT * FROM chusan_user_data WHERE id = {aqua_user_id}"
)
# could never be None undless somethign is really fucked up
user_data = None
for user in user_result:
user_data = user._asdict()
card_id = user_data["card_id"]
return card_id
def import_chuni(self):
game_cfg = ChuniConfig()
game_cfg.update(yaml.safe_load(open(f"{self.config_folder}/chuni.yaml")))
base = ChuniSun(self.config, game_cfg)
version_str = ChuniConstants.game_ver_to_string(base.version)
answer = input(
f"Do you really want to import ALL {version_str} data into ARTEMiS? [N/y]: "
)
if answer.lower() != "y":
self.logger.info("User aborted operation")
return
result, datetime_columns = self.parse_aqua_db("chusan_user_data")
for row in result:
tmp = self.parse_aqua_row(
row,
datetime_columns,
unused_columns=["id", "lastLoginDate", "cardId"],
card_id=row._asdict()["card_id"],
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userData": [tmp]}}
)
self.logger.info(f"Imported {version_str} userData: {tmp['user']}")
result, datetime_columns = self.parse_aqua_db("chusan_user_game_option")
for row in result:
user = self.get_chuni_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
tmp["speed_120"] = tmp.pop("speed120")
tmp["fieldWallPosition_120"] = tmp.pop("fieldWallPosition120")
tmp["playTimingOffset_120"] = tmp.pop("playTimingOffset120")
tmp["judgeTimingOffset_120"] = tmp.pop("judgeTimingOffset120")
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userGameOption": [tmp]}}
)
self.logger.info(f"Imported {version_str} userGameOption: {tmp['user']}")
result, datetime_columns = self.parse_aqua_db("chusan_user_general_data")
for row in result:
user = self.get_chuni_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
if tmp["propertyKey"] == "recent_rating_list":
rating_list = []
for rating in tmp["propertyValue"].split(","):
music_id, difficult_id, score = rating.split(":")
rating_list.append(
{
"score": score,
"musicId": music_id,
"difficultId": difficult_id,
"romVersionCode": "2000001",
}
)
base.handle_upsert_user_all_api_request(
{
"userId": tmp["user"],
"upsertUserAll": {"userRecentRatingList": rating_list},
}
)
self.logger.info(
f"Imported {version_str} userRecentRating: {tmp['user']}"
)
result, datetime_columns = self.parse_aqua_db("chusan_user_activity")
for row in result:
user = self.get_chuni_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
tmp["id"] = tmp["activityId"]
tmp.pop("activityId")
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userActivityList": [tmp]}}
)
self.logger.info(
f"Imported {version_str} userActivity: {tmp['activityId']}"
)
result, datetime_columns = self.parse_aqua_db("chusan_user_character")
for row in result:
user = self.get_chuni_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userCharacterList": [tmp]}}
)
self.logger.info(
f"Imported {version_str} userCharacter: {tmp['characterId']}"
)
result, datetime_columns = self.parse_aqua_db("chusan_user_course")
for row in result:
user = self.get_chuni_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userCourseList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userCourse: {tmp['courseId']}")
result, datetime_columns = self.parse_aqua_db("chusan_user_duel")
for row in result:
user = self.get_chuni_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userDuelList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userDuel: {tmp['duelId']}")
result, datetime_columns = self.parse_aqua_db("chusan_user_item")
for row in result:
user = self.get_chuni_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userItemList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userItem: {tmp['itemId']}")
result, datetime_columns = self.parse_aqua_db("chusan_user_map_area")
for row in result:
user = self.get_chuni_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userMapAreaList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userMapArea: {tmp['mapAreaId']}")
result, datetime_columns = self.parse_aqua_db("chusan_user_music_detail")
for row in result:
user = self.get_chuni_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userMusicDetailList": [tmp]}}
)
self.logger.info(
f"Imported {version_str} userMusicDetail: {tmp['musicId']}"
)
result, datetime_columns = self.parse_aqua_db("chusan_user_playlog")
for row in result:
user = self.get_chuni_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userPlaylogList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userPlaylog: {tmp['musicId']}")
def get_ongeki_card_id_by_aqua_row(self, row: Row, user_id_column: str = "user_id"):
aqua_user_id = row._asdict()[user_id_column]
user_result = self.aqua.execute(
f"SELECT * FROM ongeki_user_data WHERE id = {aqua_user_id}"
)
# could never be None undless somethign is really fucked up
user_data = None
for user in user_result:
user_data = user._asdict()
card_id = user_data["aime_card_id"]
return card_id
def import_ongeki(self):
game_cfg = OngekiConfig()
game_cfg.update(yaml.safe_load(open(f"{self.config_folder}/ongeki.yaml")))
base = OngekiBrightMemory(self.config, game_cfg)
version_str = OngekiConstants.game_ver_to_string(base.version)
answer = input(
f"Do you really want to import ALL {version_str} data into ARTEMiS? [N/y]: "
)
if answer.lower() != "y":
self.logger.info("User aborted operation")
return
result, datetime_columns = self.parse_aqua_db("ongeki_user_data")
for row in result:
tmp = self.parse_aqua_row(
row,
datetime_columns,
unused_columns=["id", "aimeCardId"],
card_id=row._asdict()["aime_card_id"],
)
# useless but required
tmp["accessCode"] = ""
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userData": [tmp]}}
)
self.logger.info(f"Imported {version_str} userData: {tmp['user']}")
result, datetime_columns = self.parse_aqua_db("ongeki_user_option")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
tmp.pop("dispbp")
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userOption": [tmp]}}
)
self.logger.info(f"Imported {version_str} userOption: {tmp['user']}")
result, datetime_columns = self.parse_aqua_db("ongeki_user_general_data")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
if tmp["propertyKey"] == "recent_rating_list":
rating_list = []
for rating in tmp["propertyValue"].split(","):
music_id, difficult_id, score = rating.split(":")
rating_list.append(
{
"score": score,
"musicId": music_id,
"difficultId": difficult_id,
"romVersionCode": "1000000",
}
)
base.handle_upsert_user_all_api_request(
{
"userId": tmp["user"],
"upsertUserAll": {"userRecentRatingList": rating_list},
}
)
self.logger.info(
f"Imported {version_str} userRecentRating: {tmp['user']}"
)
result, datetime_columns = self.parse_aqua_db("ongeki_user_deck")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userDeckList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userDeck: {tmp['deckId']}")
result, datetime_columns = self.parse_aqua_db("ongeki_user_activity")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
tmp["id"] = tmp["activityId"]
tmp.pop("activityId")
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userActivityList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userActivity: {tmp['id']}")
result, datetime_columns = self.parse_aqua_db("ongeki_user_card")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userCardList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userCard: {tmp['cardId']}")
result, datetime_columns = self.parse_aqua_db("ongeki_user_chapter")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userChapterList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userChapter: {tmp['chapterId']}")
result, datetime_columns = self.parse_aqua_db("ongeki_user_character")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userCharacterList": [tmp]}}
)
self.logger.info(
f"Imported {version_str} userCharacter: {tmp['characterId']}"
)
result, datetime_columns = self.parse_aqua_db("ongeki_user_deck")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userDeckList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userDeck: {tmp['deckId']}")
result, datetime_columns = self.parse_aqua_db("ongeki_user_item")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userItemList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userItem: {tmp['itemId']}")
result, datetime_columns = self.parse_aqua_db("ongeki_user_item")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userItemList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userItem: {tmp['itemId']}")
result, datetime_columns = self.parse_aqua_db("ongeki_user_memory_chapter")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{
"userId": tmp["user"],
"upsertUserAll": {"userMemoryChapterList": [tmp]},
}
)
self.logger.info(
f"Imported {version_str} userMemoryChapter: {tmp['chapterId']}"
)
result, datetime_columns = self.parse_aqua_db("ongeki_user_mission_point")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{
"userId": tmp["user"],
"upsertUserAll": {"userMissionPointList": [tmp]},
}
)
self.logger.info(
f"Imported {version_str} userMissionPoint: {tmp['eventId']}"
)
result, datetime_columns = self.parse_aqua_db("ongeki_user_music_detail")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userMusicDetailList": [tmp]}}
)
self.logger.info(
f"Imported {version_str} userMusicDetail: {tmp['musicId']}"
)
result, datetime_columns = self.parse_aqua_db("ongeki_user_playlog")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userPlaylogList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userPlaylog: {tmp['musicId']}")
result, datetime_columns = self.parse_aqua_db("ongeki_user_story")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userStoryList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userStory: {tmp['storyId']}")
result, datetime_columns = self.parse_aqua_db("ongeki_user_tech_count")
for row in result:
user = self.get_ongeki_card_id_by_aqua_row(row)
tmp = self.parse_aqua_row(
row, datetime_columns, unused_columns=["id"], card_id=user
)
base.handle_upsert_user_all_api_request(
{"userId": tmp["user"], "upsertUserAll": {"userTechCountList": [tmp]}}
)
self.logger.info(f"Imported {version_str} userTechCount: {tmp['levelId']}")
def main():
parser = argparse.ArgumentParser(description="AQUA to ARTEMiS")
parser.add_argument(
"--config", "-c", type=str, help="Config directory to use", default="config"
)
parser.add_argument(
"aqua_folder_path",
type=str,
help="Absolute folder path to AQUA /data folder, where db.sqlite is located in",
)
args = parser.parse_args()
core_cfg = CoreConfig()
core_cfg.update(yaml.safe_load(open(f"{args.config}/core.yaml")))
importer = Importer(core_cfg, args.config, args.aqua_folder_path)
importer.import_chuni()
importer.import_ongeki()
if __name__ == "__main__":
main()