Compare commits

...

16 Commits

6 changed files with 441 additions and 218 deletions

View File

@ -1,13 +1,14 @@
import datetime
from datetime import datetime
from typing import Any, List, Dict
import logging
import json
import urllib
from urllib import parse
from core.config import CoreConfig
from titles.diva.config import DivaConfig
from titles.diva.const import DivaConstants
from titles.diva.database import DivaData
from titles.diva.handlers import *
class DivaBase:
@ -20,30 +21,39 @@ class DivaBase:
self.game = DivaConstants.GAME_CODE
self.version = DivaConstants.VER_PROJECT_DIVA_ARCADE_FUTURE_TONE
dt = datetime.datetime.now()
self.time_lut = urllib.parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0"))
dt = datetime.now()
self.time_lut = parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0"))
def handle_test_request(self, data: Dict) -> Dict:
return ""
def handle_test_request(self, data: bytes) -> str:
pass
def handle_game_init_request(self, data: Dict) -> Dict:
return f""
def handle_game_init_request(self, data: bytes) -> str:
req = GameInitRequest(data)
return None
def handle_attend_request(self, data: Dict) -> Dict:
encoded = "&"
params = {
"atnd_prm1": "0,1,1,0,0,0,1,0,100,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1",
"atnd_prm2": "30,10,100,4,1,50,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1",
"atnd_prm3": "100,0,1,1,1,1,1,1,1,1,2,3,4,1,1,1,3,4,5,1,1,1,4,5,6,1,1,1,5,6,7,4,4,4,9,10,14,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,10,30,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",
"atnd_lut": f"{self.time_lut}",
}
def handle_attend_request(self, data: bytes) -> str:
req = AttendRequest(data)
resp = AttendResponse(req.cmd, req.req_id)
encoded += urllib.parse.urlencode(params)
encoded = encoded.replace("%2C", ",")
for i in [0, 3, 4, 5, 7, 9, 10, 11, 12, 13]:
resp.atnd_prm1[i] = 0
resp.atnd_prm1[8] = 100
return encoded
resp.atnd_prm2[:6] = [30, 10, 100, 4, 1, 50]
def handle_ping_request(self, data: Dict) -> Dict:
resp.atnd_prm3[0] = 100
resp.atnd_prm3[1] = 0
resp.atnd_prm3[10:13] = [2, 3, 4]
resp.atnd_prm3[16:19] = [3, 4, 5]
resp.atnd_prm3[22:25] = [4, 5, 6]
resp.atnd_prm3[80:] = [0] * 20
resp.atnd_prm3[28:79] = [5,6,7,4,4,4,9,10,14,5,10,10,25,20,50,30,90,5,
10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,
5,10,10,25,20,50,30,90,10,30]
return resp.make()
def handle_ping_request(self, data: bytes) -> str:
encoded = "&"
params = {
"ping_b_msg": f"Welcome to {self.core_cfg.server.name} network!",
@ -82,13 +92,13 @@ class DivaBase:
"nblss_ltt_ed_tm": "2019-09-22 12:00:00.0",
}
encoded += urllib.parse.urlencode(params)
encoded += parse.urlencode(params)
encoded = encoded.replace("+", "%20")
encoded = encoded.replace("%2C", ",")
return encoded
def handle_pv_list_request(self, data: Dict) -> Dict:
def handle_pv_list_request(self, data: bytes) -> str:
pvlist = ""
with open(r"titles/diva/data/PvList0.dat", encoding="utf-8") as shop:
lines = shop.readlines()
@ -125,7 +135,7 @@ class DivaBase:
return response
def handle_shop_catalog_request(self, data: Dict) -> Dict:
def handle_shop_catalog_request(self, data: bytes) -> str:
catalog = ""
shopList = self.data.static.get_enabled_shops(self.version)
@ -133,8 +143,8 @@ class DivaBase:
with open(r"titles/diva/data/ShopCatalog.dat", encoding="utf-8") as shop:
lines = shop.readlines()
for line in lines:
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
else:
for shop in shopList:
@ -153,8 +163,8 @@ class DivaBase:
+ ","
+ str(shop["type"])
)
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
catalog = catalog.replace("+", "%20")
@ -163,7 +173,7 @@ class DivaBase:
return response
def handle_buy_module_request(self, data: Dict) -> Dict:
def handle_buy_module_request(self, data: bytes) -> str:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
module = self.data.static.get_enabled_shop(self.version, int(data["mdl_id"]))
@ -190,7 +200,7 @@ class DivaBase:
return response
def handle_cstmz_itm_ctlg_request(self, data: Dict) -> Dict:
def handle_cstmz_itm_ctlg_request(self, data: bytes) -> str:
catalog = ""
itemList = self.data.static.get_enabled_items(self.version)
@ -198,8 +208,8 @@ class DivaBase:
with open(r"titles/diva/data/ItemCatalog.dat", encoding="utf-8") as item:
lines = item.readlines()
for line in lines:
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
else:
for item in itemList:
@ -218,8 +228,8 @@ class DivaBase:
+ ","
+ str(item["type"])
)
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
catalog = catalog.replace("+", "%20")
@ -228,7 +238,7 @@ class DivaBase:
return response
def handle_buy_cstmz_itm_request(self, data: Dict) -> Dict:
def handle_buy_cstmz_itm_request(self, data: bytes) -> str:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
item = self.data.static.get_enabled_item(
self.version, int(data["cstmz_itm_id"])
@ -263,7 +273,7 @@ class DivaBase:
return response
def handle_festa_info_request(self, data: Dict) -> Dict:
def handle_festa_info_request(self, data: bytes) -> str:
encoded = "&"
params = {
"fi_id": "1,2",
@ -280,13 +290,13 @@ class DivaBase:
"fi_lut": "{self.time_lut}",
}
encoded += urllib.parse.urlencode(params)
encoded += parse.urlencode(params)
encoded = encoded.replace("+", "%20")
encoded = encoded.replace("%2C", ",")
return encoded
def handle_contest_info_request(self, data: Dict) -> Dict:
def handle_contest_info_request(self, data: bytes) -> str:
response = ""
response += f"&ci_lut={self.time_lut}"
@ -294,7 +304,7 @@ class DivaBase:
return response
def handle_qst_inf_request(self, data: Dict) -> Dict:
def handle_qst_inf_request(self, data: bytes) -> str:
quest = ""
questList = self.data.static.get_enabled_quests(self.version)
@ -302,7 +312,7 @@ class DivaBase:
with open(r"titles/diva/data/QuestInfo.dat", encoding="utf-8") as shop:
lines = shop.readlines()
for line in lines:
quest += f"{urllib.parse.quote(line)},"
quest += f"{parse.quote(line)},"
response = ""
response += f"&qi_lut={self.time_lut}"
@ -330,7 +340,7 @@ class DivaBase:
+ ","
+ str(quests["quest_enable"])
)
quest += f"{urllib.parse.quote(line)}%0A,"
quest += f"{parse.quote(line)}%0A,"
responseline = f"{quest[:-1]},"
for i in range(len(questList), 59):
@ -344,45 +354,47 @@ class DivaBase:
return response
def handle_nv_ranking_request(self, data: Dict) -> Dict:
return f""
def handle_nv_ranking_request(self, data: bytes) -> str:
pass
def handle_ps_ranking_request(self, data: Dict) -> Dict:
return f""
def handle_ps_ranking_request(self, data: bytes) -> str:
pass
def handle_ng_word_request(self, data: Dict) -> Dict:
return f""
def handle_ng_word_request(self, data: bytes) -> str:
pass
def handle_rmt_wp_list_request(self, data: Dict) -> Dict:
return f""
def handle_rmt_wp_list_request(self, data: bytes) -> str:
pass
def handle_pv_def_chr_list_request(self, data: Dict) -> Dict:
return f""
def handle_pv_def_chr_list_request(self, data: bytes) -> str:
pass
def handle_pv_ng_mdl_list_request(self, data: Dict) -> Dict:
return f""
def handle_pv_ng_mdl_list_request(self, data: bytes) -> str:
pass
def handle_cstmz_itm_ng_mdl_lst_request(self, data: Dict) -> Dict:
return f""
def handle_cstmz_itm_ng_mdl_lst_request(self, data: bytes) -> str:
pass
def handle_banner_info_request(self, data: Dict) -> Dict:
return f""
def handle_banner_info_request(self, data: bytes) -> str:
pass
def handle_banner_data_request(self, data: Dict) -> Dict:
return f""
def handle_banner_data_request(self, data: bytes) -> str:
pass
def handle_cm_ply_info_request(self, data: Dict) -> Dict:
return f""
def handle_cm_ply_info_request(self, data: bytes) -> str:
pass
def handle_pstd_h_ctrl_request(self, data: Dict) -> Dict:
return f""
def handle_pstd_h_ctrl_request(self, data: bytes) -> str:
pass
def handle_pstd_item_ng_lst_request(self, data: Dict) -> Dict:
return f""
def handle_pstd_item_ng_lst_request(self, data: bytes) -> str:
pass
def handle_pre_start_request(self, data: Dict) -> str:
profile = self.data.profile.get_profile(data["aime_id"], self.version)
profile_shop = self.data.item.get_shop(data["aime_id"], self.version)
def handle_pre_start_request(self, data: bytes) -> str:
req = PreStartRequest(data)
resp = PreStartResponse(req.cmd, req.req_id, req.aime_id)
profile = self.data.profile.get_profile(req.aime_id, self.version)
profile_shop = self.data.item.get_shop(req.aime_id, self.version)
if profile is None:
return f"&ps_result=-3"
@ -408,76 +420,73 @@ class DivaBase:
response += f"&sldr_tch_se_eqp={profile['sldr_tch_se_eqp']}"
response += f"&passwd_stat={profile['passwd_stat']}"
# Store stuff to add to rework
response += f"&mdl_eqp_tm={self.time_lut}"
profile_dict = profile._asdict()
profile_dict.pop("id")
profile_dict.pop("user")
profile_dict.pop("version")
mdl_eqp_ary = "-999,-999,-999"
for k, v in profile_dict.items():
if hasattr(resp, k):
setattr(resp, k, v)
# get the common_modules from the profile shop
if profile_shop:
mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
if profile_shop is not None and profile_shop:
resp.mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
response += f"&mdl_eqp_ary={mdl_eqp_ary}"
return resp.make()
return response
def handle_registration_request(self, data: Dict) -> Dict:
def handle_registration_request(self, data: bytes) -> str:
self.data.profile.create_profile(
self.version, data["aime_id"], data["player_name"]
)
return f"&cd_adm_result=1&pd_id={data['aime_id']}"
def handle_start_request(self, data: Dict) -> Dict:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
profile_shop = self.data.item.get_shop(data["pd_id"], self.version)
def handle_start_request(self, data: bytes) -> str:
req = StartRequest(data)
profile = self.data.profile.get_profile(req.pd_id, self.version)
profile_shop = self.data.item.get_shop(req.pd_id, self.version)
if profile is None:
return
resp = StartResponse(req.cmd, req.req_id, req.pd_id, profile['player_name'])
mdl_have = "F" * 250
# generate the mdl_have string if "unlock_all_modules" is disabled
if not self.game_config.mods.unlock_all_modules:
mdl_have = self.data.module.get_modules_have_string(
resp.mdl_have = self.data.module.get_modules_have_string(
data["pd_id"], self.version
)
cstmz_itm_have = "F" * 250
# generate the cstmz_itm_have string if "unlock_all_items" is disabled
if not self.game_config.mods.unlock_all_items:
cstmz_itm_have = self.data.customize.get_customize_items_have_string(
resp.cstmz_itm_have = self.data.customize.get_customize_items_have_string(
data["pd_id"], self.version
)
response = f"&pd_id={data['pd_id']}"
response += "&start_result=1"
response += "&accept_idx=100"
response += f"&hp_vol={profile['hp_vol']}"
response += f"&btn_se_vol={profile['btn_se_vol']}"
response += f"&btn_se_vol2={profile['btn_se_vol2']}"
response += f"&sldr_se_vol2={profile['sldr_se_vol2']}"
response += f"&sort_kind={profile['sort_kind']}"
response += f"&player_name={profile['player_name']}"
response += f"&lv_num={profile['lv_num']}"
response += f"&lv_pnt={profile['lv_pnt']}"
response += f"&lv_efct_id={profile['lv_efct_id']}"
response += f"&lv_plt_id={profile['lv_plt_id']}"
response += f"&mdl_have={mdl_have}"
response += f"&cstmz_itm_have={cstmz_itm_have}"
response += f"&use_pv_mdl_eqp={int(profile['use_pv_mdl_eqp'])}"
response += f"&use_mdl_pri={int(profile['use_mdl_pri'])}"
response += f"&use_pv_skn_eqp={int(profile['use_pv_skn_eqp'])}"
response += f"&use_pv_btn_se_eqp={int(profile['use_pv_btn_se_eqp'])}"
response += f"&use_pv_sld_se_eqp={int(profile['use_pv_sld_se_eqp'])}"
response += f"&use_pv_chn_sld_se_eqp={int(profile['use_pv_chn_sld_se_eqp'])}"
response += f"&use_pv_sldr_tch_se_eqp={int(profile['use_pv_sldr_tch_se_eqp'])}"
response += f"&vcld_pts={profile['lv_efct_id']}"
response += f"&nxt_pv_id={profile['nxt_pv_id']}"
response += f"&nxt_dffclty={profile['nxt_dffclty']}"
response += f"&nxt_edtn={profile['nxt_edtn']}"
response += f"&dsp_clr_brdr={profile['dsp_clr_brdr']}"
response += f"&dsp_intrm_rnk={profile['dsp_intrm_rnk']}"
response += f"&dsp_clr_sts={profile['dsp_clr_sts']}"
response += f"&rgo_sts={profile['rgo_sts']}"
resp.pd_id = data['pd_id']
resp.hp_vol = {profile['hp_vol']}
resp.btn_se_vol = {profile['btn_se_vol']}
resp.btn_se_vol2 = {profile['btn_se_vol2']}
resp.sldr_se_vol2 = {profile['sldr_se_vol2']}
resp.sort_kind = {profile['sort_kind']}
resp.player_name = {profile['player_name']}
resp.lv_num = {profile['lv_num']}
resp.lv_pnt = {profile['lv_pnt']}
resp.lv_efct_id = {profile['lv_efct_id']}
resp.lv_plt_id = {profile['lv_plt_id']}
resp.use_pv_mdl_eqp = {int(profile['use_pv_mdl_eqp'])}
resp.use_mdl_pri = {int(profile['use_mdl_pri'])}
resp.use_pv_skn_eqp = {int(profile['use_pv_skn_eqp'])}
resp.use_pv_btn_se_eqp = {int(profile['use_pv_btn_se_eqp'])}
resp.use_pv_sld_se_eqp = {int(profile['use_pv_sld_se_eqp'])}
resp.use_pv_chn_sld_se_eqp = {int(profile['use_pv_chn_sld_se_eqp'])}
resp.use_pv_sldr_tch_se_eqp = {int(profile['use_pv_sldr_tch_se_eqp'])}
resp.vcld_pts = {profile['lv_efct_id']}
resp.nxt_pv_id = {profile['nxt_pv_id']}
resp.nxt_dffclty = {profile['nxt_dffclty']}
resp.nxt_edtn = {profile['nxt_edtn']}
resp.dsp_clr_brdr = {profile['dsp_clr_brdr']}
resp.dsp_intrm_rnk = {profile['dsp_intrm_rnk']}
resp.dsp_clr_sts = {profile['dsp_clr_sts']}
resp.rgo_sts = {profile['rgo_sts']}
# Contest progress
response += f"&cv_cid=-1,-1,-1,-1"
@ -495,12 +504,9 @@ class DivaBase:
response += f"&cnp_sp={profile['cnp_sp']}" if profile["cnp_sp"] != "" else ""
# To be fully fixed
if "my_qst_id" not in profile:
response += f"&my_qst_id=-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
response += f"&my_qst_sts=0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
else:
response += f"&my_qst_id={profile['my_qst_id']}"
response += f"&my_qst_sts={profile['my_qst_sts']}"
if "my_qst_id" in profile:
resp.my_qst_id = {profile['my_qst_id']}
resp.my_qst_sts = {profile['my_qst_sts']}
response += f"&my_qst_prgrs=0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
response += f"&my_qst_et=2022-06-19%2010%3A28%3A52.0,2022-06-19%2010%3A28%3A52.0,2022-06-19%2010%3A28%3A52.0,2100-01-01%2008%3A59%3A59.0,2100-01-01%2008%3A59%3A59.0,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx"
@ -565,27 +571,19 @@ class DivaBase:
# Store stuff to add to rework
response += f"&mdl_eqp_tm={self.time_lut}"
mdl_eqp_ary = "-999,-999,-999"
c_itm_eqp_ary = "-999,-999,-999,-999,-999,-999,-999,-999,-999,-999,-999,-999"
ms_itm_flg_ary = "1,1,1,1,1,1,1,1,1,1,1,1"
# get the common_modules, customize_items and customize_item_flags
# from the profile shop
if profile_shop:
mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
c_itm_eqp_ary = profile_shop["c_itm_eqp_ary"]
ms_itm_flg_ary = profile_shop["ms_itm_flg_ary"]
resp.mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
resp.c_itm_eqp_ary = profile_shop["c_itm_eqp_ary"]
resp.ms_itm_flg_ary = profile_shop["ms_itm_flg_ary"]
response += f"&mdl_eqp_ary={mdl_eqp_ary}"
response += f"&c_itm_eqp_ary={c_itm_eqp_ary}"
response += f"&ms_itm_flg_ary={ms_itm_flg_ary}"
return resp.make()
return response
def handle_pd_unlock_request(self, data: bytes) -> str:
pass
def handle_pd_unlock_request(self, data: Dict) -> Dict:
return f""
def handle_spend_credit_request(self, data: Dict) -> Dict:
def handle_spend_credit_request(self, data: bytes) -> str:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
if profile is None:
return
@ -663,7 +661,7 @@ class DivaBase:
return pv_result
def handle_get_pv_pd_request(self, data: Dict) -> Dict:
def handle_get_pv_pd_request(self, data: bytes) -> str:
song_id = data["pd_pv_id_lst"].split(",")
pv = ""
@ -703,9 +701,9 @@ class DivaBase:
self.logger.debug(f"pv_result = {pv_result}")
pv += urllib.parse.quote(pv_result)
pv += parse.quote(pv_result)
else:
pv += urllib.parse.quote(f"{song}***")
pv += parse.quote(f"{song}***")
pv += ","
response = ""
@ -715,10 +713,10 @@ class DivaBase:
return response
def handle_stage_start_request(self, data: Dict) -> Dict:
return f""
def handle_stage_start_request(self, data: bytes) -> str:
pass
def handle_stage_result_request(self, data: Dict) -> Dict:
def handle_stage_result_request(self, data: bytes) -> str:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
pd_song_list = data["stg_ply_pv_id"].split(",")
@ -897,7 +895,7 @@ class DivaBase:
return response
def handle_end_request(self, data: Dict) -> Dict:
def handle_end_request(self, data: bytes) -> str:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
self.data.profile.update_profile(
@ -905,7 +903,7 @@ class DivaBase:
)
return f""
def handle_shop_exit_request(self, data: Dict) -> Dict:
def handle_shop_exit_request(self, data: bytes) -> str:
self.data.item.put_shop(
data["pd_id"],
self.version,

View File

@ -6,6 +6,8 @@ class DivaConstants:
VER_PROJECT_DIVA_ARCADE = 0
VER_PROJECT_DIVA_ARCADE_FUTURE_TONE = 1
LUT_TIME_FMT = "%Y-%m-%d %H:%M:%S:16.0"
VERSION_NAMES = ("Project Diva Arcade", "Project Diva Arcade Future Tone")
@classmethod

View File

@ -0,0 +1,2 @@
from titles.diva.handlers.base import *
from titles.diva.handlers.user import *

View File

@ -0,0 +1,143 @@
from urllib import parse
from datetime import datetime
from typing import Union, Dict
class DivaRequestParseException(Exception):
"""
Exception raised when there is a fault in parsing a diva request,
either due to a malformed request, or missing required items
"""
def __init__(self, message: str) -> None:
self.message = message
super().__init__(self.message)
class BaseBinaryRequest:
cmd: str
req_id: str
def __init__(self, raw: bytes) -> None:
self.raw = raw
self.raw_dict = dict(parse.parse_qsl(raw))
if "cmd" not in self.raw_dict:
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
if "req_id" not in self.raw_dict:
raise DivaRequestParseException(
f"req_id not in request data {self.raw_dict}"
)
for k, v in self.raw_dict:
setattr(self, k, v)
class BaseRequest:
cmd: str
req_id: str
game_id: str
r_rev: str
kc_serial: str
b_serial: str
country_code: str
def __init__(self, raw: Union[str, bytes]) -> None:
self.raw = raw
self.raw_dict: Dict[bytes, bytes] = dict(parse.parse_qsl(raw))
if b"cmd" not in self.raw_dict:
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
if b"req_id" not in self.raw_dict:
raise DivaRequestParseException(
f"req_id not in request data {self.raw_dict}"
)
if b"place_id" not in self.raw_dict:
raise DivaRequestParseException(
f"place_id not in request data {self.raw_dict}"
)
if b"start_up_mode" not in self.raw_dict:
raise DivaRequestParseException(
f"start_up_mode not in request data {self.raw_dict}"
)
if b"cmm_dly_mod" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_dly_mod not in request data {self.raw_dict}"
)
if b"cmm_dly_sec" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_dly_sec not in request data {self.raw_dict}"
)
if b"cmm_err_mod" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_err_mod not in request data {self.raw_dict}"
)
if b"region_code" not in self.raw_dict:
raise DivaRequestParseException(
f"region_code not in request data {self.raw_dict}"
)
if b"time_stamp" not in self.raw_dict:
raise DivaRequestParseException(
f"time_stamp not in request data {self.raw_dict}"
)
for k, v in self.raw_dict.items():
setattr(self, k.decode(), v.decode())
self.place_id = int(self.place_id)
self.start_up_mode = int(self.start_up_mode)
self.cmm_dly_mod = int(self.cmm_dly_mod)
self.cmm_dly_sec = int(self.cmm_dly_sec)
self.cmm_err_mod = int(self.cmm_err_mod)
self.region_code = int(self.region_code)
# datetime.now().astimezone().replace(microsecond=0).isoformat()
self.time_stamp = datetime.strptime(self.time_stamp, "%Y-%m-%dT%H:%M:%S%z")
class BaseResponse:
def __init__(self, cmd_id: str, req_id: int) -> None:
self.cmd = cmd_id
self.req_id = req_id
self.stat = "ok"
def make(self) -> str:
return f"cmd={self.cmd}&req_id={self.req_id}&stat={self.stat}"
class GameInitRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
class AttendRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
self.power_on = int(self.power_on)
self.is_bb = bool(int(self.power_on))
class AttendResponse(BaseResponse):
def __init__(self, cmd_id: str, req_id: int) -> None:
super().__init__(cmd_id, req_id)
self.atnd_prm1 = [1] * 100
self.atnd_prm2 = [1] * 100
self.atnd_prm3 = [1] * 100
self.atnd_lut = datetime.now()
def make(self) -> str:
ret = super().make()
ret_dict = {
"atnd_prm1": ','.join([str(i) for i in self.atnd_prm1]),
"atnd_prm2": ','.join([str(i) for i in self.atnd_prm2]),
"atnd_prm3": ','.join([str(i) for i in self.atnd_prm3]),
"atnd_lut": parse.quote(self.atnd_lut.strftime('%Y-%m-%d %H:%M:%S:16.0'))
}
ret += "&" + parse.urlencode(ret_dict, safe=",")
return ret

View File

@ -0,0 +1,111 @@
from titles.diva.handlers.base import (
BaseRequest,
BaseResponse,
DivaRequestParseException,
)
from datetime import datetime
from urllib import parse
from ..const import DivaConstants
class PreStartRequest(BaseRequest):
pmm: str
idm: str
mmgameid: str
mmuid: str
a_code: str
aime_id: str
aime_a_code: str
def __init__(self, raw: str) -> None:
super().__init__(raw)
try:
self.key_obj_type = int(self.key_obj_type)
self.exec_vu = int(self.exec_vu)
except AttributeError as e:
raise DivaRequestParseException(f"PreStartRequest: {e}")
class PreStartResponse(BaseResponse):
player_name: str
sort_kind: str
lv_efct_id: str
lv_plt_id: str
lv_str: str
lv_num: str
lv_pnt: str
vcld_pts: str
skn_eqp: str
btn_se_eqp: str
sld_se_eqp: str
chn_sld_se_eqp: str
sldr_tch_se_eqp: str
passwd_stat: str
mdl_eqp_tm: str
def __init__(self, cmd_id: str, req_id: int, pd_id: int) -> None:
super().__init__(cmd_id, req_id)
self.ps_result = 1
self.pd_id = pd_id
self.accept_idx = 100
self.nblss_ltt_stts = -1
self.nblss_ltt_tckt = -1
self.nblss_ltt_is_opn = -1
# Ideally this would be a real array that would get converted later
# But this is how it's stored in the db, so w/e for now
self.mdl_eqp_ary = "-999,-999,-999"
class StartRequest(BaseRequest):
def __init__(self, raw: str) -> None:
super().__init__(raw)
try:
self.pd_id = int(self.pd_id)
self.accept_idx = int(self.accept_idx)
except AttributeError as e:
raise DivaRequestParseException(f"StartRequest: {e}")
class StartResponse(BaseResponse):
def __init__(self, cmd_id: str, req_id: int, pv_id: int, pv_name: str) -> None:
super().__init__(cmd_id, req_id)
self.pd_id: int = pv_id
self.start_result: int = 1
self.accept_idx: int = 100
self.hp_vol: int = 0
self.btn_se_vol: int = 1
self.btn_se_vol2: int = 1
self.sldr_se_vol2: int = 1
self.sort_kind: int = 1
self.player_name: str = pv_name
self.lv_num: int = 1
self.lv_pnt: int = 0
self.lv_efct_id: int = 1
self.lv_plt_id: int = 1
self.mdl_have: str = "F" * 250
self.cstmz_itm_have: str = "F" * 250
self.use_pv_mdl_eqp: int = 0
self.use_mdl_pri: int = 0
self.use_pv_skn_eqp: int = 1
self.use_pv_btn_se_eqp: int = 1
self.use_pv_sld_se_eqp: int = 1
self.use_pv_chn_sld_se_eqp: int = 1
self.use_pv_sldr_tch_se_eqp: int = 1
self.vcld_pts: int = 0
self.nxt_pv_id: int = 1
self.nxt_dffclty: int = 1
self.nxt_edtn: int = 0
self.dsp_clr_brdr: int = 0
self.dsp_intrm_rnk: int = 0
self.dsp_clr_sts: int = 0
self.rgo_sts: int = 0
self.my_qst_id: str = ",".join(["-1"] * 25)
self.my_qst_sts: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.my_qst_prgrs: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.my_qst_et: str = ",".join([parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))] * 5) + "," + ",".join(["xxx"] * 20)
self.clr_sts: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.mdl_eqp_tm: str = parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))
self.mdl_eqp_ary = ",".join(["-999"] * 3)
self.c_itm_eqp_ary = ",".join(["-999"] * 12)
self.ms_itm_flg_ary = ",".join(["1"] * 12)

View File

@ -4,11 +4,11 @@ import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
import zlib
import json
import urllib.parse
import base64
from os import path
from typing import Tuple
from titles.diva.handlers.base import *
from core.config import CoreConfig
from titles.diva.config import DivaConfig
from titles.diva.const import DivaConstants
@ -74,87 +74,54 @@ class DivaServlet:
def render_POST(self, req: Request, version: int, url_path: str) -> bytes:
req_raw = req.content.getvalue()
url_header = req.getAllHeaders()
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
# Ping Dispatch
if "THIS_STRING_SEPARATES" in str(url_header):
binary_request = req_raw.splitlines()
binary_cmd_decoded = binary_request[3].decode("utf-8")
binary_array = binary_cmd_decoded.split("&")
bin_req_data = {}
req_cls = BaseBinaryRequest(binary_cmd_decoded)
for kvp in binary_array:
split_bin = kvp.split("=")
bin_req_data[split_bin[0]] = split_bin[1]
else:
json_string = json.dumps(
req_raw.decode("utf-8")
) # Take the response and decode as UTF-8 and dump
b64string = json_string.replace(
r"\n", "\n"
) # Remove all \n and separate them as new lines
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
self.logger.info(f"Binary {bin_req_data['cmd']} Request")
self.logger.debug(bin_req_data)
try:
url_data = zlib.decompress(gz_string) # Decompressing the gzip
except zlib.error as e:
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
return b"stat=0"
handler = getattr(self.base, f"handle_{bin_req_data['cmd']}_request")
resp = handler(bin_req_data)
try:
req_cls = BaseRequest(url_data)
except DivaRequestParseException as e:
self.logger.error(e)
return b"stat=0"
self.logger.debug(
f"Response cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}"
)
return f"cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}".encode(
"utf-8"
)
# Main Dispatch
json_string = json.dumps(
req_raw.decode("utf-8")
) # Take the response and decode as UTF-8 and dump
b64string = json_string.replace(
r"\n", "\n"
) # Remove all \n and separate them as new lines
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
try:
url_data = zlib.decompress(gz_string).decode(
"utf-8"
) # Decompressing the gzip
except zlib.error as e:
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
return "stat=0"
req_kvp = urllib.parse.unquote(url_data)
req_data = {}
# We then need to split each parts with & so we can reuse them to fill out the requests
splitted_request = str.split(req_kvp, "&")
for kvp in splitted_request:
split = kvp.split("=")
req_data[split[0]] = split[1]
self.logger.info(f"{req_data['cmd']} Request")
self.logger.debug(req_data)
func_to_find = f"handle_{req_data['cmd']}_request"
# Load the requests
try:
handler = getattr(self.base, func_to_find)
resp = handler(req_data)
except AttributeError as e:
self.logger.warning(f"Unhandled {req_data['cmd']} request {e}")
return f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok".encode(
"utf-8"
)
except Exception as e:
self.logger.error(f"Error handling method {func_to_find} {e}")
return f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok".encode(
"utf-8"
)
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
self.logger.debug(
f"Response cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}"
self.logger.debug(f"Request: {url_data}\nHeaders: {url_header}")
self.logger.info(
f"{req_cls.cmd} request from {req_cls.kc_serial}/{req_cls.b_serial} at {req.getClientAddress().host}"
)
return (
f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}".encode(
"utf-8"
)
)
handler_str = f"handle_{req_cls.cmd}_request"
if not hasattr(self.base, handler_str):
self.logger.warn(f"Unhandled cmd {req_cls.cmd}")
return BaseResponse(req_cls.cmd, req_cls.req_id).make().encode()
handler = getattr(self.base, handler_str)
response = handler(req_cls.raw)
if response is None or response == "":
response = BaseResponse(req_cls.cmd, req_cls.req_id).make()
if not response.startswith("cmd="):
response = f"cmd={req_cls.cmd}&req_id={req_cls.req_id}&stat=ok" + response
self.logger.debug(f"Response: {response}")
return response.encode(errors="ignore")