add back games, conform them to new title dispatch

This commit is contained in:
Hay1tsme
2023-02-17 01:02:21 -05:00
parent f18e939dd0
commit 7e3396a7ff
214 changed files with 19412 additions and 23 deletions

18
titles/chuni/__init__.py Normal file
View File

@ -0,0 +1,18 @@
from titles.chuni.index import ChuniServlet
from titles.chuni.const import ChuniConstants
from titles.chuni.database import ChuniData
from titles.chuni.read import ChuniReader
index = ChuniServlet
database = ChuniData
reader = ChuniReader
use_default_title = True
include_protocol = True
title_secure = False
game_codes = [ChuniConstants.GAME_CODE, ChuniConstants.GAME_CODE_NEW]
trailing_slash = True
use_default_host = False
host = ""
current_schema_version = 1

16
titles/chuni/air.py Normal file
View File

@ -0,0 +1,16 @@
from typing import Dict
from core.config import CoreConfig
from titles.chuni.base import ChuniBase
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniAir(ChuniBase):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_AIR
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["dataVersion"] = "1.10.00"
return ret

16
titles/chuni/airplus.py Normal file
View File

@ -0,0 +1,16 @@
from typing import Dict
from core.config import CoreConfig
from titles.chuni.base import ChuniBase
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniAirPlus(ChuniBase):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_AIR_PLUS
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["dataVersion"] = "1.15.00"
return ret

18
titles/chuni/amazon.py Normal file
View File

@ -0,0 +1,18 @@
from datetime import datetime, timedelta
from typing import Dict, Any
import pytz
from core.config import CoreConfig
from titles.chuni.base import ChuniBase
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniAmazon(ChuniBase):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_AMAZON
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["dataVersion"] = "1.30.00"
return ret

View File

@ -0,0 +1,18 @@
from datetime import datetime, timedelta
from typing import Dict, Any
import pytz
from core.config import CoreConfig
from titles.chuni.base import ChuniBase
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniAmazonPlus(ChuniBase):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_AMAZON_PLUS
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["dataVersion"] = "1.35.00"
return ret

572
titles/chuni/base.py Normal file
View File

@ -0,0 +1,572 @@
import logging
import json
from datetime import datetime, timedelta
from time import strftime
import pytz
from typing import Dict, Any
from core.config import CoreConfig
from titles.chuni.const import ChuniConstants
from titles.chuni.database import ChuniData
from titles.chuni.config import ChuniConfig
class ChuniBase():
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
self.core_cfg = core_cfg
self.game_cfg = game_cfg
self.data = ChuniData(core_cfg)
self.date_time_format = "%Y-%m-%d %H:%M:%S"
self.logger = logging.getLogger("chuni")
self.game = ChuniConstants.GAME_CODE
self.version = ChuniConstants.VER_CHUNITHM
def handle_game_login_api_request(self, data: Dict) -> Dict:
#self.data.base.log_event("chuni", "login", logging.INFO, {"version": self.version, "user": data["userId"]})
return { "returnCode": 1 }
def handle_game_logout_api_request(self, data: Dict) -> Dict:
#self.data.base.log_event("chuni", "logout", logging.INFO, {"version": self.version, "user": data["userId"]})
return { "returnCode": 1 }
def handle_get_game_charge_api_request(self, data: Dict) -> Dict:
game_charge_list = self.data.static.get_enabled_charges(self.version)
charges = []
for x in range(len(game_charge_list)):
charges.append({
"orderId": x,
"chargeId": game_charge_list[x]["chargeId"],
"price": 1,
"startDate": "2017-12-05 07:00:00.0",
"endDate": "2099-12-31 00:00:00.0",
"salePrice": 1,
"saleStartDate": "2017-12-05 07:00:00.0",
"saleEndDate": "2099-12-31 00:00:00.0"
})
return {
"length": len(charges),
"gameChargeList": charges
}
def handle_get_game_event_api_request(self, data: Dict) -> Dict:
game_events = self.data.static.get_enabled_events(self.version)
event_list = []
for evt_row in game_events:
tmp = {}
tmp["id"] = evt_row["eventId"]
tmp["type"] = evt_row["type"]
tmp["startDate"] = "2017-12-05 07:00:00.0"
tmp["endDate"] = "2099-12-31 00:00:00.0"
event_list.append(tmp)
return {
"type": data["type"],
"length": len(event_list),
"gameEventList": event_list
}
def handle_get_game_idlist_api_request(self, data: Dict) -> Dict:
return { "type": data["type"], "length": 0, "gameIdlistList": [] }
def handle_get_game_message_api_request(self, data: Dict) -> Dict:
return { "type": data["type"], "length": "0", "gameMessageList": [] }
def handle_get_game_ranking_api_request(self, data: Dict) -> Dict:
return { "type": data["type"], "gameRankingList": [] }
def handle_get_game_sale_api_request(self, data: Dict) -> Dict:
return { "type": data["type"], "length": 0, "gameSaleList": [] }
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
reboot_start = datetime.strftime(datetime.now() - timedelta(hours=4), self.date_time_format)
reboot_end = datetime.strftime(datetime.now() - timedelta(hours=3), self.date_time_format)
return {
"gameSetting": {
"dataVersion": "1.00.00",
"isMaintenance": "false",
"requestInterval": 10,
"rebootStartTime": reboot_start,
"rebootEndTime": reboot_end,
"isBackgroundDistribute": "false",
"maxCountCharacter": 300,
"maxCountItem": 300,
"maxCountMusic": 300,
},
"isDumpUpload": "false",
"isAou": "false",
}
def handle_get_user_activity_api_request(self, data: Dict) -> Dict:
user_activity_list = self.data.profile.get_profile_activity(data["userId"], data["kind"])
activity_list = []
for activity in user_activity_list:
tmp = activity._asdict()
tmp.pop("user")
tmp["id"] = tmp["activityId"]
tmp.pop("activityId")
activity_list.append(tmp)
return {
"userId": data["userId"],
"length": len(activity_list),
"kind": data["kind"],
"userActivityList": activity_list
}
def handle_get_user_character_api_request(self, data: Dict) -> Dict:
characters = self.data.item.get_characters(data["userId"])
if characters is None: return {}
next_idx = -1
characterList = []
for x in range(int(data["nextIndex"]), len(characters)):
tmp = characters[x]._asdict()
tmp.pop("user")
tmp.pop("id")
characterList.append(tmp)
if len(characterList) >= int(data["maxCount"]):
break
if len(characterList) >= int(data["maxCount"]) and len(characters) > int(data["maxCount"]) + int(data["nextIndex"]):
next_idx = int(data["maxCount"]) + int(data["nextIndex"]) + 1
return {
"userId": data["userId"],
"length": len(characterList),
"nextIndex": next_idx,
"userCharacterList": characterList
}
def handle_get_user_charge_api_request(self, data: Dict) -> Dict:
user_charge_list = self.data.profile.get_profile_charge(data["userId"])
charge_list = []
for charge in user_charge_list:
tmp = charge._asdict()
tmp.pop("id")
tmp.pop("user")
charge_list.append(tmp)
return {
"userId": data["userId"],
"length": len(charge_list),
"userChargeList": charge_list
}
def handle_get_user_course_api_request(self, data: Dict) -> Dict:
user_course_list = self.data.score.get_courses(data["userId"])
if user_course_list is None:
return {
"userId": data["userId"],
"length": 0,
"nextIndex": -1,
"userCourseList": []
}
course_list = []
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
for x in range(next_idx, len(user_course_list)):
tmp = user_course_list[x]._asdict()
tmp.pop("user")
tmp.pop("id")
course_list.append(tmp)
if len(user_course_list) >= max_ct:
break
if len(user_course_list) >= max_ct:
next_idx = next_idx + max_ct
else:
next_idx = -1
return {
"userId": data["userId"],
"length": len(course_list),
"nextIndex": next_idx,
"userCourseList": course_list
}
def handle_get_user_data_api_request(self, data: Dict) -> Dict:
p = self.data.profile.get_profile_data(data["userId"], self.version)
if p is None: return {}
profile = p._asdict()
profile.pop("id")
profile.pop("user")
profile.pop("version")
return {
"userId": data["userId"],
"userData": profile
}
def handle_get_user_data_ex_api_request(self, data: Dict) -> Dict:
p = self.data.profile.get_profile_data_ex(data["userId"], self.version)
if p is None: return {}
profile = p._asdict()
profile.pop("id")
profile.pop("user")
profile.pop("version")
return {
"userId": data["userId"],
"userDataEx": profile
}
def handle_get_user_duel_api_request(self, data: Dict) -> Dict:
user_duel_list = self.data.item.get_duels(data["userId"])
if user_duel_list is None: return {}
duel_list = []
for duel in user_duel_list:
tmp = duel._asdict()
tmp.pop("id")
tmp.pop("user")
duel_list.append(tmp)
return {
"userId": data["userId"],
"length": len(duel_list),
"userDuelList": duel_list
}
def handle_get_user_favorite_item_api_request(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"length": 0,
"kind": data["kind"],
"nextIndex": -1,
"userFavoriteItemList": []
}
def handle_get_user_favorite_music_api_request(self, data: Dict) -> Dict:
"""
This is handled via the webui, which we don't have right now
"""
return {
"userId": data["userId"],
"length": 0,
"userFavoriteMusicList": []
}
def handle_get_user_item_api_request(self, data: Dict) -> Dict:
kind = int(int(data["nextIndex"]) / 10000000000)
next_idx = int(int(data["nextIndex"]) % 10000000000)
user_item_list = self.data.item.get_items(data["userId"], kind)
if user_item_list is None or len(user_item_list) == 0:
return {"userId": data["userId"], "nextIndex": -1, "itemKind": kind, "userItemList": []}
items: list[Dict[str, Any]] = []
for i in range(next_idx, len(user_item_list)):
tmp = user_item_list[i]._asdict()
tmp.pop("user")
tmp.pop("id")
items.append(tmp)
if len(items) >= int(data["maxCount"]):
break
xout = kind * 10000000000 + next_idx + len(items)
if len(items) < int(data["maxCount"]): nextIndex = 0
else: nextIndex = xout
return {"userId": data["userId"], "nextIndex": nextIndex, "itemKind": kind, "length": len(items), "userItemList": items}
def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict:
"""
Unsure how to get this to trigger...
"""
return {
"userId": data["userId"],
"length": 2,
"userLoginBonusList": [
{
"presetId": '10',
"bonusCount": '0',
"lastUpdateDate": "1970-01-01 09:00:00",
"isWatched": "true"
},
{
"presetId": '20',
"bonusCount": '0',
"lastUpdateDate": "1970-01-01 09:00:00",
"isWatched": "true"
},
]
}
def handle_get_user_map_api_request(self, data: Dict) -> Dict:
user_map_list = self.data.item.get_maps(data["userId"])
if user_map_list is None: return {}
map_list = []
for map in user_map_list:
tmp = map._asdict()
tmp.pop("id")
tmp.pop("user")
map_list.append(tmp)
return {
"userId": data["userId"],
"length": len(map_list),
"userMapList": map_list
}
def handle_get_user_music_api_request(self, data: Dict) -> Dict:
music_detail = self.data.score.get_scores(data["userId"])
if music_detail is None:
return {
"userId": data["userId"],
"length": 0,
"nextIndex": -1,
"userMusicList": [] #240
}
song_list = []
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
for x in range(next_idx, len(music_detail)):
found = False
tmp = music_detail[x]._asdict()
tmp.pop("user")
tmp.pop("id")
for song in song_list:
if song["userMusicDetailList"][0]["musicId"] == tmp["musicId"]:
found = True
song["userMusicDetailList"].append(tmp)
song["length"] = len(song["userMusicDetailList"])
if not found:
song_list.append({
"length": 1,
"userMusicDetailList": [tmp]
})
if len(song_list) >= max_ct:
break
if len(song_list) >= max_ct:
next_idx += max_ct
else:
next_idx = 0
return {
"userId": data["userId"],
"length": len(song_list),
"nextIndex": next_idx,
"userMusicList": song_list #240
}
def handle_get_user_option_api_request(self, data: Dict) -> Dict:
p = self.data.profile.get_profile_option(data["userId"])
option = p._asdict()
option.pop("id")
option.pop("user")
return {
"userId": data["userId"],
"userGameOption": option
}
def handle_get_user_option_ex_api_request(self, data: Dict) -> Dict:
p = self.data.profile.get_profile_option_ex(data["userId"])
option = p._asdict()
option.pop("id")
option.pop("user")
return {
"userId": data["userId"],
"userGameOptionEx": option
}
def read_wtf8(self, src):
return bytes([ord(c) for c in src]).decode("utf-8")
def handle_get_user_preview_api_request(self, data: Dict) -> Dict:
profile = self.data.profile.get_profile_preview(data["userId"], self.version)
if profile is None: return None
profile_character = self.data.item.get_character(data["userId"], profile["characterId"])
if profile_character is None:
chara = {}
else:
chara = profile_character._asdict()
chara.pop("id")
chara.pop("user")
return {
"userId": data["userId"],
# Current Login State
"isLogin": False,
"lastLoginDate": profile["lastPlayDate"],
# User Profile
"userName": profile["userName"],
"reincarnationNum": profile["reincarnationNum"],
"level": profile["level"],
"exp": profile["exp"],
"playerRating": profile["playerRating"],
"lastGameId": profile["lastGameId"],
"lastRomVersion": profile["lastRomVersion"],
"lastDataVersion": profile["lastDataVersion"],
"lastPlayDate": profile["lastPlayDate"],
"trophyId": profile["trophyId"],
"nameplateId": profile["nameplateId"],
# Current Selected Character
"userCharacter": chara,
# User Game Options
"playerLevel": profile["playerLevel"],
"rating": profile["rating"],
"headphone": profile["headphone"],
"chargeState": "1",
"userNameEx": profile["userName"],
}
def handle_get_user_recent_rating_api_request(self, data: Dict) -> Dict:
recet_rating_list = self.data.profile.get_profile_recent_rating(data["userId"])
if recet_rating_list is None:
return {
"userId": data["userId"],
"length": 0,
"userRecentRatingList": [],
}
return {
"userId": data["userId"],
"length": len(recet_rating_list["recentRating"]),
"userRecentRatingList": recet_rating_list["recentRating"],
}
def handle_get_user_region_api_request(self, data: Dict) -> Dict:
# TODO: Region
return {
"userId": data["userId"],
"length": 0,
"userRegionList": [],
}
def handle_get_user_team_api_request(self, data: Dict) -> Dict:
# TODO: Team
return {
"userId": data["userId"],
"teamId": 0
}
def handle_get_team_course_setting_api_request(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"length": 0,
"nextIndex": 0,
"teamCourseSettingList": [],
}
def handle_get_team_course_rule_api_request(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"length": 0,
"nextIndex": 0,
"teamCourseRuleList": [],
}
def handle_upsert_user_all_api_request(self, data: Dict) -> Dict:
upsert = data["upsertUserAll"]
user_id = data["userId"]
if "userData" in upsert:
try:
upsert["userData"][0]["userName"] = self.read_wtf8(upsert["userData"][0]["userName"])
except: pass
self.data.profile.put_profile_data(user_id, self.version, upsert["userData"][0])
if "userDataEx" in upsert:
self.data.profile.put_profile_data_ex(user_id, self.version, upsert["userDataEx"][0])
if "userGameOption" in upsert:
self.data.profile.put_profile_option(user_id, upsert["userGameOption"][0])
if "userGameOptionEx" in upsert:
self.data.profile.put_profile_option_ex(user_id, upsert["userGameOptionEx"][0])
if "userRecentRatingList" in upsert:
self.data.profile.put_profile_recent_rating(user_id, upsert["userRecentRatingList"])
if "userCharacterList" in upsert:
for character in upsert["userCharacterList"]:
self.data.item.put_character(user_id, character)
if "userMapList" in upsert:
for map in upsert["userMapList"]:
self.data.item.put_map(user_id, map)
if "userCourseList" in upsert:
for course in upsert["userCourseList"]:
self.data.score.put_course(user_id, course)
if "userDuelList" in upsert:
for duel in upsert["userDuelList"]:
self.data.item.put_duel(user_id, duel)
if "userItemList" in upsert:
for item in upsert["userItemList"]:
self.data.item.put_item(user_id, item)
if "userActivityList" in upsert:
for activity in upsert["userActivityList"]:
self.data.profile.put_profile_activity(user_id, activity)
if "userChargeList" in upsert:
for charge in upsert["userChargeList"]:
self.data.profile.put_profile_charge(user_id, charge)
if "userMusicDetailList" in upsert:
for song in upsert["userMusicDetailList"]:
self.data.score.put_score(user_id, song)
if "userPlaylogList" in upsert:
for playlog in upsert["userPlaylogList"]:
self.data.score.put_playlog(user_id, playlog)
if "userTeamPoint" in upsert:
# TODO: team stuff
pass
if "userMapAreaList" in upsert:
for map_area in upsert["userMapAreaList"]:
self.data.item.put_map_area(user_id, map_area)
if "userOverPowerList" in upsert:
for overpower in upsert["userOverPowerList"]:
self.data.profile.put_profile_overpower(user_id, overpower)
if "userEmoneyList" in upsert:
for emoney in upsert["userEmoneyList"]:
self.data.profile.put_profile_emoney(user_id, emoney)
return { "returnCode": "1" }
def handle_upsert_user_chargelog_api_request(self, data: Dict) -> Dict:
return { "returnCode": "1" }
def handle_upsert_client_bookkeeping_api_request(self, data: Dict) -> Dict:
return { "returnCode": "1" }
def handle_upsert_client_develop_api_request(self, data: Dict) -> Dict:
return { "returnCode": "1" }
def handle_upsert_client_error_api_request(self, data: Dict) -> Dict:
return { "returnCode": "1" }
def handle_upsert_client_setting_api_request(self, data: Dict) -> Dict:
return { "returnCode": "1" }
def handle_upsert_client_testmode_api_request(self, data: Dict) -> Dict:
return { "returnCode": "1" }

36
titles/chuni/config.py Normal file
View File

@ -0,0 +1,36 @@
from core.config import CoreConfig
from typing import Dict
class ChuniServerConfig():
def __init__(self, parent_config: "ChuniConfig") -> None:
self.__config = parent_config
@property
def enable(self) -> bool:
return CoreConfig.get_config_field(self.__config, 'chuni', 'server', 'enable', default=True)
@property
def loglevel(self) -> int:
return CoreConfig.str_to_loglevel(CoreConfig.get_config_field(self.__config, 'chuni', 'server', 'loglevel', default="info"))
class ChuniCryptoConfig():
def __init__(self, parent_config: "ChuniConfig") -> None:
self.__config = parent_config
@property
def keys(self) -> Dict:
"""
in the form of:
internal_version: [key, iv]
all values are hex strings
"""
return CoreConfig.get_config_field(self.__config, 'chuni', 'crypto', 'keys', default={})
@property
def encrypted_only(self) -> bool:
return CoreConfig.get_config_field(self.__config, 'chuni', 'crypto', 'encrypted_only', default=False)
class ChuniConfig(dict):
def __init__(self) -> None:
self.server = ChuniServerConfig(self)
self.crypto = ChuniCryptoConfig(self)

24
titles/chuni/const.py Normal file
View File

@ -0,0 +1,24 @@
class ChuniConstants():
GAME_CODE = "SDBT"
GAME_CODE_NEW = "SDHD"
VER_CHUNITHM = 0
VER_CHUNITHM_PLUS = 1
VER_CHUNITHM_AIR = 2
VER_CHUNITHM_AIR_PLUS = 3
VER_CHUNITHM_STAR = 4
VER_CHUNITHM_STAR_PLUS = 5
VER_CHUNITHM_AMAZON = 6
VER_CHUNITHM_AMAZON_PLUS = 7
VER_CHUNITHM_CRYSTAL = 8
VER_CHUNITHM_CRYSTAL_PLUS = 9
VER_CHUNITHM_PARADISE = 10
VER_CHUNITHM_NEW = 11
VER_CHUNITHM_NEW_PLUS = 12
VERSION_NAMES = ["Chunithm", "Chunithm+", "Chunithm Air", "Chunithm Air+", "Chunithm Star", "Chunithm Star+", "Chunithm Amazon",
"Chunithm Amazon+", "Chunithm Crystal", "Chunithm Crystal+", "Chunithm Paradise", "Chunithm New!!", "Chunithm New!!+"]
@classmethod
def game_ver_to_string(cls, ver: int):
return cls.VERSION_NAMES[ver]

18
titles/chuni/crystal.py Normal file
View File

@ -0,0 +1,18 @@
from datetime import datetime, timedelta
from typing import Dict, Any
import pytz
from core.config import CoreConfig
from titles.chuni.base import ChuniBase
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniCrystal(ChuniBase):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_CRYSTAL
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["dataVersion"] = "1.40.00"
return ret

View File

@ -0,0 +1,18 @@
from datetime import datetime, timedelta
from typing import Dict, Any
import pytz
from core.config import CoreConfig
from titles.chuni.base import ChuniBase
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniCrystalPlus(ChuniBase):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_CRYSTAL_PLUS
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["dataVersion"] = "1.45.00"
return ret

12
titles/chuni/database.py Normal file
View File

@ -0,0 +1,12 @@
from core.data import Data
from core.config import CoreConfig
from titles.chuni.schema import *
class ChuniData(Data):
def __init__(self, cfg: CoreConfig) -> None:
super().__init__(cfg)
self.item = ChuniItemData(cfg, self.session)
self.profile = ChuniProfileData(cfg, self.session)
self.score = ChuniScoreData(cfg, self.session)
self.static = ChuniStaticData(cfg, self.session)

172
titles/chuni/index.py Normal file
View File

@ -0,0 +1,172 @@
from twisted.web.http import Request
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
import zlib
import yaml
import json
import inflection
import string
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from core import CoreConfig
from titles.chuni.config import ChuniConfig
from titles.chuni.const import ChuniConstants
from titles.chuni.base import ChuniBase
from titles.chuni.plus import ChuniPlus
from titles.chuni.air import ChuniAir
from titles.chuni.airplus import ChuniAirPlus
from titles.chuni.star import ChuniStar
from titles.chuni.starplus import ChuniStarPlus
from titles.chuni.amazon import ChuniAmazon
from titles.chuni.amazonplus import ChuniAmazonPlus
from titles.chuni.crystal import ChuniCrystal
from titles.chuni.crystalplus import ChuniCrystalPlus
from titles.chuni.paradise import ChuniParadise
from titles.chuni.new import ChuniNew
from titles.chuni.newplus import ChuniNewPlus
class ChuniServlet():
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = ChuniConfig()
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/chuni.yaml")))
self.versions = [
ChuniBase(core_cfg, self.game_cfg),
ChuniPlus(core_cfg, self.game_cfg),
ChuniAir(core_cfg, self.game_cfg),
ChuniAirPlus(core_cfg, self.game_cfg),
ChuniStar(core_cfg, self.game_cfg),
ChuniStarPlus(core_cfg, self.game_cfg),
ChuniAmazon(core_cfg, self.game_cfg),
ChuniAmazonPlus(core_cfg, self.game_cfg),
ChuniCrystal(core_cfg, self.game_cfg),
ChuniCrystalPlus(core_cfg, self.game_cfg),
ChuniParadise(core_cfg, self.game_cfg),
ChuniNew(core_cfg, self.game_cfg),
ChuniNewPlus(core_cfg, self.game_cfg),
]
self.logger = logging.getLogger("chuni")
if not hasattr(self.logger, "inited"):
log_fmt_str = "[%(asctime)s] Chunithm | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler("{0}/{1}.log".format(self.core_cfg.server.log_dir, "chuni"), encoding='utf8',
when="d", backupCount=10)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str)
self.logger.inited = True
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
req_raw = request.content.getvalue()
url_split = url_path.split("/")
encrtped = False
internal_ver = 0
endpoint = url_split[len(url_split) - 1]
if version < 105: # 1.0
internal_ver = ChuniConstants.VER_CHUNITHM
elif version >= 105 and version < 110: # Plus
internal_ver = ChuniConstants.VER_CHUNITHM_PLUS
elif version >= 110 and version < 115: # Air
internal_ver = ChuniConstants.VER_CHUNITHM_AIR
elif version >= 115 and version < 120: # Air Plus
internal_ver = ChuniConstants.VER_CHUNITHM_AIR_PLUS
elif version >= 120 and version < 125: # Star
internal_ver = ChuniConstants.VER_CHUNITHM_STAR
elif version >= 125 and version < 130: # Star Plus
internal_ver = ChuniConstants.VER_CHUNITHM_STAR_PLUS
elif version >= 130 and version < 135: # Amazon
internal_ver = ChuniConstants.VER_CHUNITHM_AMAZON
elif version >= 135 and version < 140: # Amazon Plus
internal_ver = ChuniConstants.VER_CHUNITHM_AMAZON_PLUS
elif version >= 140 and version < 145: # Crystal
internal_ver = ChuniConstants.VER_CHUNITHM_CRYSTAL
elif version >= 145 and version < 150: # Crystal Plus
internal_ver = ChuniConstants.VER_CHUNITHM_CRYSTAL_PLUS
elif version >= 150 and version < 200: # Paradise
internal_ver = ChuniConstants.VER_CHUNITHM_PARADISE
elif version >= 200 and version < 205: # New
internal_ver = ChuniConstants.VER_CHUNITHM_NEW
elif version >= 205 and version < 210: # New Plus
internal_ver = ChuniConstants.VER_CHUNITHM_NEW_PLUS
if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32:
# If we get a 32 character long hex string, it's a hash and we're
# doing encrypted. The likelyhood of false positives is low but
# technically not 0
endpoint = request.getHeader("User-Agent").split("#")[0]
try:
crypt = AES.new(
bytes.fromhex(self.game_cfg.crypto.keys[str(internal_ver)][0]),
AES.MODE_CBC,
bytes.fromhex(self.game_cfg.crypto.keys[str(internal_ver)][1])
)
req_raw = crypt.decrypt(req_raw)
except:
self.logger.error(f"Failed to decrypt v{version} request to {endpoint} -> {req_raw}")
return zlib.compress("{\"stat\": \"0\"}".encode("utf-8"))
encrtped = True
if not encrtped and self.game_cfg.crypto.encrypted_only:
self.logger.error(f"Unencrypted v{version} {endpoint} request, but config is set to encrypted only: {req_raw}")
return zlib.compress("{\"stat\": \"0\"}".encode("utf-8"))
try:
unzip = zlib.decompress(req_raw)
except zlib.error as e:
self.logger.error(f"Failed to decompress v{version} {endpoint} request -> {e}")
return b""
req_data = json.loads(unzip)
self.logger.info(f"v{version} {endpoint} request - {req_data}")
func_to_find = "handle_" + inflection.underscore(endpoint) + "_request"
try:
handler = getattr(self.versions[internal_ver], func_to_find)
resp = handler(req_data)
except AttributeError as e:
self.logger.warning(f"Unhandled v{version} request {endpoint} - {e}")
return zlib.compress("{\"stat\": \"0\"}".encode("utf-8"))
except Exception as e:
self.logger.error(f"Error handling v{version} method {endpoint} - {e}")
return zlib.compress("{\"stat\": \"0\"}".encode("utf-8"))
if resp == None:
resp = {'returnCode': 1}
self.logger.info(f"Response {resp}")
zipped = zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8"))
if not encrtped:
return zipped
padded = pad(zipped, 16)
crypt = AES.new(
bytes.fromhex(self.game_cfg.crypto.keys[str(internal_ver)][0]),
AES.MODE_CBC,
bytes.fromhex(self.game_cfg.crypto.keys[str(internal_ver)][1])
)
return crypt.encrypt(padded)

128
titles/chuni/new.py Normal file
View File

@ -0,0 +1,128 @@
import logging
from datetime import datetime, timedelta
from typing import Dict
from core.config import CoreConfig
from titles.chuni.const import ChuniConstants
from titles.chuni.database import ChuniData
from titles.chuni.base import ChuniBase
from titles.chuni.config import ChuniConfig
class ChuniNew(ChuniBase):
ITEM_TYPE = {
"character": 20,
"story": 21,
"card": 22
}
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
self.core_cfg = core_cfg
self.game_cfg = game_cfg
self.data = ChuniData(core_cfg)
self.date_time_format = "%Y-%m-%d %H:%M:%S"
self.logger = logging.getLogger("chuni")
self.game = ChuniConstants.GAME_CODE
self.version = ChuniConstants.VER_CHUNITHM_NEW
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
match_start = datetime.strftime(datetime.now() - timedelta(hours=10), self.date_time_format)
match_end = datetime.strftime(datetime.now() + timedelta(hours=10), self.date_time_format)
reboot_start = datetime.strftime(datetime.now() - timedelta(hours=11), self.date_time_format)
reboot_end = datetime.strftime(datetime.now() - timedelta(hours=10), self.date_time_format)
return {
"gameSetting": {
"isMaintenance": "false",
"requestInterval": 10,
"rebootStartTime": reboot_start,
"rebootEndTime": reboot_end,
"isBackgroundDistribute": "false",
"maxCountCharacter": 300,
"maxCountItem": 300,
"maxCountMusic": 300,
"matchStartTime": match_start,
"matchEndTime": match_end,
"matchTimeLimit": 99,
"matchErrorLimit": 9999,
"romVersion": "2.00.00",
"dataVersion": "2.00.00",
"matchingUri": f"http://{self.core_cfg.server.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"matchingUriX": f"http://{self.core_cfg.server.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"udpHolePunchUri": f"http://{self.core_cfg.server.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"reflectorUri": f"http://{self.core_cfg.server.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
},
"isDumpUpload": "false",
"isAou": "false",
}
def handle_delete_token_api_request(self, data: Dict) -> Dict:
return { "returnCode": "1" }
def handle_create_token_api_request(self, data: Dict) -> Dict:
return { "returnCode": "1" }
def handle_get_user_map_area_api_request(self, data: Dict) -> Dict:
user_map_areas = self.data.item.get_map_areas(data["userId"])
map_areas = []
for map_area in user_map_areas:
tmp = map_area._asdict()
tmp.pop("id")
tmp.pop("user")
map_areas.append(tmp)
return {
"userId": data["userId"],
"userMapAreaList": map_areas
}
def handle_get_user_symbol_chat_setting_api_request(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"symbolCharInfoList": []
}
def handle_get_user_preview_api_request(self, data: Dict) -> Dict:
profile = self.data.profile.get_profile_preview(data["userId"], self.version)
if profile is None: return None
profile_character = self.data.item.get_character(data["userId"], profile["characterId"])
if profile_character is None:
chara = {}
else:
chara = profile_character._asdict()
chara.pop("id")
chara.pop("user")
data1 = {
"userId": data["userId"],
# Current Login State
"isLogin": False,
"lastLoginDate": profile["lastPlayDate"],
# User Profile
"userName": profile["userName"],
"reincarnationNum": profile["reincarnationNum"],
"level": profile["level"],
"exp": profile["exp"],
"playerRating": profile["playerRating"],
"lastGameId": profile["lastGameId"],
"lastRomVersion": profile["lastRomVersion"],
"lastDataVersion": profile["lastDataVersion"],
"lastPlayDate": profile["lastPlayDate"],
"emoneyBrandId": 0,
"trophyId": profile["trophyId"],
# Current Selected Character
"userCharacter": chara,
# User Game Options
"playerLevel": profile["playerLevel"],
"rating": profile["rating"],
"headphone": profile["headphone"],
"chargeState": 0,
"userNameEx": "0",
"banState": 0,
"classEmblemMedal": profile["classEmblemMedal"],
"classEmblemBase": profile["classEmblemBase"],
"battleRankId": profile["battleRankId"],
}
return data1

23
titles/chuni/newplus.py Normal file
View File

@ -0,0 +1,23 @@
from datetime import datetime, timedelta
from typing import Dict, Any
import pytz
from core.config import CoreConfig
from titles.chuni.new import ChuniNew
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniNewPlus(ChuniNew):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_NEW_PLUS
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["romVersion"] = "2.05.00"
ret["gameSetting"]["dataVersion"] = "2.05.00"
ret["gameSetting"]["matchingUri"] = f"http://{self.core_cfg.server.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
ret["gameSetting"]["matchingUriX"] = f"http://{self.core_cfg.server.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
ret["gameSetting"]["udpHolePunchUri"] = f"http://{self.core_cfg.server.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
ret["gameSetting"]["reflectorUri"] = f"http://{self.core_cfg.server.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
return ret

18
titles/chuni/paradise.py Normal file
View File

@ -0,0 +1,18 @@
from datetime import datetime, timedelta
from typing import Dict, Any
import pytz
from core.config import CoreConfig
from titles.chuni.base import ChuniBase
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniParadise(ChuniBase):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_PARADISE
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["dataVersion"] = "1.50.00"
return ret

16
titles/chuni/plus.py Normal file
View File

@ -0,0 +1,16 @@
from typing import Dict
from core.config import CoreConfig
from titles.chuni.base import ChuniBase
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniPlus(ChuniBase):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_PLUS
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["dataVersion"] = "1.05.00"
return ret

157
titles/chuni/read.py Normal file
View File

@ -0,0 +1,157 @@
from typing import Optional
from os import walk, path
import xml.etree.ElementTree as ET
from read import BaseReader
from core.config import CoreConfig
from titles.chuni.database import ChuniData
from titles.chuni.const import ChuniConstants
class ChuniReader(BaseReader):
def __init__(self, config: CoreConfig, version: int, bin_dir: Optional[str], opt_dir: Optional[str], extra: Optional[str]) -> None:
super().__init__(config, version, bin_dir, opt_dir, extra)
self.data = ChuniData(config)
try:
self.logger.info(f"Start importer for {ChuniConstants.game_ver_to_string(version)}")
except IndexError:
self.logger.error(f"Invalid chunithm version {version}")
exit(1)
def read(self) -> None:
data_dirs = []
if self.bin_dir is not None:
data_dirs += self.get_data_directories(self.bin_dir)
if self.opt_dir is not None:
data_dirs += self.get_data_directories(self.opt_dir)
for dir in data_dirs:
self.logger.info(f"Read from {dir}")
self.read_events(f"{dir}/event")
self.read_music(f"{dir}/music")
self.read_charges(f"{dir}/chargeItem")
self.read_avatar(f"{dir}/avatarAccessory")
def read_events(self, evt_dir: str) -> None:
for root, dirs, files in walk(evt_dir):
for dir in dirs:
if path.exists(f"{root}/{dir}/Event.xml"):
with open(f"{root}/{dir}/Event.xml", 'rb') as fp:
bytedata = fp.read()
strdata = bytedata.decode('UTF-8')
xml_root = ET.fromstring(strdata)
for name in xml_root.findall('name'):
id = name.find('id').text
name = name.find('str').text
for substances in xml_root.findall('substances'):
event_type = substances.find('type').text
result = self.data.static.put_event(self.version, id, event_type, name)
if result is not None:
self.logger.info(f"Inserted event {id}")
else:
self.logger.warn(f"Failed to insert event {id}")
def read_music(self, music_dir: str) -> None:
for root, dirs, files in walk(music_dir):
for dir in dirs:
if path.exists(f"{root}/{dir}/Music.xml"):
with open(f"{root}/{dir}/Music.xml", 'rb') as fp:
bytedata = fp.read()
strdata = bytedata.decode('UTF-8')
xml_root = ET.fromstring(strdata)
for name in xml_root.findall('name'):
song_id = name.find('id').text
title = name.find('str').text
for artistName in xml_root.findall('artistName'):
artist = artistName.find('str').text
for genreNames in xml_root.findall('genreNames'):
for list_ in genreNames.findall('list'):
for StringID in list_.findall('StringID'):
genre = StringID.find('str').text
for jaketFile in xml_root.findall('jaketFile'): #nice typo, SEGA
jacket_path = jaketFile.find('path').text
for fumens in xml_root.findall('fumens'):
for MusicFumenData in fumens.findall('MusicFumenData'):
fumen_path = MusicFumenData.find('file').find("path")
if fumen_path.text is not None:
chart_id = MusicFumenData.find('type').find('id').text
if chart_id == "4":
level = float(xml_root.find("starDifType").text)
we_chara = xml_root.find("worldsEndTagName").find("str").text
else:
level = float(f"{MusicFumenData.find('level').text}.{MusicFumenData.find('levelDecimal').text}")
we_chara = None
result = self.data.static.put_music(
self.version,
song_id,
chart_id,
title,
artist,
level,
genre,
jacket_path,
we_chara
)
if result is not None:
self.logger.info(f"Inserted music {song_id} chart {chart_id}")
else:
self.logger.warn(f"Failed to insert music {song_id} chart {chart_id}")
def read_charges(self, charge_dir: str) -> None:
for root, dirs, files in walk(charge_dir):
for dir in dirs:
if path.exists(f"{root}/{dir}/ChargeItem.xml"):
with open(f"{root}/{dir}/ChargeItem.xml", 'rb') as fp:
bytedata = fp.read()
strdata = bytedata.decode('UTF-8')
xml_root = ET.fromstring(strdata)
for name in xml_root.findall('name'):
id = name.find('id').text
name = name.find('str').text
expirationDays = xml_root.find('expirationDays').text
consumeType = xml_root.find('consumeType').text
sellingAppeal = bool(xml_root.find('sellingAppeal').text)
result = self.data.static.put_charge(self.version, id, name, expirationDays, consumeType, sellingAppeal)
if result is not None:
self.logger.info(f"Inserted charge {id}")
else:
self.logger.warn(f"Failed to insert charge {id}")
def read_avatar(self, avatar_dir: str) -> None:
for root, dirs, files in walk(avatar_dir):
for dir in dirs:
if path.exists(f"{root}/{dir}/AvatarAccessory.xml"):
with open(f"{root}/{dir}/AvatarAccessory.xml", 'rb') as fp:
bytedata = fp.read()
strdata = bytedata.decode('UTF-8')
xml_root = ET.fromstring(strdata)
for name in xml_root.findall('name'):
id = name.find('id').text
name = name.find('str').text
category = xml_root.find('category').text
for image in xml_root.findall('image'):
iconPath = image.find('path').text
for texture in xml_root.findall('texture'):
texturePath = texture.find('path').text
result = self.data.static.put_avatar(self.version, id, name, category, iconPath, texturePath)
if result is not None:
self.logger.info(f"Inserted avatarAccessory {id}")
else:
self.logger.warn(f"Failed to insert avatarAccessory {id}")

View File

@ -0,0 +1,6 @@
from titles.chuni.schema.profile import ChuniProfileData
from titles.chuni.schema.score import ChuniScoreData
from titles.chuni.schema.item import ChuniItemData
from titles.chuni.schema.static import ChuniStaticData
__all__ = ["ChuniProfileData", "ChuniScoreData", "ChuniItemData", "ChuniStaticData"]

207
titles/chuni/schema/item.py Normal file
View File

@ -0,0 +1,207 @@
from typing import Dict, List, Optional
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON
from sqlalchemy.engine.base import Connection
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql import func, select
from sqlalchemy.dialects.mysql import insert
from sqlalchemy.engine import Row
from core.data.schema import BaseData, metadata
character = Table(
"chuni_item_character",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("characterId", Integer),
Column("level", Integer),
Column("param1", Integer),
Column("param2", Integer),
Column("isValid", Boolean),
Column("skillId", Integer),
Column("isNewMark", Boolean),
Column("playCount", Integer),
Column("friendshipExp", Integer),
Column("assignIllust", Integer),
Column("exMaxLv", Integer),
UniqueConstraint("user", "characterId", name="chuni_item_character_uk"),
mysql_charset='utf8mb4'
)
item = Table(
"chuni_item_item",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("itemId", Integer),
Column("itemKind", Integer),
Column("stock", Integer),
Column("isValid", Boolean),
UniqueConstraint("user", "itemId", "itemKind", name="chuni_item_item_uk"),
mysql_charset='utf8mb4'
)
duel = Table(
"chuni_item_duel",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("duelId", Integer),
Column("progress", Integer),
Column("point", Integer),
Column("isClear", Boolean),
Column("lastPlayDate", String(25)),
Column("param1", Integer),
Column("param2", Integer),
Column("param3", Integer),
Column("param4", Integer),
UniqueConstraint("user", "duelId", name="chuni_item_duel_uk"),
mysql_charset='utf8mb4'
)
map = Table(
"chuni_item_map",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("mapId", Integer),
Column("position", Integer),
Column("isClear", Boolean),
Column("areaId", Integer),
Column("routeNumber", Integer),
Column("eventId", Integer),
Column("rate", Integer),
Column("statusCount", Integer),
Column("isValid", Boolean),
UniqueConstraint("user", "mapId", name="chuni_item_map_uk"),
mysql_charset='utf8mb4'
)
map_area = Table(
"chuni_item_map_area",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("mapAreaId", Integer),
Column("rate", Integer),
Column("isClear", Boolean),
Column("isLocked", Boolean),
Column("position", Integer),
Column("statusCount", Integer),
Column("remainGridCount", Integer),
UniqueConstraint("user", "mapAreaId", name="chuni_item_map_area_uk"),
mysql_charset='utf8mb4'
)
class ChuniItemData(BaseData):
def put_character(self, user_id: int, character_data: Dict) -> Optional[int]:
character_data["user"] = user_id
character_data = self.fix_bools(character_data)
sql = insert(character).values(**character_data)
conflict = sql.on_duplicate_key_update(**character_data)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def get_character(self, user_id: int, character_id: int) -> Optional[Dict]:
sql = select(character).where(and_(
character.c.user == user_id,
character.c.characterId == character_id
))
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def get_characters(self, user_id: int) -> Optional[List[Row]]:
sql = select(character).where(character.c.user == user_id)
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def put_item(self, user_id: int, item_data: Dict) -> Optional[int]:
item_data["user"] = user_id
item_data = self.fix_bools(item_data)
sql = insert(item).values(**item_data)
conflict = sql.on_duplicate_key_update(**item_data)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def get_items(self, user_id: int, kind: int = None) -> Optional[List[Row]]:
if kind is None:
sql = select(item).where(item.c.user == user_id)
else:
sql = select(item).where(and_(
item.c.user == user_id,
item.c.itemKind == kind
))
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def put_duel(self, user_id: int, duel_data: Dict) -> Optional[int]:
duel_data["user"] = user_id
duel_data = self.fix_bools(duel_data)
sql = insert(duel).values(**duel_data)
conflict = sql.on_duplicate_key_update(**duel_data)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def get_duels(self, user_id: int) -> Optional[List[Row]]:
sql = select(duel).where(duel.c.user == user_id)
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def put_map(self, user_id: int, map_data: Dict) -> Optional[int]:
map_data["user"] = user_id
map_data = self.fix_bools(map_data)
sql = insert(map).values(**map_data)
conflict = sql.on_duplicate_key_update(**map_data)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def get_maps(self, user_id: int) -> Optional[List[Row]]:
sql = select(map).where(map.c.user == user_id)
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def put_map_area(self, user_id: int, map_area_data: Dict) -> Optional[int]:
map_area_data["user"] = user_id
map_area_data = self.fix_bools(map_area_data)
sql = insert(map_area).values(**map_area_data)
conflict = sql.on_duplicate_key_update(**map_area_data)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def get_map_areas(self, user_id: int) -> Optional[List[Row]]:
sql = select(map_area).where(map_area.c.user == user_id)
result = self.execute(sql)
if result is None: return None
return result.fetchall()

View File

@ -0,0 +1,551 @@
from typing import Dict, List, Optional
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON, BigInteger
from sqlalchemy.engine.base import Connection
from sqlalchemy.schema import ForeignKey
from sqlalchemy.engine import Row
from sqlalchemy.sql import func, select
from sqlalchemy.dialects.mysql import insert
from core.data.schema import BaseData, metadata
profile = Table(
"chuni_profile_data",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("version", Integer, nullable=False),
Column("exp", Integer),
Column("level", Integer),
Column("point", Integer),
Column("frameId", Integer),
Column("isMaimai", Boolean),
Column("trophyId", Integer),
Column("userName", String(25)),
Column("isWebJoin", Boolean),
Column("playCount", Integer),
Column("lastGameId", String(25)),
Column("totalPoint", BigInteger),
Column("characterId", Integer),
Column("firstGameId", String(25)),
Column("friendCount", Integer),
Column("lastPlaceId", Integer),
Column("nameplateId", Integer),
Column("totalMapNum", Integer),
Column("lastAllNetId", Integer),
Column("lastClientId", String(25)),
Column("lastPlayDate", String(25)),
Column("lastRegionId", Integer),
Column("playerRating", Integer),
Column("totalHiScore", Integer),
Column("webLimitDate", String(25)),
Column("firstPlayDate", String(25)),
Column("highestRating", Integer),
Column("lastPlaceName", String(25)),
Column("multiWinCount", Integer),
Column("acceptResCount", Integer),
Column("lastRegionName", String(25)),
Column("lastRomVersion", String(25)),
Column("multiPlayCount", Integer),
Column("firstRomVersion", String(25)),
Column("lastDataVersion", String(25)),
Column("requestResCount", Integer),
Column("successResCount", Integer),
Column("eventWatchedDate", String(25)),
Column("firstDataVersion", String(25)),
Column("reincarnationNum", Integer),
Column("playedTutorialBit", Integer),
Column("totalBasicHighScore", Integer),
Column("totalExpertHighScore", Integer),
Column("totalMasterHighScore", Integer),
Column("totalRepertoireCount", Integer),
Column("firstTutorialCancelNum", Integer),
Column("totalAdvancedHighScore", Integer),
Column("masterTutorialCancelNum", Integer),
Column("ext1", Integer), # Added in chunew
Column("ext2", Integer),
Column("ext3", Integer),
Column("ext4", Integer),
Column("ext5", Integer),
Column("ext6", Integer),
Column("ext7", Integer),
Column("ext8", Integer),
Column("ext9", Integer),
Column("ext10", Integer),
Column("extStr1", String(255)),
Column("extStr2", String(255)),
Column("extLong1", Integer),
Column("extLong2", Integer),
Column("mapIconId", Integer),
Column("compatibleCmVersion", String(25)),
Column("medal", Integer),
Column("voiceId", Integer),
Column("teamId", Integer, ForeignKey("chuni_profile_team.id", ondelete="SET NULL", onupdate="SET NULL")),
Column("avatarBack", Integer, server_default="0"),
Column("avatarFace", Integer, server_default="0"),
Column("eliteRankPoint", Integer, server_default="0"),
Column("stockedGridCount", Integer, server_default="0"),
Column("netBattleLoseCount", Integer, server_default="0"),
Column("netBattleHostErrCnt", Integer, server_default="0"),
Column("netBattle4thCount", Integer, server_default="0"),
Column("overPowerRate", Integer, server_default="0"),
Column("battleRewardStatus", Integer, server_default="0"),
Column("avatarPoint", Integer, server_default="0"),
Column("netBattle1stCount", Integer, server_default="0"),
Column("charaIllustId", Integer, server_default="0"),
Column("avatarItem", Integer, server_default="0"),
Column("userNameEx", String(8), server_default=""),
Column("netBattleWinCount", Integer, server_default="0"),
Column("netBattleCorrection", Integer, server_default="0"),
Column("classEmblemMedal", Integer, server_default="0"),
Column("overPowerPoint", Integer, server_default="0"),
Column("netBattleErrCnt", Integer, server_default="0"),
Column("battleRankId", Integer, server_default="0"),
Column("netBattle3rdCount", Integer, server_default="0"),
Column("netBattleConsecutiveWinCount", Integer, server_default="0"),
Column("overPowerLowerRank", Integer, server_default="0"),
Column("avatarWear", Integer, server_default="0"),
Column("classEmblemBase", Integer, server_default="0"),
Column("battleRankPoint", Integer, server_default="0"),
Column("netBattle2ndCount", Integer, server_default="0"),
Column("totalUltimaHighScore", Integer, server_default="0"),
Column("skillId", Integer, server_default="0"),
Column("lastCountryCode", String(5), server_default="JPN"),
Column("isNetBattleHost", Boolean, server_default="0"),
Column("avatarFront", Integer, server_default="0"),
Column("avatarSkin", Integer, server_default="0"),
Column("battleRewardCount", Integer, server_default="0"),
Column("battleRewardIndex", Integer, server_default="0"),
Column("netBattlePlayCount", Integer, server_default="0"),
Column("exMapLoopCount", Integer, server_default="0"),
Column("netBattleEndState", Integer, server_default="0"),
Column("avatarHead", Integer, server_default="0"),
UniqueConstraint("user", "version", name="chuni_profile_profile_uk"),
mysql_charset='utf8mb4'
)
profile_ex = Table(
"chuni_profile_data_ex",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("version", Integer, nullable=False),
Column("ext1", Integer),
Column("ext2", Integer),
Column("ext3", Integer),
Column("ext4", Integer),
Column("ext5", Integer),
Column("ext6", Integer),
Column("ext7", Integer),
Column("ext8", Integer),
Column("ext9", Integer),
Column("ext10", Integer),
Column("ext11", Integer),
Column("ext12", Integer),
Column("ext13", Integer),
Column("ext14", Integer),
Column("ext15", Integer),
Column("ext16", Integer),
Column("ext17", Integer),
Column("ext18", Integer),
Column("ext19", Integer),
Column("ext20", Integer),
Column("medal", Integer),
Column("extStr1", String(255)),
Column("extStr2", String(255)),
Column("extStr3", String(255)),
Column("extStr4", String(255)),
Column("extStr5", String(255)),
Column("voiceId", Integer),
Column("extLong1", Integer),
Column("extLong2", Integer),
Column("extLong3", Integer),
Column("extLong4", Integer),
Column("extLong5", Integer),
Column("mapIconId", Integer),
Column("compatibleCmVersion", String(25)),
UniqueConstraint("user", "version", name="chuni_profile_data_ex_uk"),
mysql_charset='utf8mb4'
)
option = Table(
"chuni_profile_option",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("speed", Integer),
Column("bgInfo", Integer),
Column("rating", Integer),
Column("privacy", Integer),
Column("judgePos", Integer),
Column("matching", Integer),
Column("guideLine", Integer),
Column("headphone", Integer),
Column("optionSet", Integer),
Column("fieldColor", Integer),
Column("guideSound", Integer),
Column("successAir", Integer),
Column("successTap", Integer),
Column("judgeAttack", Integer),
Column("playerLevel", Integer),
Column("soundEffect", Integer),
Column("judgeJustice", Integer),
Column("successExTap", Integer),
Column("successFlick", Integer),
Column("successSkill", Integer),
Column("successSlideHold", Integer),
Column("successTapTimbre", Integer),
Column("ext1", Integer), # Added in chunew
Column("ext2", Integer),
Column("ext3", Integer),
Column("ext4", Integer),
Column("ext5", Integer),
Column("ext6", Integer),
Column("ext7", Integer),
Column("ext8", Integer),
Column("ext9", Integer),
Column("ext10", Integer),
Column("categoryDetail", Integer, server_default="0"),
Column("judgeTimingOffset_120", Integer, server_default="0"),
Column("resultVoiceShort", Integer, server_default="0"),
Column("judgeAppendSe", Integer, server_default="0"),
Column("judgeCritical", Integer, server_default="0"),
Column("trackSkip", Integer, server_default="0"),
Column("selectMusicFilterLv", Integer, server_default="0"),
Column("sortMusicFilterLv", Integer, server_default="0"),
Column("sortMusicGenre", Integer, server_default="0"),
Column("speed_120", Integer, server_default="0"),
Column("judgeTimingOffset", Integer, server_default="0"),
Column("mirrorFumen", Integer, server_default="0"),
Column("playTimingOffset_120", Integer, server_default="0"),
Column("hardJudge", Integer, server_default="0"),
Column("notesThickness", Integer, server_default="0"),
Column("fieldWallPosition", Integer, server_default="0"),
Column("playTimingOffset", Integer, server_default="0"),
Column("fieldWallPosition_120", Integer, server_default="0"),
UniqueConstraint("user", name="chuni_profile_option_uk"),
mysql_charset='utf8mb4'
)
option_ex = Table(
"chuni_profile_option_ex",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("ext1", Integer),
Column("ext2", Integer),
Column("ext3", Integer),
Column("ext4", Integer),
Column("ext5", Integer),
Column("ext6", Integer),
Column("ext7", Integer),
Column("ext8", Integer),
Column("ext9", Integer),
Column("ext10", Integer),
Column("ext11", Integer),
Column("ext12", Integer),
Column("ext13", Integer),
Column("ext14", Integer),
Column("ext15", Integer),
Column("ext16", Integer),
Column("ext17", Integer),
Column("ext18", Integer),
Column("ext19", Integer),
Column("ext20", Integer),
UniqueConstraint("user", name="chuni_profile_option_ex_uk"),
mysql_charset='utf8mb4'
)
recent_rating = Table(
"chuni_profile_recent_rating",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("recentRating", JSON),
UniqueConstraint("user", name="chuni_profile_recent_rating_uk"),
mysql_charset='utf8mb4'
)
region = Table(
"chuni_profile_region",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("regionId", Integer),
Column("playCount", Integer),
UniqueConstraint("user", "regionId", name="chuni_profile_region_uk"),
mysql_charset='utf8mb4'
)
activity = Table(
"chuni_profile_activity",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("kind", Integer),
Column("activityId", Integer), # Reminder: Change this to ID in base.py or the game will be sad
Column("sortNumber", Integer),
Column("param1", Integer),
Column("param2", Integer),
Column("param3", Integer),
Column("param4", Integer),
UniqueConstraint("user", "kind", "activityId", name="chuni_profile_activity_uk"),
mysql_charset='utf8mb4'
)
charge = Table(
"chuni_profile_charge",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("chargeId", Integer),
Column("stock", Integer),
Column("purchaseDate", String(25)),
Column("validDate", String(25)),
Column("param1", Integer),
Column("param2", Integer),
Column("paramDate", String(25)),
UniqueConstraint("user", "chargeId", name="chuni_profile_charge_uk"),
mysql_charset='utf8mb4'
)
emoney = Table(
"chuni_profile_emoney",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("ext1", Integer),
Column("ext2", Integer),
Column("ext3", Integer),
Column("type", Integer),
Column("emoneyBrand", Integer),
Column("emoneyCredit", Integer),
UniqueConstraint("user", "emoneyBrand", name="chuni_profile_emoney_uk"),
mysql_charset='utf8mb4'
)
overpower = Table(
"chuni_profile_overpower",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("genreId", Integer),
Column("difficulty", Integer),
Column("rate", Integer),
Column("point", Integer),
UniqueConstraint("user", "genreId", "difficulty", name="chuni_profile_emoney_uk"),
mysql_charset='utf8mb4'
)
team = Table(
"chuni_profile_team",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("teamName", String(255)),
Column("teamPoint", Integer),
mysql_charset='utf8mb4'
)
class ChuniProfileData(BaseData):
def put_profile_data(self, aime_id: int, version: int, profile_data: Dict) -> Optional[int]:
profile_data["user"] = aime_id
profile_data["version"] = version
if "accessCode" in profile_data:
profile_data.pop("accessCode")
profile_data = self.fix_bools(profile_data)
sql = insert(profile).values(**profile_data)
conflict = sql.on_duplicate_key_update(**profile_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"put_profile_data: Failed to update! aime_id: {aime_id}")
return None
return result.lastrowid
def get_profile_preview(self, aime_id: int, version: int) -> Optional[Row]:
sql = select([profile, option]).join(option, profile.c.user == option.c.user).filter(
and_(profile.c.user == aime_id, profile.c.version == version)
)
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def get_profile_data(self, aime_id: int, version: int) -> Optional[Row]:
sql = select(profile).where(and_(
profile.c.user == aime_id,
profile.c.version == version,
))
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def put_profile_data_ex(self, aime_id: int, version: int, profile_ex_data: Dict) -> Optional[int]:
profile_ex_data["user"] = aime_id
profile_ex_data["version"] = version
if "accessCode" in profile_ex_data:
profile_ex_data.pop("accessCode")
sql = insert(profile_ex).values(**profile_ex_data)
conflict = sql.on_duplicate_key_update(**profile_ex_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"put_profile_data_ex: Failed to update! aime_id: {aime_id}")
return None
return result.lastrowid
def get_profile_data_ex(self, aime_id: int, version: int) -> Optional[Row]:
sql = select(profile_ex).where(and_(
profile_ex.c.user == aime_id,
profile_ex.c.version == version,
))
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def put_profile_option(self, aime_id: int, option_data: Dict) -> Optional[int]:
option_data["user"] = aime_id
sql = insert(option).values(**option_data)
conflict = sql.on_duplicate_key_update(**option_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"put_profile_option: Failed to update! aime_id: {aime_id}")
return None
return result.lastrowid
def get_profile_option(self, aime_id: int) -> Optional[Row]:
sql = select(option).where(option.c.user == aime_id)
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def put_profile_option_ex(self, aime_id: int, option_ex_data: Dict) -> Optional[int]:
option_ex_data["user"] = aime_id
sql = insert(option_ex).values(**option_ex_data)
conflict = sql.on_duplicate_key_update(**option_ex_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"put_profile_option_ex: Failed to update! aime_id: {aime_id}")
return None
return result.lastrowid
def get_profile_option_ex(self, aime_id: int) -> Optional[Row]:
sql = select(option_ex).where(option_ex.c.user == aime_id)
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def put_profile_recent_rating(self, aime_id: int, recent_rating_data: List[Dict]) -> Optional[int]:
sql = insert(recent_rating).values(
user = aime_id,
recentRating = recent_rating_data
)
conflict = sql.on_duplicate_key_update(recentRating = recent_rating_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"put_profile_recent_rating: Failed to update! aime_id: {aime_id}")
return None
return result.lastrowid
def get_profile_recent_rating(self, aime_id: int) -> Optional[Row]:
sql = select(recent_rating).where(recent_rating.c.user == aime_id)
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def put_profile_activity(self, aime_id: int, activity_data: Dict) -> Optional[int]:
# The game just uses "id" but we need to distinguish that from the db column "id"
activity_data["user"] = aime_id
activity_data["activityId"] = activity_data["id"]
activity_data.pop("id")
sql = insert(activity).values(**activity_data)
conflict = sql.on_duplicate_key_update(**activity_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"put_profile_activity: Failed to update! aime_id: {aime_id}")
return None
return result.lastrowid
def get_profile_activity(self, aime_id: int, kind: int) -> Optional[List[Row]]:
sql = select(activity).where(and_(
activity.c.user == aime_id,
activity.c.kind == kind
))
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def put_profile_charge(self, aime_id: int, charge_data: Dict) -> Optional[int]:
charge_data["user"] = aime_id
sql = insert(charge).values(**charge_data)
conflict = sql.on_duplicate_key_update(**charge_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(f"put_profile_charge: Failed to update! aime_id: {aime_id}")
return None
return result.lastrowid
def get_profile_charge(self, aime_id: int) -> Optional[List[Row]]:
sql = select(charge).where(charge.c.user == aime_id)
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def add_profile_region(self, aime_id: int, region_id: int) -> Optional[int]:
pass
def get_profile_regions(self, aime_id: int) -> Optional[List[Row]]:
pass
def put_profile_emoney(self, aime_id: int, emoney_data: Dict) -> Optional[int]:
emoney_data["user"] = aime_id
sql = insert(emoney).values(**emoney_data)
conflict = sql.on_duplicate_key_update(**emoney_data)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def get_profile_emoney(self, aime_id: int) -> Optional[List[Row]]:
sql = select(emoney).where(emoney.c.user == aime_id)
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def put_profile_overpower(self, aime_id: int, overpower_data: Dict) -> Optional[int]:
overpower_data["user"] = aime_id
sql = insert(overpower).values(**overpower_data)
conflict = sql.on_duplicate_key_update(**overpower_data)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def get_profile_overpower(self, aime_id: int) -> Optional[List[Row]]:
sql = select(overpower).where(overpower.c.user == aime_id)
result = self.execute(sql)
if result is None: return None
return result.fetchall()

View File

@ -0,0 +1,178 @@
from typing import Dict, List, Optional
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON, BigInteger
from sqlalchemy.engine.base import Connection
from sqlalchemy.schema import ForeignKey
from sqlalchemy.engine import Row
from sqlalchemy.sql import func, select
from sqlalchemy.dialects.mysql import insert
from core.data.schema import BaseData, metadata
course = Table(
"chuni_score_course",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("courseId", Integer),
Column("classId", Integer),
Column("playCount", Integer),
Column("scoreMax", Integer),
Column("isFullCombo", Boolean),
Column("isAllJustice", Boolean),
Column("isSuccess", Boolean),
Column("scoreRank", Integer),
Column("eventId", Integer),
Column("lastPlayDate", String(25)),
Column("param1", Integer),
Column("param2", Integer),
Column("param3", Integer),
Column("param4", Integer),
Column("isClear", Boolean),
UniqueConstraint("user", "courseId", name="chuni_score_course_uk"),
mysql_charset='utf8mb4'
)
best_score = Table(
"chuni_score_best",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("musicId", Integer),
Column("level", Integer),
Column("playCount", Integer),
Column("scoreMax", Integer),
Column("resRequestCount", Integer),
Column("resAcceptCount", Integer),
Column("resSuccessCount", Integer),
Column("missCount", Integer),
Column("maxComboCount", Integer),
Column("isFullCombo", Boolean),
Column("isAllJustice", Boolean),
Column("isSuccess", Boolean),
Column("fullChain", Integer),
Column("maxChain", Integer),
Column("scoreRank", Integer),
Column("isLock", Boolean),
Column("ext1", Integer),
Column("theoryCount", Integer),
UniqueConstraint("user", "musicId", "level", name="chuni_score_best_uk"),
mysql_charset='utf8mb4'
)
playlog = Table(
"chuni_score_playlog",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
Column("orderId", Integer),
Column("sortNumber", Integer),
Column("placeId", Integer),
Column("playDate", String(20)),
Column("userPlayDate", String(20)),
Column("musicId", Integer),
Column("level", Integer),
Column("customId", Integer),
Column("playedUserId1", Integer),
Column("playedUserId2", Integer),
Column("playedUserId3", Integer),
Column("playedUserName1", String(20)),
Column("playedUserName2", String(20)),
Column("playedUserName3", String(20)),
Column("playedMusicLevel1", Integer),
Column("playedMusicLevel2", Integer),
Column("playedMusicLevel3", Integer),
Column("playedCustom1", Integer),
Column("playedCustom2", Integer),
Column("playedCustom3", Integer),
Column("track", Integer),
Column("score", Integer),
Column("rank", Integer),
Column("maxCombo", Integer),
Column("maxChain", Integer),
Column("rateTap", Integer),
Column("rateHold", Integer),
Column("rateSlide", Integer),
Column("rateAir", Integer),
Column("rateFlick", Integer),
Column("judgeGuilty", Integer),
Column("judgeAttack", Integer),
Column("judgeJustice", Integer),
Column("judgeCritical", Integer),
Column("eventId", Integer),
Column("playerRating", Integer),
Column("isNewRecord", Boolean),
Column("isFullCombo", Boolean),
Column("fullChainKind", Integer),
Column("isAllJustice", Boolean),
Column("isContinue", Boolean),
Column("isFreeToPlay", Boolean),
Column("characterId", Integer),
Column("skillId", Integer),
Column("playKind", Integer),
Column("isClear", Boolean),
Column("skillLevel", Integer),
Column("skillEffect", Integer),
Column("placeName", String(255)),
Column("isMaimai", Boolean),
Column("commonId", Integer),
Column("charaIllustId", Integer),
Column("romVersion", String(255)),
Column("judgeHeaven", Integer),
mysql_charset='utf8mb4'
)
class ChuniScoreData(BaseData):
def get_courses(self, aime_id: int) -> Optional[Row]:
sql = select(course).where(course.c.user == aime_id)
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def put_course(self, aime_id: int, course_data: Dict) -> Optional[int]:
course_data["user"] = aime_id
course_data = self.fix_bools(course_data)
sql = insert(course).values(**course_data)
conflict = sql.on_duplicate_key_update(**course_data)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def get_scores(self, aime_id: int) -> Optional[Row]:
sql = select(best_score).where(best_score.c.user == aime_id)
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def put_score(self, aime_id: int, score_data: Dict) -> Optional[int]:
score_data["user"] = aime_id
score_data = self.fix_bools(score_data)
sql = insert(best_score).values(**score_data)
conflict = sql.on_duplicate_key_update(**score_data)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def get_playlogs(self, aime_id: int) -> Optional[Row]:
sql = select(playlog).where(playlog.c.user == aime_id)
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def put_playlog(self, aime_id: int, playlog_data: Dict) -> Optional[int]:
playlog_data["user"] = aime_id
playlog_data = self.fix_bools(playlog_data)
sql = insert(playlog).values(**playlog_data)
conflict = sql.on_duplicate_key_update(**playlog_data)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid

View File

@ -0,0 +1,223 @@
from typing import Dict, List, Optional
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON, Float
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine import Row
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql import func, select
from sqlalchemy.dialects.mysql import insert
from core.data.schema import BaseData, metadata
events = Table(
"chuni_static_events",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column("eventId", Integer),
Column("type", Integer),
Column("name", String(255)),
Column("enabled", Boolean, server_default="1"),
UniqueConstraint("version", "eventId", name="chuni_static_events_uk"),
mysql_charset='utf8mb4'
)
music = Table(
"chuni_static_music",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column("songId", Integer),
Column("chartId", Integer),
Column("title", String(255)),
Column("artist", String(255)),
Column("level", Float),
Column("genre", String(255)),
Column("jacketPath", String(255)),
Column("worldsEndTag", String(20)),
UniqueConstraint("version", "songId", "chartId", name="chuni_static_music_uk"),
mysql_charset='utf8mb4'
)
charge = Table(
"chuni_static_charge",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column("chargeId", Integer),
Column("name", String(255)),
Column("expirationDays", Integer),
Column("consumeType", Integer),
Column("sellingAppeal", Boolean),
Column("enabled", Boolean, server_default="1"),
UniqueConstraint("version", "chargeId", name="chuni_static_charge_uk"),
mysql_charset='utf8mb4'
)
avatar = Table(
"chuni_static_avatar",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column("avatarAccessoryId", Integer),
Column("name", String(255)),
Column("category", Integer),
Column("iconPath", String(255)),
Column("texturePath", String(255)),
UniqueConstraint("version", "avatarAccessoryId", name="chuni_static_avatar_uk"),
mysql_charset='utf8mb4'
)
class ChuniStaticData(BaseData):
def put_event(self, version: int, event_id: int, type: int, name: str) -> Optional[int]:
sql = insert(events).values(
version = version,
eventId = event_id,
type = type,
name = name
)
conflict = sql.on_duplicate_key_update(
name = name
)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def update_event(self, version: int, event_id: int, enabled: bool) -> Optional[bool]:
sql = events.update(and_(events.c.version == version, events.c.eventId == event_id)).values(
enabled = enabled
)
result = self.execute(sql)
if result is None:
self.logger.warn(f"update_event: failed to update event! version: {version}, event_id: {event_id}, enabled: {enabled}")
return None
event = self.get_event(version, event_id)
if event is None:
self.logger.warn(f"update_event: failed to fetch event {event_id} after updating")
return None
return event["enabled"]
def get_event(self, version: int, event_id: int) -> Optional[Row]:
sql = select(events).where(and_(events.c.version == version, events.c.eventId == event_id))
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def get_enabled_events(self, version: int) -> Optional[List[Row]]:
sql = select(events).where(and_(events.c.version == version, events.c.enabled == True))
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def get_events(self, version: int) -> Optional[List[Row]]:
sql = select(events).where(events.c.version == version)
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def put_music(self, version: int, song_id: int, chart_id: int, title: int, artist: str,
level: float, genre: str, jacketPath: str, we_tag: str) -> Optional[int]:
sql = insert(music).values(
version = version,
songId = song_id,
chartId = chart_id,
title = title,
artist = artist,
level = level,
genre = genre,
jacketPath = jacketPath,
worldsEndTag = we_tag,
)
conflict = sql.on_duplicate_key_update(
title = title,
artist = artist,
level = level,
genre = genre,
jacketPath = jacketPath,
worldsEndTag = we_tag,
)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def put_charge(self, version: int, charge_id: int, name: str, expiration_days: int,
consume_type: int, selling_appeal: bool) -> Optional[int]:
sql = insert(charge).values(
version = version,
chargeId = charge_id,
name = name,
expirationDays = expiration_days,
consumeType = consume_type,
sellingAppeal = selling_appeal,
)
conflict = sql.on_duplicate_key_update(
name = name,
expirationDays = expiration_days,
consumeType = consume_type,
sellingAppeal = selling_appeal,
)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
def get_enabled_charges(self, version: int) -> Optional[List[Row]]:
sql = select(charge).where(and_(
charge.c.version == version,
charge.c.enabled == True
))
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def get_charges(self, version: int) -> Optional[List[Row]]:
sql = select(charge).where(charge.c.version == version)
result = self.execute(sql)
if result is None: return None
return result.fetchall()
def get_music_chart(self, version: int, song_id: int, chart_id: int) -> Optional[List[Row]]:
sql = select(music).where(and_(
music.c.version == version,
music.c.songId == song_id,
music.c.chartId == chart_id
))
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def put_avatar(self, version: int, avatarAccessoryId: int, name: str, category: int, iconPath: str, texturePath: str) -> Optional[int]:
sql = insert(avatar).values(
version = version,
avatarAccessoryId = avatarAccessoryId,
name = name,
category = category,
iconPath = iconPath,
texturePath = texturePath,
)
conflict = sql.on_duplicate_key_update(
name = name,
category = category,
iconPath = iconPath,
texturePath = texturePath,
)
result = self.execute(conflict)
if result is None: return None
return result.lastrowid

16
titles/chuni/star.py Normal file
View File

@ -0,0 +1,16 @@
from typing import Dict
from core.config import CoreConfig
from titles.chuni.base import ChuniBase
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniStar(ChuniBase):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_STAR
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["dataVersion"] = "1.20.00"
return ret

16
titles/chuni/starplus.py Normal file
View File

@ -0,0 +1,16 @@
from typing import Dict
from core.config import CoreConfig
from titles.chuni.base import ChuniBase
from titles.chuni.const import ChuniConstants
from titles.chuni.config import ChuniConfig
class ChuniStarPlus(ChuniBase):
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
super().__init__(core_cfg, game_cfg)
self.version = ChuniConstants.VER_CHUNITHM_STAR_PLUS
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["dataVersion"] = "1.25.00"
return ret