chuni: added SUN support, matchmaking, fixed bugs, added docs

- Added CHUNITHM SUN support
- Added first matchmaking support with CPU spawning and messages
- Fixed wrong `next_idx` calculations
- Added `startDate` to events to spawn the correct items
- Fixed login bonus per version
- Added information to docs
This commit is contained in:
2023-05-10 21:32:35 +02:00
parent 0ab539173a
commit b85a65204f
17 changed files with 626 additions and 139 deletions

View File

@ -7,4 +7,4 @@ index = ChuniServlet
database = ChuniData
reader = ChuniReader
game_codes = [ChuniConstants.GAME_CODE, ChuniConstants.GAME_CODE_NEW]
current_schema_version = 3
current_schema_version = 4

View File

@ -44,13 +44,15 @@ class ChuniBase:
# check if a user already has some pogress and if not add the
# login bonus entry
user_login_bonus = self.data.item.get_login_bonus(
user_id, self.version, preset["id"]
user_id, self.version, preset["presetId"]
)
if user_login_bonus is None:
self.data.item.put_login_bonus(user_id, self.version, preset["id"])
self.data.item.put_login_bonus(
user_id, self.version, preset["presetId"]
)
# yeah i'm lazy
user_login_bonus = self.data.item.get_login_bonus(
user_id, self.version, preset["id"]
user_id, self.version, preset["presetId"]
)
# skip the login bonus entirely if its already finished
@ -66,13 +68,13 @@ class ChuniBase:
last_update_date = datetime.now()
all_login_boni = self.data.static.get_login_bonus(
self.version, preset["id"]
self.version, preset["presetId"]
)
# skip the current bonus preset if no boni were found
if all_login_boni is None or len(all_login_boni) < 1:
self.logger.warn(
f"No bonus entries found for bonus preset {preset['id']}"
f"No bonus entries found for bonus preset {preset['presetId']}"
)
continue
@ -83,14 +85,14 @@ class ChuniBase:
if bonus_count > max_needed_days:
# assume that all login preset ids under 3000 needs to be
# looped, like 30 and 40 are looped, 40 does not work?
if preset["id"] < 3000:
if preset["presetId"] < 3000:
bonus_count = 1
else:
is_finished = True
# grab the item for the corresponding day
login_item = self.data.static.get_login_bonus_by_required_days(
self.version, preset["id"], bonus_count
self.version, preset["presetId"], bonus_count
)
if login_item is not None:
# now add the present to the database so the
@ -108,7 +110,7 @@ class ChuniBase:
self.data.item.put_login_bonus(
user_id,
self.version,
preset["id"],
preset["presetId"],
bonusCount=bonus_count,
lastUpdateDate=last_update_date,
isWatched=False,
@ -156,12 +158,18 @@ class ChuniBase:
event_list = []
for evt_row in game_events:
tmp = {}
tmp["id"] = evt_row["eventId"]
tmp["type"] = evt_row["type"]
tmp["startDate"] = "2017-12-05 07:00:00.0"
tmp["endDate"] = "2099-12-31 00:00:00.0"
event_list.append(tmp)
event_list.append(
{
"id": evt_row["eventId"],
"type": evt_row["type"],
# actually use the startDate from the import so it
# properly shows all the events when new ones are imported
"startDate": datetime.strftime(
evt_row["startDate"], "%Y-%m-%d %H:%M:%S"
),
"endDate": "2099-12-31 00:00:00",
}
)
return {
"type": data["type"],
@ -228,29 +236,36 @@ class ChuniBase:
def handle_get_user_character_api_request(self, data: Dict) -> Dict:
characters = self.data.item.get_characters(data["userId"])
if characters is None:
return {}
next_idx = -1
return {
"userId": data["userId"],
"length": 0,
"nextIndex": -1,
"userCharacterList": [],
}
characterList = []
for x in range(int(data["nextIndex"]), len(characters)):
character_list = []
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
for x in range(next_idx, len(characters)):
tmp = characters[x]._asdict()
tmp.pop("user")
tmp.pop("id")
characterList.append(tmp)
character_list.append(tmp)
if len(characterList) >= int(data["maxCount"]):
if len(character_list) >= max_ct:
break
if len(characterList) >= int(data["maxCount"]) and len(characters) > int(
data["maxCount"]
) + int(data["nextIndex"]):
next_idx = int(data["maxCount"]) + int(data["nextIndex"]) + 1
if len(characters) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = -1
return {
"userId": data["userId"],
"length": len(characterList),
"length": len(character_list),
"nextIndex": next_idx,
"userCharacterList": characterList,
"userCharacterList": character_list,
}
def handle_get_user_charge_api_request(self, data: Dict) -> Dict:
@ -292,8 +307,8 @@ class ChuniBase:
if len(user_course_list) >= max_ct:
break
if len(user_course_list) >= max_ct:
next_idx = next_idx + max_ct
if len(user_course_list) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = -1
@ -347,12 +362,23 @@ class ChuniBase:
}
def handle_get_user_favorite_item_api_request(self, data: Dict) -> Dict:
user_fav_item_list = []
# still needs to be implemented on WebUI
# 1: Music, 3: Character
fav_list = self.data.item.get_all_favorites(
data["userId"], self.version, fav_kind=int(data["kind"])
)
if fav_list is not None:
for fav in fav_list:
user_fav_item_list.append({"id": fav["favId"]})
return {
"userId": data["userId"],
"length": 0,
"length": len(user_fav_item_list),
"kind": data["kind"],
"nextIndex": -1,
"userFavoriteItemList": [],
"userFavoriteItemList": user_fav_item_list,
}
def handle_get_user_favorite_music_api_request(self, data: Dict) -> Dict:
@ -387,13 +413,13 @@ class ChuniBase:
xout = kind * 10000000000 + next_idx + len(items)
if len(items) < int(data["maxCount"]):
nextIndex = 0
next_idx = 0
else:
nextIndex = xout
next_idx = xout
return {
"userId": data["userId"],
"nextIndex": nextIndex,
"nextIndex": next_idx,
"itemKind": kind,
"length": len(items),
"userItemList": items,
@ -452,6 +478,7 @@ class ChuniBase:
"nextIndex": -1,
"userMusicList": [], # 240
}
song_list = []
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
@ -474,10 +501,10 @@ class ChuniBase:
if len(song_list) >= max_ct:
break
if len(song_list) >= max_ct:
if len(song_list) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = 0
next_idx = -1
return {
"userId": data["userId"],
@ -623,12 +650,15 @@ class ChuniBase:
self.data.profile.put_profile_data(
user_id, self.version, upsert["userData"][0]
)
if "userDataEx" in upsert:
self.data.profile.put_profile_data_ex(
user_id, self.version, upsert["userDataEx"][0]
)
if "userGameOption" in upsert:
self.data.profile.put_profile_option(user_id, upsert["userGameOption"][0])
if "userGameOptionEx" in upsert:
self.data.profile.put_profile_option_ex(
user_id, upsert["userGameOptionEx"][0]
@ -672,6 +702,10 @@ class ChuniBase:
if "userPlaylogList" in upsert:
for playlog in upsert["userPlaylogList"]:
# convert the player names to utf-8
playlog["playedUserName1"] = self.read_wtf8(playlog["playedUserName1"])
playlog["playedUserName2"] = self.read_wtf8(playlog["playedUserName2"])
playlog["playedUserName3"] = self.read_wtf8(playlog["playedUserName3"])
self.data.score.put_playlog(user_id, playlog)
if "userTeamPoint" in upsert:

View File

@ -17,21 +17,23 @@ class ChuniConstants:
VER_CHUNITHM_PARADISE = 10
VER_CHUNITHM_NEW = 11
VER_CHUNITHM_NEW_PLUS = 12
VER_CHUNITHM_SUN = 13
VERSION_NAMES = [
"Chunithm",
"Chunithm+",
"Chunithm Air",
"Chunithm Air+",
"Chunithm Star",
"Chunithm Star+",
"Chunithm Amazon",
"Chunithm Amazon+",
"Chunithm Crystal",
"Chunithm Crystal+",
"Chunithm Paradise",
"Chunithm New!!",
"Chunithm New!!+",
"CHUNITHM",
"CHUNITHM PLUS",
"CHUNITHM AIR",
"CHUNITHM AIR PLUS",
"CHUNITHM STAR",
"CHUNITHM STAR PLUS",
"CHUNITHM AMAZON",
"CHUNITHM AMAZON PLUS",
"CHUNITHM CRYSTAL",
"CHUNITHM CRYSTAL PLUS",
"CHUNITHM PARADISE",
"CHUNITHM NEW!!",
"CHUNITHM NEW PLUS!!",
"CHUNITHM SUN"
]
@classmethod

View File

@ -29,6 +29,7 @@ from titles.chuni.crystalplus import ChuniCrystalPlus
from titles.chuni.paradise import ChuniParadise
from titles.chuni.new import ChuniNew
from titles.chuni.newplus import ChuniNewPlus
from titles.chuni.sun import ChuniSun
class ChuniServlet:
@ -55,6 +56,7 @@ class ChuniServlet:
ChuniParadise,
ChuniNew,
ChuniNewPlus,
ChuniSun,
]
self.logger = logging.getLogger("chuni")
@ -145,30 +147,32 @@ class ChuniServlet:
if version < 105: # 1.0
internal_ver = ChuniConstants.VER_CHUNITHM
elif version >= 105 and version < 110: # Plus
elif version >= 105 and version < 110: # PLUS
internal_ver = ChuniConstants.VER_CHUNITHM_PLUS
elif version >= 110 and version < 115: # Air
elif version >= 110 and version < 115: # AIR
internal_ver = ChuniConstants.VER_CHUNITHM_AIR
elif version >= 115 and version < 120: # Air Plus
elif version >= 115 and version < 120: # AIR PLUS
internal_ver = ChuniConstants.VER_CHUNITHM_AIR_PLUS
elif version >= 120 and version < 125: # Star
elif version >= 120 and version < 125: # STAR
internal_ver = ChuniConstants.VER_CHUNITHM_STAR
elif version >= 125 and version < 130: # Star Plus
elif version >= 125 and version < 130: # STAR PLUS
internal_ver = ChuniConstants.VER_CHUNITHM_STAR_PLUS
elif version >= 130 and version < 135: # Amazon
elif version >= 130 and version < 135: # AMAZON
internal_ver = ChuniConstants.VER_CHUNITHM_AMAZON
elif version >= 135 and version < 140: # Amazon Plus
elif version >= 135 and version < 140: # AMAZON PLUS
internal_ver = ChuniConstants.VER_CHUNITHM_AMAZON_PLUS
elif version >= 140 and version < 145: # Crystal
elif version >= 140 and version < 145: # CRYSTAL
internal_ver = ChuniConstants.VER_CHUNITHM_CRYSTAL
elif version >= 145 and version < 150: # Crystal Plus
elif version >= 145 and version < 150: # CRYSTAL PLUS
internal_ver = ChuniConstants.VER_CHUNITHM_CRYSTAL_PLUS
elif version >= 150 and version < 200: # Paradise
elif version >= 150 and version < 200: # PARADISE
internal_ver = ChuniConstants.VER_CHUNITHM_PARADISE
elif version >= 200 and version < 205: # New
elif version >= 200 and version < 205: # NEW!!
internal_ver = ChuniConstants.VER_CHUNITHM_NEW
elif version >= 205 and version < 210: # New Plus
elif version >= 205 and version < 210: # NEW PLUS!!
internal_ver = ChuniConstants.VER_CHUNITHM_NEW_PLUS
elif version >= 210: # SUN
internal_ver = ChuniConstants.VER_CHUNITHM_SUN
if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32:
# If we get a 32 character long hex string, it's a hash and we're

View File

@ -23,41 +23,44 @@ class ChuniNew(ChuniBase):
self.version = ChuniConstants.VER_CHUNITHM_NEW
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
# use UTC time and convert it to JST time by adding +9
# matching therefore starts one hour before and lasts for 8 hours
match_start = datetime.strftime(
datetime.now() - timedelta(hours=10), self.date_time_format
datetime.utcnow() + timedelta(hours=8), self.date_time_format
)
match_end = datetime.strftime(
datetime.now() + timedelta(hours=10), self.date_time_format
datetime.utcnow() + timedelta(hours=16), self.date_time_format
)
reboot_start = datetime.strftime(
datetime.now() - timedelta(hours=11), self.date_time_format
datetime.utcnow() + timedelta(hours=6), self.date_time_format
)
reboot_end = datetime.strftime(
datetime.now() - timedelta(hours=10), self.date_time_format
datetime.utcnow() + timedelta(hours=7), self.date_time_format
)
return {
"gameSetting": {
"isMaintenance": "false",
"isMaintenance": False,
"requestInterval": 10,
"rebootStartTime": reboot_start,
"rebootEndTime": reboot_end,
"isBackgroundDistribute": "false",
"isBackgroundDistribute": False,
"maxCountCharacter": 300,
"maxCountItem": 300,
"maxCountMusic": 300,
"matchStartTime": match_start,
"matchEndTime": match_end,
"matchTimeLimit": 99,
"matchTimeLimit": 60,
"matchErrorLimit": 9999,
"romVersion": self.game_cfg.version.version(self.version)["rom"],
"dataVersion": self.game_cfg.version.version(self.version)["data"],
"matchingUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"matchingUriX": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
# might be really important for online battle to connect the cabs via UDP port 50201
"udpHolePunchUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"reflectorUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
},
"isDumpUpload": "false",
"isAou": "false",
"isDumpUpload": False,
"isAou": False,
}
def handle_remove_token_api_request(self, data: Dict) -> Dict:
@ -468,3 +471,162 @@ class ChuniNew(ChuniBase):
self.data.item.put_user_print_state(user_id, id=order_id, hasCompleted=True)
return {"returnCode": "1", "apiName": "CMUpsertUserPrintCancelApi"}
def handle_ping_request(self, data: Dict) -> Dict:
# matchmaking ping request
return {"returnCode": "1"}
def handle_begin_matching_api_request(self, data: Dict) -> Dict:
room_id = 1
# check if there is a free matching room
matching_room = self.data.item.get_oldest_free_matching(self.version)
if matching_room is None:
# grab the latest roomId and add 1 for the new room
newest_matching = self.data.item.get_newest_matching(self.version)
if newest_matching is not None:
room_id = newest_matching["roomId"] + 1
# fix userName WTF8
new_member = data["matchingMemberInfo"]
new_member["userName"] = self.read_wtf8(new_member["userName"])
# create the new room with room_id and the current user id (host)
# user id is required for the countdown later on
self.data.item.put_matching(
self.version, room_id, [new_member], user_id=new_member["userId"]
)
# get the newly created matching room
matching_room = self.data.item.get_matching(self.version, room_id)
else:
# a room already exists, so just add the new member to it
matching_member_list = matching_room["matchingMemberInfoList"]
# fix userName WTF8
new_member = data["matchingMemberInfo"]
new_member["userName"] = self.read_wtf8(new_member["userName"])
matching_member_list.append(new_member)
# add the updated room to the database, make sure to set isFull correctly!
self.data.item.put_matching(
self.version,
matching_room["roomId"],
matching_member_list,
user_id=matching_room["user"],
is_full=True if len(matching_member_list) >= 4 else False,
)
matching_wait = {
"isFinish": False,
"restMSec": matching_room["restMSec"], # in sec
"pollingInterval": 1, # in sec
"matchingMemberInfoList": matching_room["matchingMemberInfoList"],
}
return {"roomId": 1, "matchingWaitState": matching_wait}
def handle_end_matching_api_request(self, data: Dict) -> Dict:
matching_room = self.data.item.get_matching(self.version, data["roomId"])
members = matching_room["matchingMemberInfoList"]
# only set the host user to role 1 every other to 0?
role_list = [
{"role": 1} if m["userId"] == matching_room["user"] else {"role": 0}
for m in members
]
self.data.item.put_matching(
self.version,
matching_room["roomId"],
members,
user_id=matching_room["user"],
rest_sec=0, # make sure to always set 0
is_full=True, # and full, so no one can join
)
return {
"matchingResult": 1, # needs to be 1 for successful matching
"matchingMemberInfoList": members,
# no idea, maybe to differentiate between CPUs and real players?
"matchingMemberRoleList": role_list,
# TCP/UDP connection?
"reflectorUri": f"{self.core_cfg.title.hostname}",
}
def handle_remove_matching_member_api_request(self, data: Dict) -> Dict:
# get all matching rooms, because Chuni only returns the userId
# not the actual roomId
matching_rooms = self.data.item.get_all_matchings(self.version)
if matching_rooms is None:
return {"returnCode": "1"}
for room in matching_rooms:
old_members = room["matchingMemberInfoList"]
new_members = [m for m in old_members if m["userId"] != data["userId"]]
# if nothing changed go to the next room
if len(old_members) == len(new_members):
continue
# if the last user got removed, delete the matching room
if len(new_members) <= 0:
self.data.item.delete_matching(self.version, room["roomId"])
else:
# remove the user from the room
self.data.item.put_matching(
self.version,
room["roomId"],
new_members,
user_id=room["user"],
rest_sec=room["restMSec"],
)
return {"returnCode": "1"}
def handle_get_matching_state_api_request(self, data: Dict) -> Dict:
polling_interval = 1
# get the current active room
matching_room = self.data.item.get_matching(self.version, data["roomId"])
members = matching_room["matchingMemberInfoList"]
rest_sec = matching_room["restMSec"]
# grab the current member
current_member = data["matchingMemberInfo"]
# only the host user can decrease the countdown
if matching_room["user"] == int(current_member["userId"]):
# cap the restMSec to 0
if rest_sec > 0:
rest_sec -= polling_interval
else:
rest_sec = 0
# update the members in order to recieve messages
for i, member in enumerate(members):
if member["userId"] == current_member["userId"]:
# replace the old user data with the current user data,
# also parse WTF-8 everytime
current_member["userName"] = self.read_wtf8(current_member["userName"])
members[i] = current_member
self.data.item.put_matching(
self.version,
data["roomId"],
members,
rest_sec=rest_sec,
user_id=matching_room["user"],
)
# only add the other members to the list
diff_members = [m for m in members if m["userId"] != current_member["userId"]]
matching_wait = {
# makes no difference? Always use False?
"isFinish": True if rest_sec == 0 else False,
"restMSec": rest_sec,
"pollingInterval": polling_interval,
# the current user needs to be the first one?
"matchingMemberInfoList": [current_member] + diff_members,
}
return {"matchingWaitState": matching_wait}

View File

@ -36,6 +36,6 @@ class ChuniNewPlus(ChuniNew):
def handle_cm_get_user_preview_api_request(self, data: Dict) -> Dict:
user_data = super().handle_cm_get_user_preview_api_request(data)
# hardcode lastDataVersion for CardMaker 1.35
# hardcode lastDataVersion for CardMaker 1.35 A028
user_data["lastDataVersion"] = "2.05.00"
return user_data

View File

@ -1,5 +1,12 @@
from typing import Dict, List, Optional
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_
from sqlalchemy import (
Table,
Column,
UniqueConstraint,
PrimaryKeyConstraint,
and_,
delete,
)
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON
from sqlalchemy.engine.base import Connection
from sqlalchemy.schema import ForeignKey
@ -203,8 +210,141 @@ login_bonus = Table(
mysql_charset="utf8mb4",
)
favorite = Table(
"chuni_item_favorite",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("version", Integer, nullable=False),
Column("favId", Integer, nullable=False),
Column("favKind", Integer, nullable=False, server_default="1"),
UniqueConstraint("version", "user", "favId", name="chuni_item_favorite_uk"),
mysql_charset="utf8mb4",
)
matching = Table(
"chuni_item_matching",
metadata,
Column("roomId", Integer, nullable=False),
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("version", Integer, nullable=False),
Column("restMSec", Integer, nullable=False, server_default="60"),
Column("isFull", Boolean, nullable=False, server_default="0"),
PrimaryKeyConstraint("roomId", "version", name="chuni_item_matching_pk"),
Column("matchingMemberInfoList", JSON, nullable=False),
mysql_charset="utf8mb4",
)
class ChuniItemData(BaseData):
def get_oldest_free_matching(self, version: int) -> Optional[Row]:
sql = matching.select(
and_(
matching.c.version == version,
matching.c.isFull == False
)
).order_by(matching.c.roomId.asc())
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_newest_matching(self, version: int) -> Optional[Row]:
sql = matching.select(
and_(
matching.c.version == version
)
).order_by(matching.c.roomId.desc())
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_all_matchings(self, version: int) -> Optional[List[Row]]:
sql = matching.select(
and_(
matching.c.version == version
)
)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def get_matching(self, version: int, room_id: int) -> Optional[Row]:
sql = matching.select(
and_(matching.c.version == version, matching.c.roomId == room_id)
)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def put_matching(
self,
version: int,
room_id: int,
matching_member_info_list: list,
user_id: int = None,
rest_sec: int = 60,
is_full: bool = False
) -> Optional[int]:
sql = insert(matching).values(
roomId=room_id,
version=version,
restMSec=rest_sec,
user=user_id,
isFull=is_full,
matchingMemberInfoList=matching_member_info_list,
)
conflict = sql.on_duplicate_key_update(
restMSec=rest_sec, matchingMemberInfoList=matching_member_info_list
)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def delete_matching(self, version: int, room_id: int):
sql = delete(matching).where(
and_(matching.c.roomId == room_id, matching.c.version == version)
)
result = self.execute(sql)
if result is None:
return None
return result.lastrowid
def get_all_favorites(
self, user_id: int, version: int, fav_kind: int = 1
) -> Optional[List[Row]]:
sql = favorite.select(
and_(
favorite.c.version == version,
favorite.c.user == user_id,
favorite.c.favKind == fav_kind,
)
)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def put_login_bonus(
self, user_id: int, version: int, preset_id: int, **login_bonus_data
) -> Optional[int]:

View File

@ -89,8 +89,6 @@ profile = Table(
Integer,
ForeignKey("chuni_profile_team.id", ondelete="SET NULL", onupdate="SET NULL"),
),
Column("avatarBack", Integer, server_default="0"),
Column("avatarFace", Integer, server_default="0"),
Column("eliteRankPoint", Integer, server_default="0"),
Column("stockedGridCount", Integer, server_default="0"),
Column("netBattleLoseCount", Integer, server_default="0"),
@ -98,10 +96,8 @@ profile = Table(
Column("netBattle4thCount", Integer, server_default="0"),
Column("overPowerRate", Integer, server_default="0"),
Column("battleRewardStatus", Integer, server_default="0"),
Column("avatarPoint", Integer, server_default="0"),
Column("netBattle1stCount", Integer, server_default="0"),
Column("charaIllustId", Integer, server_default="0"),
Column("avatarItem", Integer, server_default="0"),
Column("userNameEx", String(8), server_default=""),
Column("netBattleWinCount", Integer, server_default="0"),
Column("netBattleCorrection", Integer, server_default="0"),
@ -112,7 +108,6 @@ profile = Table(
Column("netBattle3rdCount", Integer, server_default="0"),
Column("netBattleConsecutiveWinCount", Integer, server_default="0"),
Column("overPowerLowerRank", Integer, server_default="0"),
Column("avatarWear", Integer, server_default="0"),
Column("classEmblemBase", Integer, server_default="0"),
Column("battleRankPoint", Integer, server_default="0"),
Column("netBattle2ndCount", Integer, server_default="0"),
@ -120,13 +115,19 @@ profile = Table(
Column("skillId", Integer, server_default="0"),
Column("lastCountryCode", String(5), server_default="JPN"),
Column("isNetBattleHost", Boolean, server_default="0"),
Column("avatarFront", Integer, server_default="0"),
Column("avatarSkin", Integer, server_default="0"),
Column("battleRewardCount", Integer, server_default="0"),
Column("battleRewardIndex", Integer, server_default="0"),
Column("netBattlePlayCount", Integer, server_default="0"),
Column("exMapLoopCount", Integer, server_default="0"),
Column("netBattleEndState", Integer, server_default="0"),
Column("rankUpChallengeResults", JSON),
Column("avatarBack", Integer, server_default="0"),
Column("avatarFace", Integer, server_default="0"),
Column("avatarPoint", Integer, server_default="0"),
Column("avatarItem", Integer, server_default="0"),
Column("avatarWear", Integer, server_default="0"),
Column("avatarFront", Integer, server_default="0"),
Column("avatarSkin", Integer, server_default="0"),
Column("avatarHead", Integer, server_default="0"),
UniqueConstraint("user", "version", name="chuni_profile_profile_uk"),
mysql_charset="utf8mb4",
@ -417,8 +418,8 @@ class ChuniProfileData(BaseData):
sql = (
select([profile, option])
.join(option, profile.c.user == option.c.user)
.filter(and_(profile.c.user == aime_id, profile.c.version == version))
)
.filter(and_(profile.c.user == aime_id, profile.c.version <= version))
).order_by(profile.c.version.desc())
result = self.execute(sql)
if result is None:
@ -429,9 +430,9 @@ class ChuniProfileData(BaseData):
sql = select(profile).where(
and_(
profile.c.user == aime_id,
profile.c.version == version,
profile.c.version <= version,
)
)
).order_by(profile.c.version.desc())
result = self.execute(sql)
if result is None:
@ -461,9 +462,9 @@ class ChuniProfileData(BaseData):
sql = select(profile_ex).where(
and_(
profile_ex.c.user == aime_id,
profile_ex.c.version == version,
profile_ex.c.version <= version,
)
)
).order_by(profile_ex.c.version.desc())
result = self.execute(sql)
if result is None:

View File

@ -134,7 +134,9 @@ playlog = Table(
Column("charaIllustId", Integer),
Column("romVersion", String(255)),
Column("judgeHeaven", Integer),
mysql_charset="utf8mb4",
Column("regionId", Integer),
Column("machineType", Integer),
mysql_charset="utf8mb4"
)

View File

@ -1,11 +1,19 @@
from typing import Dict, List, Optional
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_
from sqlalchemy import (
ForeignKeyConstraint,
Table,
Column,
UniqueConstraint,
PrimaryKeyConstraint,
and_,
)
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON, Float
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine import Row
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql import func, select
from sqlalchemy.dialects.mysql import insert
from datetime import datetime
from core.data.schema import BaseData, metadata
@ -17,6 +25,7 @@ events = Table(
Column("eventId", Integer),
Column("type", Integer),
Column("name", String(255)),
Column("startDate", TIMESTAMP, server_default=func.now()),
Column("enabled", Boolean, server_default="1"),
UniqueConstraint("version", "eventId", name="chuni_static_events_uk"),
mysql_charset="utf8mb4",
@ -125,11 +134,13 @@ gacha_cards = Table(
login_bonus_preset = Table(
"chuni_static_login_bonus_preset",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("presetId", Integer, nullable=False),
Column("version", Integer, nullable=False),
Column("presetName", String(255), nullable=False),
Column("isEnabled", Boolean, server_default="1"),
UniqueConstraint("version", "id", name="chuni_static_login_bonus_preset_uk"),
PrimaryKeyConstraint(
"presetId", "version", name="chuni_static_login_bonus_preset_pk"
),
mysql_charset="utf8mb4",
)
@ -138,15 +149,7 @@ login_bonus = Table(
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column(
"presetId",
ForeignKey(
"chuni_static_login_bonus_preset.id",
ondelete="cascade",
onupdate="cascade",
),
nullable=False,
),
Column("presetId", Integer, nullable=False),
Column("loginBonusId", Integer, nullable=False),
Column("loginBonusName", String(255), nullable=False),
Column("presentId", Integer, nullable=False),
@ -157,6 +160,16 @@ login_bonus = Table(
UniqueConstraint(
"version", "presetId", "loginBonusId", name="chuni_static_login_bonus_uk"
),
ForeignKeyConstraint(
["presetId", "version"],
[
"chuni_static_login_bonus_preset.presetId",
"chuni_static_login_bonus_preset.version",
],
onupdate="CASCADE",
ondelete="CASCADE",
name="chuni_static_login_bonus_ibfk_1",
),
mysql_charset="utf8mb4",
)
@ -236,7 +249,7 @@ class ChuniStaticData(BaseData):
self, version: int, preset_id: int, preset_name: str, is_enabled: bool
) -> Optional[int]:
sql = insert(login_bonus_preset).values(
id=preset_id,
presetId=preset_id,
version=version,
presetName=preset_name,
isEnabled=is_enabled,
@ -416,6 +429,14 @@ class ChuniStaticData(BaseData):
return None
return result.fetchall()
def get_music(self, version: int) -> Optional[List[Row]]:
sql = music.select(music.c.version <= version)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def get_music_chart(
self, version: int, song_id: int, chart_id: int
) -> Optional[List[Row]]:

37
titles/chuni/sun.py Normal file
View File

@ -0,0 +1,37 @@
from typing import Dict, Any
from core.config import CoreConfig
from titles.chuni.newplus import ChuniNewPlus
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniSun(ChuniNewPlus):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_SUN
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["romVersion"] = self.game_cfg.version.version(self.version)["rom"]
ret["gameSetting"]["dataVersion"] = self.game_cfg.version.version(self.version)["data"]
ret["gameSetting"][
"matchingUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
ret["gameSetting"][
"matchingUriX"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
ret["gameSetting"][
"udpHolePunchUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
ret["gameSetting"][
"reflectorUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
return ret
def handle_cm_get_user_preview_api_request(self, data: Dict) -> Dict:
user_data = super().handle_cm_get_user_preview_api_request(data)
# hardcode lastDataVersion for CardMaker 1.35 A032
user_data["lastDataVersion"] = "2.10.00"
return user_data