From c2d4abcc26a8f1002a50aec915f53e3161aa1af8 Mon Sep 17 00:00:00 2001 From: daydensteve Date: Sun, 3 Nov 2024 16:37:05 -0500 Subject: [PATCH] db and import updates for userbox, avatar, voice, and map icon --- .../41f77ef50588_chuni_ui_overhaul.py | 122 +++++++ titles/chuni/read.py | 225 +++++++++++- titles/chuni/schema/profile.py | 52 +++ titles/chuni/schema/static.py | 330 ++++++++++++++++++ 4 files changed, 715 insertions(+), 14 deletions(-) create mode 100644 core/data/alembic/versions/41f77ef50588_chuni_ui_overhaul.py diff --git a/core/data/alembic/versions/41f77ef50588_chuni_ui_overhaul.py b/core/data/alembic/versions/41f77ef50588_chuni_ui_overhaul.py new file mode 100644 index 0000000..9e88a5f --- /dev/null +++ b/core/data/alembic/versions/41f77ef50588_chuni_ui_overhaul.py @@ -0,0 +1,122 @@ +"""chuni_ui_overhaul + +Revision ID: 41f77ef50588 +Revises: d8cd1fa04c2a +Create Date: 2024-11-02 13:27:45.839787 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + +# revision identifiers, used by Alembic. +revision = '41f77ef50588' +down_revision = 'd8cd1fa04c2a' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('chuni_static_avatar', sa.Column('sortName', mysql.VARCHAR(length=255), nullable=True)) + op.add_column('chuni_static_avatar', sa.Column('isEnabled', mysql.TINYINT(display_width=1), server_default=sa.text('1'), autoincrement=False, nullable=True)) + op.add_column('chuni_static_avatar', sa.Column('defaultHave', mysql.TINYINT(display_width=1), server_default=sa.text('0'), autoincrement=False, nullable=True)) + + op.create_table('chuni_static_character', + sa.Column('id', mysql.INTEGER(display_width=11), autoincrement=True, nullable=False), + sa.Column('version', mysql.INTEGER(display_width=11), autoincrement=False, nullable=False), + sa.Column('characterId', mysql.INTEGER(display_width=11), autoincrement=False, nullable=True), + sa.Column('name', mysql.VARCHAR(length=255), nullable=True), + sa.Column('sortName', mysql.VARCHAR(length=255), nullable=True), + sa.Column('worksName', mysql.VARCHAR(length=255), nullable=True), + sa.Column('rareType', mysql.INTEGER(display_width=11), server_default=sa.text('0'), autoincrement=False, nullable=True), + sa.Column('imagePath1', mysql.VARCHAR(length=255), nullable=True), + sa.Column('imagePath2', mysql.VARCHAR(length=255), nullable=True), + sa.Column('imagePath3', mysql.VARCHAR(length=255), nullable=True), + sa.Column('isEnabled', mysql.TINYINT(display_width=1), server_default=sa.text('1'), autoincrement=False, nullable=True), + sa.Column('defaultHave', mysql.TINYINT(display_width=1), server_default=sa.text('0'), autoincrement=False, nullable=True), + sa.PrimaryKeyConstraint('id'), + mysql_collate='utf8mb4_general_ci', + mysql_default_charset='utf8mb4', + mysql_engine='InnoDB' + ) + op.create_index('chuni_static_character_uk', 'chuni_static_character', ['version', 'characterId'], unique=True) + op.create_table('chuni_static_map_icon', + sa.Column('id', mysql.INTEGER(display_width=11), autoincrement=True, nullable=False), + sa.Column('version', mysql.INTEGER(display_width=11), autoincrement=False, nullable=False), + sa.Column('mapIconId', mysql.INTEGER(display_width=11), autoincrement=False, nullable=True), + sa.Column('name', mysql.VARCHAR(length=255), nullable=True), + sa.Column('sortName', mysql.VARCHAR(length=255), nullable=True), + sa.Column('iconPath', mysql.VARCHAR(length=255), nullable=True), + sa.Column('isEnabled', mysql.TINYINT(display_width=1), server_default=sa.text('1'), autoincrement=False, nullable=True), + sa.Column('defaultHave', mysql.TINYINT(display_width=1), server_default=sa.text('0'), autoincrement=False, nullable=True), + sa.PrimaryKeyConstraint('id'), + mysql_collate='utf8mb4_general_ci', + mysql_default_charset='utf8mb4', + mysql_engine='InnoDB' + ) + op.create_index('chuni_static_mapicon_uk', 'chuni_static_map_icon', ['version', 'mapIconId'], unique=True) + op.create_table('chuni_static_nameplate', + sa.Column('id', mysql.INTEGER(display_width=11), autoincrement=True, nullable=False), + sa.Column('version', mysql.INTEGER(display_width=11), autoincrement=False, nullable=False), + sa.Column('nameplateId', mysql.INTEGER(display_width=11), autoincrement=False, nullable=True), + sa.Column('name', mysql.VARCHAR(length=255), nullable=True), + sa.Column('texturePath', mysql.VARCHAR(length=255), nullable=True), + sa.Column('isEnabled', mysql.TINYINT(display_width=1), server_default=sa.text('1'), autoincrement=False, nullable=True), + sa.Column('defaultHave', mysql.TINYINT(display_width=1), server_default=sa.text('0'), autoincrement=False, nullable=True), + sa.Column('sortName', mysql.VARCHAR(length=255), nullable=True), + sa.PrimaryKeyConstraint('id'), + mysql_collate='utf8mb4_general_ci', + mysql_default_charset='utf8mb4', + mysql_engine='InnoDB' + ) + op.create_index('chuni_static_nameplate_uk', 'chuni_static_nameplate', ['version', 'nameplateId'], unique=True) + op.create_table('chuni_static_trophy', + sa.Column('id', mysql.INTEGER(display_width=11), autoincrement=True, nullable=False), + sa.Column('version', mysql.INTEGER(display_width=11), autoincrement=False, nullable=False), + sa.Column('trophyId', mysql.INTEGER(display_width=11), autoincrement=False, nullable=True), + sa.Column('name', mysql.VARCHAR(length=255), nullable=True), + sa.Column('rareType', mysql.TINYINT(display_width=11), server_default=sa.text('0'), autoincrement=False, nullable=True), + sa.Column('isEnabled', mysql.TINYINT(display_width=1), server_default=sa.text('1'), autoincrement=False, nullable=True), + sa.Column('defaultHave', mysql.TINYINT(display_width=1), server_default=sa.text('0'), autoincrement=False, nullable=True), + sa.PrimaryKeyConstraint('id'), + mysql_collate='utf8mb4_general_ci', + mysql_default_charset='utf8mb4', + mysql_engine='InnoDB' + ) + op.create_index('chuni_static_trophy_uk', 'chuni_static_trophy', ['version', 'trophyId'], unique=True) + op.create_table('chuni_static_system_voice', + sa.Column('id', mysql.INTEGER(display_width=11), autoincrement=True, nullable=False), + sa.Column('version', mysql.INTEGER(display_width=11), autoincrement=False, nullable=False), + sa.Column('voiceId', mysql.INTEGER(display_width=11), autoincrement=False, nullable=True), + sa.Column('name', mysql.VARCHAR(length=255), nullable=True), + sa.Column('sortName', mysql.VARCHAR(length=255), nullable=True), + sa.Column('imagePath', mysql.VARCHAR(length=255), nullable=True), + sa.Column('isEnabled', mysql.TINYINT(display_width=1), server_default=sa.text('1'), autoincrement=False, nullable=True), + sa.Column('defaultHave', mysql.TINYINT(display_width=1), server_default=sa.text('0'), autoincrement=False, nullable=True), + sa.PrimaryKeyConstraint('id'), + mysql_collate='utf8mb4_general_ci', + mysql_default_charset='utf8mb4', + mysql_engine='InnoDB' + ) + op.create_index('chuni_static_systemvoice_uk', 'chuni_static_system_voice', ['version', 'voiceId'], unique=True) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('chuni_static_systemvoice_uk', table_name='chuni_static_system_voice') + op.drop_table('chuni_static_system_voice') + op.drop_index('chuni_static_trophy_uk', table_name='chuni_static_trophy') + op.drop_table('chuni_static_trophy') + op.drop_index('chuni_static_nameplate_uk', table_name='chuni_static_nameplate') + op.drop_table('chuni_static_nameplate') + op.drop_index('chuni_static_mapicon_uk', table_name='chuni_static_map_icon') + op.drop_table('chuni_static_map_icon') + op.drop_index('chuni_static_character_uk', table_name='chuni_static_character') + op.drop_table('chuni_static_character') + + op.drop_column('chuni_static_avatar', 'defaultHave') + op.drop_column('chuni_static_avatar', 'isEnabled') + op.drop_column('chuni_static_avatar', 'sortName') + # ### end Alembic commands ### \ No newline at end of file diff --git a/titles/chuni/read.py b/titles/chuni/read.py index eebdf8b..b25d97f 100644 --- a/titles/chuni/read.py +++ b/titles/chuni/read.py @@ -42,6 +42,12 @@ class ChuniReader(BaseReader): if self.version >= ChuniConstants.VER_CHUNITHM_NEW: we_diff = "5" + # character images could be stored anywhere across all the data dirs. Map them first + self.logger.info(f"Mapping DDS image files...") + dds_images = dict() + for dir in data_dirs: + self.map_dds_images(dds_images, f"{dir}/ddsImage") + for dir in data_dirs: self.logger.info(f"Read from {dir}") await self.read_events(f"{dir}/event") @@ -49,6 +55,11 @@ class ChuniReader(BaseReader): await self.read_charges(f"{dir}/chargeItem") await self.read_avatar(f"{dir}/avatarAccessory") await self.read_login_bonus(f"{dir}/") + await self.read_nameplate(f"{dir}/namePlate") + await self.read_trophy(f"{dir}/trophy") + await self.read_character(f"{dir}/chara", dds_images) + await self.read_map_icon(f"{dir}/mapIcon") + await self.read_system_voice(f"{dir}/systemVoice") async def read_login_bonus(self, root_dir: str) -> None: for root, dirs, files in walk(f"{root_dir}loginBonusPreset"): @@ -61,9 +72,8 @@ class ChuniReader(BaseReader): for name in xml_root.findall("name"): id = name.find("id").text name = name.find("str").text - is_enabled = ( - True if xml_root.find("disableFlag").text == "false" else False - ) + disableFlag = xml_root.find("disableFlag") # may not exist in older data + is_enabled = True if (disableFlag is None or disableFlag.text == "false") else False result = await self.data.static.put_login_bonus_preset( self.version, id, name, is_enabled @@ -175,16 +185,8 @@ class ChuniReader(BaseReader): for jaketFile in xml_root.findall("jaketFile"): # nice typo, SEGA jacket_path = jaketFile.find("path").text - # Convert the image to png and save it for use in the frontend - jacket_filename_src = f"{root}/{dir}/{jacket_path}" - (pre, ext) = path.splitext(jacket_path) - jacket_filename_dst = f"titles/chuni/img/jacket/{pre}.png" - if path.exists(jacket_filename_src) and not path.exists(jacket_filename_dst): - try: - im = Image.open(jacket_filename_src) - im.save(jacket_filename_dst) - except Exception: - self.logger.warning(f"Failed to convert {jacket_path} to png") + # Save off image for use in frontend + self.copy_image(jacket_path, f"{root}/{dir}", "titles/chuni/img/jacket/") for fumens in xml_root.findall("fumens"): for MusicFumenData in fumens.findall("MusicFumenData"): @@ -268,17 +270,212 @@ class ChuniReader(BaseReader): for name in xml_root.findall("name"): id = name.find("id").text name = name.find("str").text + sortName = xml_root.find("sortName").text category = xml_root.find("category").text + defaultHave = xml_root.find("defaultHave").text == 'true' + disableFlag = xml_root.find("disableFlag") # may not exist in older data + is_enabled = True if (disableFlag is None or disableFlag.text == "false") else False + for image in xml_root.findall("image"): iconPath = image.find("path").text + self.copy_image(iconPath, f"{root}/{dir}", "titles/chuni/img/avatar/") for texture in xml_root.findall("texture"): texturePath = texture.find("path").text + self.copy_image(texturePath, f"{root}/{dir}", "titles/chuni/img/avatar/") result = await self.data.static.put_avatar( - self.version, id, name, category, iconPath, texturePath + self.version, id, name, category, iconPath, texturePath, is_enabled, defaultHave, sortName ) if result is not None: self.logger.info(f"Inserted avatarAccessory {id}") else: self.logger.warning(f"Failed to insert avatarAccessory {id}") + + async def read_nameplate(self, nameplate_dir: str) -> None: + for root, dirs, files in walk(nameplate_dir): + for dir in dirs: + if path.exists(f"{root}/{dir}/NamePlate.xml"): + with open(f"{root}/{dir}/NamePlate.xml", "r", encoding='utf-8') as fp: + strdata = fp.read() + + xml_root = ET.fromstring(strdata) + for name in xml_root.findall("name"): + id = name.find("id").text + name = name.find("str").text + sortName = xml_root.find("sortName").text + defaultHave = xml_root.find("defaultHave").text == 'true' + disableFlag = xml_root.find("disableFlag") # may not exist in older data + is_enabled = True if (disableFlag is None or disableFlag.text == "false") else False + + for image in xml_root.findall("image"): + texturePath = image.find("path").text + self.copy_image(texturePath, f"{root}/{dir}", "titles/chuni/img/nameplate/") + + result = await self.data.static.put_nameplate( + self.version, id, name, texturePath, is_enabled, defaultHave, sortName + ) + + if result is not None: + self.logger.info(f"Inserted nameplate {id}") + else: + self.logger.warning(f"Failed to insert nameplate {id}") + + async def read_trophy(self, trophy_dir: str) -> None: + for root, dirs, files in walk(trophy_dir): + for dir in dirs: + if path.exists(f"{root}/{dir}/Trophy.xml"): + with open(f"{root}/{dir}/Trophy.xml", "r", encoding='utf-8') as fp: + strdata = fp.read() + + xml_root = ET.fromstring(strdata) + for name in xml_root.findall("name"): + id = name.find("id").text + name = name.find("str").text + rareType = xml_root.find("rareType").text + disableFlag = xml_root.find("disableFlag") # may not exist in older data + is_enabled = True if (disableFlag is None or disableFlag.text == "false") else False + defaultHave = xml_root.find("defaultHave").text == 'true' + + result = await self.data.static.put_trophy( + self.version, id, name, rareType, is_enabled, defaultHave + ) + + if result is not None: + self.logger.info(f"Inserted trophy {id}") + else: + self.logger.warning(f"Failed to insert trophy {id}") + + async def read_character(self, chara_dir: str, dds_images: dict) -> None: + for root, dirs, files in walk(chara_dir): + for dir in dirs: + if path.exists(f"{root}/{dir}/Chara.xml"): + with open(f"{root}/{dir}/Chara.xml", "r", encoding='utf-8') as fp: + strdata = fp.read() + + xml_root = ET.fromstring(strdata) + for name in xml_root.findall("name"): + id = name.find("id").text + name = name.find("str").text + sortName = xml_root.find("sortName").text + for work in xml_root.findall("works"): + worksName = work.find("str").text + rareType = xml_root.find("rareType").text + defaultHave = xml_root.find("defaultHave").text == 'true' + disableFlag = xml_root.find("disableFlag") # may not exist in older data + is_enabled = True if (disableFlag is None or disableFlag.text == "false") else False + + # character images are not stored alongside + for image in xml_root.findall("defaultImages"): + imageKey = image.find("str").text + if imageKey in dds_images.keys(): + (imageDir, imagePaths) = dds_images[imageKey] + imagePath1 = imagePaths[0] if len(imagePaths) > 0 else "" + imagePath2 = imagePaths[1] if len(imagePaths) > 1 else "" + imagePath3 = imagePaths[2] if len(imagePaths) > 2 else "" + # @note the third image is the image needed for the user box ui + if imagePath3: + self.copy_image(imagePath3, imageDir, "titles/chuni/img/character/") + else: + self.logger.warning(f"Character {id} only has {len(imagePaths)} images. Expected 3") + else: + self.logger.warning(f"Unable to location character {id} images") + + result = await self.data.static.put_character( + self.version, id, name, sortName, worksName, rareType, imagePath1, imagePath2, imagePath3, is_enabled, defaultHave + ) + + if result is not None: + self.logger.info(f"Inserted character {id}") + else: + self.logger.warning(f"Failed to insert character {id}") + + async def read_map_icon(self, mapicon_dir: str) -> None: + for root, dirs, files in walk(mapicon_dir): + for dir in dirs: + if path.exists(f"{root}/{dir}/MapIcon.xml"): + with open(f"{root}/{dir}/MapIcon.xml", "r", encoding='utf-8') as fp: + strdata = fp.read() + + xml_root = ET.fromstring(strdata) + for name in xml_root.findall("name"): + id = name.find("id").text + name = name.find("str").text + sortName = xml_root.find("sortName").text + for image in xml_root.findall("image"): + iconPath = image.find("path").text + self.copy_image(iconPath, f"{root}/{dir}", "titles/chuni/img/mapIcon/") + defaultHave = xml_root.find("defaultHave").text == 'true' + disableFlag = xml_root.find("disableFlag") # may not exist in older data + is_enabled = True if (disableFlag is None or disableFlag.text == "false") else False + + result = await self.data.static.put_map_icon( + self.version, id, name, sortName, iconPath, is_enabled, defaultHave + ) + + if result is not None: + self.logger.info(f"Inserted map icon {id}") + else: + self.logger.warning(f"Failed to map icon {id}") + + async def read_system_voice(self, voice_dir: str) -> None: + for root, dirs, files in walk(voice_dir): + for dir in dirs: + if path.exists(f"{root}/{dir}/SystemVoice.xml"): + with open(f"{root}/{dir}/SystemVoice.xml", "r", encoding='utf-8') as fp: + strdata = fp.read() + + xml_root = ET.fromstring(strdata) + for name in xml_root.findall("name"): + id = name.find("id").text + name = name.find("str").text + sortName = xml_root.find("sortName").text + for image in xml_root.findall("image"): + imagePath = image.find("path").text + self.copy_image(imagePath, f"{root}/{dir}", "titles/chuni/img/systemVoice/") + defaultHave = xml_root.find("defaultHave").text == 'true' + disableFlag = xml_root.find("disableFlag") # may not exist in older data + is_enabled = True if (disableFlag is None or disableFlag.text == "false") else False + + result = await self.data.static.put_system_voice( + self.version, id, name, sortName, imagePath, is_enabled, defaultHave + ) + + if result is not None: + self.logger.info(f"Inserted system voice {id}") + else: + self.logger.warning(f"Failed to system voice {id}") + + def copy_image(self, filename: str, src_dir: str, dst_dir: str) -> None: + # Convert the image to png so we can easily display it in the frontend + file_src = path.join(src_dir, filename) + (basename, ext) = path.splitext(filename) + file_dst = path.join(dst_dir, basename) + ".png" + + if path.exists(file_src) and not path.exists(file_dst): + try: + im = Image.open(file_src) + im.save(file_dst) + except Exception: + self.logger.warning(f"Failed to convert {filename} to png") + + def map_dds_images(self, image_dict: dict, dds_dir: str) -> None: + for root, dirs, files in walk(dds_dir): + for dir in dirs: + directory = f"{root}/{dir}" + if path.exists(f"{directory}/DDSImage.xml"): + with open(f"{directory}/DDSImage.xml", "r", encoding='utf-8') as fp: + strdata = fp.read() + + xml_root = ET.fromstring(strdata) + for name in xml_root.findall("name"): + name = name.find("str").text + + images = [] + i = 0 + while xml_root.findall(f"ddsFile{i}"): + for ddsFile in xml_root.findall(f"ddsFile{i}"): + images += [ddsFile.find("path").text] + i += 1 + + image_dict[name] = (directory, images) \ No newline at end of file diff --git a/titles/chuni/schema/profile.py b/titles/chuni/schema/profile.py index f0b8c0f..8d71ba6 100644 --- a/titles/chuni/schema/profile.py +++ b/titles/chuni/schema/profile.py @@ -439,6 +439,58 @@ class ChuniProfileData(BaseData): return False return True + async def update_map_icon(self, user_id: int, version: int, new_map_icon: int) -> bool: + sql = profile.update((profile.c.user == user_id) & (profile.c.version == version)).values( + mapIconId=new_map_icon + ) + result = await self.execute(sql) + + if result is None: + self.logger.warning(f"Failed to set user {user_id} map icon") + return False + return True + + async def update_system_voice(self, user_id: int, version: int, new_system_voice: int) -> bool: + sql = profile.update((profile.c.user == user_id) & (profile.c.version == version)).values( + voiceId=new_system_voice + ) + result = await self.execute(sql) + + if result is None: + self.logger.warning(f"Failed to set user {user_id} system voice") + return False + return True + + async def update_userbox(self, user_id: int, version: int, new_nameplate: int, new_trophy: int, new_character: int) -> bool: + sql = profile.update((profile.c.user == user_id) & (profile.c.version == version)).values( + nameplateId=new_nameplate, + trophyId=new_trophy, + charaIllustId=new_character + ) + result = await self.execute(sql) + + if result is None: + self.logger.warning(f"Failed to set user {user_id} userbox") + return False + return True + + async def update_avatar(self, user_id: int, version: int, new_wear: int, new_face: int, new_head: int, new_skin: int, new_item: int, new_front: int, new_back: int) -> bool: + sql = profile.update((profile.c.user == user_id) & (profile.c.version == version)).values( + avatarWear=new_wear, + avatarFace=new_face, + avatarHead=new_head, + avatarSkin=new_skin, + avatarItem=new_item, + avatarFront=new_front, + avatarBack=new_back + ) + result = await self.execute(sql) + + if result is None: + self.logger.warning(f"Failed to set user {user_id} avatar") + return False + return True + async def put_profile_data( self, aime_id: int, version: int, profile_data: Dict ) -> Optional[int]: diff --git a/titles/chuni/schema/static.py b/titles/chuni/schema/static.py index 5c96812..e3070ec 100644 --- a/titles/chuni/schema/static.py +++ b/titles/chuni/schema/static.py @@ -73,10 +73,91 @@ avatar = Table( Column("category", Integer), Column("iconPath", String(255)), Column("texturePath", String(255)), + Column("isEnabled", Boolean, server_default="1"), + Column("defaultHave", Boolean, server_default="0"), + Column("sortName", String(255)), UniqueConstraint("version", "avatarAccessoryId", name="chuni_static_avatar_uk"), mysql_charset="utf8mb4", ) +nameplate = Table( + "chuni_static_nameplate", + metadata, + Column("id", Integer, primary_key=True, nullable=False), + Column("version", Integer, nullable=False), + Column("nameplateId", Integer), + Column("name", String(255)), + Column("texturePath", String(255)), + Column("isEnabled", Boolean, server_default="1"), + Column("defaultHave", Boolean, server_default="0"), + Column("sortName", String(255)), + UniqueConstraint("version", "nameplateId", name="chuni_static_nameplate_uk"), + mysql_charset="utf8mb4", +) + +character = Table( + "chuni_static_character", + metadata, + Column("id", Integer, primary_key=True, nullable=False), + Column("version", Integer, nullable=False), + Column("characterId", Integer), + Column("name", String(255)), + Column("sortName", String(255)), + Column("worksName", String(255)), + Column("rareType", Integer), + Column("imagePath1", String(255)), + Column("imagePath2", String(255)), + Column("imagePath3", String(255)), + Column("isEnabled", Boolean, server_default="1"), + Column("defaultHave", Boolean, server_default="0"), + UniqueConstraint("version", "characterId", name="chuni_static_character_uk"), + mysql_charset="utf8mb4", +) + +trophy = Table( + "chuni_static_trophy", + metadata, + Column("id", Integer, primary_key=True, nullable=False), + Column("version", Integer, nullable=False), + Column("trophyId", Integer), + Column("name", String(255)), + Column("rareType", Integer), + Column("isEnabled", Boolean, server_default="1"), + Column("defaultHave", Boolean, server_default="0"), + UniqueConstraint("version", "trophyId", name="chuni_static_trophy_uk"), + mysql_charset="utf8mb4", +) + +map_icon = Table( + "chuni_static_map_icon", + metadata, + Column("id", Integer, primary_key=True, nullable=False), + Column("version", Integer, nullable=False), + Column("mapIconId", Integer), + Column("name", String(255)), + Column("sortName", String(255)), + Column("iconPath", String(255)), + Column("isEnabled", Boolean, server_default="1"), + Column("defaultHave", Boolean, server_default="0"), + UniqueConstraint("version", "mapIconId", name="chuni_static_mapicon_uk"), + mysql_charset="utf8mb4", +) + +system_voice = Table( + "chuni_static_system_voice", + metadata, + Column("id", Integer, primary_key=True, nullable=False), + Column("version", Integer, nullable=False), + Column("voiceId", Integer), + Column("name", String(255)), + Column("sortName", String(255)), + Column("imagePath", String(255)), + Column("isEnabled", Boolean, server_default="1"), + Column("defaultHave", Boolean, server_default="0"), + UniqueConstraint("version", "voiceId", name="chuni_static_systemvoice_uk"), + mysql_charset="utf8mb4", +) + gachas = Table( "chuni_static_gachas", metadata, @@ -470,6 +551,9 @@ class ChuniStaticData(BaseData): category: int, iconPath: str, texturePath: str, + isEnabled: int, + defaultHave: int, + sortName: str ) -> Optional[int]: sql = insert(avatar).values( version=version, @@ -478,6 +562,9 @@ class ChuniStaticData(BaseData): category=category, iconPath=iconPath, texturePath=texturePath, + isEnabled=isEnabled, + defaultHave=defaultHave, + sortName=sortName ) conflict = sql.on_duplicate_key_update( @@ -485,6 +572,9 @@ class ChuniStaticData(BaseData): category=category, iconPath=iconPath, texturePath=texturePath, + isEnabled=isEnabled, + defaultHave=defaultHave, + sortName=sortName ) result = await self.execute(conflict) @@ -492,6 +582,246 @@ class ChuniStaticData(BaseData): return None return result.lastrowid + async def get_avatar_items(self, version: int, category: int, enabled_only: bool = True) -> Optional[List[Dict]]: + if enabled_only: + sql = select(avatar).where((avatar.c.version == version) & (avatar.c.category == category) & (avatar.c.isEnabled)).order_by(avatar.c.sortName) + else: + sql = select(avatar).where((avatar.c.version == version) & (avatar.c.category == category)).order_by(avatar.c.sortName) + result = await self.execute(sql) + + if result is None: + return None + return result.fetchall() + + async def put_nameplate( + self, + version: int, + nameplateId: int, + name: str, + texturePath: str, + isEnabled: int, + defaultHave: int, + sortName: str + ) -> Optional[int]: + sql = insert(nameplate).values( + version=version, + nameplateId=nameplateId, + name=name, + texturePath=texturePath, + isEnabled=isEnabled, + defaultHave=defaultHave, + sortName=sortName + ) + + conflict = sql.on_duplicate_key_update( + name=name, + texturePath=texturePath, + isEnabled=isEnabled, + defaultHave=defaultHave, + sortName=sortName + ) + + result = await self.execute(conflict) + if result is None: + return None + return result.lastrowid + + async def get_nameplates(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: + if enabled_only: + sql = select(nameplate).where((nameplate.c.version == version) & (nameplate.c.isEnabled)).order_by(nameplate.c.sortName) + else: + sql = select(nameplate).where(nameplate.c.version == version).order_by(nameplate.c.sortName) + result = await self.execute(sql) + + if result is None: + return None + return result.fetchall() + + async def put_trophy( + self, + version: int, + trophyId: int, + name: str, + rareType: int, + isEnabled: int, + defaultHave: int, + ) -> Optional[int]: + sql = insert(trophy).values( + version=version, + trophyId=trophyId, + name=name, + rareType=rareType, + isEnabled=isEnabled, + defaultHave=defaultHave + ) + + conflict = sql.on_duplicate_key_update( + name=name, + rareType=rareType, + isEnabled=isEnabled, + defaultHave=defaultHave + ) + + result = await self.execute(conflict) + if result is None: + return None + return result.lastrowid + + async def get_trophies(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: + if enabled_only: + sql = select(trophy).where((trophy.c.version == version) & (trophy.c.isEnabled)).order_by(trophy.c.name) + else: + sql = select(trophy).where(trophy.c.version == version).order_by(trophy.c.name) + result = await self.execute(sql) + + if result is None: + return None + return result.fetchall() + + async def put_map_icon( + self, + version: int, + mapIconId: int, + name: str, + sortName: str, + iconPath: str, + isEnabled: int, + defaultHave: int, + ) -> Optional[int]: + sql = insert(map_icon).values( + version=version, + mapIconId=mapIconId, + name=name, + sortName=sortName, + iconPath=iconPath, + isEnabled=isEnabled, + defaultHave=defaultHave + ) + + conflict = sql.on_duplicate_key_update( + name=name, + sortName=sortName, + iconPath=iconPath, + isEnabled=isEnabled, + defaultHave=defaultHave + ) + + result = await self.execute(conflict) + if result is None: + return None + return result.lastrowid + + async def get_map_icons(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: + if enabled_only: + sql = select(map_icon).where((map_icon.c.version == version) & (map_icon.c.isEnabled)).order_by(map_icon.c.sortName) + else: + sql = select(map_icon).where(map_icon.c.version == version).order_by(map_icon.c.sortName) + result = await self.execute(sql) + + if result is None: + return None + return result.fetchall() + + async def put_system_voice( + self, + version: int, + voiceId: int, + name: str, + sortName: str, + imagePath: str, + isEnabled: int, + defaultHave: int, + ) -> Optional[int]: + sql = insert(system_voice).values( + version=version, + voiceId=voiceId, + name=name, + sortName=sortName, + imagePath=imagePath, + isEnabled=isEnabled, + defaultHave=defaultHave + ) + + conflict = sql.on_duplicate_key_update( + name=name, + sortName=sortName, + imagePath=imagePath, + isEnabled=isEnabled, + defaultHave=defaultHave + ) + + result = await self.execute(conflict) + if result is None: + return None + return result.lastrowid + + async def get_system_voices(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: + if enabled_only: + sql = select(system_voice).where((system_voice.c.version == version) & (system_voice.c.isEnabled)).order_by(system_voice.c.sortName) + else: + sql = select(system_voice).where(system_voice.c.version == version).order_by(system_voice.c.sortName) + result = await self.execute(sql) + + if result is None: + return None + return result.fetchall() + + async def put_character( + self, + version: int, + characterId: int, + name: str, + sortName: str, + worksName: str, + rareType: int, + imagePath1: str, + imagePath2: str, + imagePath3: str, + isEnabled: int, + defaultHave: int + ) -> Optional[int]: + sql = insert(character).values( + version=version, + characterId=characterId, + name=name, + sortName=sortName, + worksName=worksName, + rareType=rareType, + imagePath1=imagePath1, + imagePath2=imagePath2, + imagePath3=imagePath3, + isEnabled=isEnabled, + defaultHave=defaultHave + ) + + conflict = sql.on_duplicate_key_update( + name=name, + sortName=sortName, + worksName=worksName, + rareType=rareType, + imagePath1=imagePath1, + imagePath2=imagePath2, + imagePath3=imagePath3, + isEnabled=isEnabled, + defaultHave=defaultHave + ) + + result = await self.execute(conflict) + if result is None: + return None + return result.lastrowid + + async def get_characters(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: + if enabled_only: + sql = select(character).where((character.c.version == version) & (character.c.isEnabled)).order_by(character.c.sortName) + else: + sql = select(character).where(character.c.version == version).order_by(character.c.sortName) + result = await self.execute(sql) + + if result is None: + return None + return result.fetchall() + async def put_gacha( self, version: int,