Merge branch 'develop' into idac

This commit is contained in:
2023-11-13 16:18:04 +01:00
161 changed files with 3867 additions and 1650 deletions

View File

@ -10,7 +10,7 @@ from core.config import CoreConfig
from titles.chuni.const import ChuniConstants
from titles.chuni.database import ChuniData
from titles.chuni.config import ChuniConfig
SCORE_BUFFER = {}
class ChuniBase:
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
@ -181,21 +181,50 @@ class ChuniBase:
return {"type": data["type"], "length": 0, "gameIdlistList": []}
def handle_get_game_message_api_request(self, data: Dict) -> Dict:
return {"type": data["type"], "length": "0", "gameMessageList": []}
return {
"type": data["type"],
"length": 1,
"gameMessageList": [{
"id": 1,
"type": 1,
"message": f"Welcome to {self.core_cfg.server.name} network!" if not self.game_cfg.server.news_msg else self.game_cfg.server.news_msg,
"startDate": "2017-12-05 07:00:00.0",
"endDate": "2099-12-31 00:00:00.0"
}]
}
def handle_get_game_ranking_api_request(self, data: Dict) -> Dict:
return {"type": data["type"], "gameRankingList": []}
rankings = self.data.score.get_rankings(self.version)
return {"type": data["type"], "gameRankingList": rankings}
def handle_get_game_sale_api_request(self, data: Dict) -> Dict:
return {"type": data["type"], "length": 0, "gameSaleList": []}
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
reboot_start = datetime.strftime(
datetime.now() - timedelta(hours=4), self.date_time_format
)
reboot_end = datetime.strftime(
datetime.now() - timedelta(hours=3), self.date_time_format
)
# if reboot start/end time is not defined use the default behavior of being a few hours ago
if self.core_cfg.title.reboot_start_time == "" or self.core_cfg.title.reboot_end_time == "":
reboot_start = datetime.strftime(
datetime.utcnow() + timedelta(hours=6), self.date_time_format
)
reboot_end = datetime.strftime(
datetime.utcnow() + timedelta(hours=7), self.date_time_format
)
else:
# get current datetime in JST
current_jst = datetime.now(pytz.timezone('Asia/Tokyo')).date()
# parse config start/end times into datetime
reboot_start_time = datetime.strptime(self.core_cfg.title.reboot_start_time, "%H:%M")
reboot_end_time = datetime.strptime(self.core_cfg.title.reboot_end_time, "%H:%M")
# offset datetimes with current date/time
reboot_start_time = reboot_start_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo'))
reboot_end_time = reboot_end_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo'))
# create strings for use in gameSetting
reboot_start = reboot_start_time.strftime(self.date_time_format)
reboot_end = reboot_end_time.strftime(self.date_time_format)
return {
"gameSetting": {
"dataVersion": "1.00.00",
@ -361,11 +390,70 @@ class ChuniBase:
"userDuelList": duel_list,
}
def handle_get_user_rival_data_api_request(self, data: Dict) -> Dict:
p = self.data.profile.get_rival(data["rivalId"])
if p is None:
return {}
userRivalData = {
"rivalId": p.user,
"rivalName": p.userName
}
return {
"userId": data["userId"],
"userRivalData": userRivalData
}
def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict:
rival_id = data["rivalId"]
next_index = int(data["nextIndex"])
max_count = int(data["maxCount"])
user_rival_music_list = []
# Fetch all the rival music entries for the user
all_entries = self.data.score.get_rival_music(rival_id)
# Process the entries based on max_count and nextIndex
for music in all_entries[next_index:]:
music_id = music["musicId"]
level = music["level"]
score = music["scoreMax"]
rank = music["scoreRank"]
# Create a music entry for the current music_id
music_entry = next((entry for entry in user_rival_music_list if entry["musicId"] == music_id), None)
if music_entry is None:
music_entry = {
"musicId": music_id,
"length": 0,
"userRivalMusicDetailList": []
}
user_rival_music_list.append(music_entry)
# Create a level entry for the current level
level_entry = {
"level": level,
"scoreMax": score,
"scoreRank": rank
}
music_entry["userRivalMusicDetailList"].append(level_entry)
# Calculate the length for each "musicId" by counting the levels
for music_entry in user_rival_music_list:
music_entry["length"] = len(music_entry["userRivalMusicDetailList"])
# Prepare the result dictionary with user rival music data
result = {
"userId": data["userId"],
"rivalId": data["rivalId"],
"nextIndex": str(next_index + len(all_entries) if len(all_entries) <= len(user_rival_music_list) else -1),
"userRivalMusicList": user_rival_music_list[:max_count]
}
return result
def handle_get_user_favorite_item_api_request(self, data: Dict) -> Dict:
user_fav_item_list = []
# still needs to be implemented on WebUI
# 1: Music, 3: Character
# 1: Music, 2: User, 3: Character
fav_list = self.data.item.get_all_favorites(
data["userId"], self.version, fav_kind=int(data["kind"])
)
@ -490,22 +578,39 @@ class ChuniBase:
tmp.pop("id")
for song in song_list:
score_buf = SCORE_BUFFER.get(str(data["userId"])) or []
if song["userMusicDetailList"][0]["musicId"] == tmp["musicId"]:
found = True
song["userMusicDetailList"].append(tmp)
song["length"] = len(song["userMusicDetailList"])
score_buf.append(tmp["musicId"])
SCORE_BUFFER[str(data["userId"])] = score_buf
if not found:
score_buf = SCORE_BUFFER.get(str(data["userId"])) or []
if not found and tmp["musicId"] not in score_buf:
song_list.append({"length": 1, "userMusicDetailList": [tmp]})
score_buf.append(tmp["musicId"])
SCORE_BUFFER[str(data["userId"])] = score_buf
if len(song_list) >= max_ct:
break
if len(song_list) >= next_idx + max_ct:
next_idx += max_ct
try:
while song_list[-1]["userMusicDetailList"][0]["musicId"] == music_detail[x + 1]["musicId"]:
music = music_detail[x + 1]._asdict()
music.pop("user")
music.pop("id")
song_list[-1]["userMusicDetailList"].append(music)
song_list[-1]["length"] += 1
x += 1
except IndexError:
pass
if len(song_list) >= max_ct:
next_idx += len(song_list)
else:
next_idx = -1
SCORE_BUFFER[str(data["userId"])] = []
return {
"userId": data["userId"],
"length": len(song_list),
@ -600,39 +705,94 @@ class ChuniBase:
}
def handle_get_user_team_api_request(self, data: Dict) -> Dict:
# TODO: use the database "chuni_profile_team" with a GUI
# Default values
team_id = 65535
team_name = self.game_cfg.team.team_name
if team_name == "":
team_rank = 0
# Get user profile
profile = self.data.profile.get_profile_data(data["userId"], self.version)
if profile and profile["teamId"]:
# Get team by id
team = self.data.profile.get_team_by_id(profile["teamId"])
if team:
team_id = team["id"]
team_name = team["teamName"]
team_rank = self.data.profile.get_team_rank(team["id"])
# Don't return anything if no team name has been defined for defaults and there is no team set for the player
if not profile["teamId"] and team_name == "":
return {"userId": data["userId"], "teamId": 0}
return {
"userId": data["userId"],
"teamId": 1,
"teamRank": 1,
"teamId": team_id,
"teamRank": team_rank,
"teamName": team_name,
"userTeamPoint": {
"userId": data["userId"],
"teamId": 1,
"teamId": team_id,
"orderId": 1,
"teamPoint": 1,
"aggrDate": data["playDate"],
},
}
def handle_get_team_course_setting_api_request(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"length": 0,
"nextIndex": 0,
"nextIndex": -1,
"teamCourseSettingList": [],
}
def handle_get_team_course_setting_api_request_proto(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"length": 1,
"nextIndex": -1,
"teamCourseSettingList": [
{
"orderId": 1,
"courseId": 1,
"classId": 1,
"ruleId": 1,
"courseName": "Test",
"teamCourseMusicList": [
{"track": 184, "type": 1, "level": 3, "selectLevel": -1},
{"track": 184, "type": 1, "level": 3, "selectLevel": -1},
{"track": 184, "type": 1, "level": 3, "selectLevel": -1}
],
"teamCourseRankingInfoList": [],
"recodeDate": "2099-12-31 11:59:99.0",
"isPlayed": False
}
],
}
def handle_get_team_course_rule_api_request(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"length": 0,
"nextIndex": 0,
"teamCourseRuleList": [],
"nextIndex": -1,
"teamCourseRuleList": []
}
def handle_get_team_course_rule_api_request_proto(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"length": 1,
"nextIndex": -1,
"teamCourseRuleList": [
{
"recoveryLife": 0,
"clearLife": 100,
"damageMiss": 1,
"damageAttack": 1,
"damageJustice": 1,
"damageJusticeC": 1
}
],
}
def handle_upsert_user_all_api_request(self, data: Dict) -> Dict:
@ -706,12 +866,28 @@ class ChuniBase:
playlog["playedUserName1"] = self.read_wtf8(playlog["playedUserName1"])
playlog["playedUserName2"] = self.read_wtf8(playlog["playedUserName2"])
playlog["playedUserName3"] = self.read_wtf8(playlog["playedUserName3"])
self.data.score.put_playlog(user_id, playlog)
self.data.score.put_playlog(user_id, playlog, self.version)
if "userTeamPoint" in upsert:
# TODO: team stuff
pass
team_points = upsert["userTeamPoint"]
try:
for tp in team_points:
if tp["teamId"] != '65535':
# Fetch the current team data
current_team = self.data.profile.get_team_by_id(tp["teamId"])
# Calculate the new teamPoint
new_team_point = int(tp["teamPoint"]) + current_team["teamPoint"]
# Prepare the data to update
team_data = {
"teamPoint": new_team_point
}
# Update the team data
self.data.profile.update_team(tp["teamId"], team_data)
except:
pass # Probably a better way to catch if the team is not set yet (new profiles), but let's just pass
if "userMapAreaList" in upsert:
for map_area in upsert["userMapAreaList"]:
self.data.item.put_map_area(user_id, map_area)

View File

@ -19,6 +19,12 @@ class ChuniServerConfig:
self.__config, "chuni", "server", "loglevel", default="info"
)
)
@property
def news_msg(self) -> str:
return CoreConfig.get_config_field(
self.__config, "chuni", "server", "news_msg", default=""
)
class ChuniTeamConfig:
@ -30,6 +36,11 @@ class ChuniTeamConfig:
return CoreConfig.get_config_field(
self.__config, "chuni", "team", "name", default=""
)
@property
def rank_scale(self) -> str:
return CoreConfig.get_config_field(
self.__config, "chuni", "team", "rank_scale", default="False"
)
class ChuniModsConfig:

View File

@ -11,30 +11,31 @@ from Crypto.Util.Padding import pad
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA1
from os import path
from typing import Tuple, Dict
from typing import Tuple, Dict, List
from core import CoreConfig, Utils
from titles.chuni.config import ChuniConfig
from titles.chuni.const import ChuniConstants
from titles.chuni.base import ChuniBase
from titles.chuni.plus import ChuniPlus
from titles.chuni.air import ChuniAir
from titles.chuni.airplus import ChuniAirPlus
from titles.chuni.star import ChuniStar
from titles.chuni.starplus import ChuniStarPlus
from titles.chuni.amazon import ChuniAmazon
from titles.chuni.amazonplus import ChuniAmazonPlus
from titles.chuni.crystal import ChuniCrystal
from titles.chuni.crystalplus import ChuniCrystalPlus
from titles.chuni.paradise import ChuniParadise
from titles.chuni.new import ChuniNew
from titles.chuni.newplus import ChuniNewPlus
from titles.chuni.sun import ChuniSun
from core.title import BaseServlet
from .config import ChuniConfig
from .const import ChuniConstants
from .base import ChuniBase
from .plus import ChuniPlus
from .air import ChuniAir
from .airplus import ChuniAirPlus
from .star import ChuniStar
from .starplus import ChuniStarPlus
from .amazon import ChuniAmazon
from .amazonplus import ChuniAmazonPlus
from .crystal import ChuniCrystal
from .crystalplus import ChuniCrystalPlus
from .paradise import ChuniParadise
from .new import ChuniNew
from .newplus import ChuniNewPlus
from .sun import ChuniSun
class ChuniServlet:
class ChuniServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = ChuniConfig()
self.hash_table: Dict[Dict[str, str]] = {}
if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"):
@ -115,10 +116,19 @@ class ChuniServlet:
f"Hashed v{version} method {method_fixed} with {bytes.fromhex(keys[2])} to get {hash.hex()}"
)
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[
("render_POST", "/{version}/ChuniServlet/{endpoint}", {}),
("render_POST", "/{version}/ChuniServlet/MatchingServer/{endpoint}", {})
]
)
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = ChuniConfig()
if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"):
game_cfg.update(
@ -126,26 +136,26 @@ class ChuniServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if core_cfg.server.is_develop:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
"",
)
return True
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
return (f"http://{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}/{game_ver}/", "")
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
return (f"http://{self.core_cfg.title.hostname}/{game_ver}/", "")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
if url_path.lower() == "ping":
def render_POST(self, request: Request, game_code: str, matchers: Dict) -> bytes:
endpoint = matchers['endpoint']
version = int(matchers['version'])
if endpoint.lower() == "ping":
return zlib.compress(b'{"returnCode": "1"}')
req_raw = request.content.getvalue()
url_split = url_path.split("/")
encrtped = False
internal_ver = 0
endpoint = url_split[len(url_split) - 1]
client_ip = Utils.get_ip_addr(request)
if version < 105: # 1.0

View File

@ -3,6 +3,7 @@ from datetime import datetime, timedelta
from random import randint
from typing import Dict
import pytz
from core.config import CoreConfig
from titles.chuni.const import ChuniConstants
from titles.chuni.database import ChuniData
@ -31,12 +32,29 @@ class ChuniNew(ChuniBase):
match_end = datetime.strftime(
datetime.utcnow() + timedelta(hours=16), self.date_time_format
)
reboot_start = datetime.strftime(
datetime.utcnow() + timedelta(hours=6), self.date_time_format
)
reboot_end = datetime.strftime(
datetime.utcnow() + timedelta(hours=7), self.date_time_format
)
# if reboot start/end time is not defined use the default behavior of being a few hours ago
if self.core_cfg.title.reboot_start_time == "" or self.core_cfg.title.reboot_end_time == "":
reboot_start = datetime.strftime(
datetime.utcnow() + timedelta(hours=6), self.date_time_format
)
reboot_end = datetime.strftime(
datetime.utcnow() + timedelta(hours=7), self.date_time_format
)
else:
# get current datetime in JST
current_jst = datetime.now(pytz.timezone('Asia/Tokyo')).date()
# parse config start/end times into datetime
reboot_start_time = datetime.strptime(self.core_cfg.title.reboot_start_time, "%H:%M")
reboot_end_time = datetime.strptime(self.core_cfg.title.reboot_end_time, "%H:%M")
# offset datetimes with current date/time
reboot_start_time = reboot_start_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo'))
reboot_end_time = reboot_end_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo'))
# create strings for use in gameSetting
reboot_start = reboot_start_time.strftime(self.date_time_format)
reboot_end = reboot_end_time.strftime(self.date_time_format)
return {
"gameSetting": {
"isMaintenance": False,
@ -53,11 +71,11 @@ class ChuniNew(ChuniBase):
"matchErrorLimit": 9999,
"romVersion": self.game_cfg.version.version(self.version)["rom"],
"dataVersion": self.game_cfg.version.version(self.version)["data"],
"matchingUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"matchingUriX": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"matchingUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/200/ChuniServlet/",
"matchingUriX": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/200/ChuniServlet/",
# might be really important for online battle to connect the cabs via UDP port 50201
"udpHolePunchUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"reflectorUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"udpHolePunchUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/200/ChuniServlet/",
"reflectorUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/200/ChuniServlet/",
},
"isDumpUpload": False,
"isAou": False,

View File

@ -21,16 +21,16 @@ class ChuniNewPlus(ChuniNew):
]
ret["gameSetting"][
"matchingUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/205/ChuniServlet/"
ret["gameSetting"][
"matchingUriX"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/205/ChuniServlet/"
ret["gameSetting"][
"udpHolePunchUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/205/ChuniServlet/"
ret["gameSetting"][
"reflectorUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/205/ChuniServlet/"
return ret
def handle_cm_get_user_preview_api_request(self, data: Dict) -> Dict:

View File

@ -637,3 +637,69 @@ class ChuniProfileData(BaseData):
if result is None:
return None
return result.fetchall()
def get_team_by_id(self, team_id: int) -> Optional[Row]:
sql = select(team).where(team.c.id == team_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_team_rank(self, team_id: int) -> int:
# Normal ranking system, likely the one used in the real servers
# Query all teams sorted by 'teamPoint'
result = self.execute(
select(team.c.id).order_by(team.c.teamPoint.desc())
)
# Get the rank of the team with the given team_id
rank = None
for i, row in enumerate(result, start=1):
if row.id == team_id:
rank = i
break
# Return the rank if found, or a default rank otherwise
return rank if rank is not None else 0
# RIP scaled team ranking. Gone, but forgotten
# def get_team_rank_scaled(self, team_id: int) -> int:
def update_team(self, team_id: int, team_data: Dict) -> bool:
team_data["id"] = team_id
sql = insert(team).values(**team_data)
conflict = sql.on_duplicate_key_update(**team_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(
f"update_team: Failed to update team! team id: {team_id}"
)
return False
return True
def get_rival(self, rival_id: int) -> Optional[Row]:
sql = select(profile).where(profile.c.user == rival_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_overview(self) -> Dict:
# Fetch and add up all the playcounts
playcount_sql = self.execute(select(profile.c.playCount))
if playcount_sql is None:
self.logger.warn(
f"get_overview: Couldn't pull playcounts"
)
return 0
total_play_count = 0;
for row in playcount_sql:
total_play_count += row[0]
return {
"total_play_count": total_play_count
}

View File

@ -6,7 +6,7 @@ from sqlalchemy.schema import ForeignKey
from sqlalchemy.engine import Row
from sqlalchemy.sql import func, select
from sqlalchemy.dialects.mysql import insert
from sqlalchemy.sql.expression import exists
from core.data.schema import BaseData, metadata
course = Table(
@ -189,9 +189,28 @@ class ChuniScoreData(BaseData):
return None
return result.fetchall()
def put_playlog(self, aime_id: int, playlog_data: Dict) -> Optional[int]:
def put_playlog(self, aime_id: int, playlog_data: Dict, version: int) -> Optional[int]:
# Calculate the ROM version that should be inserted into the DB, based on the version of the ggame being inserted
# We only need from Version 10 (Plost) and back, as newer versions include romVersion in their upsert
# This matters both for gameRankings, as well as a future DB update to keep version data separate
romVer = {
10: "1.50.0",
9: "1.45.0",
8: "1.40.0",
7: "1.35.0",
6: "1.30.0",
5: "1.25.0",
4: "1.20.0",
3: "1.15.0",
2: "1.10.0",
1: "1.05.0",
0: "1.00.0"
}
playlog_data["user"] = aime_id
playlog_data = self.fix_bools(playlog_data)
if "romVersion" not in playlog_data:
playlog_data["romVersion"] = romVer.get(version, "1.00.0")
sql = insert(playlog).values(**playlog_data)
conflict = sql.on_duplicate_key_update(**playlog_data)
@ -200,3 +219,40 @@ class ChuniScoreData(BaseData):
if result is None:
return None
return result.lastrowid
def get_rankings(self, version: int) -> Optional[List[Dict]]:
# Calculates the ROM version that should be fetched for rankings, based on the game version being retrieved
# This prevents tracks that are not accessible in your version from counting towards the 10 results
romVer = {
13: "2.10%",
12: "2.05%",
11: "2.00%",
10: "1.50%",
9: "1.45%",
8: "1.40%",
7: "1.35%",
6: "1.30%",
5: "1.25%",
4: "1.20%",
3: "1.15%",
2: "1.10%",
1: "1.05%",
0: "1.00%"
}
sql = select([playlog.c.musicId.label('id'), func.count(playlog.c.musicId).label('point')]).where((playlog.c.level != 4) & (playlog.c.romVersion.like(romVer.get(version, "%")))).group_by(playlog.c.musicId).order_by(func.count(playlog.c.musicId).desc()).limit(10)
result = self.execute(sql)
if result is None:
return None
rows = result.fetchall()
return [dict(row) for row in rows]
def get_rival_music(self, rival_id: int) -> Optional[List[Dict]]:
sql = select(best_score).where(best_score.c.user == rival_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()

View File

@ -453,6 +453,15 @@ class ChuniStaticData(BaseData):
return None
return result.fetchone()
def get_song(self, music_id: int) -> Optional[Row]:
sql = music.select(music.c.id == music_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def put_avatar(
self,
version: int,
@ -587,4 +596,4 @@ class ChuniStaticData(BaseData):
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
return result.fetchone()

View File

@ -17,16 +17,16 @@ class ChuniSun(ChuniNewPlus):
ret["gameSetting"]["dataVersion"] = self.game_cfg.version.version(self.version)["data"]
ret["gameSetting"][
"matchingUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/210/ChuniServlet/"
ret["gameSetting"][
"matchingUriX"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/210/ChuniServlet/"
ret["gameSetting"][
"udpHolePunchUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/210/ChuniServlet/"
ret["gameSetting"][
"reflectorUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/210/ChuniServlet/"
return ret
def handle_cm_get_user_preview_api_request(self, data: Dict) -> Dict:

View File

@ -4,7 +4,9 @@ import json
import logging
from enum import Enum
import pytz
from core.config import CoreConfig
from core.utils import Utils
from core.data.cache import cached
from titles.cm.const import CardMakerConstants
from titles.cm.config import CardMakerConfig
@ -28,8 +30,8 @@ class CardMakerBase:
return version.replace(".", "")[:3]
def handle_get_game_connect_api_request(self, data: Dict) -> Dict:
if self.core_cfg.server.is_develop:
uri = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}"
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
uri = f"http://{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}"
else:
uri = f"http://{self.core_cfg.title.hostname}"
@ -43,13 +45,13 @@ class CardMakerBase:
{
"modelKind": 0,
"type": 1,
"titleUri": f"{uri}/SDHD/{self._parse_int_ver(games_ver['chuni'])}/",
"titleUri": f"{uri}/{self._parse_int_ver(games_ver['chuni'])}/ChuniServlet/",
},
# maimai DX
{
"modelKind": 1,
"type": 1,
"titleUri": f"{uri}/SDEZ/{self._parse_int_ver(games_ver['maimai'])}/",
"titleUri": f"{uri}/{self._parse_int_ver(games_ver['maimai'])}/Maimai2Servlet/",
},
# ONGEKI
{
@ -61,12 +63,29 @@ class CardMakerBase:
}
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
reboot_start = date.strftime(
datetime.now() + timedelta(hours=3), self.date_time_format
)
reboot_end = date.strftime(
datetime.now() + timedelta(hours=4), self.date_time_format
)
# if reboot start/end time is not defined use the default behavior of being a few hours ago
if self.core_cfg.title.reboot_start_time == "" or self.core_cfg.title.reboot_end_time == "":
reboot_start = datetime.strftime(
datetime.utcnow() + timedelta(hours=6), self.date_time_format
)
reboot_end = datetime.strftime(
datetime.utcnow() + timedelta(hours=7), self.date_time_format
)
else:
# get current datetime in JST
current_jst = datetime.now(pytz.timezone('Asia/Tokyo')).date()
# parse config start/end times into datetime
reboot_start_time = datetime.strptime(self.core_cfg.title.reboot_start_time, "%H:%M")
reboot_end_time = datetime.strptime(self.core_cfg.title.reboot_end_time, "%H:%M")
# offset datetimes with current date/time
reboot_start_time = reboot_start_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo'))
reboot_end_time = reboot_end_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo'))
# create strings for use in gameSetting
reboot_start = reboot_start_time.strftime(self.date_time_format)
reboot_end = reboot_end_time.strftime(self.date_time_format)
# grab the dict with all games version numbers from user config
games_ver = self.game_cfg.version.version(self.version)

View File

@ -7,21 +7,22 @@ import coloredlogs
import zlib
from os import path
from typing import Tuple
from typing import Tuple, List, Dict
from twisted.web.http import Request
from logging.handlers import TimedRotatingFileHandler
from core.config import CoreConfig
from core.utils import Utils
from titles.cm.config import CardMakerConfig
from titles.cm.const import CardMakerConstants
from titles.cm.base import CardMakerBase
from titles.cm.cm135 import CardMaker135
from core.title import BaseServlet
from .config import CardMakerConfig
from .const import CardMakerConstants
from .base import CardMakerBase
from .cm135 import CardMaker135
class CardMakerServlet:
class CardMakerServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = CardMakerConfig()
if path.exists(f"{cfg_dir}/{CardMakerConstants.CONFIG_NAME}"):
self.game_cfg.update(
@ -55,11 +56,11 @@ class CardMakerServlet:
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = CardMakerConfig()
if path.exists(f"{cfg_dir}/{CardMakerConstants.CONFIG_NAME}"):
game_cfg.update(
@ -67,22 +68,21 @@ class CardMakerServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if core_cfg.server.is_develop:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
"",
)
return True
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[("render_POST", "/SDED/{version}/{endpoint}", {})]
)
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
def render_POST(self, request: Request, game_code: str, matchers: Dict) -> bytes:
version = int(matchers['version'])
endpoint = matchers['endpoint']
req_raw = request.content.getvalue()
url_split = url_path.split("/")
internal_ver = 0
endpoint = url_split[len(url_split) - 1]
client_ip = Utils.get_ip_addr(request)
if version >= 130 and version < 135: # Card Maker

View File

@ -205,6 +205,7 @@ class CardMakerReader(BaseReader):
"1.20": Mai2Constants.VER_MAIMAI_DX_UNIVERSE,
"1.25": Mai2Constants.VER_MAIMAI_DX_UNIVERSE_PLUS,
"1.30": Mai2Constants.VER_MAIMAI_DX_FESTIVAL,
"1.35": Mai2Constants.VER_MAIMAI_DX_FESTIVAL_PLUS,
}
for root, dirs, files in os.walk(base_dir):

View File

@ -3,13 +3,12 @@ import json
from decimal import Decimal
from base64 import b64encode
from typing import Any, Dict, List
from hashlib import md5
from datetime import datetime
from os import path
from core.config import CoreConfig
from titles.cxb.config import CxbConfig
from titles.cxb.const import CxbConstants
from titles.cxb.database import CxbData
from .config import CxbConfig
from .const import CxbConstants
from .database import CxbData
from threading import Thread
@ -22,6 +21,13 @@ class CxbBase:
self.logger = logging.getLogger("cxb")
self.version = CxbConstants.VER_CROSSBEATS_REV
def _get_data_contents(self, folder: str, filetype: str, encoding: str = None, subfolder: str = "") -> List[str]:
if path.exists(f"titles/cxb/data/{folder}/{subfolder}{filetype}.csv"):
with open(f"titles/cxb/data/{folder}/{subfolder}{filetype}.csv", encoding=encoding) as f:
return f.readlines()
return []
def handle_action_rpreq_request(self, data: Dict) -> Dict:
return {}
@ -192,14 +198,6 @@ class CxbBase:
).decode("utf-8")
)
def task_generateIndexData(versionindex):
try:
v_profile = self.data.profile.get_profile_index(0, uid, self.version)
v_profile_data = v_profile["data"]
versionindex.append(int(v_profile_data["appVersion"]))
except Exception:
versionindex.append("10400")
def handle_action_loadrange_request(self, data: Dict) -> Dict:
range_start = data["loadrange"]["range"][0]
range_end = data["loadrange"]["range"][1]
@ -273,9 +271,14 @@ class CxbBase:
thread_ScoreData = Thread(target=CxbBase.task_generateScoreData(song, index, data1))
thread_ScoreData.start()
for v in index:
thread_IndexData = Thread(target=CxbBase.task_generateIndexData(versionindex))
thread_IndexData.start()
v_profile = self.data.profile.get_profile_index(0, uid, self.version)
v_profile_data = v_profile["data"]
for _, data in enumerate(profile):
if v_profile_data:
versionindex.append(int(v_profile_data["appVersion"]))
else:
versionindex.append("10400")
return {"index": index, "data": data1, "version": versionindex}
@ -530,7 +533,6 @@ class CxbBase:
profile = self.data.profile.get_profile_index(0, uid, self.version)
data1 = profile["data"]
p = self.data.item.get_energy(uid)
energy = p["energy"]
if not p:
self.data.item.put_energy(uid, 5)
@ -543,6 +545,7 @@ class CxbBase:
}
array = []
energy = p["energy"]
newenergy = int(energy) + 5
self.data.item.put_energy(uid, newenergy)

View File

@ -19,42 +19,6 @@ class CxbServerConfig:
)
)
@property
def hostname(self) -> str:
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "hostname", default="localhost"
)
@property
def ssl_enable(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "ssl_enable", default=False
)
@property
def port(self) -> int:
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "port", default=8082
)
@property
def port_secure(self) -> int:
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "port_secure", default=443
)
@property
def ssl_cert(self) -> str:
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "ssl_cert", default="cert/title.crt"
)
@property
def ssl_key(self) -> str:
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "ssl_key", default="cert/title.key"
)
class CxbConfig(dict):
def __init__(self) -> None:

View File

@ -1,4 +1,5 @@
from twisted.web.http import Request
import traceback
from twisted.web import resource, server
from twisted.internet import reactor, endpoints
import yaml
@ -7,18 +8,20 @@ import re
import inflection
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
from typing import Dict, Tuple
from typing import Dict, Tuple, List
from os import path
from core.config import CoreConfig
from titles.cxb.config import CxbConfig
from titles.cxb.const import CxbConstants
from titles.cxb.rev import CxbRev
from titles.cxb.rss1 import CxbRevSunriseS1
from titles.cxb.rss2 import CxbRevSunriseS2
from core.title import BaseServlet
from core.utils import Utils
from .config import CxbConfig
from .const import CxbConstants
from .rev import CxbRev
from .rss1 import CxbRevSunriseS1
from .rss2 import CxbRevSunriseS2
class CxbServlet(resource.Resource):
class CxbServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.isLeaf = True
self.cfg_dir = cfg_dir
@ -61,9 +64,7 @@ class CxbServlet(resource.Resource):
]
@classmethod
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool:
game_cfg = CxbConfig()
if path.exists(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}"):
game_cfg.update(
@ -71,48 +72,36 @@ class CxbServlet(resource.Resource):
)
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return False
return True
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port_ssl(self.core_cfg) != 443:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
f"https://{self.core_cfg.title.hostname}:{self.core_cfg.title.port_ssl}",
"",
)
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def setup(self):
if self.game_cfg.server.enable:
endpoints.serverFromString(
reactor,
f"tcp:{self.game_cfg.server.port}:interface={self.core_cfg.server.listen_address}",
).listen(server.Site(CxbServlet(self.core_cfg, self.cfg_dir)))
if self.core_cfg.server.is_develop and self.game_cfg.server.ssl_enable:
endpoints.serverFromString(
reactor,
f"ssl:{self.game_cfg.server.port_secure}"
f":interface={self.core_cfg.server.listen_address}:privateKey={self.game_cfg.server.ssl_key}:"
f"certKey={self.game_cfg.server.ssl_cert}",
).listen(server.Site(CxbServlet(self.core_cfg, self.cfg_dir)))
self.logger.info(
f"Ready on ports {self.game_cfg.server.port} & {self.game_cfg.server.port_secure}"
)
else:
self.logger.info(f"Ready on port {self.game_cfg.server.port}")
def render_POST(self, request: Request, version: int, endpoint: str):
version = 0
internal_ver = 0
func_to_find = ""
cmd = ""
subcmd = ""
req_url = request.uri.decode()
url_split = req_url.split("/")
req_bytes = request.content.getvalue()
return (f"https://{self.core_cfg.title.hostname}", "")
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[
("handle_data", "/data", {}),
("handle_action", "/action", {}),
("handle_action", "/v2/action", {}),
("handle_auth", "/auth", {}),
]
)
def preprocess(self, req: Request) -> Dict:
try:
req_bytes = req.content.getvalue()
except:
req_bytes = req.content.read() # Can we just use this one?
try:
req_json: Dict = json.loads(req_bytes)
@ -124,42 +113,56 @@ class CxbServlet(resource.Resource):
except Exception as f:
self.logger.warning(
f"Error decoding json: {e} / {f} - {req_url} - {req_bytes}"
f"Error decoding json to /data endpoint: {e} / {f} - {req_bytes}"
)
return b""
return req_json
def handle_data(self, request: Request, game_code: str, matchers: Dict) -> bytes:
req_json = self.preprocess(request)
func_to_find = "handle_data_"
version_string = "Base"
internal_ver = 0
version = 0
if req_json == {}:
self.logger.warning(f"Empty json request to {req_url}")
self.logger.warning(f"Empty json request to /data")
return b""
cmd = url_split[len(url_split) - 1]
subcmd = list(req_json.keys())[0]
if subcmd == "dldate":
if (
not type(req_json["dldate"]) is dict
or "filetype" not in req_json["dldate"]
):
self.logger.warning(f"Malformed dldate request: {req_url} {req_json}")
self.logger.warning(f"Malformed dldate request: {req_json}")
return b""
filetype = req_json["dldate"]["filetype"]
filetype_split = filetype.split("/")
if len(filetype_split) < 2 or not filetype_split[0].isnumeric():
self.logger.warning(f"Malformed dldate request: {req_json}")
return b""
version = int(filetype_split[0])
filetype_inflect_split = inflection.underscore(filetype).split("/")
filename = filetype_split[len(filetype_split) - 1]
match = re.match(
"^([A-Za-z]*)(\d\d\d\d)$", filetype_split[len(filetype_split) - 1]
)
if match:
subcmd = f"{inflection.underscore(match.group(1))}xxxx"
func_to_find += f"{inflection.underscore(match.group(1))}xxxx"
else:
subcmd = f"{filetype_inflect_split[len(filetype_inflect_split) - 1]}"
func_to_find += f"{inflection.underscore(filename)}"
else:
filetype = subcmd
func_to_find = f"handle_{cmd}_{subcmd}_request"
func_to_find += filetype
func_to_find += "_request"
if version <= 10102:
version_string = "Rev"
internal_ver = CxbConstants.VER_CROSSBEATS_REV
@ -172,23 +175,81 @@ class CxbServlet(resource.Resource):
version_string = "Rev SunriseS2"
internal_ver = CxbConstants.VER_CROSSBEATS_REV_SUNRISE_S2
else:
version_string = "Base"
self.logger.info(f"{version_string} Request {req_url} -> {filetype}")
if not hasattr(self.versions[internal_ver], func_to_find):
self.logger.warn(f"{version_string} has no handler for filetype {filetype}")
return ""
self.logger.info(f"{version_string} request for filetype {filetype}")
self.logger.debug(req_json)
handler = getattr(self.versions[internal_ver], func_to_find)
try:
handler = getattr(self.versions[internal_ver], func_to_find)
resp = handler(req_json)
except AttributeError as e:
self.logger.warning(f"Unhandled {version_string} request {req_url} - {e}")
resp = {}
except Exception as e:
self.logger.error(f"Error handling {version_string} method {req_url} - {e}")
raise
self.logger.error(f"Error handling request for file {filetype} - {e}")
if self.logger.level == logging.DEBUG:
traceback.print_exception(e, limit=1)
with open("{0}/{1}.log".format(self.core_cfg.server.log_dir, "cxb"), "a") as f:
traceback.print_exception(e, limit=1, file=f)
return ""
self.logger.debug(f"{version_string} Response {resp}")
return json.dumps(resp, ensure_ascii=False).encode("utf-8")
def handle_action(self, request: Request, game_code: str, matchers: Dict) -> bytes:
req_json = self.preprocess(request)
subcmd = list(req_json.keys())[0]
func_to_find = f"handle_action_{subcmd}_request"
if not hasattr(self.versions[0], func_to_find):
self.logger.warn(f"No handler for action {subcmd} request")
return ""
self.logger.info(f"Action {subcmd} Request")
self.logger.debug(req_json)
handler = getattr(self.versions[0], func_to_find)
try:
resp = handler(req_json)
except Exception as e:
self.logger.error(f"Error handling action {subcmd} request - {e}")
if self.logger.level == logging.DEBUG:
traceback.print_exception(e, limit=1)
with open("{0}/{1}.log".format(self.core_cfg.server.log_dir, "cxb"), "a") as f:
traceback.print_exception(e, limit=1, file=f)
return ""
self.logger.debug(f"Response {resp}")
return json.dumps(resp, ensure_ascii=False).encode("utf-8")
def handle_auth(self, request: Request, game_code: str, matchers: Dict) -> bytes:
req_json = self.preprocess(request)
subcmd = list(req_json.keys())[0]
func_to_find = f"handle_auth_{subcmd}_request"
if not hasattr(self.versions[0], func_to_find):
self.logger.warn(f"No handler for auth {subcmd} request")
return ""
self.logger.info(f"Action {subcmd} Request")
self.logger.debug(req_json)
handler = getattr(self.versions[0], func_to_find)
try:
resp = handler(req_json)
except Exception as e:
self.logger.error(f"Error handling auth {subcmd} request - {e}")
if self.logger.level == logging.DEBUG:
traceback.print_exception(e, limit=1)
with open("{0}/{1}.log".format(self.core_cfg.server.log_dir, "cxb"), "a") as f:
traceback.print_exception(e, limit=1, file=f)
return ""
self.logger.debug(f"Response {resp}")
return json.dumps(resp, ensure_ascii=False).encode("utf-8")

View File

@ -7,9 +7,9 @@ from datetime import datetime
from core.config import CoreConfig
from core.data import Data, cached
from titles.cxb.config import CxbConfig
from titles.cxb.base import CxbBase
from titles.cxb.const import CxbConstants
from .config import CxbConfig
from .base import CxbBase
from .const import CxbConstants
class CxbRev(CxbBase):
@ -47,7 +47,7 @@ class CxbRev(CxbBase):
@cached(lifetime=86400)
def handle_data_music_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rev_data/MusicArchiveList.csv") as music:
with open(r"titles/cxb/data/rss/MusicArchiveList.csv") as music:
lines = music.readlines()
for line in lines:
line_split = line.split(",")
@ -59,7 +59,7 @@ class CxbRev(CxbBase):
def handle_data_item_list_icon_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ItemListIcon\r\n"
with open(
r"titles/cxb/rev_data/Item/ItemArchiveList_Icon.csv", encoding="utf-8"
r"titles/cxb/data/rss/Item/ItemArchiveList_Icon.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
@ -70,7 +70,7 @@ class CxbRev(CxbBase):
def handle_data_item_list_skin_notes_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ItemListSkinNotes\r\n"
with open(
r"titles/cxb/rev_data/Item/ItemArchiveList_SkinNotes.csv", encoding="utf-8"
r"titles/cxb/data/rss/Item/ItemArchiveList_SkinNotes.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
@ -81,7 +81,7 @@ class CxbRev(CxbBase):
def handle_data_item_list_skin_effect_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ItemListSkinEffect\r\n"
with open(
r"titles/cxb/rev_data/Item/ItemArchiveList_SkinEffect.csv", encoding="utf-8"
r"titles/cxb/data/rss/Item/ItemArchiveList_SkinEffect.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
@ -92,7 +92,7 @@ class CxbRev(CxbBase):
def handle_data_item_list_skin_bg_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ItemListSkinBg\r\n"
with open(
r"titles/cxb/rev_data/Item/ItemArchiveList_SkinBg.csv", encoding="utf-8"
r"titles/cxb/data/rss/Item/ItemArchiveList_SkinBg.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
@ -103,7 +103,7 @@ class CxbRev(CxbBase):
def handle_data_item_list_title_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ItemListTitle\r\n"
with open(
r"titles/cxb/rev_data/Item/ItemList_Title.csv", encoding="shift-jis"
r"titles/cxb/data/rss/Item/ItemList_Title.csv", encoding="shift-jis"
) as item:
lines = item.readlines()
for line in lines:
@ -114,7 +114,7 @@ class CxbRev(CxbBase):
def handle_data_shop_list_music_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ShopListMusic\r\n"
with open(
r"titles/cxb/rev_data/Shop/ShopList_Music.csv", encoding="shift-jis"
r"titles/cxb/data/rss/Shop/ShopList_Music.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -125,7 +125,7 @@ class CxbRev(CxbBase):
def handle_data_shop_list_icon_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ShopListIcon\r\n"
with open(
r"titles/cxb/rev_data/Shop/ShopList_Icon.csv", encoding="shift-jis"
r"titles/cxb/data/rss/Shop/ShopList_Icon.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -136,7 +136,7 @@ class CxbRev(CxbBase):
def handle_data_shop_list_title_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ShopListTitle\r\n"
with open(
r"titles/cxb/rev_data/Shop/ShopList_Title.csv", encoding="shift-jis"
r"titles/cxb/data/rss/Shop/ShopList_Title.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -156,7 +156,7 @@ class CxbRev(CxbBase):
def handle_data_shop_list_sale_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ShopListSale\r\n"
with open(
r"titles/cxb/rev_data/Shop/ShopList_Sale.csv", encoding="shift-jis"
r"titles/cxb/data/rss/Shop/ShopList_Sale.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -171,7 +171,7 @@ class CxbRev(CxbBase):
extra_num = int(data["dldate"]["filetype"][-4:])
ret_str = ""
with open(
rf"titles/cxb/rev_data/Ex000{extra_num}.csv", encoding="shift-jis"
rf"titles/cxb/data/rss/Ex000{extra_num}.csv", encoding="shift-jis"
) as stage:
lines = stage.readlines()
for line in lines:
@ -187,7 +187,7 @@ class CxbRev(CxbBase):
@cached(lifetime=86400)
def handle_data_news_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rev_data/NewsList.csv", encoding="UTF-8") as news:
with open(r"titles/cxb/data/rss/NewsList.csv", encoding="UTF-8") as news:
lines = news.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
@ -199,7 +199,7 @@ class CxbRev(CxbBase):
@cached(lifetime=86400)
def handle_data_license_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rev_data/License_Offline.csv", encoding="UTF-8") as lic:
with open(r"titles/cxb/data/rss/License_Offline.csv", encoding="UTF-8") as lic:
lines = lic.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
@ -209,7 +209,7 @@ class CxbRev(CxbBase):
def handle_data_course_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(
r"titles/cxb/rev_data/Course/CourseList.csv", encoding="UTF-8"
r"titles/cxb/data/rss/Course/CourseList.csv", encoding="UTF-8"
) as course:
lines = course.readlines()
for line in lines:
@ -222,7 +222,7 @@ class CxbRev(CxbBase):
extra_num = int(data["dldate"]["filetype"][-4:])
ret_str = ""
with open(
rf"titles/cxb/rev_data/Course/Cs000{extra_num}.csv", encoding="shift-jis"
rf"titles/cxb/data/rss/Course/Cs000{extra_num}.csv", encoding="shift-jis"
) as course:
lines = course.readlines()
for line in lines:
@ -233,7 +233,7 @@ class CxbRev(CxbBase):
def handle_data_mission_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(
r"titles/cxb/rev_data/MissionList.csv", encoding="shift-jis"
r"titles/cxb/data/rss/MissionList.csv", encoding="shift-jis"
) as mission:
lines = mission.readlines()
for line in lines:
@ -250,7 +250,7 @@ class CxbRev(CxbBase):
def handle_data_event_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(
r"titles/cxb/rev_data/Event/EventArchiveList.csv", encoding="shift-jis"
r"titles/cxb/data/rss/Event/EventArchiveList.csv", encoding="shift-jis"
) as mission:
lines = mission.readlines()
for line in lines:
@ -292,7 +292,7 @@ class CxbRev(CxbBase):
def handle_data_event_stamp_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(
r"titles/cxb/rev_data/Event/EventStampList.csv", encoding="shift-jis"
r"titles/cxb/data/rss/Event/EventStampList.csv", encoding="shift-jis"
) as event:
lines = event.readlines()
for line in lines:

View File

@ -7,9 +7,9 @@ from datetime import datetime
from core.config import CoreConfig
from core.data import Data, cached
from titles.cxb.config import CxbConfig
from titles.cxb.base import CxbBase
from titles.cxb.const import CxbConstants
from .config import CxbConfig
from .base import CxbBase
from .const import CxbConstants
class CxbRevSunriseS1(CxbBase):
@ -23,7 +23,7 @@ class CxbRevSunriseS1(CxbBase):
@cached(lifetime=86400)
def handle_data_music_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss1_data/MusicArchiveList.csv") as music:
with open(r"titles/cxb/data/rss1/MusicArchiveList.csv") as music:
lines = music.readlines()
for line in lines:
line_split = line.split(",")
@ -36,7 +36,7 @@ class CxbRevSunriseS1(CxbBase):
# ItemListIcon load
ret_str = "#ItemListIcon\r\n"
with open(
r"titles/cxb/rss1_data/Item/ItemList_Icon.csv", encoding="shift-jis"
r"titles/cxb/data/rss1/Item/ItemList_Icon.csv", encoding="shift-jis"
) as item:
lines = item.readlines()
for line in lines:
@ -45,7 +45,7 @@ class CxbRevSunriseS1(CxbBase):
# ItemListTitle load
ret_str += "\r\n#ItemListTitle\r\n"
with open(
r"titles/cxb/rss1_data/Item/ItemList_Title.csv", encoding="shift-jis"
r"titles/cxb/data/rss1/Item/ItemList_Title.csv", encoding="shift-jis"
) as item:
lines = item.readlines()
for line in lines:
@ -58,7 +58,7 @@ class CxbRevSunriseS1(CxbBase):
# ShopListIcon load
ret_str = "#ShopListIcon\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_Icon.csv", encoding="utf-8"
r"titles/cxb/data/rss1/Shop/ShopList_Icon.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
@ -67,7 +67,7 @@ class CxbRevSunriseS1(CxbBase):
# ShopListMusic load
ret_str += "\r\n#ShopListMusic\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_Music.csv", encoding="utf-8"
r"titles/cxb/data/rss1/Shop/ShopList_Music.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
@ -76,7 +76,7 @@ class CxbRevSunriseS1(CxbBase):
# ShopListSale load
ret_str += "\r\n#ShopListSale\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_Sale.csv", encoding="shift-jis"
r"titles/cxb/data/rss1/Shop/ShopList_Sale.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -85,7 +85,7 @@ class CxbRevSunriseS1(CxbBase):
# ShopListSkinBg load
ret_str += "\r\n#ShopListSkinBg\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_SkinBg.csv", encoding="shift-jis"
r"titles/cxb/data/rss1/Shop/ShopList_SkinBg.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -94,7 +94,7 @@ class CxbRevSunriseS1(CxbBase):
# ShopListSkinEffect load
ret_str += "\r\n#ShopListSkinEffect\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_SkinEffect.csv", encoding="shift-jis"
r"titles/cxb/data/rss1/Shop/ShopList_SkinEffect.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -103,7 +103,7 @@ class CxbRevSunriseS1(CxbBase):
# ShopListSkinNotes load
ret_str += "\r\n#ShopListSkinNotes\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_SkinNotes.csv", encoding="shift-jis"
r"titles/cxb/data/rss1/Shop/ShopList_SkinNotes.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -112,7 +112,7 @@ class CxbRevSunriseS1(CxbBase):
# ShopListTitle load
ret_str += "\r\n#ShopListTitle\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_Title.csv", encoding="utf-8"
r"titles/cxb/data/rss1/Shop/ShopList_Title.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
@ -140,7 +140,7 @@ class CxbRevSunriseS1(CxbBase):
@cached(lifetime=86400)
def handle_data_news_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss1_data/NewsList.csv", encoding="UTF-8") as news:
with open(r"titles/cxb/data/rss1/NewsList.csv", encoding="UTF-8") as news:
lines = news.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
@ -155,7 +155,7 @@ class CxbRevSunriseS1(CxbBase):
@cached(lifetime=86400)
def handle_data_random_music_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss1_data/MusicArchiveList.csv") as music:
with open(r"titles/cxb/data/rss1/MusicArchiveList.csv") as music:
lines = music.readlines()
count = 0
for line in lines:
@ -169,7 +169,7 @@ class CxbRevSunriseS1(CxbBase):
@cached(lifetime=86400)
def handle_data_license_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss1_data/License.csv", encoding="UTF-8") as licenses:
with open(r"titles/cxb/data/rss1/License.csv", encoding="UTF-8") as licenses:
lines = licenses.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
@ -179,7 +179,7 @@ class CxbRevSunriseS1(CxbBase):
def handle_data_course_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(
r"titles/cxb/rss1_data/Course/CourseList.csv", encoding="UTF-8"
r"titles/cxb/data/rss1/Course/CourseList.csv", encoding="UTF-8"
) as course:
lines = course.readlines()
for line in lines:
@ -191,7 +191,7 @@ class CxbRevSunriseS1(CxbBase):
extra_num = int(data["dldate"]["filetype"][-4:])
ret_str = ""
with open(
rf"titles/cxb/rss1_data/Course/Cs{extra_num}.csv", encoding="shift-jis"
rf"titles/cxb/data/rss1/Course/Cs{extra_num}.csv", encoding="shift-jis"
) as course:
lines = course.readlines()
for line in lines:
@ -229,7 +229,7 @@ class CxbRevSunriseS1(CxbBase):
def handle_data_partnerxxxx_request(self, data: Dict) -> Dict:
partner_num = int(data["dldate"]["filetype"][-4:])
ret_str = f"{partner_num},,{partner_num},1,10000,\r\n"
with open(r"titles/cxb/rss1_data/Partner0000.csv") as partner:
with open(r"titles/cxb/data/rss1/Partner0000.csv") as partner:
lines = partner.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"

View File

@ -7,9 +7,9 @@ from datetime import datetime
from core.config import CoreConfig
from core.data import Data, cached
from titles.cxb.config import CxbConfig
from titles.cxb.base import CxbBase
from titles.cxb.const import CxbConstants
from .config import CxbConfig
from .base import CxbBase
from .const import CxbConstants
class CxbRevSunriseS2(CxbBase):
@ -23,7 +23,7 @@ class CxbRevSunriseS2(CxbBase):
@cached(lifetime=86400)
def handle_data_music_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss2_data/MusicArchiveList.csv") as music:
with open(r"titles/cxb/data/rss2/MusicArchiveList.csv") as music:
lines = music.readlines()
for line in lines:
line_split = line.split(",")
@ -36,7 +36,7 @@ class CxbRevSunriseS2(CxbBase):
# ItemListIcon load
ret_str = "#ItemListIcon\r\n"
with open(
r"titles/cxb/rss2_data/Item/ItemList_Icon.csv", encoding="utf-8"
r"titles/cxb/data/rss2/Item/ItemList_Icon.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
@ -45,7 +45,7 @@ class CxbRevSunriseS2(CxbBase):
# ItemListTitle load
ret_str += "\r\n#ItemListTitle\r\n"
with open(
r"titles/cxb/rss2_data/Item/ItemList_Title.csv", encoding="utf-8"
r"titles/cxb/data/rss2/Item/ItemList_Title.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
@ -58,7 +58,7 @@ class CxbRevSunriseS2(CxbBase):
# ShopListIcon load
ret_str = "#ShopListIcon\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_Icon.csv", encoding="utf-8"
r"titles/cxb/data/rss2/Shop/ShopList_Icon.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
@ -67,7 +67,7 @@ class CxbRevSunriseS2(CxbBase):
# ShopListMusic load
ret_str += "\r\n#ShopListMusic\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_Music.csv", encoding="utf-8"
r"titles/cxb/data/rss2/Shop/ShopList_Music.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
@ -76,7 +76,7 @@ class CxbRevSunriseS2(CxbBase):
# ShopListSale load
ret_str += "\r\n#ShopListSale\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_Sale.csv", encoding="shift-jis"
r"titles/cxb/data/rss2/Shop/ShopList_Sale.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -85,7 +85,7 @@ class CxbRevSunriseS2(CxbBase):
# ShopListSkinBg load
ret_str += "\r\n#ShopListSkinBg\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_SkinBg.csv", encoding="shift-jis"
r"titles/cxb/data/rss2/Shop/ShopList_SkinBg.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -94,7 +94,7 @@ class CxbRevSunriseS2(CxbBase):
# ShopListSkinEffect load
ret_str += "\r\n#ShopListSkinEffect\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_SkinEffect.csv", encoding="shift-jis"
r"titles/cxb/data/rss2/Shop/ShopList_SkinEffect.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -103,7 +103,7 @@ class CxbRevSunriseS2(CxbBase):
# ShopListSkinNotes load
ret_str += "\r\n#ShopListSkinNotes\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_SkinNotes.csv", encoding="shift-jis"
r"titles/cxb/data/rss2/Shop/ShopList_SkinNotes.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
@ -112,7 +112,7 @@ class CxbRevSunriseS2(CxbBase):
# ShopListTitle load
ret_str += "\r\n#ShopListTitle\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_Title.csv", encoding="utf-8"
r"titles/cxb/data/rss2/Shop/ShopList_Title.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
@ -140,7 +140,7 @@ class CxbRevSunriseS2(CxbBase):
@cached(lifetime=86400)
def handle_data_news_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss2_data/NewsList.csv", encoding="UTF-8") as news:
with open(r"titles/cxb/data/rss2/NewsList.csv", encoding="UTF-8") as news:
lines = news.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
@ -155,7 +155,7 @@ class CxbRevSunriseS2(CxbBase):
@cached(lifetime=86400)
def handle_data_random_music_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss2_data/MusicArchiveList.csv") as music:
with open(r"titles/cxb/data/rss2/MusicArchiveList.csv") as music:
lines = music.readlines()
count = 0
for line in lines:
@ -169,7 +169,7 @@ class CxbRevSunriseS2(CxbBase):
@cached(lifetime=86400)
def handle_data_license_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss2_data/License.csv", encoding="UTF-8") as licenses:
with open(r"titles/cxb/data/rss2/License.csv", encoding="UTF-8") as licenses:
lines = licenses.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
@ -179,7 +179,7 @@ class CxbRevSunriseS2(CxbBase):
def handle_data_course_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(
r"titles/cxb/rss2_data/Course/CourseList.csv", encoding="UTF-8"
r"titles/cxb/data/rss2/Course/CourseList.csv", encoding="UTF-8"
) as course:
lines = course.readlines()
for line in lines:
@ -191,7 +191,7 @@ class CxbRevSunriseS2(CxbBase):
extra_num = int(data["dldate"]["filetype"][-4:])
ret_str = ""
with open(
rf"titles/cxb/rss2_data/Course/Cs{extra_num}.csv", encoding="shift-jis"
rf"titles/cxb/data/rss2/Course/Cs{extra_num}.csv", encoding="shift-jis"
) as course:
lines = course.readlines()
for line in lines:
@ -229,7 +229,7 @@ class CxbRevSunriseS2(CxbBase):
def handle_data_partnerxxxx_request(self, data: Dict) -> Dict:
partner_num = int(data["dldate"]["filetype"][-4:])
ret_str = f"{partner_num},,{partner_num},1,10000,\r\n"
with open(r"titles/cxb/rss2_data/Partner0000.csv") as partner:
with open(r"titles/cxb/data/rss2/Partner0000.csv") as partner:
lines = partner.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"

View File

@ -7,4 +7,4 @@ index = DivaServlet
database = DivaData
reader = DivaReader
game_codes = [DivaConstants.GAME_CODE]
current_schema_version = 5
current_schema_version = 6

View File

@ -402,7 +402,7 @@ class DivaBase:
response += f"&lv_num={profile['lv_num']}"
response += f"&lv_pnt={profile['lv_pnt']}"
response += f"&vcld_pts={profile['vcld_pts']}"
response += f"&skn_eqp={profile['use_pv_skn_eqp']}"
response += f"&skn_eqp={profile['skn_eqp']}"
response += f"&btn_se_eqp={profile['btn_se_eqp']}"
response += f"&sld_se_eqp={profile['sld_se_eqp']}"
response += f"&chn_sld_se_eqp={profile['chn_sld_se_eqp']}"

View File

@ -7,17 +7,19 @@ import json
import urllib.parse
import base64
from os import path
from typing import Tuple
from typing import Tuple, Dict, List
from core.config import CoreConfig
from titles.diva.config import DivaConfig
from titles.diva.const import DivaConstants
from titles.diva.base import DivaBase
from core.title import BaseServlet
from core.utils import Utils
from .config import DivaConfig
from .const import DivaConstants
from .base import DivaBase
class DivaServlet:
class DivaServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = DivaConfig()
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
self.game_cfg.update(
@ -49,10 +51,22 @@ class DivaServlet:
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[("render_POST", "/DivaServlet/", {})]
)
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
return (f"http://{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}/DivaServlet/", "")
return (f"http://{self.core_cfg.title.hostname}/DivaServlet/", "")
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = DivaConfig()
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
game_cfg.update(
@ -60,20 +74,13 @@ class DivaServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if core_cfg.server.is_develop:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
"",
)
return True
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def render_POST(self, req: Request, version: int, url_path: str) -> bytes:
req_raw = req.content.getvalue()
url_header = req.getAllHeaders()
def render_POST(self, request: Request, game_code: str, matchers: Dict) -> bytes:
req_raw = request.content.getvalue()
url_header = request.getAllHeaders()
# Ping Dispatch
if "THIS_STRING_SEPARATES" in str(url_header):
@ -148,7 +155,7 @@ class DivaServlet:
"utf-8"
)
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
request.responseHeaders.addRawHeader(b"content-type", b"text/plain")
self.logger.debug(
f"Response cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}"
)

View File

@ -51,6 +51,7 @@ profile = Table(
Column("rgo_sts", Integer, nullable=False, server_default="1"),
Column("lv_efct_id", Integer, nullable=False, server_default="0"),
Column("lv_plt_id", Integer, nullable=False, server_default="1"),
Column("skn_eqp", Integer, nullable=False, server_default="0"),
Column("passwd_stat", Integer, nullable=False, server_default="0"),
Column("passwd", String(12), nullable=False, server_default="**********"),
Column(

Some files were not shown because too many files have changed in this diff Show More