chuni: initial verse support

This commit is contained in:
2025-03-23 18:53:38 +01:00
parent 9f916a6302
commit 91f06ccfd2
15 changed files with 807 additions and 137 deletions

View File

@ -1,6 +1,9 @@
# Changelog # Changelog
Documenting updates to ARTEMiS, to be updated every time the master branch is pushed to. Documenting updates to ARTEMiS, to be updated every time the master branch is pushed to.
## 20250803
+ CHUNITHM VERSE support added
## 20250327 ## 20250327
+ O.N.G.E.K.I. bright MEMORY Act.3 support added + O.N.G.E.K.I. bright MEMORY Act.3 support added
+ CardMaker support updated + CardMaker support updated

View File

@ -0,0 +1,84 @@
"""CHUNITHM VERSE support
Revision ID: 49c295e89cd4
Revises: f6007bbf057d
Create Date: 2025-03-09 14:10:03.067328
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
from sqlalchemy.sql import func
# revision identifiers, used by Alembic.
revision = "49c295e89cd4"
down_revision = "f6007bbf057d"
branch_labels = None
depends_on = None
def upgrade():
### commands auto generated by Alembic - please adjust! ###
op.add_column("chuni_profile_data", sa.Column("trophyIdSub1", sa.Integer()))
op.add_column("chuni_profile_data", sa.Column("trophyIdSub2", sa.Integer()))
op.add_column("chuni_score_playlog", sa.Column("monthPoint", sa.Integer()))
op.add_column("chuni_score_playlog", sa.Column("eventPoint", sa.Integer()))
op.create_table(
"chuni_static_unlock_challenge",
sa.Column("id", sa.Integer(), primary_key=True, nullable=False),
sa.Column("version", sa.Integer(), nullable=False),
sa.Column("unlockChallengeId", sa.Integer(), nullable=False),
sa.Column("name", sa.String(length=255)),
sa.Column("isEnabled", sa.Boolean(), server_default="1"),
sa.Column("startDate", sa.TIMESTAMP(), server_default=func.now()),
sa.Column("courseId1", sa.Integer()),
sa.Column("courseId2", sa.Integer()),
sa.Column("courseId3", sa.Integer()),
sa.Column("courseId4", sa.Integer()),
sa.Column("courseId5", sa.Integer()),
sa.UniqueConstraint(
"version", "unlockChallengeId", name="chuni_static_unlock_challenge_uk"
),
mysql_charset="utf8mb4",
)
op.create_tablee(
"chuni_item_unlock_challenge",
sa.Column("id", sa.Integer(), primary_key=True, nullable=False),
sa.Column("version", sa.Integer(), nullable=False),
sa.Column(
"user",
sa.ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
sa.Column("unlockChallengeId", sa.Integer(), nullable=False),
sa.Column("status", sa.Integer()),
sa.Column("clearCourseId", sa.Integer()),
sa.Column("conditionType", sa.Integer()),
sa.Column("score", sa.Integer()),
sa.Column("life", sa.Integer()),
sa.Column("clearDate", sa.Integer()),
sa.UniqueConstraint(
"version",
"user",
"unlockChallengeId",
name="chuni_item_unlock_challenge_uk",
),
mysql_charset="utf8mb4",
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("chuni_score_playlog", "eventPoint")
op.drop_column("chuni_score_playlog", "monthPoint")
op.drop_column("chuni_profile_data", "trophyIdSub2")
op.drop_column("chuni_profile_data", "trophyIdSub1")
op.drop_table("chuni_static_unlock_challenge")
op.drop_table("chuni_item_unlock_challenge")
# ### end Alembic commands ###

View File

@ -68,6 +68,7 @@ Games listed below have been tested and confirmed working.
| 14 | CHUNITHM SUN PLUS | | 14 | CHUNITHM SUN PLUS |
| 15 | CHUNITHM LUMINOUS | | 15 | CHUNITHM LUMINOUS |
| 16 | CHUNITHM LUMINOUS PLUS | | 16 | CHUNITHM LUMINOUS PLUS |
| 17 | CHUNITHM VERSE |
### Importer ### Importer

View File

@ -44,6 +44,9 @@ version:
16: 16:
rom: 2.25.00 rom: 2.25.00
data: 2.25.00 data: 2.25.00
17:
rom: 2.30.00
data: 2.30.00
crypto: crypto:
encrypted_only: False encrypted_only: False

View File

@ -38,6 +38,7 @@ Games listed below have been tested and confirmed working. Only game versions ol
+ SUN PLUS + SUN PLUS
+ LUMINOUS + LUMINOUS
+ LUMINOUS PLUS + LUMINOUS PLUS
+ VERSE
+ crossbeats REV. + crossbeats REV.
+ Crossbeats REV. + Crossbeats REV.

View File

@ -53,7 +53,9 @@ class ChuniBase:
if not self.game_cfg.mods.use_login_bonus: if not self.game_cfg.mods.use_login_bonus:
return {"returnCode": 1} return {"returnCode": 1}
login_bonus_presets = await self.data.static.get_login_bonus_presets(self.version) login_bonus_presets = await self.data.static.get_login_bonus_presets(
self.version
)
for preset in login_bonus_presets: for preset in login_bonus_presets:
# check if a user already has some pogress and if not add the # check if a user already has some pogress and if not add the
@ -197,15 +199,21 @@ class ChuniBase:
async def handle_get_game_message_api_request(self, data: Dict) -> Dict: async def handle_get_game_message_api_request(self, data: Dict) -> Dict:
return { return {
"type": data["type"], "type": data["type"],
"length": 1, "length": 1,
"gameMessageList": [{ "gameMessageList": [
"id": 1, {
"type": 1, "id": 1,
"message": f"Welcome to {self.core_cfg.server.name} network!" if not self.game_cfg.server.news_msg else self.game_cfg.server.news_msg, "type": 1,
"startDate": "2017-12-05 07:00:00.0", "message": (
"endDate": "2099-12-31 00:00:00.0" f"Welcome to {self.core_cfg.server.name} network!"
}] if not self.game_cfg.server.news_msg
else self.game_cfg.server.news_msg
),
"startDate": "2017-12-05 07:00:00.0",
"endDate": "2099-12-31 00:00:00.0",
}
],
} }
async def handle_get_game_ranking_api_request(self, data: Dict) -> Dict: async def handle_get_game_ranking_api_request(self, data: Dict) -> Dict:
@ -217,7 +225,10 @@ class ChuniBase:
async def handle_get_game_setting_api_request(self, data: Dict) -> Dict: async def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
# if reboot start/end time is not defined use the default behavior of being a few hours ago # if reboot start/end time is not defined use the default behavior of being a few hours ago
if self.core_cfg.title.reboot_start_time == "" or self.core_cfg.title.reboot_end_time == "": if (
self.core_cfg.title.reboot_start_time == ""
or self.core_cfg.title.reboot_end_time == ""
):
reboot_start = datetime.strftime( reboot_start = datetime.strftime(
datetime.utcnow() + timedelta(hours=6), self.date_time_format datetime.utcnow() + timedelta(hours=6), self.date_time_format
) )
@ -226,15 +237,29 @@ class ChuniBase:
) )
else: else:
# get current datetime in JST # get current datetime in JST
current_jst = datetime.now(pytz.timezone('Asia/Tokyo')).date() current_jst = datetime.now(pytz.timezone("Asia/Tokyo")).date()
# parse config start/end times into datetime # parse config start/end times into datetime
reboot_start_time = datetime.strptime(self.core_cfg.title.reboot_start_time, "%H:%M") reboot_start_time = datetime.strptime(
reboot_end_time = datetime.strptime(self.core_cfg.title.reboot_end_time, "%H:%M") self.core_cfg.title.reboot_start_time, "%H:%M"
)
reboot_end_time = datetime.strptime(
self.core_cfg.title.reboot_end_time, "%H:%M"
)
# offset datetimes with current date/time # offset datetimes with current date/time
reboot_start_time = reboot_start_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo')) reboot_start_time = reboot_start_time.replace(
reboot_end_time = reboot_end_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo')) year=current_jst.year,
month=current_jst.month,
day=current_jst.day,
tzinfo=pytz.timezone("Asia/Tokyo"),
)
reboot_end_time = reboot_end_time.replace(
year=current_jst.year,
month=current_jst.month,
day=current_jst.day,
tzinfo=pytz.timezone("Asia/Tokyo"),
)
# create strings for use in gameSetting # create strings for use in gameSetting
reboot_start = reboot_start_time.strftime(self.date_time_format) reboot_start = reboot_start_time.strftime(self.date_time_format)
@ -255,6 +280,7 @@ class ChuniBase:
"isDumpUpload": "false", "isDumpUpload": "false",
"isAou": "false", "isAou": "false",
} }
async def handle_get_user_activity_api_request(self, data: Dict) -> Dict: async def handle_get_user_activity_api_request(self, data: Dict) -> Dict:
user_activity_list = await self.data.profile.get_profile_activity( user_activity_list = await self.data.profile.get_profile_activity(
data["userId"], data["kind"] data["userId"], data["kind"]
@ -285,7 +311,7 @@ class ChuniBase:
rows = await self.data.item.get_characters( rows = await self.data.item.get_characters(
user_id, limit=max_ct + 1, offset=next_idx user_id, limit=max_ct + 1, offset=next_idx
) )
if rows is None or len(rows) == 0: if rows is None or len(rows) == 0:
return { return {
"userId": user_id, "userId": user_id,
@ -335,7 +361,7 @@ class ChuniBase:
return { return {
"userId": data["userId"], "userId": data["userId"],
"length": 0, "length": 0,
"userRecentPlayerList": [], # playUserId, playUserName, playDate, friendPoint "userRecentPlayerList": [], # playUserId, playUserName, playDate, friendPoint
} }
async def handle_get_user_course_api_request(self, data: Dict) -> Dict: async def handle_get_user_course_api_request(self, data: Dict) -> Dict:
@ -421,15 +447,9 @@ class ChuniBase:
p = await self.data.profile.get_rival(data["rivalId"]) p = await self.data.profile.get_rival(data["rivalId"])
if p is None: if p is None:
return {} return {}
userRivalData = { userRivalData = {"rivalId": p.user, "rivalName": p.userName}
"rivalId": p.user, return {"userId": data["userId"], "userRivalData": userRivalData}
"rivalName": p.userName
}
return {
"userId": data["userId"],
"userRivalData": userRivalData
}
async def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict: async def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict:
user_id = int(data["userId"]) user_id = int(data["userId"])
rival_id = int(data["rivalId"]) rival_id = int(data["rivalId"])
@ -459,18 +479,25 @@ class ChuniBase:
# note that itertools.groupby will only work on sorted keys, which is already sorted by # note that itertools.groupby will only work on sorted keys, which is already sorted by
# the query in get_scores # the query in get_scores
for music_id, details_iter in itertools.groupby(music_details, key=lambda x: x["musicId"]): for music_id, details_iter in itertools.groupby(
music_details, key=lambda x: x["musicId"]
):
details: list[dict[Any, Any]] = [ details: list[dict[Any, Any]] = [
{"level": d["level"], "scoreMax": d["scoreMax"]} {"level": d["level"], "scoreMax": d["scoreMax"]} for d in details_iter
for d in details_iter
] ]
music_list.append({"musicId": music_id, "length": len(details), "userRivalMusicDetailList": details}) music_list.append(
{
"musicId": music_id,
"length": len(details),
"userRivalMusicDetailList": details,
}
)
returned_music_details_count += len(details) returned_music_details_count += len(details)
if len(music_list) >= max_ct: if len(music_list) >= max_ct:
break break
# if we returned fewer PBs than we originally asked for from the database, that means # if we returned fewer PBs than we originally asked for from the database, that means
# we queried for the PBs of max_ct + 1 songs. # we queried for the PBs of max_ct + 1 songs.
if returned_music_details_count < len(rows): if returned_music_details_count < len(rows):
@ -485,7 +512,7 @@ class ChuniBase:
"nextIndex": next_idx, "nextIndex": next_idx,
"userRivalMusicList": music_list, "userRivalMusicList": music_list,
} }
async def handle_get_user_favorite_item_api_request(self, data: Dict) -> Dict: async def handle_get_user_favorite_item_api_request(self, data: Dict) -> Dict:
user_id = int(data["userId"]) user_id = int(data["userId"])
next_idx = int(data["nextIndex"]) next_idx = int(data["nextIndex"])
@ -571,7 +598,9 @@ class ChuniBase:
async def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict: async def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict:
user_id = data["userId"] user_id = data["userId"]
user_login_bonus = await self.data.item.get_all_login_bonus(user_id, self.version) user_login_bonus = await self.data.item.get_all_login_bonus(
user_id, self.version
)
# ignore the loginBonus request if its disabled in config # ignore the loginBonus request if its disabled in config
if user_login_bonus is None or not self.game_cfg.mods.use_login_bonus: if user_login_bonus is None or not self.game_cfg.mods.use_login_bonus:
return {"userId": user_id, "length": 0, "userLoginBonusList": []} return {"userId": user_id, "length": 0, "userLoginBonusList": []}
@ -621,7 +650,7 @@ class ChuniBase:
rows = await self.data.score.get_scores( rows = await self.data.score.get_scores(
user_id, limit=max_ct + 1, offset=next_idx user_id, limit=max_ct + 1, offset=next_idx
) )
if rows is None or len(rows) == 0: if rows is None or len(rows) == 0:
return { return {
"userId": user_id, "userId": user_id,
@ -636,7 +665,9 @@ class ChuniBase:
# note that itertools.groupby will only work on sorted keys, which is already sorted by # note that itertools.groupby will only work on sorted keys, which is already sorted by
# the query in get_scores # the query in get_scores
for _music_id, details_iter in itertools.groupby(music_details, key=lambda x: x["musicId"]): for _music_id, details_iter in itertools.groupby(
music_details, key=lambda x: x["musicId"]
):
details: list[dict[Any, Any]] = [] details: list[dict[Any, Any]] = []
for d in details_iter: for d in details_iter:
@ -650,14 +681,14 @@ class ChuniBase:
if len(music_list) >= max_ct: if len(music_list) >= max_ct:
break break
# if we returned fewer PBs than we originally asked for from the database, that means # if we returned fewer PBs than we originally asked for from the database, that means
# we queried for the PBs of max_ct + 1 songs. # we queried for the PBs of max_ct + 1 songs.
if returned_music_details_count < len(rows): if returned_music_details_count < len(rows):
next_idx += max_ct next_idx += max_ct
else: else:
next_idx = -1 next_idx = -1
return { return {
"userId": user_id, "userId": user_id,
"length": len(music_list), "length": len(music_list),
@ -687,7 +718,9 @@ class ChuniBase:
return bytes([ord(c) for c in src]).decode("utf-8") return bytes([ord(c) for c in src]).decode("utf-8")
async def handle_get_user_preview_api_request(self, data: Dict) -> Dict: async def handle_get_user_preview_api_request(self, data: Dict) -> Dict:
profile = await self.data.profile.get_profile_preview(data["userId"], self.version) profile = await self.data.profile.get_profile_preview(
data["userId"], self.version
)
if profile is None: if profile is None:
return None return None
profile_character = await self.data.item.get_character( profile_character = await self.data.item.get_character(
@ -729,7 +762,9 @@ class ChuniBase:
} }
async def handle_get_user_recent_rating_api_request(self, data: Dict) -> Dict: async def handle_get_user_recent_rating_api_request(self, data: Dict) -> Dict:
recent_rating_list = await self.data.profile.get_profile_recent_rating(data["userId"]) recent_rating_list = await self.data.profile.get_profile_recent_rating(
data["userId"]
)
if recent_rating_list is None: if recent_rating_list is None:
return { return {
"userId": data["userId"], "userId": data["userId"],
@ -762,7 +797,7 @@ class ChuniBase:
profile = await self.data.profile.get_profile_data(data["userId"], self.version) profile = await self.data.profile.get_profile_data(data["userId"], self.version)
if profile is None: if profile is None:
return {"userId": data["userId"], "teamId": 0} return {"userId": data["userId"], "teamId": 0}
if profile and profile["teamId"]: if profile and profile["teamId"]:
# Get team by id # Get team by id
@ -787,7 +822,7 @@ class ChuniBase:
"teamId": team_id, "teamId": team_id,
"teamRank": team_rank, "teamRank": team_rank,
"teamName": team_name, "teamName": team_name,
"assaultTimeRate": 1, # TODO: Figure out assaultTime, which might be team point boost? "assaultTimeRate": 1, # TODO: Figure out assaultTime, which might be team point boost?
"userTeamPoint": { "userTeamPoint": {
"userId": data["userId"], "userId": data["userId"],
"teamId": team_id, "teamId": team_id,
@ -796,7 +831,7 @@ class ChuniBase:
"aggrDate": data["playDate"], "aggrDate": data["playDate"],
}, },
} }
async def handle_get_team_course_setting_api_request(self, data: Dict) -> Dict: async def handle_get_team_course_setting_api_request(self, data: Dict) -> Dict:
return { return {
"userId": data["userId"], "userId": data["userId"],
@ -805,7 +840,9 @@ class ChuniBase:
"teamCourseSettingList": [], "teamCourseSettingList": [],
} }
async def handle_get_team_course_setting_api_request_proto(self, data: Dict) -> Dict: async def handle_get_team_course_setting_api_request_proto(
self, data: Dict
) -> Dict:
return { return {
"userId": data["userId"], "userId": data["userId"],
"length": 1, "length": 1,
@ -820,11 +857,11 @@ class ChuniBase:
"teamCourseMusicList": [ "teamCourseMusicList": [
{"track": 184, "type": 1, "level": 3, "selectLevel": -1}, {"track": 184, "type": 1, "level": 3, "selectLevel": -1},
{"track": 184, "type": 1, "level": 3, "selectLevel": -1}, {"track": 184, "type": 1, "level": 3, "selectLevel": -1},
{"track": 184, "type": 1, "level": 3, "selectLevel": -1} {"track": 184, "type": 1, "level": 3, "selectLevel": -1},
], ],
"teamCourseRankingInfoList": [], "teamCourseRankingInfoList": [],
"recodeDate": "2099-12-31 11:59:99.0", "recodeDate": "2099-12-31 11:59:99.0",
"isPlayed": False "isPlayed": False,
} }
], ],
} }
@ -834,7 +871,7 @@ class ChuniBase:
"userId": data["userId"], "userId": data["userId"],
"length": 0, "length": 0,
"nextIndex": -1, "nextIndex": -1,
"teamCourseRuleList": [] "teamCourseRuleList": [],
} }
async def handle_get_team_course_rule_api_request_proto(self, data: Dict) -> Dict: async def handle_get_team_course_rule_api_request_proto(self, data: Dict) -> Dict:
@ -849,7 +886,7 @@ class ChuniBase:
"damageMiss": 1, "damageMiss": 1,
"damageAttack": 1, "damageAttack": 1,
"damageJustice": 1, "damageJustice": 1,
"damageJusticeC": 1 "damageJusticeC": 1,
} }
], ],
} }
@ -860,7 +897,7 @@ class ChuniBase:
if int(user_id) & 0x1000000000001 == 0x1000000000001: if int(user_id) & 0x1000000000001 == 0x1000000000001:
place_id = int(user_id) & 0xFFFC00000000 place_id = int(user_id) & 0xFFFC00000000
self.logger.info("Guest play from place ID %d, ignoring.", place_id) self.logger.info("Guest play from place ID %d, ignoring.", place_id)
return {"returnCode": "1"} return {"returnCode": "1"}
@ -882,7 +919,9 @@ class ChuniBase:
) )
if "userGameOption" in upsert: if "userGameOption" in upsert:
await self.data.profile.put_profile_option(user_id, upsert["userGameOption"][0]) await self.data.profile.put_profile_option(
user_id, upsert["userGameOption"][0]
)
if "userGameOptionEx" in upsert: if "userGameOptionEx" in upsert:
await self.data.profile.put_profile_option_ex( await self.data.profile.put_profile_option_ex(
@ -929,33 +968,41 @@ class ChuniBase:
for playlog in upsert["userPlaylogList"]: for playlog in upsert["userPlaylogList"]:
# convert the player names to utf-8 # convert the player names to utf-8
if playlog["playedUserName1"] is not None: if playlog["playedUserName1"] is not None:
playlog["playedUserName1"] = self.read_wtf8(playlog["playedUserName1"]) playlog["playedUserName1"] = self.read_wtf8(
playlog["playedUserName1"]
)
if playlog["playedUserName2"] is not None: if playlog["playedUserName2"] is not None:
playlog["playedUserName2"] = self.read_wtf8(playlog["playedUserName2"]) playlog["playedUserName2"] = self.read_wtf8(
playlog["playedUserName2"]
)
if playlog["playedUserName3"] is not None: if playlog["playedUserName3"] is not None:
playlog["playedUserName3"] = self.read_wtf8(playlog["playedUserName3"]) playlog["playedUserName3"] = self.read_wtf8(
playlog["playedUserName3"]
)
await self.data.score.put_playlog(user_id, playlog, self.version) await self.data.score.put_playlog(user_id, playlog, self.version)
if "userTeamPoint" in upsert: if "userTeamPoint" in upsert:
team_points = upsert["userTeamPoint"] team_points = upsert["userTeamPoint"]
try: try:
for tp in team_points: for tp in team_points:
if tp["teamId"] != '65535': if tp["teamId"] != "65535":
# Fetch the current team data # Fetch the current team data
current_team = await self.data.profile.get_team_by_id(tp["teamId"]) current_team = await self.data.profile.get_team_by_id(
tp["teamId"]
)
# Calculate the new teamPoint # Calculate the new teamPoint
new_team_point = int(tp["teamPoint"]) + current_team["teamPoint"] new_team_point = (
int(tp["teamPoint"]) + current_team["teamPoint"]
)
# Prepare the data to update # Prepare the data to update
team_data = { team_data = {"teamPoint": new_team_point}
"teamPoint": new_team_point
}
# Update the team data # Update the team data
await self.data.profile.update_team(tp["teamId"], team_data) await self.data.profile.update_team(tp["teamId"], team_data)
except: except:
pass # Probably a better way to catch if the team is not set yet (new profiles), but let's just pass pass # Probably a better way to catch if the team is not set yet (new profiles), but let's just pass
if "userMapAreaList" in upsert: if "userMapAreaList" in upsert:
for map_area in upsert["userMapAreaList"]: for map_area in upsert["userMapAreaList"]:
await self.data.item.put_map_area(user_id, map_area) await self.data.item.put_map_area(user_id, map_area)
@ -973,22 +1020,28 @@ class ChuniBase:
await self.data.item.put_login_bonus( await self.data.item.put_login_bonus(
user_id, self.version, login["presetId"], isWatched=True user_id, self.version, login["presetId"], isWatched=True
) )
if "userRecentPlayerList" in upsert: # TODO: Seen in Air, maybe implement sometime if (
"userRecentPlayerList" in upsert
): # TODO: Seen in Air, maybe implement sometime
for rp in upsert["userRecentPlayerList"]: for rp in upsert["userRecentPlayerList"]:
pass pass
for rating_type in {"userRatingBaseList", "userRatingBaseHotList", "userRatingBaseNextList"}: for rating_type in {
"userRatingBaseList",
"userRatingBaseHotList",
"userRatingBaseNextList",
}:
if rating_type not in upsert: if rating_type not in upsert:
continue continue
await self.data.profile.put_profile_rating( await self.data.profile.put_profile_rating(
user_id, user_id,
self.version, self.version,
rating_type, rating_type,
upsert[rating_type], upsert[rating_type],
) )
# added in LUMINOUS # added in LUMINOUS
if "userCMissionList" in upsert: if "userCMissionList" in upsert:
for cmission in upsert["userCMissionList"]: for cmission in upsert["userCMissionList"]:
@ -1003,7 +1056,9 @@ class ChuniBase:
) )
for progress in cmission["userCMissionProgressList"]: for progress in cmission["userCMissionProgressList"]:
await self.data.item.put_cmission_progress(user_id, mission_id, progress) await self.data.item.put_cmission_progress(
user_id, mission_id, progress
)
if "userNetBattleData" in upsert: if "userNetBattleData" in upsert:
net_battle = upsert["userNetBattleData"][0] net_battle = upsert["userNetBattleData"][0]
@ -1035,10 +1090,20 @@ class ChuniBase:
added_ids = music_ids - keep_ids added_ids = music_ids - keep_ids
for fav_id in deleted_ids: for fav_id in deleted_ids:
await self.data.item.delete_favorite_music(user_id, self.version, fav_id) await self.data.item.delete_favorite_music(
user_id, self.version, fav_id
)
for fav_id in added_ids: for fav_id in added_ids:
await self.data.item.put_favorite_music(user_id, self.version, fav_id) await self.data.item.put_favorite_music(user_id, self.version, fav_id)
# added in CHUNITHM VERSE
if "userUnlockChallengeList" in upsert:
for unlock_challenge in upsert["userUnlockChallengeList"]:
await self.data.item.put_unlock_challenge(
user_id, self.version, unlock_challenge
)
return {"returnCode": "1"} return {"returnCode": "1"}

View File

@ -28,6 +28,7 @@ class ChuniConstants:
VER_CHUNITHM_SUN_PLUS = 14 VER_CHUNITHM_SUN_PLUS = 14
VER_CHUNITHM_LUMINOUS = 15 VER_CHUNITHM_LUMINOUS = 15
VER_CHUNITHM_LUMINOUS_PLUS = 16 VER_CHUNITHM_LUMINOUS_PLUS = 16
VER_CHUNITHM_VERSE = 17
VERSION_NAMES = [ VERSION_NAMES = [
"CHUNITHM", "CHUNITHM",
@ -47,6 +48,7 @@ class ChuniConstants:
"CHUNITHM SUN PLUS", "CHUNITHM SUN PLUS",
"CHUNITHM LUMINOUS", "CHUNITHM LUMINOUS",
"CHUNITHM LUMINOUS PLUS", "CHUNITHM LUMINOUS PLUS",
"CHUNITHM VERSE"
] ]
SCORE_RANK_INTERVALS_OLD = [ SCORE_RANK_INTERVALS_OLD = [

View File

@ -37,6 +37,7 @@ from .sun import ChuniSun
from .sunplus import ChuniSunPlus from .sunplus import ChuniSunPlus
from .luminous import ChuniLuminous from .luminous import ChuniLuminous
from .luminousplus import ChuniLuminousPlus from .luminousplus import ChuniLuminousPlus
from .verse import ChuniVerse
class ChuniServlet(BaseServlet): class ChuniServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None: def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
@ -66,6 +67,7 @@ class ChuniServlet(BaseServlet):
ChuniSunPlus, ChuniSunPlus,
ChuniLuminous, ChuniLuminous,
ChuniLuminousPlus, ChuniLuminousPlus,
ChuniVerse
] ]
self.logger = logging.getLogger("chuni") self.logger = logging.getLogger("chuni")
@ -113,6 +115,7 @@ class ChuniServlet(BaseServlet):
f"{ChuniConstants.VER_CHUNITHM_LUMINOUS}_int": 8, f"{ChuniConstants.VER_CHUNITHM_LUMINOUS}_int": 8,
f"{ChuniConstants.VER_CHUNITHM_LUMINOUS}_chn": 8, f"{ChuniConstants.VER_CHUNITHM_LUMINOUS}_chn": 8,
ChuniConstants.VER_CHUNITHM_LUMINOUS_PLUS: 56, ChuniConstants.VER_CHUNITHM_LUMINOUS_PLUS: 56,
ChuniConstants.VER_CHUNITHM_VERSE: 42,
} }
for version, keys in self.game_cfg.crypto.keys.items(): for version, keys in self.game_cfg.crypto.keys.items():
@ -218,9 +221,9 @@ class ChuniServlet(BaseServlet):
] ]
async def render_POST(self, request: Request) -> bytes: async def render_POST(self, request: Request) -> bytes:
endpoint: str = request.path_params.get('endpoint') endpoint: str = request.path_params.get("endpoint")
version: int = request.path_params.get('version') version: int = request.path_params.get("version")
game_code: str = request.path_params.get('game') game_code: str = request.path_params.get("game")
if endpoint.lower() == "ping": if endpoint.lower() == "ping":
return Response(zlib.compress(b'{"returnCode": "1"}')) return Response(zlib.compress(b'{"returnCode": "1"}'))
@ -264,8 +267,10 @@ class ChuniServlet(BaseServlet):
internal_ver = ChuniConstants.VER_CHUNITHM_SUN_PLUS internal_ver = ChuniConstants.VER_CHUNITHM_SUN_PLUS
elif version >= 220 and version < 225: # LUMINOUS elif version >= 220 and version < 225: # LUMINOUS
internal_ver = ChuniConstants.VER_CHUNITHM_LUMINOUS internal_ver = ChuniConstants.VER_CHUNITHM_LUMINOUS
elif version >= 225: # LUMINOUS PLUS elif version >= 225 and version < 230: # LUMINOUS PLUS
internal_ver = ChuniConstants.VER_CHUNITHM_LUMINOUS_PLUS internal_ver = ChuniConstants.VER_CHUNITHM_LUMINOUS_PLUS
elif version >= 230: # VERSE
internal_ver = ChuniConstants.VER_CHUNITHM_VERSE
elif game_code == "SDGS": # Int elif game_code == "SDGS": # Int
if version < 105: # SUPERSTAR if version < 105: # SUPERSTAR
internal_ver = ChuniConstants.VER_CHUNITHM_CRYSTAL_PLUS internal_ver = ChuniConstants.VER_CHUNITHM_CRYSTAL_PLUS
@ -380,7 +385,7 @@ class ChuniServlet(BaseServlet):
handler_cls = self.versions[internal_ver](self.core_cfg, self.game_cfg) handler_cls = self.versions[internal_ver](self.core_cfg, self.game_cfg)
if not hasattr(handler_cls, func_to_find): if not hasattr(handler_cls, func_to_find):
self.logger.warning(f"Unhandled v{version} request {endpoint}") self.logger.warning(f"Unhandled v{version} request {func_to_find}")
resp = {"returnCode": 1} resp = {"returnCode": 1}
else: else:

View File

@ -28,16 +28,18 @@ class ChuniNew(ChuniBase):
def _interal_ver_to_intver(self) -> str: def _interal_ver_to_intver(self) -> str:
if self.version == ChuniConstants.VER_CHUNITHM_NEW: if self.version == ChuniConstants.VER_CHUNITHM_NEW:
return "200" return "200"
if self.version == ChuniConstants.VER_CHUNITHM_NEW_PLUS: elif self.version == ChuniConstants.VER_CHUNITHM_NEW_PLUS:
return "205" return "205"
if self.version == ChuniConstants.VER_CHUNITHM_SUN: elif self.version == ChuniConstants.VER_CHUNITHM_SUN:
return "210" return "210"
if self.version == ChuniConstants.VER_CHUNITHM_SUN_PLUS: elif self.version == ChuniConstants.VER_CHUNITHM_SUN_PLUS:
return "215" return "215"
if self.version == ChuniConstants.VER_CHUNITHM_LUMINOUS: elif self.version == ChuniConstants.VER_CHUNITHM_LUMINOUS:
return "220" return "220"
if self.version == ChuniConstants.VER_CHUNITHM_LUMINOUS_PLUS: elif self.version == ChuniConstants.VER_CHUNITHM_LUMINOUS_PLUS:
return "225" return "225"
elif self.version == ChuniConstants.VER_CHUNITHM_VERSE:
return "230"
async def handle_get_game_setting_api_request(self, data: Dict) -> Dict: async def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
# use UTC time and convert it to JST time by adding +9 # use UTC time and convert it to JST time by adding +9

View File

@ -62,6 +62,7 @@ class ChuniReader(BaseReader):
await self.read_character(f"{dir}/chara", dds_images, this_opt_id) await self.read_character(f"{dir}/chara", dds_images, this_opt_id)
await self.read_map_icon(f"{dir}/mapIcon", this_opt_id) await self.read_map_icon(f"{dir}/mapIcon", this_opt_id)
await self.read_system_voice(f"{dir}/systemVoice", this_opt_id) await self.read_system_voice(f"{dir}/systemVoice", this_opt_id)
await self.read_unlock_challenge(f"{dir}/unlockChallenge")
async def read_login_bonus(self, root_dir: str, opt_id: Optional[int] = None) -> None: async def read_login_bonus(self, root_dir: str, opt_id: Optional[int] = None) -> None:
for root, dirs, files in walk(f"{root_dir}loginBonusPreset"): for root, dirs, files in walk(f"{root_dir}loginBonusPreset"):
@ -499,6 +500,38 @@ class ChuniReader(BaseReader):
self.logger.info(f"Opt folder {opt_folder} (Database ID {opt_id}) contains {data_config['Version']['Name']} v{data_config['Version']['VerMajor']}.{data_config['Version']['VerMinor']}.{opt_seq}") self.logger.info(f"Opt folder {opt_folder} (Database ID {opt_id}) contains {data_config['Version']['Name']} v{data_config['Version']['VerMajor']}.{data_config['Version']['VerMinor']}.{opt_seq}")
return opt_id return opt_id
async def read_unlock_challenge(self, uc_dir: str) -> None:
for root, dirs, files in walk(uc_dir):
for dir in dirs:
if path.exists(f"{root}/{dir}/UnlockChallenge.xml"):
with open(f"{root}/{dir}/UnlockChallenge.xml", "r", encoding="utf-8") as fp:
strdata = fp.read()
xml_root = ET.fromstring(strdata)
for name in xml_root.findall("name"):
id = name.find("id").text
name = name.find("str").text
course_ids = []
for course in xml_root.find("musicList/list/UnlockChallengeMusicListSubData/unlockChallengeMusicData/courseList/list").findall("UnlockChallengeCourseListSubData"):
course_id = course.find("unlockChallengeCourseData/courseName").find("id").text
course_ids.append(course_id)
# Build keyword arguments dynamically for up to 5 course IDs
course_kwargs = {
f"course_id{i+1}": course_ids[i]
for i in range(min(5, len(course_ids)))
}
result = await self.data.static.put_unlock_challenge(
self.version, id, name,
**course_kwargs
)
if result is not None:
self.logger.info(f"Inserted unlock challenge {id}")
else:
self.logger.warning(f"Failed to unlock challenge {id}")
def copy_image(self, filename: str, src_dir: str, dst_dir: str) -> None: def copy_image(self, filename: str, src_dir: str, dst_dir: str) -> None:
# Convert the image to png so we can easily display it in the frontend # Convert the image to png so we can easily display it in the frontend
file_src = path.join(src_dir, filename) file_src = path.join(src_dir, filename)

View File

@ -262,7 +262,11 @@ cmission_progress = Table(
"chuni_item_cmission_progress", "chuni_item_cmission_progress",
metadata, metadata,
Column("id", Integer, primary_key=True, nullable=False), Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False), Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("missionId", Integer, nullable=False), Column("missionId", Integer, nullable=False),
Column("order", Integer), Column("order", Integer),
Column("stage", Integer), Column("stage", Integer),
@ -273,14 +277,34 @@ cmission_progress = Table(
mysql_charset="utf8mb4", mysql_charset="utf8mb4",
) )
unlock_challenge = Table(
"chuni_item_unlock_challenge",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("unlockChallengeId", Integer, nullable=False),
Column("status", Integer),
Column("clearCourseId", Integer),
Column("conditionType", Integer),
Column("score", Integer),
Column("life", Integer),
Column("clearDate", TIMESTAMP),
UniqueConstraint(
"version", "user", "unlockChallengeId", name="chuni_item_unlock_challenge_uk"
),
mysql_charset="utf8mb4",
)
class ChuniItemData(BaseData): class ChuniItemData(BaseData):
async def get_oldest_free_matching(self, version: int) -> Optional[Row]: async def get_oldest_free_matching(self, version: int) -> Optional[Row]:
sql = matching.select( sql = matching.select(
and_( and_(matching.c.version == version, matching.c.isFull == False)
matching.c.version == version,
matching.c.isFull == False
)
).order_by(matching.c.roomId.asc()) ).order_by(matching.c.roomId.asc())
result = await self.execute(sql) result = await self.execute(sql)
@ -289,11 +313,9 @@ class ChuniItemData(BaseData):
return result.fetchone() return result.fetchone()
async def get_newest_matching(self, version: int) -> Optional[Row]: async def get_newest_matching(self, version: int) -> Optional[Row]:
sql = matching.select( sql = matching.select(and_(matching.c.version == version)).order_by(
and_( matching.c.roomId.desc()
matching.c.version == version )
)
).order_by(matching.c.roomId.desc())
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
@ -301,11 +323,7 @@ class ChuniItemData(BaseData):
return result.fetchone() return result.fetchone()
async def get_all_matchings(self, version: int) -> Optional[List[Row]]: async def get_all_matchings(self, version: int) -> Optional[List[Row]]:
sql = matching.select( sql = matching.select(and_(matching.c.version == version))
and_(
matching.c.version == version
)
)
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
@ -329,7 +347,7 @@ class ChuniItemData(BaseData):
matching_member_info_list: List, matching_member_info_list: List,
user_id: int = None, user_id: int = None,
rest_sec: int = 60, rest_sec: int = 60,
is_full: bool = False is_full: bool = False,
) -> Optional[int]: ) -> Optional[int]:
sql = insert(matching).values( sql = insert(matching).values(
roomId=room_id, roomId=room_id,
@ -452,23 +470,31 @@ class ChuniItemData(BaseData):
return None return None
return result.fetchone() return result.fetchone()
async def put_favorite_music(self, user_id: int, version: int, music_id: int) -> Optional[int]: async def put_favorite_music(
sql = insert(favorite).values(user=user_id, version=version, favId=music_id, favKind=1) self, user_id: int, version: int, music_id: int
) -> Optional[int]:
sql = insert(favorite).values(
user=user_id, version=version, favId=music_id, favKind=1
)
conflict = sql.on_duplicate_key_update(user=user_id, version=version, favId=music_id, favKind=1) conflict = sql.on_duplicate_key_update(
user=user_id, version=version, favId=music_id, favKind=1
)
result = await self.execute(conflict) result = await self.execute(conflict)
if result is None: if result is None:
return None return None
return result.lastrowid return result.lastrowid
async def delete_favorite_music(self, user_id: int, version: int, music_id: int) -> Optional[int]: async def delete_favorite_music(
self, user_id: int, version: int, music_id: int
) -> Optional[int]:
sql = delete(favorite).where( sql = delete(favorite).where(
and_( and_(
favorite.c.user==user_id, favorite.c.user == user_id,
favorite.c.version==version, favorite.c.version == version,
favorite.c.favId==music_id, favorite.c.favId == music_id,
favorite.c.favKind==1 favorite.c.favKind == 1,
) )
) )
@ -611,8 +637,12 @@ class ChuniItemData(BaseData):
return None return None
return result.lastrowid return result.lastrowid
async def get_map_areas(self, user_id: int, map_area_ids: List[int]) -> Optional[List[Row]]: async def get_map_areas(
sql = select(map_area).where(map_area.c.user == user_id, map_area.c.mapAreaId.in_(map_area_ids)) self, user_id: int, map_area_ids: List[int]
) -> Optional[List[Row]]:
sql = select(map_area).where(
map_area.c.user == user_id, map_area.c.mapAreaId.in_(map_area_ids)
)
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
@ -713,7 +743,7 @@ class ChuniItemData(BaseData):
) )
return None return None
return result.lastrowid return result.lastrowid
async def put_cmission_progress( async def put_cmission_progress(
self, user_id: int, mission_id: int, progress_data: Dict self, user_id: int, mission_id: int, progress_data: Dict
) -> Optional[int]: ) -> Optional[int]:
@ -723,10 +753,10 @@ class ChuniItemData(BaseData):
sql = insert(cmission_progress).values(**progress_data) sql = insert(cmission_progress).values(**progress_data)
conflict = sql.on_duplicate_key_update(**progress_data) conflict = sql.on_duplicate_key_update(**progress_data)
result = await self.execute(conflict) result = await self.execute(conflict)
if result is None: if result is None:
return None return None
return result.lastrowid return result.lastrowid
async def get_cmission_progress( async def get_cmission_progress(
@ -739,21 +769,21 @@ class ChuniItemData(BaseData):
) )
).order_by(cmission_progress.c.order.asc()) ).order_by(cmission_progress.c.order.asc())
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
return None return None
return result.fetchall() return result.fetchall()
async def get_cmission(self, user_id: int, mission_id: int) -> Optional[Row]: async def get_cmission(self, user_id: int, mission_id: int) -> Optional[Row]:
sql = cmission.select( sql = cmission.select(
and_(cmission.c.user == user_id, cmission.c.missionId == mission_id) and_(cmission.c.user == user_id, cmission.c.missionId == mission_id)
) )
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
return None return None
return result.fetchone() return result.fetchone()
async def put_cmission(self, user_id: int, mission_data: Dict) -> Optional[int]: async def put_cmission(self, user_id: int, mission_data: Dict) -> Optional[int]:
@ -762,17 +792,46 @@ class ChuniItemData(BaseData):
sql = insert(cmission).values(**mission_data) sql = insert(cmission).values(**mission_data)
conflict = sql.on_duplicate_key_update(**mission_data) conflict = sql.on_duplicate_key_update(**mission_data)
result = await self.execute(conflict) result = await self.execute(conflict)
if result is None: if result is None:
return None return None
return result.lastrowid return result.lastrowid
async def get_cmissions(self, user_id: int) -> Optional[List[Row]]: async def get_cmissions(self, user_id: int) -> Optional[List[Row]]:
sql = cmission.select(cmission.c.user == user_id) sql = cmission.select(cmission.c.user == user_id)
result = await self.execute(sql) result = await self.execute(sql)
if result is None:
return None
return result.fetchall()
async def put_unlock_challenge(
self, user_id: int, version: int, unlock_challenge_data: Dict
) -> Optional[int]:
unlock_challenge_data["user"] = user_id
unlock_challenge_data["version"] = version
sql = insert(unlock_challenge).values(**unlock_challenge_data)
conflict = sql.on_duplicate_key_update(**unlock_challenge_data)
result = await self.execute(conflict)
if result is None:
return None
return result.lastrowid
async def get_unlock_challenges(
self, user_id: int, version: int
) -> Optional[List[Row]]:
sql = unlock_challenge.select(
and_(
unlock_challenge.c.user == user_id,
unlock_challenge.c.version == version,
)
)
result = await self.execute(sql)
if result is None: if result is None:
return None return None
return result.fetchall() return result.fetchall()

View File

@ -25,6 +25,8 @@ profile = Table(
Column("frameId", Integer), Column("frameId", Integer),
Column("isMaimai", Boolean), Column("isMaimai", Boolean),
Column("trophyId", Integer), Column("trophyId", Integer),
Column("trophyIdSub1", Integer),
Column("trophyIdSub2", Integer),
Column("userName", String(25)), Column("userName", String(25)),
Column("isWebJoin", Boolean), Column("isWebJoin", Boolean),
Column("playCount", Integer), Column("playCount", Integer),
@ -465,6 +467,8 @@ class ChuniProfileData(BaseData):
sql = profile.update((profile.c.user == user_id) & (profile.c.version == version)).values( sql = profile.update((profile.c.user == user_id) & (profile.c.version == version)).values(
nameplateId=new_nameplate, nameplateId=new_nameplate,
trophyId=new_trophy, trophyId=new_trophy,
trophyIdSub1=new_trophySub1,
trophyIdSub2=new_trophySub2,
charaIllustId=new_character charaIllustId=new_character
) )
result = await self.execute(sql) result = await self.execute(sql)
@ -899,4 +903,4 @@ class ChuniProfileData(BaseData):
async def get_net_battle(self, aime_id: int) -> Optional[Row]: async def get_net_battle(self, aime_id: int) -> Optional[Row]:
result = await self.execute(net_battle.select(net_battle.c.user == aime_id)) result = await self.execute(net_battle.select(net_battle.c.user == aime_id))
if result: if result:
return result.fetchone() return result.fetchone()

View File

@ -139,6 +139,8 @@ playlog = Table(
Column("regionId", Integer), Column("regionId", Integer),
Column("machineType", Integer), Column("machineType", Integer),
Column("ticketId", Integer), Column("ticketId", Integer),
Column("monthPoint", Integer),
Column("eventPoint", Integer),
mysql_charset="utf8mb4" mysql_charset="utf8mb4"
) )
@ -420,4 +422,4 @@ class ChuniScoreData(BaseData):
return None return None
rows = result.fetchall() rows = result.fetchall()
return [dict(row) for row in rows] return [dict(row) for row in rows]

View File

@ -289,6 +289,27 @@ login_bonus = Table(
mysql_charset="utf8mb4", mysql_charset="utf8mb4",
) )
unlock_challenge = Table(
"chuni_static_unlock_challenge",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column("unlockChallengeId", Integer, nullable=False),
Column("name", String(255)),
Column("isEnabled", Boolean, server_default="1"),
Column("startDate", TIMESTAMP, server_default=func.now()),
Column("courseId1", Integer),
Column("courseId2", Integer),
Column("courseId3", Integer),
Column("courseId4", Integer),
Column("courseId5", Integer),
UniqueConstraint(
"version", "unlockChallengeId", name="chuni_static_unlock_challenge_pk"
),
mysql_charset="utf8mb4",
)
class ChuniStaticData(BaseData): class ChuniStaticData(BaseData):
async def put_login_bonus( async def put_login_bonus(
self, self,
@ -556,7 +577,7 @@ class ChuniStaticData(BaseData):
return result.fetchall() return result.fetchall()
async def get_music(self, version: int) -> Optional[List[Row]]: async def get_music(self, version: int) -> Optional[List[Row]]:
sql = music.select(music.c.version <= version) sql = music.select(music.c.version == version)
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
@ -586,6 +607,28 @@ class ChuniStaticData(BaseData):
if result is None: if result is None:
return None return None
return result.fetchone() return result.fetchone()
async def get_music_by_metadata(
self, title: Optional[str] = None, artist: Optional[str] = None, genre: Optional[str] = None
) -> Optional[List[Row]]:
# all conditions should use like for partial matches
conditions = []
if title:
conditions.append(music.c.title.like(f"%{title}%"))
if artist:
conditions.append(music.c.artist.like(f"%{artist}%"))
if genre:
conditions.append(music.c.genre.like(f"%{genre}%"))
if not conditions:
return None
sql = select(music).where(and_(*conditions))
result = await self.execute(sql)
if result is None:
return None
return result.fetchall()
async def put_avatar( async def put_avatar(
self, self,
@ -629,11 +672,25 @@ class ChuniStaticData(BaseData):
return None return None
return result.lastrowid return result.lastrowid
async def get_avatar_items(self, version: int, category: int, enabled_only: bool = True) -> Optional[List[Dict]]: async def get_avatar_items(
self, version: int, category: int, enabled_only: bool = True
) -> Optional[List[Dict]]:
if enabled_only: if enabled_only:
sql = select(avatar).where((avatar.c.version == version) & (avatar.c.category == category) & (avatar.c.isEnabled)).order_by(avatar.c.sortName) sql = (
select(avatar)
.where(
(avatar.c.version == version)
& (avatar.c.category == category)
& (avatar.c.isEnabled)
)
.order_by(avatar.c.sortName)
)
else: else:
sql = select(avatar).where((avatar.c.version == version) & (avatar.c.category == category)).order_by(avatar.c.sortName) sql = (
select(avatar)
.where((avatar.c.version == version) & (avatar.c.category == category))
.order_by(avatar.c.sortName)
)
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
@ -676,11 +733,21 @@ class ChuniStaticData(BaseData):
return None return None
return result.lastrowid return result.lastrowid
async def get_nameplates(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: async def get_nameplates(
self, version: int, enabled_only: bool = True
) -> Optional[List[Dict]]:
if enabled_only: if enabled_only:
sql = select(nameplate).where((nameplate.c.version == version) & (nameplate.c.isEnabled)).order_by(nameplate.c.sortName) sql = (
select(nameplate)
.where((nameplate.c.version == version) & (nameplate.c.isEnabled))
.order_by(nameplate.c.sortName)
)
else: else:
sql = select(nameplate).where(nameplate.c.version == version).order_by(nameplate.c.sortName) sql = (
select(nameplate)
.where(nameplate.c.version == version)
.order_by(nameplate.c.sortName)
)
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
@ -720,11 +787,21 @@ class ChuniStaticData(BaseData):
return None return None
return result.lastrowid return result.lastrowid
async def get_trophies(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: async def get_trophies(
self, version: int, enabled_only: bool = True
) -> Optional[List[Dict]]:
if enabled_only: if enabled_only:
sql = select(trophy).where((trophy.c.version == version) & (trophy.c.isEnabled)).order_by(trophy.c.name) sql = (
select(trophy)
.where((trophy.c.version == version) & (trophy.c.isEnabled))
.order_by(trophy.c.name)
)
else: else:
sql = select(trophy).where(trophy.c.version == version).order_by(trophy.c.name) sql = (
select(trophy)
.where(trophy.c.version == version)
.order_by(trophy.c.name)
)
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
@ -767,11 +844,21 @@ class ChuniStaticData(BaseData):
return None return None
return result.lastrowid return result.lastrowid
async def get_map_icons(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: async def get_map_icons(
self, version: int, enabled_only: bool = True
) -> Optional[List[Dict]]:
if enabled_only: if enabled_only:
sql = select(map_icon).where((map_icon.c.version == version) & (map_icon.c.isEnabled)).order_by(map_icon.c.sortName) sql = (
select(map_icon)
.where((map_icon.c.version == version) & (map_icon.c.isEnabled))
.order_by(map_icon.c.sortName)
)
else: else:
sql = select(map_icon).where(map_icon.c.version == version).order_by(map_icon.c.sortName) sql = (
select(map_icon)
.where(map_icon.c.version == version)
.order_by(map_icon.c.sortName)
)
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
@ -814,11 +901,21 @@ class ChuniStaticData(BaseData):
return None return None
return result.lastrowid return result.lastrowid
async def get_system_voices(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: async def get_system_voices(
self, version: int, enabled_only: bool = True
) -> Optional[List[Dict]]:
if enabled_only: if enabled_only:
sql = select(system_voice).where((system_voice.c.version == version) & (system_voice.c.isEnabled)).order_by(system_voice.c.sortName) sql = (
select(system_voice)
.where((system_voice.c.version == version) & (system_voice.c.isEnabled))
.order_by(system_voice.c.sortName)
)
else: else:
sql = select(system_voice).where(system_voice.c.version == version).order_by(system_voice.c.sortName) sql = (
select(system_voice)
.where(system_voice.c.version == version)
.order_by(system_voice.c.sortName)
)
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
@ -873,11 +970,21 @@ class ChuniStaticData(BaseData):
return None return None
return result.lastrowid return result.lastrowid
async def get_characters(self, version: int, enabled_only: bool = True) -> Optional[List[Dict]]: async def get_characters(
self, version: int, enabled_only: bool = True
) -> Optional[List[Dict]]:
if enabled_only: if enabled_only:
sql = select(character).where((character.c.version == version) & (character.c.isEnabled)).order_by(character.c.sortName) sql = (
select(character)
.where((character.c.version == version) & (character.c.isEnabled))
.order_by(character.c.sortName)
)
else: else:
sql = select(character).where(character.c.version == version).order_by(character.c.sortName) sql = (
select(character)
.where(character.c.version == version)
.order_by(character.c.sortName)
)
result = await self.execute(sql) result = await self.execute(sql)
if result is None: if result is None:
@ -1074,3 +1181,54 @@ class ChuniStaticData(BaseData):
self.logger.error(f"Failed to set opt enabled status to {enabled} for opt {opt_id}") self.logger.error(f"Failed to set opt enabled status to {enabled} for opt {opt_id}")
return False return False
return True return True
async def put_unlock_challenge(
self,
version: int,
unlock_challenge_id: int,
name: str,
course_id1: Optional[int] = None,
course_id2: Optional[int] = None,
course_id3: Optional[int] = None,
course_id4: Optional[int] = None,
course_id5: Optional[int] = None,
) -> Optional[int]:
sql = insert(unlock_challenge).values(
version=version,
unlockChallengeId=unlock_challenge_id,
name=name,
courseId1=course_id1,
courseId2=course_id2,
courseId3=course_id3,
courseId4=course_id4,
courseId5=course_id5,
)
conflict = sql.on_duplicate_key_update(
name=name,
courseId1=course_id1,
courseId2=course_id2,
courseId3=course_id3,
courseId4=course_id4,
courseId5=course_id5,
)
result = await self.execute(conflict)
if result is None:
return None
return result.lastrowid
async def get_unlock_challenges(self, version: int) -> Optional[List[Dict]]:
sql = unlock_challenge.select(
and_(
unlock_challenge.c.version == version,
unlock_challenge.c.isEnabled == True,
)
).order_by(unlock_challenge.c.startDate.asc())
result = await self.execute(sql)
if result is None:
return None
return result.fetchall()

248
titles/chuni/verse.py Normal file
View File

@ -0,0 +1,248 @@
from datetime import datetime, timedelta
from typing import Dict, List, Set
from core.config import CoreConfig
from titles.chuni.config import ChuniConfig
from titles.chuni.const import (
ChuniConstants,
MapAreaConditionLogicalOperator,
MapAreaConditionType,
)
from titles.chuni.luminousplus import ChuniLuminousPlus
class ChuniVerse(ChuniLuminousPlus):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_VERSE
async def handle_cm_get_user_preview_api_request(self, data: Dict) -> Dict:
user_data = await super().handle_cm_get_user_preview_api_request(data)
# Does CARD MAKER 1.35 work this far up?
user_data["lastDataVersion"] = "2.30.00"
return user_data
async def handle_get_game_course_level_api_request(self, data: Dict) -> Dict:
unlock_challenges = await self.data.static.get_unlock_challenges(self.version)
game_course_level_list = []
for unlock_challenge in unlock_challenges:
course_ids = [
unlock_challenge["courseId1"],
unlock_challenge["courseId2"],
unlock_challenge["courseId3"],
unlock_challenge["courseId4"],
unlock_challenge["courseId5"],
]
start_date = unlock_challenge["startDate"].replace(
hour=0, minute=0, second=0
)
for i, course_id in enumerate(course_ids):
start = start_date + timedelta(days=7 * i)
end = start_date + timedelta(days=7 * (i + 1)) - timedelta(seconds=1)
if i == len(course_ids) - 1:
# If this is the last course, set end date to a far future date
end = datetime(2099, 1, 1)
game_course_level_list.append(
{
"courseId": course_id,
"startDate": start.strftime(self.date_time_format),
"endDate": end.strftime(self.date_time_format),
}
)
return {
"length": len(game_course_level_list),
"gameCourseLevelList": game_course_level_list,
}
async def handle_get_game_uc_condition_api_request(self, data: Dict) -> Dict:
unlock_challenges = await self.data.static.get_unlock_challenges(self.version)
game_unlock_challenge_condition_list = []
conditions = {
# unlock Theatore Creatore (ULTIMA) after clearing map VERSE ep. I
10001: {
"type": MapAreaConditionType.MAP_CLEARED.value,
"conditionId": 3020798,
},
# unlock Crossmythos Rhapsodia after clearing map VERSE ep. IV
10006: {
"type": MapAreaConditionType.MAP_CLEARED.value,
"conditionId": 3020802,
},
}
for unlock_challenge in unlock_challenges:
unlock_challenge_id = unlock_challenge["unlockChallengeId"]
unlock_condition = conditions.get(
unlock_challenge_id,
{
"type": 3, # always unlocked
"conditionId": 0,
},
)
game_unlock_challenge_condition_list.append(
{
"unlockChallengeId": unlock_challenge_id,
"length": 1,
"conditionList": [
{
"type": unlock_condition["type"],
"conditionId": unlock_condition["conditionId"],
"logicalOpe": MapAreaConditionLogicalOperator.AND.value,
"startDate": unlock_challenge["startDate"].strftime(
self.date_time_format
),
"endDate": datetime(2099, 1, 1).strftime(
self.date_time_format
),
}
],
}
)
return {
"length": len(game_unlock_challenge_condition_list),
"gameUnlockChallengeConditionList": game_unlock_challenge_condition_list,
}
async def handle_get_user_uc_api_request(self, data: Dict) -> Dict:
user_id = data["userId"]
user_unlock_challenges = await self.data.item.get_unlock_challenges(
user_id, self.version
)
user_unlock_challenge_list = [
{
"unlockChallengeId": user_uc["unlockChallengeId"],
"status": user_uc["status"],
"clearCourseId": user_uc["clearCourseId"],
"conditionType": user_uc["conditionType"],
"score": user_uc["score"],
"life": user_uc["life"],
"clearDate": user_uc["clearDate"].strftime(self.date_time_format),
}
for user_uc in user_unlock_challenges
]
return {
"userId": user_id,
"userUnlockChallengeList": user_unlock_challenge_list,
}
async def handle_get_user_rec_music_api_request(self, data: Dict) -> Dict:
rec_limit = 25 # limit for recommendations
user_id = data["userId"]
user_rec_music_set = set()
recent_rating = await self.data.profile.get_profile_recent_rating(user_id)
if not recent_rating:
# If no recent ratings, return an empty list
return {
"length": 0,
"userRecMusicList": [],
}
recent_ratings = recent_rating["recentRating"]
# cache music info
music_info_list = []
for recent_rating in recent_ratings:
music_id = recent_rating["musicId"]
music_info = await self.data.static.get_song(music_id)
if music_info:
music_info_list.append(music_info)
# use a set to avoid duplicates
user_rec_music_set = set()
# try adding recommendations in order of: title → artist → genre
for field in ("title", "artist", "genre"):
await self._add_recommendations(field, user_rec_music_set, music_info_list, rec_limit)
if len(user_rec_music_set) >= rec_limit:
break
user_rec_music_list = [
{
"musicId": 1, # no idea
# recMusicList is a semi colon-separated list of music IDs and their order comma separated
# for some reason, not all music ids are shown in game?!
"recMusicList": ";".join(
f"{music_id},{index + 1}"
for index, music_id in enumerate(user_rec_music_set)
),
},
]
return {
"length": len(user_rec_music_list),
"userRecMusicList": user_rec_music_list,
}
async def handle_get_user_rec_rating_api_request(self, data: Dict) -> Dict:
class GetUserRecRatingApi:
class UserRecRating:
ratingMin: int
ratingMax: int
# same as recMusicList in get_user_rec_music_api_request
recMusicList: str
length: int
userRecRatingList: list[UserRecRating]
user_id = data["userId"]
user_rec_rating_list = []
return {
"length": len(user_rec_rating_list),
"userRecRatingList": user_rec_rating_list,
}
async def _add_recommendations(
self,
field: str,
user_rec_music_set: Set[int],
music_info_list: List[Dict],
limit: int = 25,
) -> None:
"""
Adds music recommendations based on a specific metadata field (title/artist/genre),
excluding music IDs already in the user's recent ratings and recommendations.
"""
# Collect all existing songId to exclude from recommendations
existing_music_ids = {
info["songId"] for info in music_info_list
}
for music_info in music_info_list:
if len(user_rec_music_set) >= limit:
break
metadata_value = music_info[field]
if not metadata_value:
continue
recs = await self.data.static.get_music_by_metadata(
**{field: metadata_value}
)
for rec in recs or []:
song_id = rec["songId"]
# skip if the song is already in the user's recent ratings
# or if the song is already in the user's recommendations
if (
len(user_rec_music_set) >= limit
or song_id in existing_music_ids
or song_id in user_rec_music_set
):
continue
user_rec_music_set.add(song_id)