From 91f06ccfd2824e6040872b0bb0d1c830a007aae3 Mon Sep 17 00:00:00 2001 From: Dniel97 Date: Sun, 23 Mar 2025 18:53:38 +0100 Subject: [PATCH] chuni: initial verse support --- changelog.md | 3 + .../versions/49c295e89cd4_chunithm_verse.py | 84 ++++++ docs/game_specific_info.md | 1 + example_config/chuni.yaml | 3 + readme.md | 1 + titles/chuni/base.py | 201 +++++++++----- titles/chuni/const.py | 2 + titles/chuni/index.py | 15 +- titles/chuni/new.py | 12 +- titles/chuni/read.py | 33 +++ titles/chuni/schema/item.py | 135 +++++++--- titles/chuni/schema/profile.py | 6 +- titles/chuni/schema/score.py | 4 +- titles/chuni/schema/static.py | 196 ++++++++++++-- titles/chuni/verse.py | 248 ++++++++++++++++++ 15 files changed, 807 insertions(+), 137 deletions(-) create mode 100644 core/data/alembic/versions/49c295e89cd4_chunithm_verse.py create mode 100644 titles/chuni/verse.py diff --git a/changelog.md b/changelog.md index 4620084..b274cc9 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,9 @@ # Changelog Documenting updates to ARTEMiS, to be updated every time the master branch is pushed to. +## 20250803 ++ CHUNITHM VERSE support added + ## 20250327 + O.N.G.E.K.I. bright MEMORY Act.3 support added + CardMaker support updated diff --git a/core/data/alembic/versions/49c295e89cd4_chunithm_verse.py b/core/data/alembic/versions/49c295e89cd4_chunithm_verse.py new file mode 100644 index 0000000..61b813f --- /dev/null +++ b/core/data/alembic/versions/49c295e89cd4_chunithm_verse.py @@ -0,0 +1,84 @@ +"""CHUNITHM VERSE support + +Revision ID: 49c295e89cd4 +Revises: f6007bbf057d +Create Date: 2025-03-09 14:10:03.067328 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql +from sqlalchemy.sql import func + +# revision identifiers, used by Alembic. +revision = "49c295e89cd4" +down_revision = "f6007bbf057d" +branch_labels = None +depends_on = None + + +def upgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.add_column("chuni_profile_data", sa.Column("trophyIdSub1", sa.Integer())) + op.add_column("chuni_profile_data", sa.Column("trophyIdSub2", sa.Integer())) + op.add_column("chuni_score_playlog", sa.Column("monthPoint", sa.Integer())) + op.add_column("chuni_score_playlog", sa.Column("eventPoint", sa.Integer())) + + op.create_table( + "chuni_static_unlock_challenge", + sa.Column("id", sa.Integer(), primary_key=True, nullable=False), + sa.Column("version", sa.Integer(), nullable=False), + sa.Column("unlockChallengeId", sa.Integer(), nullable=False), + sa.Column("name", sa.String(length=255)), + sa.Column("isEnabled", sa.Boolean(), server_default="1"), + sa.Column("startDate", sa.TIMESTAMP(), server_default=func.now()), + sa.Column("courseId1", sa.Integer()), + sa.Column("courseId2", sa.Integer()), + sa.Column("courseId3", sa.Integer()), + sa.Column("courseId4", sa.Integer()), + sa.Column("courseId5", sa.Integer()), + sa.UniqueConstraint( + "version", "unlockChallengeId", name="chuni_static_unlock_challenge_uk" + ), + mysql_charset="utf8mb4", + ) + + op.create_tablee( + "chuni_item_unlock_challenge", + sa.Column("id", sa.Integer(), primary_key=True, nullable=False), + sa.Column("version", sa.Integer(), nullable=False), + sa.Column( + "user", + sa.ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), + nullable=False, + ), + sa.Column("unlockChallengeId", sa.Integer(), nullable=False), + sa.Column("status", sa.Integer()), + sa.Column("clearCourseId", sa.Integer()), + sa.Column("conditionType", sa.Integer()), + sa.Column("score", sa.Integer()), + sa.Column("life", sa.Integer()), + sa.Column("clearDate", sa.Integer()), + sa.UniqueConstraint( + "version", + "user", + "unlockChallengeId", + name="chuni_item_unlock_challenge_uk", + ), + mysql_charset="utf8mb4", + ) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("chuni_score_playlog", "eventPoint") + op.drop_column("chuni_score_playlog", "monthPoint") + op.drop_column("chuni_profile_data", "trophyIdSub2") + op.drop_column("chuni_profile_data", "trophyIdSub1") + + op.drop_table("chuni_static_unlock_challenge") + op.drop_table("chuni_item_unlock_challenge") + # ### end Alembic commands ### diff --git a/docs/game_specific_info.md b/docs/game_specific_info.md index e05a771..21faf07 100644 --- a/docs/game_specific_info.md +++ b/docs/game_specific_info.md @@ -68,6 +68,7 @@ Games listed below have been tested and confirmed working. | 14 | CHUNITHM SUN PLUS | | 15 | CHUNITHM LUMINOUS | | 16 | CHUNITHM LUMINOUS PLUS | +| 17 | CHUNITHM VERSE | ### Importer diff --git a/example_config/chuni.yaml b/example_config/chuni.yaml index e2ae746..f9c2f20 100644 --- a/example_config/chuni.yaml +++ b/example_config/chuni.yaml @@ -44,6 +44,9 @@ version: 16: rom: 2.25.00 data: 2.25.00 + 17: + rom: 2.30.00 + data: 2.30.00 crypto: encrypted_only: False diff --git a/readme.md b/readme.md index 8591c74..527414a 100644 --- a/readme.md +++ b/readme.md @@ -38,6 +38,7 @@ Games listed below have been tested and confirmed working. Only game versions ol + SUN PLUS + LUMINOUS + LUMINOUS PLUS + + VERSE + crossbeats REV. + Crossbeats REV. diff --git a/titles/chuni/base.py b/titles/chuni/base.py index 9333aab..d28a799 100644 --- a/titles/chuni/base.py +++ b/titles/chuni/base.py @@ -53,7 +53,9 @@ class ChuniBase: if not self.game_cfg.mods.use_login_bonus: return {"returnCode": 1} - login_bonus_presets = await self.data.static.get_login_bonus_presets(self.version) + login_bonus_presets = await self.data.static.get_login_bonus_presets( + self.version + ) for preset in login_bonus_presets: # check if a user already has some pogress and if not add the @@ -197,15 +199,21 @@ class ChuniBase: async def handle_get_game_message_api_request(self, data: Dict) -> Dict: return { - "type": data["type"], - "length": 1, - "gameMessageList": [{ - "id": 1, - "type": 1, - "message": f"Welcome to {self.core_cfg.server.name} network!" if not self.game_cfg.server.news_msg else self.game_cfg.server.news_msg, - "startDate": "2017-12-05 07:00:00.0", - "endDate": "2099-12-31 00:00:00.0" - }] + "type": data["type"], + "length": 1, + "gameMessageList": [ + { + "id": 1, + "type": 1, + "message": ( + f"Welcome to {self.core_cfg.server.name} network!" + if not self.game_cfg.server.news_msg + else self.game_cfg.server.news_msg + ), + "startDate": "2017-12-05 07:00:00.0", + "endDate": "2099-12-31 00:00:00.0", + } + ], } async def handle_get_game_ranking_api_request(self, data: Dict) -> Dict: @@ -217,7 +225,10 @@ class ChuniBase: async def handle_get_game_setting_api_request(self, data: Dict) -> Dict: # if reboot start/end time is not defined use the default behavior of being a few hours ago - if self.core_cfg.title.reboot_start_time == "" or self.core_cfg.title.reboot_end_time == "": + if ( + self.core_cfg.title.reboot_start_time == "" + or self.core_cfg.title.reboot_end_time == "" + ): reboot_start = datetime.strftime( datetime.utcnow() + timedelta(hours=6), self.date_time_format ) @@ -226,15 +237,29 @@ class ChuniBase: ) else: # get current datetime in JST - current_jst = datetime.now(pytz.timezone('Asia/Tokyo')).date() + current_jst = datetime.now(pytz.timezone("Asia/Tokyo")).date() # parse config start/end times into datetime - reboot_start_time = datetime.strptime(self.core_cfg.title.reboot_start_time, "%H:%M") - reboot_end_time = datetime.strptime(self.core_cfg.title.reboot_end_time, "%H:%M") + reboot_start_time = datetime.strptime( + self.core_cfg.title.reboot_start_time, "%H:%M" + ) + reboot_end_time = datetime.strptime( + self.core_cfg.title.reboot_end_time, "%H:%M" + ) # offset datetimes with current date/time - reboot_start_time = reboot_start_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo')) - reboot_end_time = reboot_end_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo')) + reboot_start_time = reboot_start_time.replace( + year=current_jst.year, + month=current_jst.month, + day=current_jst.day, + tzinfo=pytz.timezone("Asia/Tokyo"), + ) + reboot_end_time = reboot_end_time.replace( + year=current_jst.year, + month=current_jst.month, + day=current_jst.day, + tzinfo=pytz.timezone("Asia/Tokyo"), + ) # create strings for use in gameSetting reboot_start = reboot_start_time.strftime(self.date_time_format) @@ -255,6 +280,7 @@ class ChuniBase: "isDumpUpload": "false", "isAou": "false", } + async def handle_get_user_activity_api_request(self, data: Dict) -> Dict: user_activity_list = await self.data.profile.get_profile_activity( data["userId"], data["kind"] @@ -285,7 +311,7 @@ class ChuniBase: rows = await self.data.item.get_characters( user_id, limit=max_ct + 1, offset=next_idx ) - + if rows is None or len(rows) == 0: return { "userId": user_id, @@ -335,7 +361,7 @@ class ChuniBase: return { "userId": data["userId"], "length": 0, - "userRecentPlayerList": [], # playUserId, playUserName, playDate, friendPoint + "userRecentPlayerList": [], # playUserId, playUserName, playDate, friendPoint } async def handle_get_user_course_api_request(self, data: Dict) -> Dict: @@ -421,15 +447,9 @@ class ChuniBase: p = await self.data.profile.get_rival(data["rivalId"]) if p is None: return {} - userRivalData = { - "rivalId": p.user, - "rivalName": p.userName - } - return { - "userId": data["userId"], - "userRivalData": userRivalData - } - + userRivalData = {"rivalId": p.user, "rivalName": p.userName} + return {"userId": data["userId"], "userRivalData": userRivalData} + async def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict: user_id = int(data["userId"]) rival_id = int(data["rivalId"]) @@ -459,18 +479,25 @@ class ChuniBase: # note that itertools.groupby will only work on sorted keys, which is already sorted by # the query in get_scores - for music_id, details_iter in itertools.groupby(music_details, key=lambda x: x["musicId"]): + for music_id, details_iter in itertools.groupby( + music_details, key=lambda x: x["musicId"] + ): details: list[dict[Any, Any]] = [ - {"level": d["level"], "scoreMax": d["scoreMax"]} - for d in details_iter + {"level": d["level"], "scoreMax": d["scoreMax"]} for d in details_iter ] - music_list.append({"musicId": music_id, "length": len(details), "userRivalMusicDetailList": details}) + music_list.append( + { + "musicId": music_id, + "length": len(details), + "userRivalMusicDetailList": details, + } + ) returned_music_details_count += len(details) if len(music_list) >= max_ct: break - + # if we returned fewer PBs than we originally asked for from the database, that means # we queried for the PBs of max_ct + 1 songs. if returned_music_details_count < len(rows): @@ -485,7 +512,7 @@ class ChuniBase: "nextIndex": next_idx, "userRivalMusicList": music_list, } - + async def handle_get_user_favorite_item_api_request(self, data: Dict) -> Dict: user_id = int(data["userId"]) next_idx = int(data["nextIndex"]) @@ -571,7 +598,9 @@ class ChuniBase: async def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict: user_id = data["userId"] - user_login_bonus = await self.data.item.get_all_login_bonus(user_id, self.version) + user_login_bonus = await self.data.item.get_all_login_bonus( + user_id, self.version + ) # ignore the loginBonus request if its disabled in config if user_login_bonus is None or not self.game_cfg.mods.use_login_bonus: return {"userId": user_id, "length": 0, "userLoginBonusList": []} @@ -621,7 +650,7 @@ class ChuniBase: rows = await self.data.score.get_scores( user_id, limit=max_ct + 1, offset=next_idx ) - + if rows is None or len(rows) == 0: return { "userId": user_id, @@ -636,7 +665,9 @@ class ChuniBase: # note that itertools.groupby will only work on sorted keys, which is already sorted by # the query in get_scores - for _music_id, details_iter in itertools.groupby(music_details, key=lambda x: x["musicId"]): + for _music_id, details_iter in itertools.groupby( + music_details, key=lambda x: x["musicId"] + ): details: list[dict[Any, Any]] = [] for d in details_iter: @@ -650,14 +681,14 @@ class ChuniBase: if len(music_list) >= max_ct: break - + # if we returned fewer PBs than we originally asked for from the database, that means # we queried for the PBs of max_ct + 1 songs. if returned_music_details_count < len(rows): next_idx += max_ct else: next_idx = -1 - + return { "userId": user_id, "length": len(music_list), @@ -687,7 +718,9 @@ class ChuniBase: return bytes([ord(c) for c in src]).decode("utf-8") async def handle_get_user_preview_api_request(self, data: Dict) -> Dict: - profile = await self.data.profile.get_profile_preview(data["userId"], self.version) + profile = await self.data.profile.get_profile_preview( + data["userId"], self.version + ) if profile is None: return None profile_character = await self.data.item.get_character( @@ -729,7 +762,9 @@ class ChuniBase: } async def handle_get_user_recent_rating_api_request(self, data: Dict) -> Dict: - recent_rating_list = await self.data.profile.get_profile_recent_rating(data["userId"]) + recent_rating_list = await self.data.profile.get_profile_recent_rating( + data["userId"] + ) if recent_rating_list is None: return { "userId": data["userId"], @@ -762,7 +797,7 @@ class ChuniBase: profile = await self.data.profile.get_profile_data(data["userId"], self.version) if profile is None: - return {"userId": data["userId"], "teamId": 0} + return {"userId": data["userId"], "teamId": 0} if profile and profile["teamId"]: # Get team by id @@ -787,7 +822,7 @@ class ChuniBase: "teamId": team_id, "teamRank": team_rank, "teamName": team_name, - "assaultTimeRate": 1, # TODO: Figure out assaultTime, which might be team point boost? + "assaultTimeRate": 1, # TODO: Figure out assaultTime, which might be team point boost? "userTeamPoint": { "userId": data["userId"], "teamId": team_id, @@ -796,7 +831,7 @@ class ChuniBase: "aggrDate": data["playDate"], }, } - + async def handle_get_team_course_setting_api_request(self, data: Dict) -> Dict: return { "userId": data["userId"], @@ -805,7 +840,9 @@ class ChuniBase: "teamCourseSettingList": [], } - async def handle_get_team_course_setting_api_request_proto(self, data: Dict) -> Dict: + async def handle_get_team_course_setting_api_request_proto( + self, data: Dict + ) -> Dict: return { "userId": data["userId"], "length": 1, @@ -820,11 +857,11 @@ class ChuniBase: "teamCourseMusicList": [ {"track": 184, "type": 1, "level": 3, "selectLevel": -1}, {"track": 184, "type": 1, "level": 3, "selectLevel": -1}, - {"track": 184, "type": 1, "level": 3, "selectLevel": -1} + {"track": 184, "type": 1, "level": 3, "selectLevel": -1}, ], "teamCourseRankingInfoList": [], "recodeDate": "2099-12-31 11:59:99.0", - "isPlayed": False + "isPlayed": False, } ], } @@ -834,7 +871,7 @@ class ChuniBase: "userId": data["userId"], "length": 0, "nextIndex": -1, - "teamCourseRuleList": [] + "teamCourseRuleList": [], } async def handle_get_team_course_rule_api_request_proto(self, data: Dict) -> Dict: @@ -849,7 +886,7 @@ class ChuniBase: "damageMiss": 1, "damageAttack": 1, "damageJustice": 1, - "damageJusticeC": 1 + "damageJusticeC": 1, } ], } @@ -860,7 +897,7 @@ class ChuniBase: if int(user_id) & 0x1000000000001 == 0x1000000000001: place_id = int(user_id) & 0xFFFC00000000 - + self.logger.info("Guest play from place ID %d, ignoring.", place_id) return {"returnCode": "1"} @@ -882,7 +919,9 @@ class ChuniBase: ) if "userGameOption" in upsert: - await self.data.profile.put_profile_option(user_id, upsert["userGameOption"][0]) + await self.data.profile.put_profile_option( + user_id, upsert["userGameOption"][0] + ) if "userGameOptionEx" in upsert: await self.data.profile.put_profile_option_ex( @@ -929,33 +968,41 @@ class ChuniBase: for playlog in upsert["userPlaylogList"]: # convert the player names to utf-8 if playlog["playedUserName1"] is not None: - playlog["playedUserName1"] = self.read_wtf8(playlog["playedUserName1"]) + playlog["playedUserName1"] = self.read_wtf8( + playlog["playedUserName1"] + ) if playlog["playedUserName2"] is not None: - playlog["playedUserName2"] = self.read_wtf8(playlog["playedUserName2"]) + playlog["playedUserName2"] = self.read_wtf8( + playlog["playedUserName2"] + ) if playlog["playedUserName3"] is not None: - playlog["playedUserName3"] = self.read_wtf8(playlog["playedUserName3"]) + playlog["playedUserName3"] = self.read_wtf8( + playlog["playedUserName3"] + ) await self.data.score.put_playlog(user_id, playlog, self.version) if "userTeamPoint" in upsert: team_points = upsert["userTeamPoint"] try: for tp in team_points: - if tp["teamId"] != '65535': + if tp["teamId"] != "65535": # Fetch the current team data - current_team = await self.data.profile.get_team_by_id(tp["teamId"]) + current_team = await self.data.profile.get_team_by_id( + tp["teamId"] + ) # Calculate the new teamPoint - new_team_point = int(tp["teamPoint"]) + current_team["teamPoint"] + new_team_point = ( + int(tp["teamPoint"]) + current_team["teamPoint"] + ) # Prepare the data to update - team_data = { - "teamPoint": new_team_point - } + team_data = {"teamPoint": new_team_point} # Update the team data await self.data.profile.update_team(tp["teamId"], team_data) except: - pass # Probably a better way to catch if the team is not set yet (new profiles), but let's just pass + pass # Probably a better way to catch if the team is not set yet (new profiles), but let's just pass if "userMapAreaList" in upsert: for map_area in upsert["userMapAreaList"]: await self.data.item.put_map_area(user_id, map_area) @@ -973,22 +1020,28 @@ class ChuniBase: await self.data.item.put_login_bonus( user_id, self.version, login["presetId"], isWatched=True ) - - if "userRecentPlayerList" in upsert: # TODO: Seen in Air, maybe implement sometime + + if ( + "userRecentPlayerList" in upsert + ): # TODO: Seen in Air, maybe implement sometime for rp in upsert["userRecentPlayerList"]: pass - for rating_type in {"userRatingBaseList", "userRatingBaseHotList", "userRatingBaseNextList"}: + for rating_type in { + "userRatingBaseList", + "userRatingBaseHotList", + "userRatingBaseNextList", + }: if rating_type not in upsert: continue - + await self.data.profile.put_profile_rating( user_id, self.version, rating_type, upsert[rating_type], ) - + # added in LUMINOUS if "userCMissionList" in upsert: for cmission in upsert["userCMissionList"]: @@ -1003,7 +1056,9 @@ class ChuniBase: ) for progress in cmission["userCMissionProgressList"]: - await self.data.item.put_cmission_progress(user_id, mission_id, progress) + await self.data.item.put_cmission_progress( + user_id, mission_id, progress + ) if "userNetBattleData" in upsert: net_battle = upsert["userNetBattleData"][0] @@ -1035,10 +1090,20 @@ class ChuniBase: added_ids = music_ids - keep_ids for fav_id in deleted_ids: - await self.data.item.delete_favorite_music(user_id, self.version, fav_id) - + await self.data.item.delete_favorite_music( + user_id, self.version, fav_id + ) + for fav_id in added_ids: await self.data.item.put_favorite_music(user_id, self.version, fav_id) + + # added in CHUNITHM VERSE + if "userUnlockChallengeList" in upsert: + for unlock_challenge in upsert["userUnlockChallengeList"]: + await self.data.item.put_unlock_challenge( + user_id, self.version, unlock_challenge + ) + return {"returnCode": "1"} diff --git a/titles/chuni/const.py b/titles/chuni/const.py index 7c534d3..f5642e3 100644 --- a/titles/chuni/const.py +++ b/titles/chuni/const.py @@ -28,6 +28,7 @@ class ChuniConstants: VER_CHUNITHM_SUN_PLUS = 14 VER_CHUNITHM_LUMINOUS = 15 VER_CHUNITHM_LUMINOUS_PLUS = 16 + VER_CHUNITHM_VERSE = 17 VERSION_NAMES = [ "CHUNITHM", @@ -47,6 +48,7 @@ class ChuniConstants: "CHUNITHM SUN PLUS", "CHUNITHM LUMINOUS", "CHUNITHM LUMINOUS PLUS", + "CHUNITHM VERSE" ] SCORE_RANK_INTERVALS_OLD = [ diff --git a/titles/chuni/index.py b/titles/chuni/index.py index cc25289..f792e6b 100644 --- a/titles/chuni/index.py +++ b/titles/chuni/index.py @@ -37,6 +37,7 @@ from .sun import ChuniSun from .sunplus import ChuniSunPlus from .luminous import ChuniLuminous from .luminousplus import ChuniLuminousPlus +from .verse import ChuniVerse class ChuniServlet(BaseServlet): def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None: @@ -66,6 +67,7 @@ class ChuniServlet(BaseServlet): ChuniSunPlus, ChuniLuminous, ChuniLuminousPlus, + ChuniVerse ] self.logger = logging.getLogger("chuni") @@ -113,6 +115,7 @@ class ChuniServlet(BaseServlet): f"{ChuniConstants.VER_CHUNITHM_LUMINOUS}_int": 8, f"{ChuniConstants.VER_CHUNITHM_LUMINOUS}_chn": 8, ChuniConstants.VER_CHUNITHM_LUMINOUS_PLUS: 56, + ChuniConstants.VER_CHUNITHM_VERSE: 42, } for version, keys in self.game_cfg.crypto.keys.items(): @@ -218,9 +221,9 @@ class ChuniServlet(BaseServlet): ] async def render_POST(self, request: Request) -> bytes: - endpoint: str = request.path_params.get('endpoint') - version: int = request.path_params.get('version') - game_code: str = request.path_params.get('game') + endpoint: str = request.path_params.get("endpoint") + version: int = request.path_params.get("version") + game_code: str = request.path_params.get("game") if endpoint.lower() == "ping": return Response(zlib.compress(b'{"returnCode": "1"}')) @@ -264,8 +267,10 @@ class ChuniServlet(BaseServlet): internal_ver = ChuniConstants.VER_CHUNITHM_SUN_PLUS elif version >= 220 and version < 225: # LUMINOUS internal_ver = ChuniConstants.VER_CHUNITHM_LUMINOUS - elif version >= 225: # LUMINOUS PLUS + elif version >= 225 and version < 230: # LUMINOUS PLUS internal_ver = ChuniConstants.VER_CHUNITHM_LUMINOUS_PLUS + elif version >= 230: # VERSE + internal_ver = ChuniConstants.VER_CHUNITHM_VERSE elif game_code == "SDGS": # Int if version < 105: # SUPERSTAR internal_ver = ChuniConstants.VER_CHUNITHM_CRYSTAL_PLUS @@ -380,7 +385,7 @@ class ChuniServlet(BaseServlet): handler_cls = self.versions[internal_ver](self.core_cfg, self.game_cfg) if not hasattr(handler_cls, func_to_find): - self.logger.warning(f"Unhandled v{version} request {endpoint}") + self.logger.warning(f"Unhandled v{version} request {func_to_find}") resp = {"returnCode": 1} else: diff --git a/titles/chuni/new.py b/titles/chuni/new.py index a3aa1a3..b39efc8 100644 --- a/titles/chuni/new.py +++ b/titles/chuni/new.py @@ -28,16 +28,18 @@ class ChuniNew(ChuniBase): def _interal_ver_to_intver(self) -> str: if self.version == ChuniConstants.VER_CHUNITHM_NEW: return "200" - if self.version == ChuniConstants.VER_CHUNITHM_NEW_PLUS: + elif self.version == ChuniConstants.VER_CHUNITHM_NEW_PLUS: return "205" - if self.version == ChuniConstants.VER_CHUNITHM_SUN: + elif self.version == ChuniConstants.VER_CHUNITHM_SUN: return "210" - if self.version == ChuniConstants.VER_CHUNITHM_SUN_PLUS: + elif self.version == ChuniConstants.VER_CHUNITHM_SUN_PLUS: return "215" - if self.version == ChuniConstants.VER_CHUNITHM_LUMINOUS: + elif self.version == ChuniConstants.VER_CHUNITHM_LUMINOUS: return "220" - if self.version == ChuniConstants.VER_CHUNITHM_LUMINOUS_PLUS: + elif self.version == ChuniConstants.VER_CHUNITHM_LUMINOUS_PLUS: return "225" + elif self.version == ChuniConstants.VER_CHUNITHM_VERSE: + return "230" async def handle_get_game_setting_api_request(self, data: Dict) -> Dict: # use UTC time and convert it to JST time by adding +9 diff --git a/titles/chuni/read.py b/titles/chuni/read.py index bd6ff07..47c1939 100644 --- a/titles/chuni/read.py +++ b/titles/chuni/read.py @@ -62,6 +62,7 @@ class ChuniReader(BaseReader): await self.read_character(f"{dir}/chara", dds_images, this_opt_id) await self.read_map_icon(f"{dir}/mapIcon", this_opt_id) await self.read_system_voice(f"{dir}/systemVoice", this_opt_id) + await self.read_unlock_challenge(f"{dir}/unlockChallenge") async def read_login_bonus(self, root_dir: str, opt_id: Optional[int] = None) -> None: for root, dirs, files in walk(f"{root_dir}loginBonusPreset"): @@ -499,6 +500,38 @@ class ChuniReader(BaseReader): self.logger.info(f"Opt folder {opt_folder} (Database ID {opt_id}) contains {data_config['Version']['Name']} v{data_config['Version']['VerMajor']}.{data_config['Version']['VerMinor']}.{opt_seq}") return opt_id + async def read_unlock_challenge(self, uc_dir: str) -> None: + for root, dirs, files in walk(uc_dir): + for dir in dirs: + if path.exists(f"{root}/{dir}/UnlockChallenge.xml"): + with open(f"{root}/{dir}/UnlockChallenge.xml", "r", encoding="utf-8") as fp: + strdata = fp.read() + + xml_root = ET.fromstring(strdata) + for name in xml_root.findall("name"): + id = name.find("id").text + name = name.find("str").text + + course_ids = [] + for course in xml_root.find("musicList/list/UnlockChallengeMusicListSubData/unlockChallengeMusicData/courseList/list").findall("UnlockChallengeCourseListSubData"): + course_id = course.find("unlockChallengeCourseData/courseName").find("id").text + course_ids.append(course_id) + + # Build keyword arguments dynamically for up to 5 course IDs + course_kwargs = { + f"course_id{i+1}": course_ids[i] + for i in range(min(5, len(course_ids))) + } + + result = await self.data.static.put_unlock_challenge( + self.version, id, name, + **course_kwargs + ) + if result is not None: + self.logger.info(f"Inserted unlock challenge {id}") + else: + self.logger.warning(f"Failed to unlock challenge {id}") + def copy_image(self, filename: str, src_dir: str, dst_dir: str) -> None: # Convert the image to png so we can easily display it in the frontend file_src = path.join(src_dir, filename) diff --git a/titles/chuni/schema/item.py b/titles/chuni/schema/item.py index 93dcf86..84c6364 100644 --- a/titles/chuni/schema/item.py +++ b/titles/chuni/schema/item.py @@ -262,7 +262,11 @@ cmission_progress = Table( "chuni_item_cmission_progress", metadata, Column("id", Integer, primary_key=True, nullable=False), - Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False), + Column( + "user", + ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), + nullable=False, + ), Column("missionId", Integer, nullable=False), Column("order", Integer), Column("stage", Integer), @@ -273,14 +277,34 @@ cmission_progress = Table( mysql_charset="utf8mb4", ) +unlock_challenge = Table( + "chuni_item_unlock_challenge", + metadata, + Column("id", Integer, primary_key=True, nullable=False), + Column("version", Integer, nullable=False), + Column( + "user", + ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), + nullable=False, + ), + Column("unlockChallengeId", Integer, nullable=False), + Column("status", Integer), + Column("clearCourseId", Integer), + Column("conditionType", Integer), + Column("score", Integer), + Column("life", Integer), + Column("clearDate", TIMESTAMP), + UniqueConstraint( + "version", "user", "unlockChallengeId", name="chuni_item_unlock_challenge_uk" + ), + mysql_charset="utf8mb4", +) + class ChuniItemData(BaseData): async def get_oldest_free_matching(self, version: int) -> Optional[Row]: sql = matching.select( - and_( - matching.c.version == version, - matching.c.isFull == False - ) + and_(matching.c.version == version, matching.c.isFull == False) ).order_by(matching.c.roomId.asc()) result = await self.execute(sql) @@ -289,11 +313,9 @@ class ChuniItemData(BaseData): return result.fetchone() async def get_newest_matching(self, version: int) -> Optional[Row]: - sql = matching.select( - and_( - matching.c.version == version - ) - ).order_by(matching.c.roomId.desc()) + sql = matching.select(and_(matching.c.version == version)).order_by( + matching.c.roomId.desc() + ) result = await self.execute(sql) if result is None: @@ -301,11 +323,7 @@ class ChuniItemData(BaseData): return result.fetchone() async def get_all_matchings(self, version: int) -> Optional[List[Row]]: - sql = matching.select( - and_( - matching.c.version == version - ) - ) + sql = matching.select(and_(matching.c.version == version)) result = await self.execute(sql) if result is None: @@ -329,7 +347,7 @@ class ChuniItemData(BaseData): matching_member_info_list: List, user_id: int = None, rest_sec: int = 60, - is_full: bool = False + is_full: bool = False, ) -> Optional[int]: sql = insert(matching).values( roomId=room_id, @@ -452,23 +470,31 @@ class ChuniItemData(BaseData): return None return result.fetchone() - async def put_favorite_music(self, user_id: int, version: int, music_id: int) -> Optional[int]: - sql = insert(favorite).values(user=user_id, version=version, favId=music_id, favKind=1) + async def put_favorite_music( + self, user_id: int, version: int, music_id: int + ) -> Optional[int]: + sql = insert(favorite).values( + user=user_id, version=version, favId=music_id, favKind=1 + ) - conflict = sql.on_duplicate_key_update(user=user_id, version=version, favId=music_id, favKind=1) + conflict = sql.on_duplicate_key_update( + user=user_id, version=version, favId=music_id, favKind=1 + ) result = await self.execute(conflict) if result is None: return None return result.lastrowid - async def delete_favorite_music(self, user_id: int, version: int, music_id: int) -> Optional[int]: + async def delete_favorite_music( + self, user_id: int, version: int, music_id: int + ) -> Optional[int]: sql = delete(favorite).where( and_( - favorite.c.user==user_id, - favorite.c.version==version, - favorite.c.favId==music_id, - favorite.c.favKind==1 + favorite.c.user == user_id, + favorite.c.version == version, + favorite.c.favId == music_id, + favorite.c.favKind == 1, ) ) @@ -611,8 +637,12 @@ class ChuniItemData(BaseData): return None return result.lastrowid - async def get_map_areas(self, user_id: int, map_area_ids: List[int]) -> Optional[List[Row]]: - sql = select(map_area).where(map_area.c.user == user_id, map_area.c.mapAreaId.in_(map_area_ids)) + async def get_map_areas( + self, user_id: int, map_area_ids: List[int] + ) -> Optional[List[Row]]: + sql = select(map_area).where( + map_area.c.user == user_id, map_area.c.mapAreaId.in_(map_area_ids) + ) result = await self.execute(sql) if result is None: @@ -713,7 +743,7 @@ class ChuniItemData(BaseData): ) return None return result.lastrowid - + async def put_cmission_progress( self, user_id: int, mission_id: int, progress_data: Dict ) -> Optional[int]: @@ -723,10 +753,10 @@ class ChuniItemData(BaseData): sql = insert(cmission_progress).values(**progress_data) conflict = sql.on_duplicate_key_update(**progress_data) result = await self.execute(conflict) - + if result is None: return None - + return result.lastrowid async def get_cmission_progress( @@ -739,21 +769,21 @@ class ChuniItemData(BaseData): ) ).order_by(cmission_progress.c.order.asc()) result = await self.execute(sql) - + if result is None: return None - + return result.fetchall() - + async def get_cmission(self, user_id: int, mission_id: int) -> Optional[Row]: sql = cmission.select( and_(cmission.c.user == user_id, cmission.c.missionId == mission_id) ) result = await self.execute(sql) - + if result is None: return None - + return result.fetchone() async def put_cmission(self, user_id: int, mission_data: Dict) -> Optional[int]: @@ -762,17 +792,46 @@ class ChuniItemData(BaseData): sql = insert(cmission).values(**mission_data) conflict = sql.on_duplicate_key_update(**mission_data) result = await self.execute(conflict) - + if result is None: return None - + return result.lastrowid async def get_cmissions(self, user_id: int) -> Optional[List[Row]]: sql = cmission.select(cmission.c.user == user_id) result = await self.execute(sql) - + + if result is None: + return None + + return result.fetchall() + + async def put_unlock_challenge( + self, user_id: int, version: int, unlock_challenge_data: Dict + ) -> Optional[int]: + unlock_challenge_data["user"] = user_id + unlock_challenge_data["version"] = version + + sql = insert(unlock_challenge).values(**unlock_challenge_data) + conflict = sql.on_duplicate_key_update(**unlock_challenge_data) + + result = await self.execute(conflict) + if result is None: + return None + return result.lastrowid + + async def get_unlock_challenges( + self, user_id: int, version: int + ) -> Optional[List[Row]]: + sql = unlock_challenge.select( + and_( + unlock_challenge.c.user == user_id, + unlock_challenge.c.version == version, + ) + ) + + result = await self.execute(sql) if result is None: return None - return result.fetchall() diff --git a/titles/chuni/schema/profile.py b/titles/chuni/schema/profile.py index c7fb750..5f54f5e 100644 --- a/titles/chuni/schema/profile.py +++ b/titles/chuni/schema/profile.py @@ -25,6 +25,8 @@ profile = Table( Column("frameId", Integer), Column("isMaimai", Boolean), Column("trophyId", Integer), + Column("trophyIdSub1", Integer), + Column("trophyIdSub2", Integer), Column("userName", String(25)), Column("isWebJoin", Boolean), Column("playCount", Integer), @@ -465,6 +467,8 @@ class ChuniProfileData(BaseData): sql = profile.update((profile.c.user == user_id) & (profile.c.version == version)).values( nameplateId=new_nameplate, trophyId=new_trophy, + trophyIdSub1=new_trophySub1, + trophyIdSub2=new_trophySub2, charaIllustId=new_character ) result = await self.execute(sql) @@ -899,4 +903,4 @@ class ChuniProfileData(BaseData): async def get_net_battle(self, aime_id: int) -> Optional[Row]: result = await self.execute(net_battle.select(net_battle.c.user == aime_id)) if result: - return result.fetchone() + return result.fetchone() \ No newline at end of file diff --git a/titles/chuni/schema/score.py b/titles/chuni/schema/score.py index 50a8f7f..036e41b 100644 --- a/titles/chuni/schema/score.py +++ b/titles/chuni/schema/score.py @@ -139,6 +139,8 @@ playlog = Table( Column("regionId", Integer), Column("machineType", Integer), Column("ticketId", Integer), + Column("monthPoint", Integer), + Column("eventPoint", Integer), mysql_charset="utf8mb4" ) @@ -420,4 +422,4 @@ class ChuniScoreData(BaseData): return None rows = result.fetchall() - return [dict(row) for row in rows] + return [dict(row) for row in rows] \ No newline at end of file diff --git a/titles/chuni/schema/static.py b/titles/chuni/schema/static.py index f4f0f9f..0e0f2ca 100644 --- a/titles/chuni/schema/static.py +++ b/titles/chuni/schema/static.py @@ -289,6 +289,27 @@ login_bonus = Table( mysql_charset="utf8mb4", ) +unlock_challenge = Table( + "chuni_static_unlock_challenge", + metadata, + Column("id", Integer, primary_key=True, nullable=False), + Column("version", Integer, nullable=False), + Column("unlockChallengeId", Integer, nullable=False), + Column("name", String(255)), + Column("isEnabled", Boolean, server_default="1"), + Column("startDate", TIMESTAMP, server_default=func.now()), + Column("courseId1", Integer), + Column("courseId2", Integer), + Column("courseId3", Integer), + Column("courseId4", Integer), + Column("courseId5", Integer), + UniqueConstraint( + "version", "unlockChallengeId", name="chuni_static_unlock_challenge_pk" + ), + mysql_charset="utf8mb4", +) + + class ChuniStaticData(BaseData): async def put_login_bonus( self, @@ -556,7 +577,7 @@ class ChuniStaticData(BaseData): return result.fetchall() async def get_music(self, version: int) -> Optional[List[Row]]: - sql = music.select(music.c.version <= version) + sql = music.select(music.c.version == version) result = await self.execute(sql) if result is None: @@ -586,6 +607,28 @@ class ChuniStaticData(BaseData): if result is None: return None return result.fetchone() + + async def get_music_by_metadata( + self, title: Optional[str] = None, artist: Optional[str] = None, genre: Optional[str] = None + ) -> Optional[List[Row]]: + # all conditions should use like for partial matches + conditions = [] + if title: + conditions.append(music.c.title.like(f"%{title}%")) + if artist: + conditions.append(music.c.artist.like(f"%{artist}%")) + if genre: + conditions.append(music.c.genre.like(f"%{genre}%")) + + if not conditions: + return None + + sql = select(music).where(and_(*conditions)) + + result = await self.execute(sql) + if result is None: + return None + return result.fetchall() async def put_avatar( self, @@ -629,11 +672,25 @@ class ChuniStaticData(BaseData): return None return result.lastrowid - async def get_avatar_items(self, version: int, category: int, enabled_only: bool = True) -> Optional[List[Dict]]: + async def get_avatar_items( + self, version: int, category: int, enabled_only: bool = True + ) -> Optional[List[Dict]]: if enabled_only: - sql = select(avatar).where((avatar.c.version == version) & (avatar.c.category == category) & (avatar.c.isEnabled)).order_by(avatar.c.sortName) + sql = ( + select(avatar) + .where( + (avatar.c.version == version) + & (avatar.c.category == category) + & (avatar.c.isEnabled) + ) + .order_by(avatar.c.sortName) + ) else: - sql = select(avatar).where((avatar.c.version == version) & (avatar.c.category == category)).order_by(avatar.c.sortName) + sql = ( + select(avatar) + .where((avatar.c.version == version) & (avatar.c.category == category)) + .order_by(avatar.c.sortName) + ) result = await self.execute(sql) if result is None: @@ -676,11 +733,21 @@ class ChuniStaticData(BaseData): return None return result.lastrowid - async def get_nameplates(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: + async def get_nameplates( + self, version: int, enabled_only: bool = True + ) -> Optional[List[Dict]]: if enabled_only: - sql = select(nameplate).where((nameplate.c.version == version) & (nameplate.c.isEnabled)).order_by(nameplate.c.sortName) + sql = ( + select(nameplate) + .where((nameplate.c.version == version) & (nameplate.c.isEnabled)) + .order_by(nameplate.c.sortName) + ) else: - sql = select(nameplate).where(nameplate.c.version == version).order_by(nameplate.c.sortName) + sql = ( + select(nameplate) + .where(nameplate.c.version == version) + .order_by(nameplate.c.sortName) + ) result = await self.execute(sql) if result is None: @@ -720,11 +787,21 @@ class ChuniStaticData(BaseData): return None return result.lastrowid - async def get_trophies(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: + async def get_trophies( + self, version: int, enabled_only: bool = True + ) -> Optional[List[Dict]]: if enabled_only: - sql = select(trophy).where((trophy.c.version == version) & (trophy.c.isEnabled)).order_by(trophy.c.name) + sql = ( + select(trophy) + .where((trophy.c.version == version) & (trophy.c.isEnabled)) + .order_by(trophy.c.name) + ) else: - sql = select(trophy).where(trophy.c.version == version).order_by(trophy.c.name) + sql = ( + select(trophy) + .where(trophy.c.version == version) + .order_by(trophy.c.name) + ) result = await self.execute(sql) if result is None: @@ -767,11 +844,21 @@ class ChuniStaticData(BaseData): return None return result.lastrowid - async def get_map_icons(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: + async def get_map_icons( + self, version: int, enabled_only: bool = True + ) -> Optional[List[Dict]]: if enabled_only: - sql = select(map_icon).where((map_icon.c.version == version) & (map_icon.c.isEnabled)).order_by(map_icon.c.sortName) + sql = ( + select(map_icon) + .where((map_icon.c.version == version) & (map_icon.c.isEnabled)) + .order_by(map_icon.c.sortName) + ) else: - sql = select(map_icon).where(map_icon.c.version == version).order_by(map_icon.c.sortName) + sql = ( + select(map_icon) + .where(map_icon.c.version == version) + .order_by(map_icon.c.sortName) + ) result = await self.execute(sql) if result is None: @@ -814,11 +901,21 @@ class ChuniStaticData(BaseData): return None return result.lastrowid - async def get_system_voices(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: + async def get_system_voices( + self, version: int, enabled_only: bool = True + ) -> Optional[List[Dict]]: if enabled_only: - sql = select(system_voice).where((system_voice.c.version == version) & (system_voice.c.isEnabled)).order_by(system_voice.c.sortName) + sql = ( + select(system_voice) + .where((system_voice.c.version == version) & (system_voice.c.isEnabled)) + .order_by(system_voice.c.sortName) + ) else: - sql = select(system_voice).where(system_voice.c.version == version).order_by(system_voice.c.sortName) + sql = ( + select(system_voice) + .where(system_voice.c.version == version) + .order_by(system_voice.c.sortName) + ) result = await self.execute(sql) if result is None: @@ -873,11 +970,21 @@ class ChuniStaticData(BaseData): return None return result.lastrowid - async def get_characters(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: + async def get_characters( + self, version: int, enabled_only: bool = True + ) -> Optional[List[Dict]]: if enabled_only: - sql = select(character).where((character.c.version == version) & (character.c.isEnabled)).order_by(character.c.sortName) + sql = ( + select(character) + .where((character.c.version == version) & (character.c.isEnabled)) + .order_by(character.c.sortName) + ) else: - sql = select(character).where(character.c.version == version).order_by(character.c.sortName) + sql = ( + select(character) + .where(character.c.version == version) + .order_by(character.c.sortName) + ) result = await self.execute(sql) if result is None: @@ -1074,3 +1181,54 @@ class ChuniStaticData(BaseData): self.logger.error(f"Failed to set opt enabled status to {enabled} for opt {opt_id}") return False return True + + + async def put_unlock_challenge( + self, + version: int, + unlock_challenge_id: int, + name: str, + course_id1: Optional[int] = None, + course_id2: Optional[int] = None, + course_id3: Optional[int] = None, + course_id4: Optional[int] = None, + course_id5: Optional[int] = None, + ) -> Optional[int]: + + sql = insert(unlock_challenge).values( + version=version, + unlockChallengeId=unlock_challenge_id, + name=name, + courseId1=course_id1, + courseId2=course_id2, + courseId3=course_id3, + courseId4=course_id4, + courseId5=course_id5, + ) + + conflict = sql.on_duplicate_key_update( + name=name, + courseId1=course_id1, + courseId2=course_id2, + courseId3=course_id3, + courseId4=course_id4, + courseId5=course_id5, + ) + + result = await self.execute(conflict) + if result is None: + return None + return result.lastrowid + + async def get_unlock_challenges(self, version: int) -> Optional[List[Dict]]: + sql = unlock_challenge.select( + and_( + unlock_challenge.c.version == version, + unlock_challenge.c.isEnabled == True, + ) + ).order_by(unlock_challenge.c.startDate.asc()) + + result = await self.execute(sql) + if result is None: + return None + return result.fetchall() diff --git a/titles/chuni/verse.py b/titles/chuni/verse.py new file mode 100644 index 0000000..31138b6 --- /dev/null +++ b/titles/chuni/verse.py @@ -0,0 +1,248 @@ +from datetime import datetime, timedelta +from typing import Dict, List, Set + +from core.config import CoreConfig +from titles.chuni.config import ChuniConfig +from titles.chuni.const import ( + ChuniConstants, + MapAreaConditionLogicalOperator, + MapAreaConditionType, +) +from titles.chuni.luminousplus import ChuniLuminousPlus + + +class ChuniVerse(ChuniLuminousPlus): + def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None: + super().__init__(core_cfg, game_cfg) + self.version = ChuniConstants.VER_CHUNITHM_VERSE + + async def handle_cm_get_user_preview_api_request(self, data: Dict) -> Dict: + user_data = await super().handle_cm_get_user_preview_api_request(data) + + # Does CARD MAKER 1.35 work this far up? + user_data["lastDataVersion"] = "2.30.00" + return user_data + + async def handle_get_game_course_level_api_request(self, data: Dict) -> Dict: + unlock_challenges = await self.data.static.get_unlock_challenges(self.version) + game_course_level_list = [] + + for unlock_challenge in unlock_challenges: + course_ids = [ + unlock_challenge["courseId1"], + unlock_challenge["courseId2"], + unlock_challenge["courseId3"], + unlock_challenge["courseId4"], + unlock_challenge["courseId5"], + ] + + start_date = unlock_challenge["startDate"].replace( + hour=0, minute=0, second=0 + ) + + for i, course_id in enumerate(course_ids): + start = start_date + timedelta(days=7 * i) + end = start_date + timedelta(days=7 * (i + 1)) - timedelta(seconds=1) + + if i == len(course_ids) - 1: + # If this is the last course, set end date to a far future date + end = datetime(2099, 1, 1) + + game_course_level_list.append( + { + "courseId": course_id, + "startDate": start.strftime(self.date_time_format), + "endDate": end.strftime(self.date_time_format), + } + ) + + return { + "length": len(game_course_level_list), + "gameCourseLevelList": game_course_level_list, + } + + async def handle_get_game_uc_condition_api_request(self, data: Dict) -> Dict: + unlock_challenges = await self.data.static.get_unlock_challenges(self.version) + game_unlock_challenge_condition_list = [] + + conditions = { + # unlock Theatore Creatore (ULTIMA) after clearing map VERSE ep. I + 10001: { + "type": MapAreaConditionType.MAP_CLEARED.value, + "conditionId": 3020798, + }, + # unlock Crossmythos Rhapsodia after clearing map VERSE ep. IV + 10006: { + "type": MapAreaConditionType.MAP_CLEARED.value, + "conditionId": 3020802, + }, + } + + for unlock_challenge in unlock_challenges: + unlock_challenge_id = unlock_challenge["unlockChallengeId"] + + unlock_condition = conditions.get( + unlock_challenge_id, + { + "type": 3, # always unlocked + "conditionId": 0, + }, + ) + + game_unlock_challenge_condition_list.append( + { + "unlockChallengeId": unlock_challenge_id, + "length": 1, + "conditionList": [ + { + "type": unlock_condition["type"], + "conditionId": unlock_condition["conditionId"], + "logicalOpe": MapAreaConditionLogicalOperator.AND.value, + "startDate": unlock_challenge["startDate"].strftime( + self.date_time_format + ), + "endDate": datetime(2099, 1, 1).strftime( + self.date_time_format + ), + } + ], + } + ) + + return { + "length": len(game_unlock_challenge_condition_list), + "gameUnlockChallengeConditionList": game_unlock_challenge_condition_list, + } + + async def handle_get_user_uc_api_request(self, data: Dict) -> Dict: + user_id = data["userId"] + + user_unlock_challenges = await self.data.item.get_unlock_challenges( + user_id, self.version + ) + + user_unlock_challenge_list = [ + { + "unlockChallengeId": user_uc["unlockChallengeId"], + "status": user_uc["status"], + "clearCourseId": user_uc["clearCourseId"], + "conditionType": user_uc["conditionType"], + "score": user_uc["score"], + "life": user_uc["life"], + "clearDate": user_uc["clearDate"].strftime(self.date_time_format), + } + for user_uc in user_unlock_challenges + ] + + return { + "userId": user_id, + "userUnlockChallengeList": user_unlock_challenge_list, + } + + async def handle_get_user_rec_music_api_request(self, data: Dict) -> Dict: + rec_limit = 25 # limit for recommendations + user_id = data["userId"] + user_rec_music_set = set() + + recent_rating = await self.data.profile.get_profile_recent_rating(user_id) + if not recent_rating: + # If no recent ratings, return an empty list + return { + "length": 0, + "userRecMusicList": [], + } + + recent_ratings = recent_rating["recentRating"] + # cache music info + music_info_list = [] + + for recent_rating in recent_ratings: + music_id = recent_rating["musicId"] + music_info = await self.data.static.get_song(music_id) + if music_info: + music_info_list.append(music_info) + + # use a set to avoid duplicates + user_rec_music_set = set() + + # try adding recommendations in order of: title → artist → genre + for field in ("title", "artist", "genre"): + await self._add_recommendations(field, user_rec_music_set, music_info_list, rec_limit) + if len(user_rec_music_set) >= rec_limit: + break + + user_rec_music_list = [ + { + "musicId": 1, # no idea + # recMusicList is a semi colon-separated list of music IDs and their order comma separated + # for some reason, not all music ids are shown in game?! + "recMusicList": ";".join( + f"{music_id},{index + 1}" + for index, music_id in enumerate(user_rec_music_set) + ), + }, + ] + + return { + "length": len(user_rec_music_list), + "userRecMusicList": user_rec_music_list, + } + + async def handle_get_user_rec_rating_api_request(self, data: Dict) -> Dict: + class GetUserRecRatingApi: + class UserRecRating: + ratingMin: int + ratingMax: int + # same as recMusicList in get_user_rec_music_api_request + recMusicList: str + + length: int + userRecRatingList: list[UserRecRating] + + user_id = data["userId"] + + user_rec_rating_list = [] + + return { + "length": len(user_rec_rating_list), + "userRecRatingList": user_rec_rating_list, + } + + async def _add_recommendations( + self, + field: str, + user_rec_music_set: Set[int], + music_info_list: List[Dict], + limit: int = 25, + ) -> None: + """ + Adds music recommendations based on a specific metadata field (title/artist/genre), + excluding music IDs already in the user's recent ratings and recommendations. + """ + # Collect all existing songId to exclude from recommendations + existing_music_ids = { + info["songId"] for info in music_info_list + } + + for music_info in music_info_list: + if len(user_rec_music_set) >= limit: + break + + metadata_value = music_info[field] + if not metadata_value: + continue + + recs = await self.data.static.get_music_by_metadata( + **{field: metadata_value} + ) + for rec in recs or []: + song_id = rec["songId"] + # skip if the song is already in the user's recent ratings + # or if the song is already in the user's recommendations + if ( + len(user_rec_music_set) >= limit + or song_id in existing_music_ids + or song_id in user_rec_music_set + ): + continue + user_rec_music_set.add(song_id)