From 4c64554383c5de1b9b87583f7a4896c1794df828 Mon Sep 17 00:00:00 2001 From: Midorica Date: Fri, 3 Mar 2023 13:27:22 -0500 Subject: [PATCH 01/12] pushing small typo for title port on both guides --- docs/INSTALL_UBUNTU.md | 2 +- docs/INSTALL_WINDOWS.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/INSTALL_UBUNTU.md b/docs/INSTALL_UBUNTU.md index eaabb0c..710c757 100644 --- a/docs/INSTALL_UBUNTU.md +++ b/docs/INSTALL_UBUNTU.md @@ -96,7 +96,7 @@ sudo ufw allow 8443 sudo ufw allow 22345 sudo ufw allow 8090 sudo ufw allow 8444 -sudo ufw allow 9000 +sudo ufw allow 8080 ``` ## Running the ARTEMiS instance diff --git a/docs/INSTALL_WINDOWS.md b/docs/INSTALL_WINDOWS.md index abd1e43..e88def3 100644 --- a/docs/INSTALL_WINDOWS.md +++ b/docs/INSTALL_WINDOWS.md @@ -57,7 +57,7 @@ title: ## Firewall Adjustements Make sure the following ports are open both on your router and local Windows firewall in case you want to use this for public use (NOT recommended): -> Port 80 (TCP), 443 (TCP), 8443 (TCP), 22345 (TCP), 8090 (TCP) **webui, 8444 (TCP) **mucha, 9000 (TCP) +> Port 80 (TCP), 443 (TCP), 8443 (TCP), 22345 (TCP), 8080 (TCP), 8090 (TCP) **webui, 8444 (TCP) **mucha ## Running the ARTEMiS instance > python index.py From fae6b77403926ed0b7e83e60022579effc6a4597 Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Fri, 3 Mar 2023 15:03:57 -0500 Subject: [PATCH 02/12] core: TESTING fix for get_machine --- core/data/schema/arcade.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/data/schema/arcade.py b/core/data/schema/arcade.py index 117c3fe..921e5b5 100644 --- a/core/data/schema/arcade.py +++ b/core/data/schema/arcade.py @@ -50,9 +50,20 @@ arcade_owner = Table( class ArcadeData(BaseData): def get_machine(self, serial: str = None, id: int = None) -> Optional[Dict]: if serial is not None: - sql = machine.select(machine.c.serial == serial) + serial = serial.replace("-", "") + if len(serial) == 11: + sql = machine.select(machine.c.serial.like(f"{serial}%")) + + elif len(serial) == 15: + sql = machine.select(machine.c.serial == serial) + + else: + self.logger.error(f"{__name__ }: Malformed serial {serial}") + return None + elif id is not None: sql = machine.select(machine.c.id == id) + else: self.logger.error(f"{__name__ }: Need either serial or ID to look up!") return None @@ -110,4 +121,4 @@ class ArcadeData(BaseData): return result.lastrowid def generate_keychip_serial(self, platform_id: int) -> str: - pass \ No newline at end of file + pass From 101b966e3a4f2f80f8acaab331b1297a22e51e02 Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Fri, 3 Mar 2023 15:39:14 -0500 Subject: [PATCH 03/12] add allnet request debug logging --- core/allnet.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/allnet.py b/core/allnet.py index 587aeab..01a3610 100644 --- a/core/allnet.py +++ b/core/allnet.py @@ -115,6 +115,7 @@ class AllnetServlet: else: resp = AllnetPowerOnResponse2() + self.logger.debug(f"Allnet request: {vars(req)}") if req.game_id not in self.uri_registry: msg = f"Unrecognised game {req.game_id} attempted allnet auth from {request_ip}." self.data.base.log_event("allnet", "ALLNET_AUTH_UNKNOWN_GAME", logging.WARN, msg) @@ -154,6 +155,7 @@ class AllnetServlet: msg = f"{req.serial} authenticated from {request_ip}: {req.game_id} v{req.ver}" self.data.base.log_event("allnet", "ALLNET_AUTH_SUCCESS", logging.INFO, msg) self.logger.info(msg) + self.logger.debug(f"Allnet response: {vars(resp)}") return self.dict_to_http_form_string([vars(resp)]).encode("utf-8") From f6cfb9e36dbdf3f56896a978bf57326067291019 Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Fri, 3 Mar 2023 15:45:21 -0500 Subject: [PATCH 04/12] allnet: fix "none" in response --- core/allnet.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/allnet.py b/core/allnet.py index 01a3610..4f886f9 100644 --- a/core/allnet.py +++ b/core/allnet.py @@ -14,6 +14,7 @@ from time import strptime from core.config import CoreConfig from core.data import Data from core.utils import Utils +from core.const import * class AllnetServlet: def __init__(self, core_cfg: CoreConfig, cfg_folder: str): @@ -137,15 +138,15 @@ class AllnetServlet: if machine is not None: arcade = self.data.arcade.get_arcade(machine["arcade"]) - resp.country = arcade["country"] if machine["country"] is None else machine["country"] + resp.country = arcade["country"] if machine["country"] is None else machine["country"] if machine["country"] is None else AllnetCountryCode.JAPAN.value resp.place_id = arcade["id"] resp.allnet_id = machine["id"] - resp.name = arcade["name"] - resp.nickname = arcade["nickname"] - resp.region0 = arcade["region_id"] - resp.region_name0 = arcade["country"] - resp.region_name1 = arcade["state"] - resp.region_name2 = arcade["city"] + resp.name = arcade["name"] if arcade["name"] is not None else "" + resp.nickname = arcade["nickname"] if arcade["nickname"] is not None else "" + resp.region0 = arcade["region_id"] if arcade["region_id"] is not None else AllnetJapanRegionId.AICHI.value + resp.region_name0 = arcade["country"] if arcade["country"] is not None else AllnetCountryCode.JAPAN.value + resp.region_name1 = arcade["state"] if arcade["state"] is not None else AllnetJapanRegionId.AICHI.name + resp.region_name2 = arcade["city"] if arcade["city"] is not None else "" resp.client_timezone = arcade["timezone"] if arcade["timezone"] is not None else "+0900" int_ver = req.ver.replace(".", "") From b35e7d69831ee6ae13c3080a6bd16343e05ce4ed Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Fri, 3 Mar 2023 15:49:33 -0500 Subject: [PATCH 05/12] allnet: hotfix for country --- core/allnet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/allnet.py b/core/allnet.py index 4f886f9..233469a 100644 --- a/core/allnet.py +++ b/core/allnet.py @@ -138,7 +138,7 @@ class AllnetServlet: if machine is not None: arcade = self.data.arcade.get_arcade(machine["arcade"]) - resp.country = arcade["country"] if machine["country"] is None else machine["country"] if machine["country"] is None else AllnetCountryCode.JAPAN.value + resp.country = arcade["country"] if machine["country"] is None else machine["country"] if machine["country"] is not None else AllnetCountryCode.JAPAN.value resp.place_id = arcade["id"] resp.allnet_id = machine["id"] resp.name = arcade["name"] if arcade["name"] is not None else "" From 34e2c50fb5b1fad897c6f69ba60f5687fe3f3029 Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Fri, 3 Mar 2023 15:52:58 -0500 Subject: [PATCH 06/12] allnet: see previous --- core/allnet.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/allnet.py b/core/allnet.py index 233469a..71eeb33 100644 --- a/core/allnet.py +++ b/core/allnet.py @@ -138,7 +138,11 @@ class AllnetServlet: if machine is not None: arcade = self.data.arcade.get_arcade(machine["arcade"]) - resp.country = arcade["country"] if machine["country"] is None else machine["country"] if machine["country"] is not None else AllnetCountryCode.JAPAN.value + country = arcade["country"] if machine["country"] is None else machine["country"] + if country is None: + country = AllnetCountryCode.JAPAN.value + + resp.country = country resp.place_id = arcade["id"] resp.allnet_id = machine["id"] resp.name = arcade["name"] if arcade["name"] is not None else "" From f24d554a448673d67dfdc52286e5301a7832f278 Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Fri, 3 Mar 2023 16:26:07 -0500 Subject: [PATCH 07/12] wacca: pull region_id from allnet if available --- titles/wacca/base.py | 21 +++++++++++++++++++-- titles/wacca/const.py | 23 ++++++++++++++++++++++- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/titles/wacca/base.py b/titles/wacca/base.py index 25ec33b..64b4248 100644 --- a/titles/wacca/base.py +++ b/titles/wacca/base.py @@ -91,10 +91,27 @@ class WaccaBase(): def handle_housing_start_request(self, data: Dict) -> Dict: req = HousingStartRequestV1(data) - if req.appVersion.country != "JPN" and req.appVersion.country in [region.name for region in WaccaConstants.Region]: + machine = self.data.arcade.get_machine(req.chipId) + if machine is not None: + arcade = self.data.arcade.get_arcade(machine["arcade"]) + + allnet_region_id = arcade["region_id"] + + if req.appVersion.country == "JPN": + if allnet_region_id is not None: + region = WaccaConstants.allnet_region_id_to_wacca_region(allnet_region_id) + + if region is None: + region_id = self.region_id + + else: + region_id = self.region_id + + elif req.appVersion.country in WaccaConstants.VALID_COUNTRIES: region_id = WaccaConstants.Region[req.appVersion.country] + else: - region_id = self.region_id + region_id = 0 resp = HousingStartResponseV1(region_id) return resp.make() diff --git a/titles/wacca/const.py b/titles/wacca/const.py index a984682..f072143 100644 --- a/titles/wacca/const.py +++ b/titles/wacca/const.py @@ -1,4 +1,7 @@ from enum import Enum +from typing import Optional + +from core.const import AllnetJapanRegionId class WaccaConstants(): CONFIG_NAME = "wacca.yaml" @@ -165,4 +168,22 @@ class WaccaConstants(): @classmethod def game_ver_to_string(cls, ver: int): - return cls.VERSION_NAMES[ver] \ No newline at end of file + return cls.VERSION_NAMES[ver] + + @classmethod + def allnet_region_id_to_wacca_region(cls, region: int) -> Optional[Region]: + try: + return [ + cls.Region.NONE, cls.Region.AICHI, cls.Region.AOMORI, cls.Region.AKITA, cls.Region.ISHIKAWA, + cls.Region.IBARAKI, cls.Region.IWATE, cls.Region.EHIME, cls.Region.OITA, cls.Region.OSAKA, + cls.Region.OKAYAMA, cls.Region.OKINAWA, cls.Region.KAGAWA, cls.Region.KAGOSHIMA, cls.Region.KANAGAWA, + cls.Region.GIFU, cls.Region.KYOTO, cls.Region.KUMAMOTO, cls.Region.GUNMA, cls.Region.KOCHI, + cls.Region.SAITAMA, cls.Region.SAGA, cls.Region.SHIGA, cls.Region.SHIZUOKA, cls.Region.SHIMANE, + cls.Region.CHIBA, cls.Region.TOKYO, cls.Region.TOKUSHIMA, cls.Region.TOCHIGI, cls.Region.TOTTORI, + cls.Region.TOYAMA, cls.Region.NAGASAKI, cls.Region.NAGANO, cls.Region.NARA, cls.Region.NIIGATA, + cls.Region.HYOGO, cls.Region.HIROSHIMA, cls.Region.FUKUI, cls.Region.FUKUOKA, cls.Region.FUKUSHIMA, + cls.Region.HOKKAIDO, cls.Region.MIE, cls.Region.MIYAGI, cls.Region.MIYAZAKI, cls.Region.YAMAGATA, + cls.Region.YAMAGUCHI, cls.Region.YAMANASHI, cls.Region.WAKAYAMA, + ][region] + except: return None + From 9ad724d64beee2f0c27891acfe1d8452b614dc7c Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Fri, 3 Mar 2023 16:28:42 -0500 Subject: [PATCH 08/12] wacca: fix edge case in handle_housing_start_request --- titles/wacca/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/titles/wacca/base.py b/titles/wacca/base.py index 64b4248..9867d5a 100644 --- a/titles/wacca/base.py +++ b/titles/wacca/base.py @@ -103,6 +103,8 @@ class WaccaBase(): if region is None: region_id = self.region_id + else: + region_id = region else: region_id = self.region_id @@ -111,7 +113,7 @@ class WaccaBase(): region_id = WaccaConstants.Region[req.appVersion.country] else: - region_id = 0 + region_id = WaccaConstants.Region.NONE resp = HousingStartResponseV1(region_id) return resp.make() From c26f6b7b1d500b052140079dcad99ca2630e3c70 Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Fri, 3 Mar 2023 16:31:42 -0500 Subject: [PATCH 09/12] wacca: fix typing --- titles/wacca/base.py | 5 +++-- titles/wacca/handlers/helpers.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/titles/wacca/base.py b/titles/wacca/base.py index 9867d5a..2e5001f 100644 --- a/titles/wacca/base.py +++ b/titles/wacca/base.py @@ -9,6 +9,7 @@ from titles.wacca.const import WaccaConstants from titles.wacca.database import WaccaData from titles.wacca.handlers import * +from core.const import AllnetCountryCode class WaccaBase(): def __init__(self, cfg: CoreConfig, game_cfg: WaccaConfig) -> None: @@ -74,6 +75,7 @@ class WaccaBase(): if prefecture_name not in [region.name for region in WaccaConstants.Region]: self.logger.warning(f"Invalid prefecture name {game_cfg.server.prefecture_name} in config file") self.region_id = WaccaConstants.Region.HOKKAIDO + else: self.region_id = WaccaConstants.Region[prefecture_name] @@ -94,10 +96,9 @@ class WaccaBase(): machine = self.data.arcade.get_machine(req.chipId) if machine is not None: arcade = self.data.arcade.get_arcade(machine["arcade"]) - allnet_region_id = arcade["region_id"] - if req.appVersion.country == "JPN": + if req.appVersion.country == AllnetCountryCode.JAPAN.value: if allnet_region_id is not None: region = WaccaConstants.allnet_region_id_to_wacca_region(allnet_region_id) diff --git a/titles/wacca/handlers/helpers.py b/titles/wacca/handlers/helpers.py index 1192237..b96b3dd 100644 --- a/titles/wacca/handlers/helpers.py +++ b/titles/wacca/handlers/helpers.py @@ -78,9 +78,9 @@ class Version(ShortVersion): super().__init__(version, major, minor, patch) split = version.split(".") if len(split) >= 6: - self.country = split[3] + self.country: str = split[3] self.build = int(split[4]) - self.role = split[5] + self.role: str = split[5] else: self.country = country From 2a6842db245745d7b2167235466e52c03802bf4e Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Fri, 3 Mar 2023 17:03:19 -0500 Subject: [PATCH 10/12] remove db old-to-new migration --- core/data/database.py | 1614 ----------------------------------------- 1 file changed, 1614 deletions(-) diff --git a/core/data/database.py b/core/data/database.py index 70fc3e0..65b01aa 100644 --- a/core/data/database.py +++ b/core/data/database.py @@ -138,1617 +138,3 @@ class Data: return None self.logger.info(f"Successfully migrated {game} to schema version {version}") - - def dump_db(self): - dbname = self.config.database.name - - self.logger.info("Database dumper for use with the reworked schema") - self.logger.info("Dumping users...") - - sql = f"SELECT * FROM `{dbname}`.`user`" - - result = self.base.execute(sql) - if result is None: - self.logger.error("Failed") - return None - users = result.fetchall() - - user_list: List[Dict[str, Any]] = [] - for usr in users: - user_list.append({ - "id": usr["id"], - "username": usr["username"], - "email": usr["email"], - "password": usr["password"], - "permissions": usr["permissions"], - "created_date": datetime.strftime(usr["created_date"], "%Y-%m-%d %H:%M:%S"), - "last_login_date": datetime.strftime(usr["accessed_date"], "%Y-%m-%d %H:%M:%S"), - }) - - self.logger.info(f"Done, found {len(user_list)} users") - with open("dbdump-user.json", "w", encoding="utf-8") as f: - f.write(json.dumps(user_list)) - self.logger.info(f"Saved as dbdump-user.json") - - self.logger.info("Dumping cards...") - - sql = f"SELECT * FROM `{dbname}`.`card`" - - result = self.base.execute(sql) - if result is None: - self.logger.error("Failed") - return None - cards = result.fetchall() - - card_list: List[Dict[str, Any]] = [] - for crd in cards: - card_list.append({ - "id": crd["id"], - "user": crd["user"], - "access_code": crd["access_code"], - "is_locked": crd["is_locked"], - "is_banned": crd["is_banned"], - "created_date": datetime.strftime(crd["created_date"], "%Y-%m-%d %H:%M:%S"), - "last_login_date": datetime.strftime(crd["accessed_date"], "%Y-%m-%d %H:%M:%S"), - }) - - self.logger.info(f"Done, found {len(card_list)} cards") - with open("dbdump-card.json", "w", encoding="utf-8") as f: - f.write(json.dumps(card_list)) - self.logger.info(f"Saved as dbdump-card.json") - - self.logger.info("Dumping arcades...") - - sql = f"SELECT * FROM `{dbname}`.`arcade`" - - result = self.base.execute(sql) - if result is None: - self.logger.error("Failed") - return None - arcades = result.fetchall() - - arcade_list: List[Dict[str, Any]] = [] - for arc in arcades: - arcade_list.append({ - "id": arc["id"], - "name": arc["name"], - "nickname": arc["name"], - "country": None, - "country_id": None, - "state": None, - "city": None, - "region_id": None, - "timezone": None, - }) - - self.logger.info(f"Done, found {len(arcade_list)} arcades") - with open("dbdump-arcade.json", "w", encoding="utf-8") as f: - f.write(json.dumps(arcade_list)) - self.logger.info(f"Saved as dbdump-arcade.json") - - self.logger.info("Dumping machines...") - - sql = f"SELECT * FROM `{dbname}`.`machine`" - - result = self.base.execute(sql) - if result is None: - self.logger.error("Failed") - return None - machines = result.fetchall() - - machine_list: List[Dict[str, Any]] = [] - for mech in machines: - if "country" in mech["data"]: - country = mech["data"]["country"] - else: - country = None - - if "ota_enable" in mech["data"]: - ota_enable = mech["data"]["ota_enable"] - else: - ota_enable = None - - machine_list.append({ - "id": mech["id"], - "arcade": mech["arcade"], - "serial": mech["keychip"], - "game": mech["game"], - "board": None, - "country": country, - "timezone": None, - "ota_enable": ota_enable, - "is_cab": False, - }) - - self.logger.info(f"Done, found {len(machine_list)} machines") - with open("dbdump-machine.json", "w", encoding="utf-8") as f: - f.write(json.dumps(machine_list)) - self.logger.info(f"Saved as dbdump-machine.json") - - self.logger.info("Dumping arcade owners...") - - sql = f"SELECT * FROM `{dbname}`.`arcade_owner`" - - result = self.base.execute(sql) - if result is None: - self.logger.error("Failed") - return None - arcade_owners = result.fetchall() - - owner_list: List[Dict[str, Any]] = [] - for owner in owner_list: - owner_list.append(owner._asdict()) - - self.logger.info(f"Done, found {len(owner_list)} arcade owners") - with open("dbdump-arcade_owner.json", "w", encoding="utf-8") as f: - f.write(json.dumps(owner_list)) - self.logger.info(f"Saved as dbdump-arcade_owner.json") - - self.logger.info("Dumping profiles...") - - sql = f"SELECT * FROM `{dbname}`.`profile`" - - result = self.base.execute(sql) - if result is None: - self.logger.error("Failed") - return None - profiles = result.fetchall() - - profile_list: Dict[List[Dict[str, Any]]] = {} - for pf in profiles: - game = pf["game"] - - if game not in profile_list: - profile_list[game] = [] - - profile_list[game].append({ - "id": pf["id"], - "user": pf["user"], - "version": pf["version"], - "use_count": pf["use_count"], - "name": pf["name"], - "game_id": pf["game_id"], - "mods": pf["mods"], - "data": pf["data"], - }) - - self.logger.info(f"Done, found profiles for {len(profile_list)} games") - with open("dbdump-profile.json", "w", encoding="utf-8") as f: - f.write(json.dumps(profile_list)) - self.logger.info(f"Saved as dbdump-profile.json") - - self.logger.info("Dumping scores...") - - sql = f"SELECT * FROM `{dbname}`.`score`" - - result = self.base.execute(sql) - if result is None: - self.logger.error("Failed") - return None - scores = result.fetchall() - - score_list: Dict[List[Dict[str, Any]]] = {} - for sc in scores: - game = sc["game"] - - if game not in score_list: - score_list[game] = [] - - score_list[game].append({ - "id": sc["id"], - "user": sc["user"], - "version": sc["version"], - "song_id": sc["song_id"], - "chart_id": sc["chart_id"], - "score1": sc["score1"], - "score2": sc["score2"], - "fc1": sc["fc1"], - "fc2": sc["fc2"], - "cleared": sc["cleared"], - "grade": sc["grade"], - "data": sc["data"], - }) - - self.logger.info(f"Done, found scores for {len(score_list)} games") - with open("dbdump-score.json", "w", encoding="utf-8") as f: - f.write(json.dumps(score_list)) - self.logger.info(f"Saved as dbdump-score.json") - - self.logger.info("Dumping achievements...") - - sql = f"SELECT * FROM `{dbname}`.`achievement`" - - result = self.base.execute(sql) - if result is None: - self.logger.error("Failed") - return None - achievements = result.fetchall() - - achievement_list: Dict[List[Dict[str, Any]]] = {} - for ach in achievements: - game = ach["game"] - - if game not in achievement_list: - achievement_list[game] = [] - - achievement_list[game].append({ - "id": ach["id"], - "user": ach["user"], - "version": ach["version"], - "type": ach["type"], - "achievement_id": ach["achievement_id"], - "data": ach["data"], - }) - - self.logger.info(f"Done, found achievements for {len(achievement_list)} games") - with open("dbdump-achievement.json", "w", encoding="utf-8") as f: - f.write(json.dumps(achievement_list)) - self.logger.info(f"Saved as dbdump-achievement.json") - - self.logger.info("Dumping items...") - - sql = f"SELECT * FROM `{dbname}`.`item`" - - result = self.base.execute(sql) - if result is None: - self.logger.error("Failed") - return None - items = result.fetchall() - - item_list: Dict[List[Dict[str, Any]]] = {} - for itm in items: - game = itm["game"] - - if game not in item_list: - item_list[game] = [] - - item_list[game].append({ - "id": itm["id"], - "user": itm["user"], - "version": itm["version"], - "type": itm["type"], - "item_id": itm["item_id"], - "data": ach["data"], - }) - - self.logger.info(f"Done, found items for {len(item_list)} games") - with open("dbdump-item.json", "w", encoding="utf-8") as f: - f.write(json.dumps(item_list)) - self.logger.info(f"Saved as dbdump-item.json") - - def restore_from_old_schema(self): - # Import the tables we expect to be there - from core.data.schema.user import aime_user - from core.data.schema.card import aime_card - from core.data.schema.arcade import arcade, machine, arcade_owner - from sqlalchemy.dialects.mysql import Insert - - # Make sure that all the tables we're trying to access exist - self.create_database() - - # Import the data, making sure that dependencies are accounted for - if os.path.exists("dbdump-user.json"): - users = [] - with open("dbdump-user.json", "r", encoding="utf-8") as f: - users = json.load(f) - - self.logger.info(f"Load {len(users)} users") - - for user in users: - sql = Insert(aime_user).values(**user) - - conflict = sql.on_duplicate_key_update(**user) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert user {user['id']}") - continue - self.logger.info(f"Inserted user {user['id']} -> {result.lastrowid}") - - if os.path.exists("dbdump-card.json"): - cards = [] - with open("dbdump-card.json", "r", encoding="utf-8") as f: - cards = json.load(f) - - self.logger.info(f"Load {len(cards)} cards") - - for card in cards: - sql = Insert(aime_card).values(**card) - - conflict = sql.on_duplicate_key_update(**card) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert card {card['id']}") - continue - self.logger.info(f"Inserted card {card['id']} -> {result.lastrowid}") - - if os.path.exists("dbdump-arcade.json"): - arcades = [] - with open("dbdump-arcade.json", "r", encoding="utf-8") as f: - arcades = json.load(f) - - self.logger.info(f"Load {len(arcades)} arcades") - - for ac in arcades: - sql = Insert(arcade).values(**ac) - - conflict = sql.on_duplicate_key_update(**ac) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert arcade {ac['id']}") - continue - self.logger.info(f"Inserted arcade {ac['id']} -> {result.lastrowid}") - - if os.path.exists("dbdump-arcade_owner.json"): - ac_owners = [] - with open("dbdump-arcade_owner.json", "r", encoding="utf-8") as f: - ac_owners = json.load(f) - - self.logger.info(f"Load {len(ac_owners)} arcade owners") - - for owner in ac_owners: - sql = Insert(arcade_owner).values(**owner) - - conflict = sql.on_duplicate_key_update(**owner) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert arcade_owner {owner['user']}") - continue - self.logger.info(f"Inserted arcade_owner {owner['user']} -> {result.lastrowid}") - - if os.path.exists("dbdump-machine.json"): - mechs = [] - with open("dbdump-machine.json", "r", encoding="utf-8") as f: - mechs = json.load(f) - - self.logger.info(f"Load {len(mechs)} machines") - - for mech in mechs: - sql = Insert(machine).values(**mech) - - conflict = sql.on_duplicate_key_update(**mech) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert machine {mech['id']}") - continue - self.logger.info(f"Inserted machine {mech['id']} -> {result.lastrowid}") - - # Now the fun part, grabbing all our scores, profiles, items, and achievements and trying - # to conform them to our current, freeform schema. This will be painful... - profiles = {} - items = {} - scores = {} - achievements = {} - - if os.path.exists("dbdump-profile.json"): - with open("dbdump-profile.json", "r", encoding="utf-8") as f: - profiles = json.load(f) - - self.logger.info(f"Load {len(profiles)} profiles") - - if os.path.exists("dbdump-item.json"): - with open("dbdump-item.json", "r", encoding="utf-8") as f: - items = json.load(f) - - self.logger.info(f"Load {len(items)} items") - - if os.path.exists("dbdump-score.json"): - with open("dbdump-score.json", "r", encoding="utf-8") as f: - scores = json.load(f) - - self.logger.info(f"Load {len(scores)} scores") - - if os.path.exists("dbdump-achievement.json"): - with open("dbdump-achievement.json", "r", encoding="utf-8") as f: - achievements = json.load(f) - - self.logger.info(f"Load {len(achievements)} achievements") - - # Chuni / Chusan - if os.path.exists("titles/chuni/schema"): - from titles.chuni.schema.item import character, item, duel, map, map_area - from titles.chuni.schema.profile import profile, profile_ex, option, option_ex - from titles.chuni.schema.profile import recent_rating, activity, charge, emoney - from titles.chuni.schema.profile import overpower - from titles.chuni.schema.score import best_score, course - - chuni_profiles = [] - chuni_items = [] - chuni_scores = [] - - if "SDBT" in profiles: - chuni_profiles = profiles["SDBT"] - if "SDBT" in items: - chuni_items = items["SDBT"] - if "SDBT" in scores: - chuni_scores = scores["SDBT"] - if "SDHD" in profiles: - chuni_profiles += profiles["SDHD"] - if "SDHD" in items: - chuni_items += items["SDHD"] - if "SDHD" in scores: - chuni_scores += scores["SDHD"] - - self.logger.info(f"Importing {len(chuni_profiles)} chunithm/chunithm new profiles") - - for pf in chuni_profiles: - if type(pf["data"]) is not dict: - pf["data"] = json.loads(pf["data"]) - pf_data = pf["data"] - - # data - if "userData" in pf_data: - pf_data["userData"]["userName"] = bytes([ord(c) for c in pf_data["userData"]["userName"]]).decode("utf-8") - pf_data["userData"]["user"] = pf["user"] - pf_data["userData"]["version"] = pf["version"] - pf_data["userData"].pop("accessCode") - - if pf_data["userData"]["lastRomVersion"].startswith("2."): - pf_data["userData"]["version"] += 10 - - pf_data["userData"] = self.base.fix_bools(pf_data["userData"]) - - sql = Insert(profile).values(**pf_data["userData"]) - conflict = sql.on_duplicate_key_update(**pf_data["userData"]) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni profile data for {pf['user']}") - continue - self.logger.info(f"Inserted chuni profile for {pf['user']} ->{result.lastrowid}") - - # data_ex - if "userDataEx" in pf_data and len(pf_data["userDataEx"]) > 0: - pf_data["userDataEx"][0]["user"] = pf["user"] - pf_data["userDataEx"][0]["version"] = pf["version"] - - pf_data["userDataEx"] = self.base.fix_bools(pf_data["userDataEx"][0]) - - sql = Insert(profile_ex).values(**pf_data["userDataEx"]) - conflict = sql.on_duplicate_key_update(**pf_data["userDataEx"]) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni profile data_ex for {pf['user']}") - continue - self.logger.info(f"Inserted chuni profile data_ex for {pf['user']} ->{result.lastrowid}") - - # option - if "userGameOption" in pf_data: - pf_data["userGameOption"]["user"] = pf["user"] - - pf_data["userGameOption"] = self.base.fix_bools(pf_data["userGameOption"]) - - sql = Insert(option).values(**pf_data["userGameOption"]) - conflict = sql.on_duplicate_key_update(**pf_data["userGameOption"]) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni profile options for {pf['user']}") - continue - self.logger.info(f"Inserted chuni profile options for {pf['user']} ->{result.lastrowid}") - - # option_ex - if "userGameOptionEx" in pf_data and len(pf_data["userGameOptionEx"]) > 0: - pf_data["userGameOptionEx"][0]["user"] = pf["user"] - - pf_data["userGameOptionEx"] = self.base.fix_bools(pf_data["userGameOptionEx"][0]) - - sql = Insert(option_ex).values(**pf_data["userGameOptionEx"]) - conflict = sql.on_duplicate_key_update(**pf_data["userGameOptionEx"]) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni profile option_ex for {pf['user']}") - continue - self.logger.info(f"Inserted chuni profile option_ex for {pf['user']} ->{result.lastrowid}") - - # recent_rating - if "userRecentRatingList" in pf_data: - rr = { - "user": pf["user"], - "recentRating": pf_data["userRecentRatingList"] - } - - sql = Insert(recent_rating).values(**rr) - conflict = sql.on_duplicate_key_update(**rr) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni profile recent_rating for {pf['user']}") - continue - self.logger.info(f"Inserted chuni profile recent_rating for {pf['user']} ->{result.lastrowid}") - - # activity - if "userActivityList" in pf_data: - for act in pf_data["userActivityList"]: - act["user"] = pf["user"] - - sql = Insert(activity).values(**act) - conflict = sql.on_duplicate_key_update(**act) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni profile activity for {pf['user']}") - else: - self.logger.info(f"Inserted chuni profile activity for {pf['user']} ->{result.lastrowid}") - - # charge - if "userChargeList" in pf_data: - for cg in pf_data["userChargeList"]: - cg["user"] = pf["user"] - - cg = self.base.fix_bools(cg) - - sql = Insert(charge).values(**cg) - conflict = sql.on_duplicate_key_update(**cg) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni profile charge for {pf['user']}") - else: - self.logger.info(f"Inserted chuni profile charge for {pf['user']} ->{result.lastrowid}") - - # emoney - if "userEmoneyList" in pf_data: - for emon in pf_data["userEmoneyList"]: - emon["user"] = pf["user"] - - sql = Insert(emoney).values(**emon) - conflict = sql.on_duplicate_key_update(**emon) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni profile emoney for {pf['user']}") - else: - self.logger.info(f"Inserted chuni profile emoney for {pf['user']} ->{result.lastrowid}") - - # overpower - if "userOverPowerList" in pf_data: - for op in pf_data["userOverPowerList"]: - op["user"] = pf["user"] - - sql = Insert(overpower).values(**op) - conflict = sql.on_duplicate_key_update(**op) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni profile overpower for {pf['user']}") - else: - self.logger.info(f"Inserted chuni profile overpower for {pf['user']} ->{result.lastrowid}") - - # map_area - if "userMapAreaList" in pf_data: - for ma in pf_data["userMapAreaList"]: - ma["user"] = pf["user"] - - ma = self.base.fix_bools(ma) - - sql = Insert(map_area).values(**ma) - conflict = sql.on_duplicate_key_update(**ma) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni map_area for {pf['user']}") - else: - self.logger.info(f"Inserted chuni map_area for {pf['user']} ->{result.lastrowid}") - - #duel - if "userDuelList" in pf_data: - for ma in pf_data["userDuelList"]: - ma["user"] = pf["user"] - - ma = self.base.fix_bools(ma) - - sql = Insert(duel).values(**ma) - conflict = sql.on_duplicate_key_update(**ma) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni duel for {pf['user']}") - else: - self.logger.info(f"Inserted chuni duel for {pf['user']} ->{result.lastrowid}") - - # map - if "userMapList" in pf_data: - for ma in pf_data["userMapList"]: - ma["user"] = pf["user"] - - ma = self.base.fix_bools(ma) - - sql = Insert(map).values(**ma) - conflict = sql.on_duplicate_key_update(**ma) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni map for {pf['user']}") - else: - self.logger.info(f"Inserted chuni map for {pf['user']} ->{result.lastrowid}") - - self.logger.info(f"Importing {len(chuni_items)} chunithm/chunithm new items") - - for i in chuni_items: - if type(i["data"]) is not dict: - i["data"] = json.loads(i["data"]) - i_data = i["data"] - - i_data["user"] = i["user"] - - i_data = self.base.fix_bools(i_data) - - try: i_data.pop("assignIllust") - except: pass - - try: i_data.pop("exMaxLv") - except: pass - - if i["type"] == 20: #character - sql = Insert(character).values(**i_data) - else: - sql = Insert(item).values(**i_data) - - conflict = sql.on_duplicate_key_update(**i_data) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert chuni item for user {i['user']}") - - else: - self.logger.info(f"Inserted chuni item for user {i['user']} {i['item_id']} -> {result.lastrowid}") - - self.logger.info(f"Importing {len(chuni_scores)} chunithm/chunithm new scores") - - for sc in chuni_scores: - if type(sc["data"]) is not dict: - sc["data"] = json.loads(sc["data"]) - - score_data = self.base.fix_bools(sc["data"]) - - try: score_data.pop("theoryCount") - except: pass - - try: score_data.pop("ext1") - except: pass - - score_data["user"] = sc["user"] - - sql = Insert(best_score).values(**score_data) - conflict = sql.on_duplicate_key_update(**score_data) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to put chuni score for user {sc['user']}") - else: - self.logger.info(f"Inserted chuni score for user {sc['user']} {sc['song_id']}/{sc['chart_id']} -> {result.lastrowid}") - - else: - self.logger.info(f"Chuni/Chusan not found, skipping...") - - # CXB - if os.path.exists("titles/cxb/schema"): - from titles.cxb.schema.item import energy - from titles.cxb.schema.profile import profile - from titles.cxb.schema.score import score, ranking - - cxb_profiles = [] - cxb_items = [] - cxb_scores = [] - - if "SDCA" in profiles: - cxb_profiles = profiles["SDCA"] - if "SDCA" in items: - cxb_items = items["SDCA"] - if "SDCA" in scores: - cxb_scores = scores["SDCA"] - - self.logger.info(f"Importing {len(cxb_profiles)} CXB profiles") - - for pf in cxb_profiles: - user = pf["user"] - version = pf["version"] - pf_data = pf["data"]["data"] - pf_idx = pf["data"]["index"] - - for x in range(len(pf_data)): - sql = Insert(profile).values( - user = user, - version = version, - index = int(pf_idx[x]), - data = json.loads(pf_data[x]) if type(pf_data[x]) is not dict else pf_data[x] - ) - - conflict = sql.on_duplicate_key_update( - user = user, - version = version, - index = int(pf_idx[x]), - data = json.loads(pf_data[x]) if type(pf_data[x]) is not dict else pf_data[x] - ) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert CXB profile for user {user} Index {pf_idx[x]}") - - self.logger.info(f"Importing {len(cxb_scores)} CXB scores") - - for sc in cxb_scores: - user = sc["user"] - version = sc["version"] - mcode = sc["data"]["mcode"] - index = sc["data"]["index"] - - sql = Insert(score).values( - user = user, - game_version = version, - song_mcode = mcode, - song_index = index, - data = sc["data"] - ) - - conflict = sql.on_duplicate_key_update( - user = user, - game_version = version, - song_mcode = mcode, - song_index = index, - data = sc["data"] - ) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert CXB score for user {user} mcode {mcode}") - - self.logger.info(f"Importing {len(cxb_items)} CXB items") - - for it in cxb_items: - user = it["user"] - - if it["type"] == 3: # energy - sql = Insert(energy).values( - user = user, - energy = it["data"]["total"] - ) - - conflict = sql.on_duplicate_key_update( - user = user, - energy = it["data"]["total"] - ) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert CXB energy for user {user}") - - elif it["type"] == 2: - sql = Insert(ranking).values( - user = user, - rev_id = it["data"]["rid"], - song_id = it["data"]["sc"][1] if len(it["data"]["sc"]) > 1 else None, - score = it["data"]["sc"][0], - clear = it["data"]["clear"], - ) - - conflict = sql.on_duplicate_key_update( - user = user, - rev_id = it["data"]["rid"], - song_id = it["data"]["sc"][1] if len(it["data"]["sc"]) > 1 else None, - score = it["data"]["sc"][0], - clear = it["data"]["clear"], - ) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert CXB ranking for user {user}") - - else: - self.logger.error(f"Unknown CXB item type {it['type']} for user {user}") - - else: - self.logger.info(f"CXB not found, skipping...") - - # Diva - if os.path.exists("titles/diva/schema"): - from titles.diva.schema.profile import profile - from titles.diva.schema.score import score - from titles.diva.schema.item import shop - - diva_profiles = [] - diva_scores = [] - - if "SBZV" in profiles: - diva_profiles = profiles["SBZV"] - if "SBZV" in scores: - diva_scores = scores["SBZV"] - - self.logger.info(f"Importing {len(diva_profiles)} Diva profiles") - - for pf in diva_profiles: - pf["data"]["user"] = pf["user"] - pf["data"]["version"] = pf["version"] - pf_data = pf["data"] - - if "mdl_eqp_ary" in pf["data"]: - sql = Insert(shop).values( - user = user, - version = version, - mdl_eqp_ary = pf["data"]["mdl_eqp_ary"], - ) - conflict = sql.on_duplicate_key_update( - user = user, - version = version, - mdl_eqp_ary = pf["data"]["mdl_eqp_ary"] - ) - self.base.execute(conflict) - pf["data"].pop("mdl_eqp_ary") - - sql = Insert(profile).values(**pf_data) - conflict = sql.on_duplicate_key_update(**pf_data) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert diva profile for {pf['user']}") - - self.logger.info(f"Importing {len(diva_scores)} Diva scores") - - for sc in diva_scores: - user = sc["user"] - - clr_kind = -1 - for x in sc["data"]["stg_clr_kind"].split(","): - if x != "-1": - clr_kind = x - - cool_ct = 0 - for x in sc["data"]["stg_cool_cnt"].split(","): - if x != "0": - cool_ct = x - - fine_ct = 0 - for x in sc["data"]["stg_fine_cnt"].split(","): - if x != "0": - fine_ct = x - - safe_ct = 0 - for x in sc["data"]["stg_safe_cnt"].split(","): - if x != "0": - safe_ct = x - - sad_ct = 0 - for x in sc["data"]["stg_sad_cnt"].split(","): - if x != "0": - sad_ct = x - - worst_ct = 0 - for x in sc["data"]["stg_wt_wg_cnt"].split(","): - if x != "0": - worst_ct = x - - max_cmb = 0 - for x in sc["data"]["stg_max_cmb"].split(","): - if x != "0": - max_cmb = x - - sql = Insert(score).values( - user = user, - version = sc["version"], - pv_id = sc["song_id"], - difficulty = sc["chart_id"], - score = sc["score1"], - atn_pnt = sc["score2"], - clr_kind = clr_kind, - sort_kind = sc["data"]["sort_kind"], - cool = cool_ct, - fine = fine_ct, - safe = safe_ct, - sad = sad_ct, - worst = worst_ct, - max_combo = max_cmb, - ) - - conflict = sql.on_duplicate_key_update(user = user, - version = sc["version"], - pv_id = sc["song_id"], - difficulty = sc["chart_id"], - score = sc["score1"], - atn_pnt = sc["score2"], - clr_kind = clr_kind, - sort_kind = sc["data"]["sort_kind"], - cool = cool_ct, - fine = fine_ct, - safe = safe_ct, - sad = sad_ct, - worst = worst_ct, - max_combo = max_cmb - ) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert diva score for {pf['user']}") - - else: - self.logger.info(f"Diva not found, skipping...") - - # Ongeki - if os.path.exists("titles/ongeki/schema"): - from titles.ongeki.schema.item import card, deck, character, boss, story - from titles.ongeki.schema.item import chapter, item, music_item, login_bonus - from titles.ongeki.schema.item import event_point, mission_point, scenerio - from titles.ongeki.schema.item import trade_item, event_music, tech_event - from titles.ongeki.schema.profile import profile, option, activity, recent_rating - from titles.ongeki.schema.profile import rating_log, training_room, kop - from titles.ongeki.schema.score import score_best, tech_count, playlog - from titles.ongeki.schema.log import session_log - - item_types = { - "character": 20, - "story": 21, - "card": 22, - "deck": 23, - "login": 24, - "chapter": 25 - } - - ongeki_profiles = [] - ongeki_items = [] - ongeki_scores = [] - - if "SDDT" in profiles: - ongeki_profiles = profiles["SDDT"] - if "SDDT" in items: - ongeki_items = items["SDDT"] - if "SDDT" in scores: - ongeki_scores = scores["SDDT"] - - self.logger.info(f"Importing {len(ongeki_profiles)} ongeki profiles") - - for pf in ongeki_profiles: - user = pf["user"] - version = pf["version"] - pf_data = pf["data"] - - pf_data["userData"]["user"] = user - pf_data["userData"]["version"] = version - pf_data["userData"].pop("accessCode") - - sql = Insert(profile).values(**pf_data["userData"]) - conflict = sql.on_duplicate_key_update(**pf_data["userData"]) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki profile data for user {pf['user']}") - continue - - pf_data["userOption"]["user"] = user - - sql = Insert(option).values(**pf_data["userOption"]) - conflict = sql.on_duplicate_key_update(**pf_data["userOption"]) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki profile options for user {pf['user']}") - continue - - for pf_list in pf_data["userActivityList"]: - pf_list["user"] = user - - sql = Insert(activity).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki profile activity for user {pf['user']}") - continue - - sql = Insert(recent_rating).values( - user = user, - recentRating = pf_data["userRecentRatingList"] - ) - - conflict = sql.on_duplicate_key_update( - user = user, - recentRating = pf_data["userRecentRatingList"] - ) - result = self.base.execute(conflict) - - if result is None: - self.logger.error(f"Failed to insert ongeki profile recent rating for user {pf['user']}") - continue - - for pf_list in pf_data["userRatinglogList"]: - pf_list["user"] = user - - sql = Insert(rating_log).values(**pf_list) - - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki profile rating log for user {pf['user']}") - continue - - for pf_list in pf_data["userTrainingRoomList"]: - pf_list["user"] = user - - sql = Insert(training_room).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki profile training room for user {pf['user']}") - continue - - if "userKopList" in pf_data: - for pf_list in pf_data["userKopList"]: - pf_list["user"] = user - - sql = Insert(kop).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki profile training room for user {pf['user']}") - continue - - for pf_list in pf_data["userBossList"]: - pf_list["user"] = user - - sql = Insert(boss).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item boss for user {pf['user']}") - continue - - for pf_list in pf_data["userDeckList"]: - pf_list["user"] = user - - sql = Insert(deck).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item deck for user {pf['user']}") - continue - - for pf_list in pf_data["userStoryList"]: - pf_list["user"] = user - - sql = Insert(story).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item story for user {pf['user']}") - continue - - for pf_list in pf_data["userChapterList"]: - pf_list["user"] = user - - sql = Insert(chapter).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item chapter for user {pf['user']}") - continue - - for pf_list in pf_data["userPlaylogList"]: - pf_list["user"] = user - - sql = Insert(playlog).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki score playlog for user {pf['user']}") - continue - - for pf_list in pf_data["userMusicItemList"]: - pf_list["user"] = user - - sql = Insert(music_item).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item music item for user {pf['user']}") - continue - - for pf_list in pf_data["userTechCountList"]: - pf_list["user"] = user - - sql = Insert(tech_count).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item tech count for user {pf['user']}") - continue - - if "userTechEventList" in pf_data: - for pf_list in pf_data["userTechEventList"]: - pf_list["user"] = user - - sql = Insert(tech_event).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item tech event for user {pf['user']}") - continue - - for pf_list in pf_data["userTradeItemList"]: - pf_list["user"] = user - - sql = Insert(trade_item).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item trade item for user {pf['user']}") - continue - - if "userEventMusicList" in pf_data: - for pf_list in pf_data["userEventMusicList"]: - pf_list["user"] = user - - sql = Insert(event_music).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item event music for user {pf['user']}") - continue - - if "userEventPointList" in pf_data: - for pf_list in pf_data["userEventPointList"]: - pf_list["user"] = user - - sql = Insert(event_point).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item event point for user {pf['user']}") - continue - - for pf_list in pf_data["userLoginBonusList"]: - pf_list["user"] = user - - sql = Insert(login_bonus).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item login bonus for user {pf['user']}") - continue - - for pf_list in pf_data["userMissionPointList"]: - pf_list["user"] = user - - sql = Insert(mission_point).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item mission point for user {pf['user']}") - continue - - for pf_list in pf_data["userScenarioList"]: - pf_list["user"] = user - - sql = Insert(scenerio).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item scenerio for user {pf['user']}") - continue - - if "userSessionlogList" in pf_data: - for pf_list in pf_data["userSessionlogList"]: - pf_list["user"] = user - - sql = Insert(session_log).values(**pf_list) - conflict = sql.on_duplicate_key_update(**pf_list) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki log session for user {pf['user']}") - continue - - self.logger.info(f"Importing {len(ongeki_items)} ongeki items") - - for it in ongeki_items: - user = it["user"] - it_type = it["type"] - it_id = it["item_id"] - it_data = it["data"] - it_data["user"] = user - - if it_type == item_types["character"] and "characterId" in it_data: - sql = Insert(character).values(**it_data) - - elif it_type == item_types["story"]: - sql = Insert(story).values(**it_data) - - elif it_type == item_types["card"]: - sql = Insert(card).values(**it_data) - - elif it_type == item_types["deck"]: - sql = Insert(deck).values(**it_data) - - elif it_type == item_types["login"]: # login bonus - sql = Insert(login_bonus).values(**it_data) - - elif it_type == item_types["chapter"]: - sql = Insert(chapter).values(**it_data) - - else: - sql = Insert(item).values(**it_data) - - conflict = sql.on_duplicate_key_update(**it_data) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki item {it_id} kind {it_type} for user {user}") - - self.logger.info(f"Importing {len(ongeki_scores)} ongeki scores") - - for sc in ongeki_scores: - user = sc["user"] - sc_data = sc["data"] - sc_data["user"] = user - - sql = Insert(score_best).values(**sc_data) - conflict = sql.on_duplicate_key_update(**sc_data) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert ongeki score for user {user}: {sc['song_id']}/{sc['chart_id']}") - - else: - self.logger.info(f"Ongeki not found, skipping...") - - # Wacca - if os.path.exists("titles/wacca/schema"): - from titles.wacca.schema.profile import profile, option, bingo, gate, favorite - from titles.wacca.schema.item import item, ticket, song_unlock, trophy - from titles.wacca.schema.score import best_score, stageup - from titles.wacca.reverse import WaccaReverse - from titles.wacca.const import WaccaConstants - - default_opts = WaccaReverse.OPTIONS_DEFAULTS - opts = WaccaConstants.OPTIONS - item_types = WaccaConstants.ITEM_TYPES - - wacca_profiles = [] - wacca_items = [] - wacca_scores = [] - wacca_achievements = [] - - if "SDFE" in profiles: - wacca_profiles = profiles["SDFE"] - if "SDFE" in items: - wacca_items = items["SDFE"] - if "SDFE" in scores: - wacca_scores = scores["SDFE"] - if "SDFE" in achievements: - wacca_achievements = achievements["SDFE"] - - self.logger.info(f"Importing {len(wacca_profiles)} wacca profiles") - - for pf in wacca_profiles: - if pf["version"] == 0 or pf["version"] == 1: - season = 1 - elif pf["version"] == 2 or pf["version"] == 3: - season = 2 - elif pf["version"] >= 4: - season = 3 - - if type(pf["data"]) is not dict: - pf["data"] = json.loads(pf["data"]) - - try: - sql = Insert(profile).values( - id = pf["id"], - user = pf["user"], - version = pf["version"], - season = season, - username = pf["data"]["profile"]["username"] if "username" in pf["data"]["profile"] else pf["name"], - xp = pf["data"]["profile"]["xp"], - xp_season = pf["data"]["profile"]["xp"], - wp = pf["data"]["profile"]["wp"], - wp_season = pf["data"]["profile"]["wp"], - wp_total = pf["data"]["profile"]["total_wp_gained"], - dan_type = pf["data"]["profile"]["dan_type"], - dan_level = pf["data"]["profile"]["dan_level"], - title_0 = pf["data"]["profile"]["title_part_ids"][0], - title_1 = pf["data"]["profile"]["title_part_ids"][1], - title_2 = pf["data"]["profile"]["title_part_ids"][2], - rating = pf["data"]["profile"]["rating"], - vip_expire_time = datetime.fromtimestamp(pf["data"]["profile"]["vip_expire_time"]) if "vip_expire_time" in pf["data"]["profile"] else None, - login_count = pf["use_count"], - playcount_single = pf["use_count"], - playcount_single_season = pf["use_count"], - last_game_ver = pf["data"]["profile"]["last_game_ver"], - last_song_id = pf["data"]["profile"]["last_song_info"][0] if "last_song_info" in pf["data"]["profile"] else 0, - last_song_difficulty = pf["data"]["profile"]["last_song_info"][1] if "last_song_info" in pf["data"]["profile"] else 0, - last_folder_order = pf["data"]["profile"]["last_song_info"][2] if "last_song_info" in pf["data"]["profile"] else 0, - last_folder_id = pf["data"]["profile"]["last_song_info"][3] if "last_song_info" in pf["data"]["profile"] else 0, - last_song_order = pf["data"]["profile"]["last_song_info"][4] if "last_song_info" in pf["data"]["profile"] else 0, - last_login_date = datetime.fromtimestamp(pf["data"]["profile"]["last_login_timestamp"]), - ) - - conflict = sql.on_duplicate_key_update( - id = pf["id"], - user = pf["user"], - version = pf["version"], - season = season, - username = pf["data"]["profile"]["username"] if "username" in pf["data"]["profile"] else pf["name"], - xp = pf["data"]["profile"]["xp"], - xp_season = pf["data"]["profile"]["xp"], - wp = pf["data"]["profile"]["wp"], - wp_season = pf["data"]["profile"]["wp"], - wp_total = pf["data"]["profile"]["total_wp_gained"], - dan_type = pf["data"]["profile"]["dan_type"], - dan_level = pf["data"]["profile"]["dan_level"], - title_0 = pf["data"]["profile"]["title_part_ids"][0], - title_1 = pf["data"]["profile"]["title_part_ids"][1], - title_2 = pf["data"]["profile"]["title_part_ids"][2], - rating = pf["data"]["profile"]["rating"], - vip_expire_time = datetime.fromtimestamp(pf["data"]["profile"]["vip_expire_time"]) if "vip_expire_time" in pf["data"]["profile"] else None, - login_count = pf["use_count"], - playcount_single = pf["use_count"], - playcount_single_season = pf["use_count"], - last_game_ver = pf["data"]["profile"]["last_game_ver"], - last_song_id = pf["data"]["profile"]["last_song_info"][0] if "last_song_info" in pf["data"]["profile"] else 0, - last_song_difficulty = pf["data"]["profile"]["last_song_info"][1] if "last_song_info" in pf["data"]["profile"] else 0, - last_folder_order = pf["data"]["profile"]["last_song_info"][2] if "last_song_info" in pf["data"]["profile"] else 0, - last_folder_id = pf["data"]["profile"]["last_song_info"][3] if "last_song_info" in pf["data"]["profile"] else 0, - last_song_order = pf["data"]["profile"]["last_song_info"][4] if "last_song_info" in pf["data"]["profile"] else 0, - last_login_date = datetime.fromtimestamp(pf["data"]["profile"]["last_login_timestamp"]), - ) - - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert wacca profile for user {pf['user']}") - continue - - for opt, val in pf["data"]["option"].items(): - if val != default_opts[opt]: - opt_id = opts[opt] - sql = Insert(option).values( - user = pf["user"], - opt_id = opt_id, - value = val, - ) - - conflict = sql.on_duplicate_key_update( - user = pf["user"], - opt_id = opt_id, - value = val, - ) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert wacca option for user {pf['user']} {opt} -> {val}") - - except KeyError as e: - self.logger.warn(f"Outdated wacca profile, skipping: {e}") - - if "gate" in pf["data"]: - for profile_gate in pf["data"]["gate"]: - sql = Insert(gate).values( - user = pf["user"], - gate_id = profile_gate["id"], - page = profile_gate["page"], - loops = profile_gate["loops"], - progress = profile_gate["progress"], - last_used = datetime.fromtimestamp(profile_gate["last_used"]), - mission_flag = profile_gate["mission_flag"], - total_points = profile_gate["total_points"], - ) - - conflict = sql.on_duplicate_key_update( - user = pf["user"], - gate_id = profile_gate["id"], - page = profile_gate["page"], - loops = profile_gate["loops"], - progress = profile_gate["progress"], - last_used = datetime.fromtimestamp(profile_gate["last_used"]), - mission_flag = profile_gate["mission_flag"], - total_points = profile_gate["total_points"], - ) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert wacca gate for user {pf['user']} -> {profile_gate['id']}") - continue - - if "favorite" in pf["data"]: - for profile_favorite in pf["data"]["favorite"]: - sql = Insert(favorite).values( - user = pf["user"], - song_id = profile_favorite - ) - - conflict = sql.on_duplicate_key_update( - user = pf["user"], - song_id = profile_favorite - ) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert wacca favorite songs for user {pf['user']} -> {profile_favorite}") - continue - - for it in wacca_items: - user = it["user"] - item_type = it["type"] - item_id = it["item_id"] - - if type(it["data"]) is not dict: - it["data"] = json.loads(it["data"]) - - if item_type == item_types["ticket"]: - if "quantity" in it["data"]: - for x in range(it["data"]["quantity"]): - sql = Insert(ticket).values( - user = user, - ticket_id = item_id, - ) - - conflict = sql.on_duplicate_key_update( - user = user, - ticket_id = item_id, - ) - result = self.base.execute(conflict) - if result is None: - self.logger.warn(f"Wacca: Failed to insert ticket {item_id} for user {user}") - - elif item_type == item_types["music_unlock"] or item_type == item_types["music_difficulty_unlock"]: - diff = 0 - if "difficulty" in it["data"]: - for x in it["data"]["difficulty"]: - if x == 1: - diff += 1 - else: - break - - sql = Insert(song_unlock).values( - user = user, - song_id = item_id, - highest_difficulty = diff, - ) - - conflict = sql.on_duplicate_key_update( - user = user, - song_id = item_id, - highest_difficulty = diff, - ) - result = self.base.execute(conflict) - if result is None: - self.logger.warn(f"Wacca: Failed to insert song unlock {item_id} {diff} for user {user}") - - elif item_type == item_types["trophy"]: - season = int(item_id / 100000) - sql = Insert(trophy).values( - user = user, - trophy_id = item_id, - season = season, - progress = 0 if "progress" not in it["data"] else it["data"]["progress"], - badge_type = 0 # ??? - ) - - conflict = sql.on_duplicate_key_update( - user = user, - trophy_id = item_id, - season = season, - progress = 0 if "progress" not in it["data"] else it["data"]["progress"], - badge_type = 0 # ??? - ) - result = self.base.execute(conflict) - if result is None: - self.logger.warn(f"Wacca: Failed to insert trophy {item_id} for user {user}") - - else: - sql = Insert(item).values( - user = user, - item_id = item_id, - type = item_type, - acquire_date = datetime.fromtimestamp(it["data"]["obtainedDate"]) if "obtainedDate" in it["data"] else datetime.now(), - use_count = it["data"]["uses"] if "uses" in it["data"] else 0, - use_count_season = it["data"]["season_uses"] if "season_uses" in it["data"] else 0 - ) - - conflict = sql.on_duplicate_key_update( - user = user, - item_id = item_id, - type = item_type, - acquire_date = datetime.fromtimestamp(it["data"]["obtainedDate"]) if "obtainedDate" in it["data"] else datetime.now(), - use_count = it["data"]["uses"] if "uses" in it["data"] else 0, - use_count_season = it["data"]["season_uses"] if "season_uses" in it["data"] else 0 - ) - result = self.base.execute(conflict) - if result is None: - self.logger.warn(f"Wacca: Failed to insert trophy {item_id} for user {user}") - - for sc in wacca_scores: - if type(sc["data"]) is not dict: - sc["data"] = json.loads(sc["data"]) - - sql = Insert(best_score).values( - user = sc["user"], - song_id = int(sc["song_id"]), - chart_id = sc["chart_id"], - score = sc["score1"], - play_ct = 1 if "play_count" not in sc["data"] else sc["data"]["play_count"], - clear_ct = 1 if sc["cleared"] & 0x01 else 0, - missless_ct = 1 if sc["cleared"] & 0x02 else 0, - fullcombo_ct = 1 if sc["cleared"] & 0x04 else 0, - allmarv_ct = 1 if sc["cleared"] & 0x08 else 0, - grade_d_ct = 1 if sc["grade"] & 0x01 else 0, - grade_c_ct = 1 if sc["grade"] & 0x02 else 0, - grade_b_ct = 1 if sc["grade"] & 0x04 else 0, - grade_a_ct = 1 if sc["grade"] & 0x08 else 0, - grade_aa_ct = 1 if sc["grade"] & 0x10 else 0, - grade_aaa_ct = 1 if sc["grade"] & 0x20 else 0, - grade_s_ct = 1 if sc["grade"] & 0x40 else 0, - grade_ss_ct = 1 if sc["grade"] & 0x80 else 0, - grade_sss_ct = 1 if sc["grade"] & 0x100 else 0, - grade_master_ct = 1 if sc["grade"] & 0x200 else 0, - grade_sp_ct = 1 if sc["grade"] & 0x400 else 0, - grade_ssp_ct = 1 if sc["grade"] & 0x800 else 0, - grade_sssp_ct = 1 if sc["grade"] & 0x1000 else 0, - best_combo = 0 if "max_combo" not in sc["data"] else sc["data"]["max_combo"], - lowest_miss_ct = 0 if "lowest_miss_count" not in sc["data"] else sc["data"]["lowest_miss_count"], - rating = 0 if "rating" not in sc["data"] else sc["data"]["rating"], - ) - - conflict = sql.on_duplicate_key_update( - user = sc["user"], - song_id = int(sc["song_id"]), - chart_id = sc["chart_id"], - score = sc["score1"], - play_ct = 1 if "play_count" not in sc["data"] else sc["data"]["play_count"], - clear_ct = 1 if sc["cleared"] & 0x01 else 0, - missless_ct = 1 if sc["cleared"] & 0x02 else 0, - fullcombo_ct = 1 if sc["cleared"] & 0x04 else 0, - allmarv_ct = 1 if sc["cleared"] & 0x08 else 0, - grade_d_ct = 1 if sc["grade"] & 0x01 else 0, - grade_c_ct = 1 if sc["grade"] & 0x02 else 0, - grade_b_ct = 1 if sc["grade"] & 0x04 else 0, - grade_a_ct = 1 if sc["grade"] & 0x08 else 0, - grade_aa_ct = 1 if sc["grade"] & 0x10 else 0, - grade_aaa_ct = 1 if sc["grade"] & 0x20 else 0, - grade_s_ct = 1 if sc["grade"] & 0x40 else 0, - grade_ss_ct = 1 if sc["grade"] & 0x80 else 0, - grade_sss_ct = 1 if sc["grade"] & 0x100 else 0, - grade_master_ct = 1 if sc["grade"] & 0x200 else 0, - grade_sp_ct = 1 if sc["grade"] & 0x400 else 0, - grade_ssp_ct = 1 if sc["grade"] & 0x800 else 0, - grade_sssp_ct = 1 if sc["grade"] & 0x1000 else 0, - best_combo = 0 if "max_combo" not in sc["data"] else sc["data"]["max_combo"], - lowest_miss_ct = 0 if "lowest_miss_count" not in sc["data"] else sc["data"]["lowest_miss_count"], - rating = 0 if "rating" not in sc["data"] else sc["data"]["rating"], - ) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert wacca score for user {sc['user']} {int(sc['song_id'])} {sc['chart_id']}") - - for ach in wacca_achievements: - if ach["version"] == 0 or ach["version"] == 1: - season = 1 - elif ach["version"] == 2 or ach["version"] == 3: - season = 2 - elif ach["version"] >= 4: - season = 3 - - if type(ach["data"]) is not dict: - ach["data"] = json.loads(ach["data"]) - - sql = Insert(stageup).values( - user = ach["user"], - season = season, - stage_id = ach["achievement_id"], - clear_status = 0 if "clear" not in ach["data"] else ach["data"]["clear"], - clear_song_ct = 0 if "clear_song_ct" not in ach["data"] else ach["data"]["clear_song_ct"], - song1_score = 0 if "score1" not in ach["data"] else ach["data"]["score1"], - song2_score = 0 if "score2" not in ach["data"] else ach["data"]["score2"], - song3_score = 0 if "score3" not in ach["data"] else ach["data"]["score3"], - play_ct = 1 if "attemps" not in ach["data"] else ach["data"]["attemps"], - ) - - conflict = sql.on_duplicate_key_update( - user = ach["user"], - season = season, - stage_id = ach["achievement_id"], - clear_status = 0 if "clear" not in ach["data"] else ach["data"]["clear"], - clear_song_ct = 0 if "clear_song_ct" not in ach["data"] else ach["data"]["clear_song_ct"], - song1_score = 0 if "score1" not in ach["data"] else ach["data"]["score1"], - song2_score = 0 if "score2" not in ach["data"] else ach["data"]["score2"], - song3_score = 0 if "score3" not in ach["data"] else ach["data"]["score3"], - play_ct = 1 if "attemps" not in ach["data"] else ach["data"]["attemps"], - ) - result = self.base.execute(conflict) - if result is None: - self.logger.error(f"Failed to insert wacca achievement for user {ach['user']}") - - else: - self.logger.info(f"Wacca not found, skipping...") From 102bf3b5a49d0e8539f7c7cc07bb2667a3d5fd02 Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Fri, 3 Mar 2023 17:04:26 -0500 Subject: [PATCH 11/12] database: remove functions that no longer exist --- dbutils.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/dbutils.py b/dbutils.py index d52128e..4500a13 100644 --- a/dbutils.py +++ b/dbutils.py @@ -32,16 +32,5 @@ if __name__=='__main__': else: data.migrate_database(args.game, int(args.version), args.action) - - elif args.action == "migrate": - data.logger.info("Migrating from old schema to new schema") - data.restore_from_old_schema() - - elif args.action == "dump": - data.logger.info("Dumping old schema to migrate to new schema") - data.dump_db() - - elif args.action == "generate": - pass data.logger.info("Done") From 02e1838d95c2498f438835aa106ad64dcc167232 Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Fri, 3 Mar 2023 17:05:16 -0500 Subject: [PATCH 12/12] database: add format_serial, validate_keychip_format, set_machine_boardid, set_machine_serial --- core/data/schema/arcade.py | 48 +++++++++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/core/data/schema/arcade.py b/core/data/schema/arcade.py index 921e5b5..af4069d 100644 --- a/core/data/schema/arcade.py +++ b/core/data/schema/arcade.py @@ -4,8 +4,10 @@ from sqlalchemy.sql.schema import ForeignKey, PrimaryKeyConstraint from sqlalchemy.types import Integer, String, Boolean from sqlalchemy.sql import func, select from sqlalchemy.dialects.mysql import insert +import re from core.data.schema.base import BaseData, metadata +from core.const import * arcade = Table( "arcade", @@ -72,20 +74,28 @@ class ArcadeData(BaseData): if result is None: return None return result.fetchone() - def put_machine(self, arcade_id: int, serial: str = None, board: str = None, game: str = None, is_cab: bool = False) -> Optional[int]: + def put_machine(self, arcade_id: int, serial: str = "", board: str = None, game: str = None, is_cab: bool = False) -> Optional[int]: if arcade_id: self.logger.error(f"{__name__ }: Need arcade id!") return None - if serial is None: - pass - sql = machine.insert().values(arcade = arcade_id, keychip = serial, board = board, game = game, is_cab = is_cab) result = self.execute(sql) if result is None: return None return result.lastrowid + def set_machine_serial(self, machine_id: int, serial: str) -> None: + result = self.execute(machine.update(machine.c.id == machine_id).values(keychip = serial)) + if result is None: + self.logger.error(f"Failed to update serial for machine {machine_id} -> {serial}") + return result.lastrowid + + def set_machine_boardid(self, machine_id: int, boardid: str) -> None: + result = self.execute(machine.update(machine.c.id == machine_id).values(board = boardid)) + if result is None: + self.logger.error(f"Failed to update board id for machine {machine_id} -> {boardid}") + def get_arcade(self, id: int) -> Optional[Dict]: sql = arcade.select(arcade.c.id == id) result = self.execute(sql) @@ -120,5 +130,31 @@ class ArcadeData(BaseData): if result is None: return None return result.lastrowid - def generate_keychip_serial(self, platform_id: int) -> str: - pass + def format_serial(self, platform_code: str, platform_rev: int, serial_num: int, append: int = 4152) -> str: + return f"{platform_code}{platform_rev:02d}A{serial_num:04d}{append:04d}" # 0x41 = A, 0x52 = R + + def validate_keychip_format(self, serial: str) -> bool: + serial = serial.replace("-", "") + if len(serial) != 11 or len(serial) != 15: + self.logger.error(f"Serial validate failed: Incorrect length for {serial} (len {len(serial)})") + return False + + platform_code = serial[:4] + platform_rev = serial[4:6] + const_a = serial[6] + num = serial[7:11] + append = serial[11:15] + + if re.match("A[7|6]\d[E|X][0|1][0|1|2]A\d{4,8}", serial) is None: + self.logger.error(f"Serial validate failed: {serial} failed regex") + return False + + if len(append) != 0 or len(append) != 4: + self.logger.error(f"Serial validate failed: {serial} had malformed append {append}") + return False + + if len(num) != 4: + self.logger.error(f"Serial validate failed: {serial} had malformed number {num}") + return False + + return True