3
2
forked from Dniel97/artemis
artemis/core/data/database.py
2023-02-19 15:40:25 -05:00

1755 lines
76 KiB
Python

import logging, coloredlogs
from typing import Any, Dict, List
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import create_engine
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime
import importlib, os, json
from hashlib import sha256
from core.config import CoreConfig
from core.data.schema import *
from core.utils import Utils
class Data:
def __init__(self, cfg: CoreConfig) -> None:
self.config = cfg
if self.config.database.sha2_password:
passwd = sha256(self.config.database.password.encode()).digest()
self.__url = f"{self.config.database.protocol}://{self.config.database.username}:{passwd.hex()}@{self.config.database.host}/{self.config.database.name}?charset=utf8mb4"
else:
self.__url = f"{self.config.database.protocol}://{self.config.database.username}:{self.config.database.password}@{self.config.database.host}/{self.config.database.name}?charset=utf8mb4"
self.__engine = create_engine(self.__url, pool_recycle=3600)
session = sessionmaker(bind=self.__engine, autoflush=True, autocommit=True)
self.session = scoped_session(session)
self.user = UserData(self.config, self.session)
self.arcade = ArcadeData(self.config, self.session)
self.card = CardData(self.config, self.session)
self.base = BaseData(self.config, self.session)
self.schema_ver_latest = 2
log_fmt_str = "[%(asctime)s] %(levelname)s | Database | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
self.logger = logging.getLogger("database")
# Prevent the logger from adding handlers multiple times
if not getattr(self.logger, 'handler_set', None):
fileHandler = TimedRotatingFileHandler("{0}/{1}.log".format(self.config.server.log_dir, "db"), encoding="utf-8",
when="d", backupCount=10)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.config.database.loglevel)
coloredlogs.install(cfg.database.loglevel, logger=self.logger, fmt=log_fmt_str)
self.logger.handler_set = True # type: ignore
def create_database(self):
self.logger.info("Creating databases...")
try:
metadata.create_all(self.__engine.connect())
except SQLAlchemyError as e:
self.logger.error(f"Failed to create databases! {e}")
return
games = Utils.get_all_titles()
for game_dir, game_mod in games.items():
try:
title_db = game_mod.database(self.config)
metadata.create_all(self.__engine.connect())
self.base.set_schema_ver(game_mod.current_schema_version, game_mod.game_codes[0])
except Exception as e:
self.logger.warning(f"Could not load database schema from {game_dir} - {e}")
self.logger.info(f"Setting base_schema_ver to {self.schema_ver_latest}")
self.base.set_schema_ver(self.schema_ver_latest)
self.logger.info(f"Setting user auto_incrememnt to {self.config.database.user_table_autoincrement_start}")
self.user.reset_autoincrement(self.config.database.user_table_autoincrement_start)
def recreate_database(self):
self.logger.info("Dropping all databases...")
self.base.execute("SET FOREIGN_KEY_CHECKS=0")
try:
metadata.drop_all(self.__engine.connect())
except SQLAlchemyError as e:
self.logger.error(f"Failed to drop databases! {e}")
return
for root, dirs, files in os.walk("./titles"):
for dir in dirs:
if not dir.startswith("__"):
try:
mod = importlib.import_module(f"titles.{dir}")
try:
title_db = mod.database(self.config)
metadata.drop_all(self.__engine.connect())
except Exception as e:
self.logger.warning(f"Could not load database schema from {dir} - {e}")
except ImportError as e:
self.logger.warning(f"Failed to load database schema dir {dir} - {e}")
break
self.base.execute("SET FOREIGN_KEY_CHECKS=1")
self.create_database()
def migrate_database(self, game: str, version: int, action: str) -> None:
old_ver = self.base.get_schema_ver(game)
sql = ""
if old_ver is None:
self.logger.error(f"Schema for game {game} does not exist, did you run the creation script?")
return
if old_ver == version:
self.logger.info(f"Schema for game {game} is already version {old_ver}, nothing to do")
return
if not os.path.exists(f"core/data/schema/versions/{game.upper()}_{version}_{action}.sql"):
self.logger.error(f"Could not find {action} script {game.upper()}_{version}_{action}.sql in core/data/schema/versions folder")
return
with open(f"core/data/schema/versions/{game.upper()}_{version}_{action}.sql", "r", encoding="utf-8") as f:
sql = f.read()
result = self.base.execute(sql)
if result is None:
self.logger.error("Error execuing sql script!")
return None
result = self.base.set_schema_ver(version, game)
if result is None:
self.logger.error("Error setting version in schema_version table!")
return None
self.logger.info(f"Successfully migrated {game} to schema version {version}")
def dump_db(self):
dbname = self.config.database.name
self.logger.info("Database dumper for use with the reworked schema")
self.logger.info("Dumping users...")
sql = f"SELECT * FROM `{dbname}`.`user`"
result = self.base.execute(sql)
if result is None:
self.logger.error("Failed")
return None
users = result.fetchall()
user_list: List[Dict[str, Any]] = []
for usr in users:
user_list.append({
"id": usr["id"],
"username": usr["username"],
"email": usr["email"],
"password": usr["password"],
"permissions": usr["permissions"],
"created_date": datetime.strftime(usr["created_date"], "%Y-%m-%d %H:%M:%S"),
"last_login_date": datetime.strftime(usr["accessed_date"], "%Y-%m-%d %H:%M:%S"),
})
self.logger.info(f"Done, found {len(user_list)} users")
with open("dbdump-user.json", "w", encoding="utf-8") as f:
f.write(json.dumps(user_list))
self.logger.info(f"Saved as dbdump-user.json")
self.logger.info("Dumping cards...")
sql = f"SELECT * FROM `{dbname}`.`card`"
result = self.base.execute(sql)
if result is None:
self.logger.error("Failed")
return None
cards = result.fetchall()
card_list: List[Dict[str, Any]] = []
for crd in cards:
card_list.append({
"id": crd["id"],
"user": crd["user"],
"access_code": crd["access_code"],
"is_locked": crd["is_locked"],
"is_banned": crd["is_banned"],
"created_date": datetime.strftime(crd["created_date"], "%Y-%m-%d %H:%M:%S"),
"last_login_date": datetime.strftime(crd["accessed_date"], "%Y-%m-%d %H:%M:%S"),
})
self.logger.info(f"Done, found {len(card_list)} cards")
with open("dbdump-card.json", "w", encoding="utf-8") as f:
f.write(json.dumps(card_list))
self.logger.info(f"Saved as dbdump-card.json")
self.logger.info("Dumping arcades...")
sql = f"SELECT * FROM `{dbname}`.`arcade`"
result = self.base.execute(sql)
if result is None:
self.logger.error("Failed")
return None
arcades = result.fetchall()
arcade_list: List[Dict[str, Any]] = []
for arc in arcades:
arcade_list.append({
"id": arc["id"],
"name": arc["name"],
"nickname": arc["name"],
"country": None,
"country_id": None,
"state": None,
"city": None,
"region_id": None,
"timezone": None,
})
self.logger.info(f"Done, found {len(arcade_list)} arcades")
with open("dbdump-arcade.json", "w", encoding="utf-8") as f:
f.write(json.dumps(arcade_list))
self.logger.info(f"Saved as dbdump-arcade.json")
self.logger.info("Dumping machines...")
sql = f"SELECT * FROM `{dbname}`.`machine`"
result = self.base.execute(sql)
if result is None:
self.logger.error("Failed")
return None
machines = result.fetchall()
machine_list: List[Dict[str, Any]] = []
for mech in machines:
if "country" in mech["data"]:
country = mech["data"]["country"]
else:
country = None
if "ota_enable" in mech["data"]:
ota_enable = mech["data"]["ota_enable"]
else:
ota_enable = None
machine_list.append({
"id": mech["id"],
"arcade": mech["arcade"],
"serial": mech["keychip"],
"game": mech["game"],
"board": None,
"country": country,
"timezone": None,
"ota_enable": ota_enable,
"is_cab": False,
})
self.logger.info(f"Done, found {len(machine_list)} machines")
with open("dbdump-machine.json", "w", encoding="utf-8") as f:
f.write(json.dumps(machine_list))
self.logger.info(f"Saved as dbdump-machine.json")
self.logger.info("Dumping arcade owners...")
sql = f"SELECT * FROM `{dbname}`.`arcade_owner`"
result = self.base.execute(sql)
if result is None:
self.logger.error("Failed")
return None
arcade_owners = result.fetchall()
owner_list: List[Dict[str, Any]] = []
for owner in owner_list:
owner_list.append(owner._asdict())
self.logger.info(f"Done, found {len(owner_list)} arcade owners")
with open("dbdump-arcade_owner.json", "w", encoding="utf-8") as f:
f.write(json.dumps(owner_list))
self.logger.info(f"Saved as dbdump-arcade_owner.json")
self.logger.info("Dumping profiles...")
sql = f"SELECT * FROM `{dbname}`.`profile`"
result = self.base.execute(sql)
if result is None:
self.logger.error("Failed")
return None
profiles = result.fetchall()
profile_list: Dict[List[Dict[str, Any]]] = {}
for pf in profiles:
game = pf["game"]
if game not in profile_list:
profile_list[game] = []
profile_list[game].append({
"id": pf["id"],
"user": pf["user"],
"version": pf["version"],
"use_count": pf["use_count"],
"name": pf["name"],
"game_id": pf["game_id"],
"mods": pf["mods"],
"data": pf["data"],
})
self.logger.info(f"Done, found profiles for {len(profile_list)} games")
with open("dbdump-profile.json", "w", encoding="utf-8") as f:
f.write(json.dumps(profile_list))
self.logger.info(f"Saved as dbdump-profile.json")
self.logger.info("Dumping scores...")
sql = f"SELECT * FROM `{dbname}`.`score`"
result = self.base.execute(sql)
if result is None:
self.logger.error("Failed")
return None
scores = result.fetchall()
score_list: Dict[List[Dict[str, Any]]] = {}
for sc in scores:
game = sc["game"]
if game not in score_list:
score_list[game] = []
score_list[game].append({
"id": sc["id"],
"user": sc["user"],
"version": sc["version"],
"song_id": sc["song_id"],
"chart_id": sc["chart_id"],
"score1": sc["score1"],
"score2": sc["score2"],
"fc1": sc["fc1"],
"fc2": sc["fc2"],
"cleared": sc["cleared"],
"grade": sc["grade"],
"data": sc["data"],
})
self.logger.info(f"Done, found scores for {len(score_list)} games")
with open("dbdump-score.json", "w", encoding="utf-8") as f:
f.write(json.dumps(score_list))
self.logger.info(f"Saved as dbdump-score.json")
self.logger.info("Dumping achievements...")
sql = f"SELECT * FROM `{dbname}`.`achievement`"
result = self.base.execute(sql)
if result is None:
self.logger.error("Failed")
return None
achievements = result.fetchall()
achievement_list: Dict[List[Dict[str, Any]]] = {}
for ach in achievements:
game = ach["game"]
if game not in achievement_list:
achievement_list[game] = []
achievement_list[game].append({
"id": ach["id"],
"user": ach["user"],
"version": ach["version"],
"type": ach["type"],
"achievement_id": ach["achievement_id"],
"data": ach["data"],
})
self.logger.info(f"Done, found achievements for {len(achievement_list)} games")
with open("dbdump-achievement.json", "w", encoding="utf-8") as f:
f.write(json.dumps(achievement_list))
self.logger.info(f"Saved as dbdump-achievement.json")
self.logger.info("Dumping items...")
sql = f"SELECT * FROM `{dbname}`.`item`"
result = self.base.execute(sql)
if result is None:
self.logger.error("Failed")
return None
items = result.fetchall()
item_list: Dict[List[Dict[str, Any]]] = {}
for itm in items:
game = itm["game"]
if game not in item_list:
item_list[game] = []
item_list[game].append({
"id": itm["id"],
"user": itm["user"],
"version": itm["version"],
"type": itm["type"],
"item_id": itm["item_id"],
"data": ach["data"],
})
self.logger.info(f"Done, found items for {len(item_list)} games")
with open("dbdump-item.json", "w", encoding="utf-8") as f:
f.write(json.dumps(item_list))
self.logger.info(f"Saved as dbdump-item.json")
def restore_from_old_schema(self):
# Import the tables we expect to be there
from core.data.schema.user import aime_user
from core.data.schema.card import aime_card
from core.data.schema.arcade import arcade, machine, arcade_owner
from sqlalchemy.dialects.mysql import Insert
# Make sure that all the tables we're trying to access exist
self.create_database()
# Import the data, making sure that dependencies are accounted for
if os.path.exists("dbdump-user.json"):
users = []
with open("dbdump-user.json", "r", encoding="utf-8") as f:
users = json.load(f)
self.logger.info(f"Load {len(users)} users")
for user in users:
sql = Insert(aime_user).values(**user)
conflict = sql.on_duplicate_key_update(**user)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert user {user['id']}")
continue
self.logger.info(f"Inserted user {user['id']} -> {result.lastrowid}")
if os.path.exists("dbdump-card.json"):
cards = []
with open("dbdump-card.json", "r", encoding="utf-8") as f:
cards = json.load(f)
self.logger.info(f"Load {len(cards)} cards")
for card in cards:
sql = Insert(aime_card).values(**card)
conflict = sql.on_duplicate_key_update(**card)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert card {card['id']}")
continue
self.logger.info(f"Inserted card {card['id']} -> {result.lastrowid}")
if os.path.exists("dbdump-arcade.json"):
arcades = []
with open("dbdump-arcade.json", "r", encoding="utf-8") as f:
arcades = json.load(f)
self.logger.info(f"Load {len(arcades)} arcades")
for ac in arcades:
sql = Insert(arcade).values(**ac)
conflict = sql.on_duplicate_key_update(**ac)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert arcade {ac['id']}")
continue
self.logger.info(f"Inserted arcade {ac['id']} -> {result.lastrowid}")
if os.path.exists("dbdump-arcade_owner.json"):
ac_owners = []
with open("dbdump-arcade_owner.json", "r", encoding="utf-8") as f:
ac_owners = json.load(f)
self.logger.info(f"Load {len(ac_owners)} arcade owners")
for owner in ac_owners:
sql = Insert(arcade_owner).values(**owner)
conflict = sql.on_duplicate_key_update(**owner)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert arcade_owner {owner['user']}")
continue
self.logger.info(f"Inserted arcade_owner {owner['user']} -> {result.lastrowid}")
if os.path.exists("dbdump-machine.json"):
mechs = []
with open("dbdump-machine.json", "r", encoding="utf-8") as f:
mechs = json.load(f)
self.logger.info(f"Load {len(mechs)} machines")
for mech in mechs:
sql = Insert(machine).values(**mech)
conflict = sql.on_duplicate_key_update(**mech)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert machine {mech['id']}")
continue
self.logger.info(f"Inserted machine {mech['id']} -> {result.lastrowid}")
# Now the fun part, grabbing all our scores, profiles, items, and achievements and trying
# to conform them to our current, freeform schema. This will be painful...
profiles = {}
items = {}
scores = {}
achievements = {}
if os.path.exists("dbdump-profile.json"):
with open("dbdump-profile.json", "r", encoding="utf-8") as f:
profiles = json.load(f)
self.logger.info(f"Load {len(profiles)} profiles")
if os.path.exists("dbdump-item.json"):
with open("dbdump-item.json", "r", encoding="utf-8") as f:
items = json.load(f)
self.logger.info(f"Load {len(items)} items")
if os.path.exists("dbdump-score.json"):
with open("dbdump-score.json", "r", encoding="utf-8") as f:
scores = json.load(f)
self.logger.info(f"Load {len(scores)} scores")
if os.path.exists("dbdump-achievement.json"):
with open("dbdump-achievement.json", "r", encoding="utf-8") as f:
achievements = json.load(f)
self.logger.info(f"Load {len(achievements)} achievements")
# Chuni / Chusan
if os.path.exists("titles/chuni/schema"):
from titles.chuni.schema.item import character, item, duel, map, map_area
from titles.chuni.schema.profile import profile, profile_ex, option, option_ex
from titles.chuni.schema.profile import recent_rating, activity, charge, emoney
from titles.chuni.schema.profile import overpower
from titles.chuni.schema.score import best_score, course
chuni_profiles = []
chuni_items = []
chuni_scores = []
if "SDBT" in profiles:
chuni_profiles = profiles["SDBT"]
if "SDBT" in items:
chuni_items = items["SDBT"]
if "SDBT" in scores:
chuni_scores = scores["SDBT"]
if "SDHD" in profiles:
chuni_profiles += profiles["SDHD"]
if "SDHD" in items:
chuni_items += items["SDHD"]
if "SDHD" in scores:
chuni_scores += scores["SDHD"]
self.logger.info(f"Importing {len(chuni_profiles)} chunithm/chunithm new profiles")
for pf in chuni_profiles:
if type(pf["data"]) is not dict:
pf["data"] = json.loads(pf["data"])
pf_data = pf["data"]
# data
if "userData" in pf_data:
pf_data["userData"]["userName"] = bytes([ord(c) for c in pf_data["userData"]["userName"]]).decode("utf-8")
pf_data["userData"]["user"] = pf["user"]
pf_data["userData"]["version"] = pf["version"]
pf_data["userData"].pop("accessCode")
if pf_data["userData"]["lastRomVersion"].startswith("2."):
pf_data["userData"]["version"] += 10
pf_data["userData"] = self.base.fix_bools(pf_data["userData"])
sql = Insert(profile).values(**pf_data["userData"])
conflict = sql.on_duplicate_key_update(**pf_data["userData"])
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni profile data for {pf['user']}")
continue
self.logger.info(f"Inserted chuni profile for {pf['user']} ->{result.lastrowid}")
# data_ex
if "userDataEx" in pf_data and len(pf_data["userDataEx"]) > 0:
pf_data["userDataEx"][0]["user"] = pf["user"]
pf_data["userDataEx"][0]["version"] = pf["version"]
pf_data["userDataEx"] = self.base.fix_bools(pf_data["userDataEx"][0])
sql = Insert(profile_ex).values(**pf_data["userDataEx"])
conflict = sql.on_duplicate_key_update(**pf_data["userDataEx"])
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni profile data_ex for {pf['user']}")
continue
self.logger.info(f"Inserted chuni profile data_ex for {pf['user']} ->{result.lastrowid}")
# option
if "userGameOption" in pf_data:
pf_data["userGameOption"]["user"] = pf["user"]
pf_data["userGameOption"] = self.base.fix_bools(pf_data["userGameOption"])
sql = Insert(option).values(**pf_data["userGameOption"])
conflict = sql.on_duplicate_key_update(**pf_data["userGameOption"])
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni profile options for {pf['user']}")
continue
self.logger.info(f"Inserted chuni profile options for {pf['user']} ->{result.lastrowid}")
# option_ex
if "userGameOptionEx" in pf_data and len(pf_data["userGameOptionEx"]) > 0:
pf_data["userGameOptionEx"][0]["user"] = pf["user"]
pf_data["userGameOptionEx"] = self.base.fix_bools(pf_data["userGameOptionEx"][0])
sql = Insert(option_ex).values(**pf_data["userGameOptionEx"])
conflict = sql.on_duplicate_key_update(**pf_data["userGameOptionEx"])
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni profile option_ex for {pf['user']}")
continue
self.logger.info(f"Inserted chuni profile option_ex for {pf['user']} ->{result.lastrowid}")
# recent_rating
if "userRecentRatingList" in pf_data:
rr = {
"user": pf["user"],
"recentRating": pf_data["userRecentRatingList"]
}
sql = Insert(recent_rating).values(**rr)
conflict = sql.on_duplicate_key_update(**rr)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni profile recent_rating for {pf['user']}")
continue
self.logger.info(f"Inserted chuni profile recent_rating for {pf['user']} ->{result.lastrowid}")
# activity
if "userActivityList" in pf_data:
for act in pf_data["userActivityList"]:
act["user"] = pf["user"]
sql = Insert(activity).values(**act)
conflict = sql.on_duplicate_key_update(**act)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni profile activity for {pf['user']}")
else:
self.logger.info(f"Inserted chuni profile activity for {pf['user']} ->{result.lastrowid}")
# charge
if "userChargeList" in pf_data:
for cg in pf_data["userChargeList"]:
cg["user"] = pf["user"]
cg = self.base.fix_bools(cg)
sql = Insert(charge).values(**cg)
conflict = sql.on_duplicate_key_update(**cg)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni profile charge for {pf['user']}")
else:
self.logger.info(f"Inserted chuni profile charge for {pf['user']} ->{result.lastrowid}")
# emoney
if "userEmoneyList" in pf_data:
for emon in pf_data["userEmoneyList"]:
emon["user"] = pf["user"]
sql = Insert(emoney).values(**emon)
conflict = sql.on_duplicate_key_update(**emon)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni profile emoney for {pf['user']}")
else:
self.logger.info(f"Inserted chuni profile emoney for {pf['user']} ->{result.lastrowid}")
# overpower
if "userOverPowerList" in pf_data:
for op in pf_data["userOverPowerList"]:
op["user"] = pf["user"]
sql = Insert(overpower).values(**op)
conflict = sql.on_duplicate_key_update(**op)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni profile overpower for {pf['user']}")
else:
self.logger.info(f"Inserted chuni profile overpower for {pf['user']} ->{result.lastrowid}")
# map_area
if "userMapAreaList" in pf_data:
for ma in pf_data["userMapAreaList"]:
ma["user"] = pf["user"]
ma = self.base.fix_bools(ma)
sql = Insert(map_area).values(**ma)
conflict = sql.on_duplicate_key_update(**ma)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni map_area for {pf['user']}")
else:
self.logger.info(f"Inserted chuni map_area for {pf['user']} ->{result.lastrowid}")
#duel
if "userDuelList" in pf_data:
for ma in pf_data["userDuelList"]:
ma["user"] = pf["user"]
ma = self.base.fix_bools(ma)
sql = Insert(duel).values(**ma)
conflict = sql.on_duplicate_key_update(**ma)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni duel for {pf['user']}")
else:
self.logger.info(f"Inserted chuni duel for {pf['user']} ->{result.lastrowid}")
# map
if "userMapList" in pf_data:
for ma in pf_data["userMapList"]:
ma["user"] = pf["user"]
ma = self.base.fix_bools(ma)
sql = Insert(map).values(**ma)
conflict = sql.on_duplicate_key_update(**ma)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni map for {pf['user']}")
else:
self.logger.info(f"Inserted chuni map for {pf['user']} ->{result.lastrowid}")
self.logger.info(f"Importing {len(chuni_items)} chunithm/chunithm new items")
for i in chuni_items:
if type(i["data"]) is not dict:
i["data"] = json.loads(i["data"])
i_data = i["data"]
i_data["user"] = i["user"]
i_data = self.base.fix_bools(i_data)
try: i_data.pop("assignIllust")
except: pass
try: i_data.pop("exMaxLv")
except: pass
if i["type"] == 20: #character
sql = Insert(character).values(**i_data)
else:
sql = Insert(item).values(**i_data)
conflict = sql.on_duplicate_key_update(**i_data)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert chuni item for user {i['user']}")
else:
self.logger.info(f"Inserted chuni item for user {i['user']} {i['item_id']} -> {result.lastrowid}")
self.logger.info(f"Importing {len(chuni_scores)} chunithm/chunithm new scores")
for sc in chuni_scores:
if type(sc["data"]) is not dict:
sc["data"] = json.loads(sc["data"])
score_data = self.base.fix_bools(sc["data"])
try: score_data.pop("theoryCount")
except: pass
try: score_data.pop("ext1")
except: pass
score_data["user"] = sc["user"]
sql = Insert(best_score).values(**score_data)
conflict = sql.on_duplicate_key_update(**score_data)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to put chuni score for user {sc['user']}")
else:
self.logger.info(f"Inserted chuni score for user {sc['user']} {sc['song_id']}/{sc['chart_id']} -> {result.lastrowid}")
else:
self.logger.info(f"Chuni/Chusan not found, skipping...")
# CXB
if os.path.exists("titles/cxb/schema"):
from titles.cxb.schema.item import energy
from titles.cxb.schema.profile import profile
from titles.cxb.schema.score import score, ranking
cxb_profiles = []
cxb_items = []
cxb_scores = []
if "SDCA" in profiles:
cxb_profiles = profiles["SDCA"]
if "SDCA" in items:
cxb_items = items["SDCA"]
if "SDCA" in scores:
cxb_scores = scores["SDCA"]
self.logger.info(f"Importing {len(cxb_profiles)} CXB profiles")
for pf in cxb_profiles:
user = pf["user"]
version = pf["version"]
pf_data = pf["data"]["data"]
pf_idx = pf["data"]["index"]
for x in range(len(pf_data)):
sql = Insert(profile).values(
user = user,
version = version,
index = int(pf_idx[x]),
data = json.loads(pf_data[x]) if type(pf_data[x]) is not dict else pf_data[x]
)
conflict = sql.on_duplicate_key_update(
user = user,
version = version,
index = int(pf_idx[x]),
data = json.loads(pf_data[x]) if type(pf_data[x]) is not dict else pf_data[x]
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert CXB profile for user {user} Index {pf_idx[x]}")
self.logger.info(f"Importing {len(cxb_scores)} CXB scores")
for sc in cxb_scores:
user = sc["user"]
version = sc["version"]
mcode = sc["data"]["mcode"]
index = sc["data"]["index"]
sql = Insert(score).values(
user = user,
game_version = version,
song_mcode = mcode,
song_index = index,
data = sc["data"]
)
conflict = sql.on_duplicate_key_update(
user = user,
game_version = version,
song_mcode = mcode,
song_index = index,
data = sc["data"]
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert CXB score for user {user} mcode {mcode}")
self.logger.info(f"Importing {len(cxb_items)} CXB items")
for it in cxb_items:
user = it["user"]
if it["type"] == 3: # energy
sql = Insert(energy).values(
user = user,
energy = it["data"]["total"]
)
conflict = sql.on_duplicate_key_update(
user = user,
energy = it["data"]["total"]
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert CXB energy for user {user}")
elif it["type"] == 2:
sql = Insert(ranking).values(
user = user,
rev_id = it["data"]["rid"],
song_id = it["data"]["sc"][1] if len(it["data"]["sc"]) > 1 else None,
score = it["data"]["sc"][0],
clear = it["data"]["clear"],
)
conflict = sql.on_duplicate_key_update(
user = user,
rev_id = it["data"]["rid"],
song_id = it["data"]["sc"][1] if len(it["data"]["sc"]) > 1 else None,
score = it["data"]["sc"][0],
clear = it["data"]["clear"],
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert CXB ranking for user {user}")
else:
self.logger.error(f"Unknown CXB item type {it['type']} for user {user}")
else:
self.logger.info(f"CXB not found, skipping...")
# Diva
if os.path.exists("titles/diva/schema"):
from titles.diva.schema.profile import profile
from titles.diva.schema.score import score
from titles.diva.schema.item import shop
diva_profiles = []
diva_scores = []
if "SBZV" in profiles:
diva_profiles = profiles["SBZV"]
if "SBZV" in scores:
diva_scores = scores["SBZV"]
self.logger.info(f"Importing {len(diva_profiles)} Diva profiles")
for pf in diva_profiles:
pf["data"]["user"] = pf["user"]
pf["data"]["version"] = pf["version"]
pf_data = pf["data"]
if "mdl_eqp_ary" in pf["data"]:
sql = Insert(shop).values(
user = user,
version = version,
mdl_eqp_ary = pf["data"]["mdl_eqp_ary"],
)
conflict = sql.on_duplicate_key_update(
user = user,
version = version,
mdl_eqp_ary = pf["data"]["mdl_eqp_ary"]
)
self.base.execute(conflict)
pf["data"].pop("mdl_eqp_ary")
sql = Insert(profile).values(**pf_data)
conflict = sql.on_duplicate_key_update(**pf_data)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert diva profile for {pf['user']}")
self.logger.info(f"Importing {len(diva_scores)} Diva scores")
for sc in diva_scores:
user = sc["user"]
clr_kind = -1
for x in sc["data"]["stg_clr_kind"].split(","):
if x != "-1":
clr_kind = x
cool_ct = 0
for x in sc["data"]["stg_cool_cnt"].split(","):
if x != "0":
cool_ct = x
fine_ct = 0
for x in sc["data"]["stg_fine_cnt"].split(","):
if x != "0":
fine_ct = x
safe_ct = 0
for x in sc["data"]["stg_safe_cnt"].split(","):
if x != "0":
safe_ct = x
sad_ct = 0
for x in sc["data"]["stg_sad_cnt"].split(","):
if x != "0":
sad_ct = x
worst_ct = 0
for x in sc["data"]["stg_wt_wg_cnt"].split(","):
if x != "0":
worst_ct = x
max_cmb = 0
for x in sc["data"]["stg_max_cmb"].split(","):
if x != "0":
max_cmb = x
sql = Insert(score).values(
user = user,
version = sc["version"],
pv_id = sc["song_id"],
difficulty = sc["chart_id"],
score = sc["score1"],
atn_pnt = sc["score2"],
clr_kind = clr_kind,
sort_kind = sc["data"]["sort_kind"],
cool = cool_ct,
fine = fine_ct,
safe = safe_ct,
sad = sad_ct,
worst = worst_ct,
max_combo = max_cmb,
)
conflict = sql.on_duplicate_key_update(user = user,
version = sc["version"],
pv_id = sc["song_id"],
difficulty = sc["chart_id"],
score = sc["score1"],
atn_pnt = sc["score2"],
clr_kind = clr_kind,
sort_kind = sc["data"]["sort_kind"],
cool = cool_ct,
fine = fine_ct,
safe = safe_ct,
sad = sad_ct,
worst = worst_ct,
max_combo = max_cmb
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert diva score for {pf['user']}")
else:
self.logger.info(f"Diva not found, skipping...")
# Ongeki
if os.path.exists("titles/ongeki/schema"):
from titles.ongeki.schema.item import card, deck, character, boss, story
from titles.ongeki.schema.item import chapter, item, music_item, login_bonus
from titles.ongeki.schema.item import event_point, mission_point, scenerio
from titles.ongeki.schema.item import trade_item, event_music, tech_event
from titles.ongeki.schema.profile import profile, option, activity, recent_rating
from titles.ongeki.schema.profile import rating_log, training_room, kop
from titles.ongeki.schema.score import score_best, tech_count, playlog
from titles.ongeki.schema.log import session_log
item_types = {
"character": 20,
"story": 21,
"card": 22,
"deck": 23,
"login": 24,
"chapter": 25
}
ongeki_profiles = []
ongeki_items = []
ongeki_scores = []
if "SDDT" in profiles:
ongeki_profiles = profiles["SDDT"]
if "SDDT" in items:
ongeki_items = items["SDDT"]
if "SDDT" in scores:
ongeki_scores = scores["SDDT"]
self.logger.info(f"Importing {len(ongeki_profiles)} ongeki profiles")
for pf in ongeki_profiles:
user = pf["user"]
version = pf["version"]
pf_data = pf["data"]
pf_data["userData"]["user"] = user
pf_data["userData"]["version"] = version
pf_data["userData"].pop("accessCode")
sql = Insert(profile).values(**pf_data["userData"])
conflict = sql.on_duplicate_key_update(**pf_data["userData"])
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki profile data for user {pf['user']}")
continue
pf_data["userOption"]["user"] = user
sql = Insert(option).values(**pf_data["userOption"])
conflict = sql.on_duplicate_key_update(**pf_data["userOption"])
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki profile options for user {pf['user']}")
continue
for pf_list in pf_data["userActivityList"]:
pf_list["user"] = user
sql = Insert(activity).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki profile activity for user {pf['user']}")
continue
sql = Insert(recent_rating).values(
user = user,
recentRating = pf_data["userRecentRatingList"]
)
conflict = sql.on_duplicate_key_update(
user = user,
recentRating = pf_data["userRecentRatingList"]
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki profile recent rating for user {pf['user']}")
continue
for pf_list in pf_data["userRatinglogList"]:
pf_list["user"] = user
sql = Insert(rating_log).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki profile rating log for user {pf['user']}")
continue
for pf_list in pf_data["userTrainingRoomList"]:
pf_list["user"] = user
sql = Insert(training_room).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki profile training room for user {pf['user']}")
continue
if "userKopList" in pf_data:
for pf_list in pf_data["userKopList"]:
pf_list["user"] = user
sql = Insert(kop).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki profile training room for user {pf['user']}")
continue
for pf_list in pf_data["userBossList"]:
pf_list["user"] = user
sql = Insert(boss).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item boss for user {pf['user']}")
continue
for pf_list in pf_data["userDeckList"]:
pf_list["user"] = user
sql = Insert(deck).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item deck for user {pf['user']}")
continue
for pf_list in pf_data["userStoryList"]:
pf_list["user"] = user
sql = Insert(story).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item story for user {pf['user']}")
continue
for pf_list in pf_data["userChapterList"]:
pf_list["user"] = user
sql = Insert(chapter).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item chapter for user {pf['user']}")
continue
for pf_list in pf_data["userPlaylogList"]:
pf_list["user"] = user
sql = Insert(playlog).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki score playlog for user {pf['user']}")
continue
for pf_list in pf_data["userMusicItemList"]:
pf_list["user"] = user
sql = Insert(music_item).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item music item for user {pf['user']}")
continue
for pf_list in pf_data["userTechCountList"]:
pf_list["user"] = user
sql = Insert(tech_count).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item tech count for user {pf['user']}")
continue
if "userTechEventList" in pf_data:
for pf_list in pf_data["userTechEventList"]:
pf_list["user"] = user
sql = Insert(tech_event).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item tech event for user {pf['user']}")
continue
for pf_list in pf_data["userTradeItemList"]:
pf_list["user"] = user
sql = Insert(trade_item).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item trade item for user {pf['user']}")
continue
if "userEventMusicList" in pf_data:
for pf_list in pf_data["userEventMusicList"]:
pf_list["user"] = user
sql = Insert(event_music).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item event music for user {pf['user']}")
continue
if "userEventPointList" in pf_data:
for pf_list in pf_data["userEventPointList"]:
pf_list["user"] = user
sql = Insert(event_point).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item event point for user {pf['user']}")
continue
for pf_list in pf_data["userLoginBonusList"]:
pf_list["user"] = user
sql = Insert(login_bonus).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item login bonus for user {pf['user']}")
continue
for pf_list in pf_data["userMissionPointList"]:
pf_list["user"] = user
sql = Insert(mission_point).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item mission point for user {pf['user']}")
continue
for pf_list in pf_data["userScenarioList"]:
pf_list["user"] = user
sql = Insert(scenerio).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item scenerio for user {pf['user']}")
continue
if "userSessionlogList" in pf_data:
for pf_list in pf_data["userSessionlogList"]:
pf_list["user"] = user
sql = Insert(session_log).values(**pf_list)
conflict = sql.on_duplicate_key_update(**pf_list)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki log session for user {pf['user']}")
continue
self.logger.info(f"Importing {len(ongeki_items)} ongeki items")
for it in ongeki_items:
user = it["user"]
it_type = it["type"]
it_id = it["item_id"]
it_data = it["data"]
it_data["user"] = user
if it_type == item_types["character"] and "characterId" in it_data:
sql = Insert(character).values(**it_data)
elif it_type == item_types["story"]:
sql = Insert(story).values(**it_data)
elif it_type == item_types["card"]:
sql = Insert(card).values(**it_data)
elif it_type == item_types["deck"]:
sql = Insert(deck).values(**it_data)
elif it_type == item_types["login"]: # login bonus
sql = Insert(login_bonus).values(**it_data)
elif it_type == item_types["chapter"]:
sql = Insert(chapter).values(**it_data)
else:
sql = Insert(item).values(**it_data)
conflict = sql.on_duplicate_key_update(**it_data)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki item {it_id} kind {it_type} for user {user}")
self.logger.info(f"Importing {len(ongeki_scores)} ongeki scores")
for sc in ongeki_scores:
user = sc["user"]
sc_data = sc["data"]
sc_data["user"] = user
sql = Insert(score_best).values(**sc_data)
conflict = sql.on_duplicate_key_update(**sc_data)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert ongeki score for user {user}: {sc['song_id']}/{sc['chart_id']}")
else:
self.logger.info(f"Ongeki not found, skipping...")
# Wacca
if os.path.exists("titles/wacca/schema"):
from titles.wacca.schema.profile import profile, option, bingo, gate, favorite
from titles.wacca.schema.item import item, ticket, song_unlock, trophy
from titles.wacca.schema.score import best_score, stageup
from titles.wacca.reverse import WaccaReverse
from titles.wacca.const import WaccaConstants
default_opts = WaccaReverse.OPTIONS_DEFAULTS
opts = WaccaConstants.OPTIONS
item_types = WaccaConstants.ITEM_TYPES
wacca_profiles = []
wacca_items = []
wacca_scores = []
wacca_achievements = []
if "SDFE" in profiles:
wacca_profiles = profiles["SDFE"]
if "SDFE" in items:
wacca_items = items["SDFE"]
if "SDFE" in scores:
wacca_scores = scores["SDFE"]
if "SDFE" in achievements:
wacca_achievements = achievements["SDFE"]
self.logger.info(f"Importing {len(wacca_profiles)} wacca profiles")
for pf in wacca_profiles:
if pf["version"] == 0 or pf["version"] == 1:
season = 1
elif pf["version"] == 2 or pf["version"] == 3:
season = 2
elif pf["version"] >= 4:
season = 3
if type(pf["data"]) is not dict:
pf["data"] = json.loads(pf["data"])
try:
sql = Insert(profile).values(
id = pf["id"],
user = pf["user"],
version = pf["version"],
season = season,
username = pf["data"]["profile"]["username"] if "username" in pf["data"]["profile"] else pf["name"],
xp = pf["data"]["profile"]["xp"],
xp_season = pf["data"]["profile"]["xp"],
wp = pf["data"]["profile"]["wp"],
wp_season = pf["data"]["profile"]["wp"],
wp_total = pf["data"]["profile"]["total_wp_gained"],
dan_type = pf["data"]["profile"]["dan_type"],
dan_level = pf["data"]["profile"]["dan_level"],
title_0 = pf["data"]["profile"]["title_part_ids"][0],
title_1 = pf["data"]["profile"]["title_part_ids"][1],
title_2 = pf["data"]["profile"]["title_part_ids"][2],
rating = pf["data"]["profile"]["rating"],
vip_expire_time = datetime.fromtimestamp(pf["data"]["profile"]["vip_expire_time"]) if "vip_expire_time" in pf["data"]["profile"] else None,
login_count = pf["use_count"],
playcount_single = pf["use_count"],
playcount_single_season = pf["use_count"],
last_game_ver = pf["data"]["profile"]["last_game_ver"],
last_song_id = pf["data"]["profile"]["last_song_info"][0] if "last_song_info" in pf["data"]["profile"] else 0,
last_song_difficulty = pf["data"]["profile"]["last_song_info"][1] if "last_song_info" in pf["data"]["profile"] else 0,
last_folder_order = pf["data"]["profile"]["last_song_info"][2] if "last_song_info" in pf["data"]["profile"] else 0,
last_folder_id = pf["data"]["profile"]["last_song_info"][3] if "last_song_info" in pf["data"]["profile"] else 0,
last_song_order = pf["data"]["profile"]["last_song_info"][4] if "last_song_info" in pf["data"]["profile"] else 0,
last_login_date = datetime.fromtimestamp(pf["data"]["profile"]["last_login_timestamp"]),
)
conflict = sql.on_duplicate_key_update(
id = pf["id"],
user = pf["user"],
version = pf["version"],
season = season,
username = pf["data"]["profile"]["username"] if "username" in pf["data"]["profile"] else pf["name"],
xp = pf["data"]["profile"]["xp"],
xp_season = pf["data"]["profile"]["xp"],
wp = pf["data"]["profile"]["wp"],
wp_season = pf["data"]["profile"]["wp"],
wp_total = pf["data"]["profile"]["total_wp_gained"],
dan_type = pf["data"]["profile"]["dan_type"],
dan_level = pf["data"]["profile"]["dan_level"],
title_0 = pf["data"]["profile"]["title_part_ids"][0],
title_1 = pf["data"]["profile"]["title_part_ids"][1],
title_2 = pf["data"]["profile"]["title_part_ids"][2],
rating = pf["data"]["profile"]["rating"],
vip_expire_time = datetime.fromtimestamp(pf["data"]["profile"]["vip_expire_time"]) if "vip_expire_time" in pf["data"]["profile"] else None,
login_count = pf["use_count"],
playcount_single = pf["use_count"],
playcount_single_season = pf["use_count"],
last_game_ver = pf["data"]["profile"]["last_game_ver"],
last_song_id = pf["data"]["profile"]["last_song_info"][0] if "last_song_info" in pf["data"]["profile"] else 0,
last_song_difficulty = pf["data"]["profile"]["last_song_info"][1] if "last_song_info" in pf["data"]["profile"] else 0,
last_folder_order = pf["data"]["profile"]["last_song_info"][2] if "last_song_info" in pf["data"]["profile"] else 0,
last_folder_id = pf["data"]["profile"]["last_song_info"][3] if "last_song_info" in pf["data"]["profile"] else 0,
last_song_order = pf["data"]["profile"]["last_song_info"][4] if "last_song_info" in pf["data"]["profile"] else 0,
last_login_date = datetime.fromtimestamp(pf["data"]["profile"]["last_login_timestamp"]),
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert wacca profile for user {pf['user']}")
continue
for opt, val in pf["data"]["option"].items():
if val != default_opts[opt]:
opt_id = opts[opt]
sql = Insert(option).values(
user = pf["user"],
opt_id = opt_id,
value = val,
)
conflict = sql.on_duplicate_key_update(
user = pf["user"],
opt_id = opt_id,
value = val,
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert wacca option for user {pf['user']} {opt} -> {val}")
except KeyError as e:
self.logger.warn(f"Outdated wacca profile, skipping: {e}")
if "gate" in pf["data"]:
for profile_gate in pf["data"]["gate"]:
sql = Insert(gate).values(
user = pf["user"],
gate_id = profile_gate["id"],
page = profile_gate["page"],
loops = profile_gate["loops"],
progress = profile_gate["progress"],
last_used = datetime.fromtimestamp(profile_gate["last_used"]),
mission_flag = profile_gate["mission_flag"],
total_points = profile_gate["total_points"],
)
conflict = sql.on_duplicate_key_update(
user = pf["user"],
gate_id = profile_gate["id"],
page = profile_gate["page"],
loops = profile_gate["loops"],
progress = profile_gate["progress"],
last_used = datetime.fromtimestamp(profile_gate["last_used"]),
mission_flag = profile_gate["mission_flag"],
total_points = profile_gate["total_points"],
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert wacca gate for user {pf['user']} -> {profile_gate['id']}")
continue
if "favorite" in pf["data"]:
for profile_favorite in pf["data"]["favorite"]:
sql = Insert(favorite).values(
user = pf["user"],
song_id = profile_favorite
)
conflict = sql.on_duplicate_key_update(
user = pf["user"],
song_id = profile_favorite
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert wacca favorite songs for user {pf['user']} -> {profile_favorite}")
continue
for it in wacca_items:
user = it["user"]
item_type = it["type"]
item_id = it["item_id"]
if type(it["data"]) is not dict:
it["data"] = json.loads(it["data"])
if item_type == item_types["ticket"]:
if "quantity" in it["data"]:
for x in range(it["data"]["quantity"]):
sql = Insert(ticket).values(
user = user,
ticket_id = item_id,
)
conflict = sql.on_duplicate_key_update(
user = user,
ticket_id = item_id,
)
result = self.base.execute(conflict)
if result is None:
self.logger.warn(f"Wacca: Failed to insert ticket {item_id} for user {user}")
elif item_type == item_types["music_unlock"] or item_type == item_types["music_difficulty_unlock"]:
diff = 0
if "difficulty" in it["data"]:
for x in it["data"]["difficulty"]:
if x == 1:
diff += 1
else:
break
sql = Insert(song_unlock).values(
user = user,
song_id = item_id,
highest_difficulty = diff,
)
conflict = sql.on_duplicate_key_update(
user = user,
song_id = item_id,
highest_difficulty = diff,
)
result = self.base.execute(conflict)
if result is None:
self.logger.warn(f"Wacca: Failed to insert song unlock {item_id} {diff} for user {user}")
elif item_type == item_types["trophy"]:
season = int(item_id / 100000)
sql = Insert(trophy).values(
user = user,
trophy_id = item_id,
season = season,
progress = 0 if "progress" not in it["data"] else it["data"]["progress"],
badge_type = 0 # ???
)
conflict = sql.on_duplicate_key_update(
user = user,
trophy_id = item_id,
season = season,
progress = 0 if "progress" not in it["data"] else it["data"]["progress"],
badge_type = 0 # ???
)
result = self.base.execute(conflict)
if result is None:
self.logger.warn(f"Wacca: Failed to insert trophy {item_id} for user {user}")
else:
sql = Insert(item).values(
user = user,
item_id = item_id,
type = item_type,
acquire_date = datetime.fromtimestamp(it["data"]["obtainedDate"]) if "obtainedDate" in it["data"] else datetime.now(),
use_count = it["data"]["uses"] if "uses" in it["data"] else 0,
use_count_season = it["data"]["season_uses"] if "season_uses" in it["data"] else 0
)
conflict = sql.on_duplicate_key_update(
user = user,
item_id = item_id,
type = item_type,
acquire_date = datetime.fromtimestamp(it["data"]["obtainedDate"]) if "obtainedDate" in it["data"] else datetime.now(),
use_count = it["data"]["uses"] if "uses" in it["data"] else 0,
use_count_season = it["data"]["season_uses"] if "season_uses" in it["data"] else 0
)
result = self.base.execute(conflict)
if result is None:
self.logger.warn(f"Wacca: Failed to insert trophy {item_id} for user {user}")
for sc in wacca_scores:
if type(sc["data"]) is not dict:
sc["data"] = json.loads(sc["data"])
sql = Insert(best_score).values(
user = sc["user"],
song_id = int(sc["song_id"]),
chart_id = sc["chart_id"],
score = sc["score1"],
play_ct = 1 if "play_count" not in sc["data"] else sc["data"]["play_count"],
clear_ct = 1 if sc["cleared"] & 0x01 else 0,
missless_ct = 1 if sc["cleared"] & 0x02 else 0,
fullcombo_ct = 1 if sc["cleared"] & 0x04 else 0,
allmarv_ct = 1 if sc["cleared"] & 0x08 else 0,
grade_d_ct = 1 if sc["grade"] & 0x01 else 0,
grade_c_ct = 1 if sc["grade"] & 0x02 else 0,
grade_b_ct = 1 if sc["grade"] & 0x04 else 0,
grade_a_ct = 1 if sc["grade"] & 0x08 else 0,
grade_aa_ct = 1 if sc["grade"] & 0x10 else 0,
grade_aaa_ct = 1 if sc["grade"] & 0x20 else 0,
grade_s_ct = 1 if sc["grade"] & 0x40 else 0,
grade_ss_ct = 1 if sc["grade"] & 0x80 else 0,
grade_sss_ct = 1 if sc["grade"] & 0x100 else 0,
grade_master_ct = 1 if sc["grade"] & 0x200 else 0,
grade_sp_ct = 1 if sc["grade"] & 0x400 else 0,
grade_ssp_ct = 1 if sc["grade"] & 0x800 else 0,
grade_sssp_ct = 1 if sc["grade"] & 0x1000 else 0,
best_combo = 0 if "max_combo" not in sc["data"] else sc["data"]["max_combo"],
lowest_miss_ct = 0 if "lowest_miss_count" not in sc["data"] else sc["data"]["lowest_miss_count"],
rating = 0 if "rating" not in sc["data"] else sc["data"]["rating"],
)
conflict = sql.on_duplicate_key_update(
user = sc["user"],
song_id = int(sc["song_id"]),
chart_id = sc["chart_id"],
score = sc["score1"],
play_ct = 1 if "play_count" not in sc["data"] else sc["data"]["play_count"],
clear_ct = 1 if sc["cleared"] & 0x01 else 0,
missless_ct = 1 if sc["cleared"] & 0x02 else 0,
fullcombo_ct = 1 if sc["cleared"] & 0x04 else 0,
allmarv_ct = 1 if sc["cleared"] & 0x08 else 0,
grade_d_ct = 1 if sc["grade"] & 0x01 else 0,
grade_c_ct = 1 if sc["grade"] & 0x02 else 0,
grade_b_ct = 1 if sc["grade"] & 0x04 else 0,
grade_a_ct = 1 if sc["grade"] & 0x08 else 0,
grade_aa_ct = 1 if sc["grade"] & 0x10 else 0,
grade_aaa_ct = 1 if sc["grade"] & 0x20 else 0,
grade_s_ct = 1 if sc["grade"] & 0x40 else 0,
grade_ss_ct = 1 if sc["grade"] & 0x80 else 0,
grade_sss_ct = 1 if sc["grade"] & 0x100 else 0,
grade_master_ct = 1 if sc["grade"] & 0x200 else 0,
grade_sp_ct = 1 if sc["grade"] & 0x400 else 0,
grade_ssp_ct = 1 if sc["grade"] & 0x800 else 0,
grade_sssp_ct = 1 if sc["grade"] & 0x1000 else 0,
best_combo = 0 if "max_combo" not in sc["data"] else sc["data"]["max_combo"],
lowest_miss_ct = 0 if "lowest_miss_count" not in sc["data"] else sc["data"]["lowest_miss_count"],
rating = 0 if "rating" not in sc["data"] else sc["data"]["rating"],
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert wacca score for user {sc['user']} {int(sc['song_id'])} {sc['chart_id']}")
for ach in wacca_achievements:
if ach["version"] == 0 or ach["version"] == 1:
season = 1
elif ach["version"] == 2 or ach["version"] == 3:
season = 2
elif ach["version"] >= 4:
season = 3
if type(ach["data"]) is not dict:
ach["data"] = json.loads(ach["data"])
sql = Insert(stageup).values(
user = ach["user"],
season = season,
stage_id = ach["achievement_id"],
clear_status = 0 if "clear" not in ach["data"] else ach["data"]["clear"],
clear_song_ct = 0 if "clear_song_ct" not in ach["data"] else ach["data"]["clear_song_ct"],
song1_score = 0 if "score1" not in ach["data"] else ach["data"]["score1"],
song2_score = 0 if "score2" not in ach["data"] else ach["data"]["score2"],
song3_score = 0 if "score3" not in ach["data"] else ach["data"]["score3"],
play_ct = 1 if "attemps" not in ach["data"] else ach["data"]["attemps"],
)
conflict = sql.on_duplicate_key_update(
user = ach["user"],
season = season,
stage_id = ach["achievement_id"],
clear_status = 0 if "clear" not in ach["data"] else ach["data"]["clear"],
clear_song_ct = 0 if "clear_song_ct" not in ach["data"] else ach["data"]["clear_song_ct"],
song1_score = 0 if "score1" not in ach["data"] else ach["data"]["score1"],
song2_score = 0 if "score2" not in ach["data"] else ach["data"]["score2"],
song3_score = 0 if "score3" not in ach["data"] else ach["data"]["score3"],
play_ct = 1 if "attemps" not in ach["data"] else ach["data"]["attemps"],
)
result = self.base.execute(conflict)
if result is None:
self.logger.error(f"Failed to insert wacca achievement for user {ach['user']}")
else:
self.logger.info(f"Wacca not found, skipping...")