forked from Hay1tsme/artemis
remove unused code from index.py
This commit is contained in:
parent
9718e822f3
commit
2b81ba206c
@ -1,13 +1,14 @@
|
||||
import datetime
|
||||
from datetime import datetime
|
||||
from typing import Any, List, Dict
|
||||
import logging
|
||||
import json
|
||||
import urllib
|
||||
from urllib import parse
|
||||
|
||||
from core.config import CoreConfig
|
||||
from titles.diva.config import DivaConfig
|
||||
from titles.diva.const import DivaConstants
|
||||
from titles.diva.database import DivaData
|
||||
from titles.diva.handlers import *
|
||||
|
||||
class DivaBase():
|
||||
def __init__(self, cfg: CoreConfig, game_cfg: DivaConfig) -> None:
|
||||
@ -19,16 +20,16 @@ class DivaBase():
|
||||
self.game = DivaConstants.GAME_CODE
|
||||
self.version = DivaConstants.VER_PROJECT_DIVA_ARCADE_FUTURE_TONE
|
||||
|
||||
dt = datetime.datetime.now()
|
||||
self.time_lut=urllib.parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0"))
|
||||
dt = datetime.now()
|
||||
self.time_lut=parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0"))
|
||||
|
||||
def handle_test_request(self, data: Dict) -> Dict:
|
||||
def handle_test_request(self, data: bytes) -> str:
|
||||
return ""
|
||||
|
||||
def handle_game_init_request(self, data: Dict) -> Dict:
|
||||
def handle_game_init_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_attend_request(self, data: Dict) -> Dict:
|
||||
def handle_attend_request(self, data: bytes) -> str:
|
||||
encoded = "&"
|
||||
params = {
|
||||
'atnd_prm1': '0,1,1,0,0,0,1,0,100,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1',
|
||||
@ -37,12 +38,12 @@ class DivaBase():
|
||||
'atnd_lut': f'{self.time_lut}',
|
||||
}
|
||||
|
||||
encoded += urllib.parse.urlencode(params)
|
||||
encoded += parse.urlencode(params)
|
||||
encoded = encoded.replace("%2C", ",")
|
||||
|
||||
return encoded
|
||||
|
||||
def handle_ping_request(self, data: Dict) -> Dict:
|
||||
def handle_ping_request(self, data: bytes) -> str:
|
||||
encoded = "&"
|
||||
params = {
|
||||
'ping_b_msg': f'Welcome to {self.core_cfg.server.name} network!',
|
||||
@ -81,13 +82,13 @@ class DivaBase():
|
||||
'nblss_ltt_ed_tm': "2019-09-22 12:00:00.0",
|
||||
}
|
||||
|
||||
encoded += urllib.parse.urlencode(params)
|
||||
encoded += parse.urlencode(params)
|
||||
encoded = encoded.replace("+", "%20")
|
||||
encoded = encoded.replace("%2C", ",")
|
||||
|
||||
return encoded
|
||||
|
||||
def handle_pv_list_request(self, data: Dict) -> Dict:
|
||||
def handle_pv_list_request(self, data: bytes) -> str:
|
||||
pvlist = ""
|
||||
with open(r"titles/diva/data/PvList0.dat", encoding="utf-8") as shop:
|
||||
lines = shop.readlines()
|
||||
@ -124,7 +125,7 @@ class DivaBase():
|
||||
|
||||
return ( response )
|
||||
|
||||
def handle_shop_catalog_request(self, data: Dict) -> Dict:
|
||||
def handle_shop_catalog_request(self, data: bytes) -> str:
|
||||
catalog = ""
|
||||
|
||||
shopList = self.data.static.get_enabled_shops(self.version)
|
||||
@ -132,14 +133,14 @@ class DivaBase():
|
||||
with open(r"titles/diva/data/ShopCatalog.dat", encoding="utf-8") as shop:
|
||||
lines = shop.readlines()
|
||||
for line in lines:
|
||||
line = urllib.parse.quote(line) + ","
|
||||
catalog += f"{urllib.parse.quote(line)}"
|
||||
line = parse.quote(line) + ","
|
||||
catalog += f"{parse.quote(line)}"
|
||||
|
||||
else:
|
||||
for shop in shopList:
|
||||
line = str(shop["shopId"]) + "," + str(shop['unknown_0']) + "," + shop['name'] + "," + str(shop['points']) + "," + shop['start_date'] + "," + shop['end_date'] + "," + str(shop["type"])
|
||||
line = urllib.parse.quote(line) + ","
|
||||
catalog += f"{urllib.parse.quote(line)}"
|
||||
line = parse.quote(line) + ","
|
||||
catalog += f"{parse.quote(line)}"
|
||||
|
||||
catalog = catalog.replace("+", "%20")
|
||||
|
||||
@ -148,7 +149,7 @@ class DivaBase():
|
||||
|
||||
return ( response )
|
||||
|
||||
def handle_buy_module_request(self, data: Dict) -> Dict:
|
||||
def handle_buy_module_request(self, data: bytes) -> str:
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
module = self.data.static.get_enabled_shop(self.version, int(data["mdl_id"]))
|
||||
|
||||
@ -178,7 +179,7 @@ class DivaBase():
|
||||
|
||||
return response
|
||||
|
||||
def handle_cstmz_itm_ctlg_request(self, data: Dict) -> Dict:
|
||||
def handle_cstmz_itm_ctlg_request(self, data: bytes) -> str:
|
||||
catalog = ""
|
||||
|
||||
itemList = self.data.static.get_enabled_items(self.version)
|
||||
@ -186,14 +187,14 @@ class DivaBase():
|
||||
with open(r"titles/diva/data/ItemCatalog.dat", encoding="utf-8") as item:
|
||||
lines = item.readlines()
|
||||
for line in lines:
|
||||
line = urllib.parse.quote(line) + ","
|
||||
catalog += f"{urllib.parse.quote(line)}"
|
||||
line = parse.quote(line) + ","
|
||||
catalog += f"{parse.quote(line)}"
|
||||
|
||||
else:
|
||||
for item in itemList:
|
||||
line = str(item["itemId"]) + "," + str(item['unknown_0']) + "," + item['name'] + "," + str(item['points']) + "," + item['start_date'] + "," + item['end_date'] + "," + str(item["type"])
|
||||
line = urllib.parse.quote(line) + ","
|
||||
catalog += f"{urllib.parse.quote(line)}"
|
||||
line = parse.quote(line) + ","
|
||||
catalog += f"{parse.quote(line)}"
|
||||
|
||||
catalog = catalog.replace("+", "%20")
|
||||
|
||||
@ -202,7 +203,7 @@ class DivaBase():
|
||||
|
||||
return ( response )
|
||||
|
||||
def handle_buy_cstmz_itm_request(self, data: Dict) -> Dict:
|
||||
def handle_buy_cstmz_itm_request(self, data: bytes) -> str:
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
item = self.data.static.get_enabled_item(self.version, int(data["cstmz_itm_id"]))
|
||||
|
||||
@ -234,7 +235,7 @@ class DivaBase():
|
||||
|
||||
return response
|
||||
|
||||
def handle_festa_info_request(self, data: Dict) -> Dict:
|
||||
def handle_festa_info_request(self, data: bytes) -> str:
|
||||
encoded = "&"
|
||||
params = {
|
||||
'fi_id': '1,-1',
|
||||
@ -250,13 +251,13 @@ class DivaBase():
|
||||
'fi_lut': '{self.time_lut}',
|
||||
}
|
||||
|
||||
encoded += urllib.parse.urlencode(params)
|
||||
encoded += parse.urlencode(params)
|
||||
encoded = encoded.replace("+", "%20")
|
||||
encoded = encoded.replace("%2C", ",")
|
||||
|
||||
return encoded
|
||||
|
||||
def handle_contest_info_request(self, data: Dict) -> Dict:
|
||||
def handle_contest_info_request(self, data: bytes) -> str:
|
||||
response = ""
|
||||
|
||||
response += f"&ci_lut={self.time_lut}"
|
||||
@ -264,7 +265,7 @@ class DivaBase():
|
||||
|
||||
return ( response )
|
||||
|
||||
def handle_qst_inf_request(self, data: Dict) -> Dict:
|
||||
def handle_qst_inf_request(self, data: bytes) -> str:
|
||||
quest = ""
|
||||
|
||||
questList = self.data.static.get_enabled_quests(self.version)
|
||||
@ -272,7 +273,7 @@ class DivaBase():
|
||||
with open(r"titles/diva/data/QuestInfo.dat", encoding="utf-8") as shop:
|
||||
lines = shop.readlines()
|
||||
for line in lines:
|
||||
quest += f"{urllib.parse.quote(line)},"
|
||||
quest += f"{parse.quote(line)},"
|
||||
|
||||
response = ""
|
||||
response += f"&qi_lut={self.time_lut}"
|
||||
@ -280,7 +281,7 @@ class DivaBase():
|
||||
else:
|
||||
for quests in questList:
|
||||
line = str(quests["questId"]) + "," + str(quests['quest_order']) + "," + str(quests['kind']) + "," + str(quests['unknown_0']) + "," + quests['start_datetime'] + "," + quests['end_datetime'] + "," + quests["name"] + "," + str(quests["unknown_1"]) + "," + str(quests["unknown_2"]) + "," + str(quests["quest_enable"])
|
||||
quest += f"{urllib.parse.quote(line)}%0A,"
|
||||
quest += f"{parse.quote(line)}%0A,"
|
||||
|
||||
responseline = f"{quest[:-1]},"
|
||||
for i in range(len(questList),59):
|
||||
@ -294,88 +295,71 @@ class DivaBase():
|
||||
|
||||
return ( response )
|
||||
|
||||
def handle_nv_ranking_request(self, data: Dict) -> Dict:
|
||||
def handle_nv_ranking_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_ps_ranking_request(self, data: Dict) -> Dict:
|
||||
def handle_ps_ranking_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_ng_word_request(self, data: Dict) -> Dict:
|
||||
def handle_ng_word_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_rmt_wp_list_request(self, data: Dict) -> Dict:
|
||||
def handle_rmt_wp_list_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_pv_def_chr_list_request(self, data: Dict) -> Dict:
|
||||
def handle_pv_def_chr_list_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_pv_ng_mdl_list_request(self, data: Dict) -> Dict:
|
||||
def handle_pv_ng_mdl_list_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_cstmz_itm_ng_mdl_lst_request(self, data: Dict) -> Dict:
|
||||
def handle_cstmz_itm_ng_mdl_lst_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_banner_info_request(self, data: Dict) -> Dict:
|
||||
def handle_banner_info_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_banner_data_request(self, data: Dict) -> Dict:
|
||||
def handle_banner_data_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_cm_ply_info_request(self, data: Dict) -> Dict:
|
||||
def handle_cm_ply_info_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_pstd_h_ctrl_request(self, data: Dict) -> Dict:
|
||||
def handle_pstd_h_ctrl_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_pstd_item_ng_lst_request(self, data: Dict) -> Dict:
|
||||
def handle_pstd_item_ng_lst_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_pre_start_request(self, data: Dict) -> str:
|
||||
profile = self.data.profile.get_profile(data["aime_id"], self.version)
|
||||
profile_shop = self.data.item.get_shop(data["aime_id"], self.version)
|
||||
def handle_pre_start_request(self, data: bytes) -> str:
|
||||
req = PreStartRequest(data)
|
||||
resp = PreStartResponse(req.cmd, req.req_id, req.aime_id)
|
||||
profile = self.data.profile.get_profile(req.aime_id, self.version)
|
||||
profile_shop = self.data.item.get_shop(req.aime_id, self.version)
|
||||
|
||||
if profile is None:
|
||||
return f"&ps_result=-3"
|
||||
else:
|
||||
response = "&ps_result=1"
|
||||
response += "&accept_idx=100"
|
||||
response += "&nblss_ltt_stts=-1"
|
||||
response += "&nblss_ltt_tckt=-1"
|
||||
response += "&nblss_ltt_is_opn=-1"
|
||||
response += f"&pd_id={data['aime_id']}"
|
||||
response += f"&player_name={profile['player_name']}"
|
||||
response += f"&sort_kind={profile['player_name']}"
|
||||
response += f"&lv_efct_id={profile['lv_efct_id']}"
|
||||
response += f"&lv_plt_id={profile['lv_plt_id']}"
|
||||
response += f"&lv_str={profile['lv_str']}"
|
||||
response += f"&lv_num={profile['lv_num']}"
|
||||
response += f"&lv_pnt={profile['lv_pnt']}"
|
||||
response += f"&vcld_pts={profile['vcld_pts']}"
|
||||
response += f"&skn_eqp={profile['use_pv_skn_eqp']}"
|
||||
response += f"&btn_se_eqp={profile['use_pv_btn_se_eqp']}"
|
||||
response += f"&sld_se_eqp={profile['use_pv_sld_se_eqp']}"
|
||||
response += f"&chn_sld_se_eqp={profile['use_pv_chn_sld_se_eqp']}"
|
||||
response += f"&sldr_tch_se_eqp={profile['use_pv_sldr_tch_se_eqp']}"
|
||||
response += f"&passwd_stat={profile['passwd_stat']}"
|
||||
resp.ps_result = -3
|
||||
return resp.make()
|
||||
|
||||
# Store stuff to add to rework
|
||||
response += f"&mdl_eqp_tm={self.time_lut}"
|
||||
profile_dict = profile._asdict()
|
||||
profile_dict.pop("id")
|
||||
profile_dict.pop("user")
|
||||
profile_dict.pop("version")
|
||||
|
||||
mdl_eqp_ary = "-999,-999,-999"
|
||||
for k, v in profile_dict.items():
|
||||
if hasattr(resp, k):
|
||||
setattr(resp, k, v)
|
||||
|
||||
# get the common_modules from the profile shop
|
||||
if profile_shop:
|
||||
mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
|
||||
if profile_shop is not None and profile_shop:
|
||||
resp.mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
|
||||
|
||||
response += f"&mdl_eqp_ary={mdl_eqp_ary}"
|
||||
return resp.make()
|
||||
|
||||
return response
|
||||
|
||||
def handle_registration_request(self, data: Dict) -> Dict:
|
||||
def handle_registration_request(self, data: bytes) -> str:
|
||||
self.data.profile.create_profile(self.version, data["aime_id"], data["player_name"])
|
||||
return (f"&cd_adm_result=1&pd_id={data['aime_id']}")
|
||||
|
||||
def handle_start_request(self, data: Dict) -> Dict:
|
||||
def handle_start_request(self, data: bytes) -> str:
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
profile_shop = self.data.item.get_shop(data["pd_id"], self.version)
|
||||
if profile is None:
|
||||
@ -455,10 +439,10 @@ class DivaBase():
|
||||
|
||||
return ( response )
|
||||
|
||||
def handle_pd_unlock_request(self, data: Dict) -> Dict:
|
||||
def handle_pd_unlock_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_spend_credit_request(self, data: Dict) -> Dict:
|
||||
def handle_spend_credit_request(self, data: bytes) -> str:
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
if profile is None: return
|
||||
|
||||
@ -529,7 +513,7 @@ class DivaBase():
|
||||
|
||||
return pv_result
|
||||
|
||||
def handle_get_pv_pd_request(self, data: Dict) -> Dict:
|
||||
def handle_get_pv_pd_request(self, data: bytes) -> str:
|
||||
song_id = data["pd_pv_id_lst"].split(",")
|
||||
pv = ""
|
||||
|
||||
@ -555,9 +539,9 @@ class DivaBase():
|
||||
|
||||
self.logger.debug(f"pv_result = {pv_result}")
|
||||
|
||||
pv += urllib.parse.quote(pv_result)
|
||||
pv += parse.quote(pv_result)
|
||||
else:
|
||||
pv += urllib.parse.quote(f"{song}***")
|
||||
pv += parse.quote(f"{song}***")
|
||||
pv += ","
|
||||
|
||||
response = ""
|
||||
@ -567,10 +551,10 @@ class DivaBase():
|
||||
|
||||
return ( response )
|
||||
|
||||
def handle_stage_start_request(self, data: Dict) -> Dict:
|
||||
def handle_stage_start_request(self, data: bytes) -> str:
|
||||
return ( f'' )
|
||||
|
||||
def handle_stage_result_request(self, data: Dict) -> Dict:
|
||||
def handle_stage_result_request(self, data: bytes) -> str:
|
||||
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
|
||||
@ -665,7 +649,7 @@ class DivaBase():
|
||||
|
||||
return ( response )
|
||||
|
||||
def handle_end_request(self, data: Dict) -> Dict:
|
||||
def handle_end_request(self, data: bytes) -> str:
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
|
||||
self.data.profile.update_profile(
|
||||
@ -675,7 +659,7 @@ class DivaBase():
|
||||
)
|
||||
return (f'')
|
||||
|
||||
def handle_shop_exit_request(self, data: Dict) -> Dict:
|
||||
def handle_shop_exit_request(self, data: bytes) -> str:
|
||||
self.data.item.put_shop(data["pd_id"], self.version, data["mdl_eqp_cmn_ary"], data["c_itm_eqp_cmn_ary"], data["ms_itm_flg_cmn_ary"])
|
||||
if int(data["use_pv_mdl_eqp"]) == 1:
|
||||
self.data.pv_customize.put_pv_customize(data["pd_id"], self.version, data["ply_pv_id"],
|
||||
|
2
titles/diva/handlers/__init__.py
Normal file
2
titles/diva/handlers/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
from titles.diva.handlers.base import *
|
||||
from titles.diva.handlers.user import *
|
@ -1,4 +1,6 @@
|
||||
from urllib import parse
|
||||
from datetime import datetime
|
||||
from typing import Union
|
||||
|
||||
class DivaRequestParseException(Exception):
|
||||
"""
|
||||
@ -9,17 +11,72 @@ class DivaRequestParseException(Exception):
|
||||
self.message = message
|
||||
super().__init__(self.message)
|
||||
|
||||
class BaseBinaryRequest:
|
||||
cmd: str
|
||||
req_id: str
|
||||
def __init__(self, raw: bytes) -> None:
|
||||
self.raw = raw
|
||||
self.raw_dict = dict(parse.parse_qsl(raw))
|
||||
|
||||
if 'cmd' not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
|
||||
|
||||
if 'req_id' not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"req_id not in request data {self.raw_dict}")
|
||||
|
||||
for k, v in self.raw_dict:
|
||||
setattr(self, k, v)
|
||||
|
||||
class BaseRequest:
|
||||
def __init__(self, raw: str) -> None:
|
||||
self.raw = dict(parse.parse_qsl(raw))
|
||||
cmd: str
|
||||
req_id: str
|
||||
game_id: str
|
||||
r_rev: str
|
||||
kc_serial: str
|
||||
b_serial: str
|
||||
country_code: str
|
||||
def __init__(self, raw: Union[str, bytes]) -> None:
|
||||
self.raw = raw
|
||||
self.raw_dict = dict(parse.parse_qsl(raw))
|
||||
|
||||
if 'cmd' not in self.raw:
|
||||
raise DivaRequestParseException(f"cmd not in request data {self.raw}")
|
||||
self.cmd = raw['cmd']
|
||||
if 'cmd' not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
|
||||
|
||||
if 'req_id' not in self.raw:
|
||||
raise DivaRequestParseException(f"req_id not in request data {self.raw}")
|
||||
self.req_id = raw['req_id']
|
||||
if 'req_id' not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"req_id not in request data {self.raw_dict}")
|
||||
|
||||
if 'place_id' not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"place_id not in request data {self.raw_dict}")
|
||||
|
||||
if 'start_up_mode' not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"start_up_mode not in request data {self.raw_dict}")
|
||||
|
||||
if 'cmm_dly_mod' not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"cmm_dly_mod not in request data {self.raw_dict}")
|
||||
|
||||
if 'cmm_dly_sec' not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"cmm_dly_sec not in request data {self.raw_dict}")
|
||||
|
||||
if 'cmm_err_mod' not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"cmm_err_mod not in request data {self.raw_dict}")
|
||||
|
||||
if 'region_code' not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"region_code not in request data {self.raw_dict}")
|
||||
|
||||
if 'time_stamp' not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"time_stamp not in request data {self.raw_dict}")
|
||||
|
||||
for k, v in self.raw_dict:
|
||||
setattr(self, k, v)
|
||||
|
||||
self.place_id = int(self.place_id)
|
||||
self.start_up_mode = int(self.start_up_mode)
|
||||
self.cmm_dly_mod = int(self.cmm_dly_mod)
|
||||
self.cmm_dly_sec = int(self.cmm_dly_sec)
|
||||
self.cmm_err_mod = int(self.cmm_err_mod)
|
||||
self.region_code = int(self.region_code)
|
||||
# datetime.now().astimezone().replace(microsecond=0).isoformat()
|
||||
self.time_stamp = datetime.strptime(self.time_stamp, "%Y-%m-%dT%H:%M:%S%z")
|
||||
|
||||
class BaseResponse:
|
||||
def __init__(self, cmd_id: str, req_id: int) -> None:
|
||||
@ -31,8 +88,12 @@ class BaseResponse:
|
||||
all_vars = vars(self)
|
||||
all_vars.pop('cmd')
|
||||
all_vars.pop('req_id')
|
||||
all_vars.pop('stat')
|
||||
|
||||
ret = f"cmd={self.cmd}&req_id={self.req_id}"
|
||||
ret += parse.urlencode(all_vars, doseq=True, quote_via=parse.quote, safe=",")
|
||||
ret = f"cmd={self.cmd}&req_id={self.req_id}&stat={self.stat}"
|
||||
additional_params = parse.urlencode(all_vars, doseq=True, quote_via=parse.quote, safe=",")
|
||||
|
||||
if additional_params != '':
|
||||
ret += f"&{additional_params}"
|
||||
|
||||
return ret
|
||||
|
60
titles/diva/handlers/user.py
Normal file
60
titles/diva/handlers/user.py
Normal file
@ -0,0 +1,60 @@
|
||||
from titles.diva.handlers.base import BaseRequest, BaseResponse, DivaRequestParseException
|
||||
|
||||
class PreStartRequest(BaseRequest):
|
||||
pmm: str
|
||||
idm: str
|
||||
mmgameid: str
|
||||
mmuid: str
|
||||
a_code: str
|
||||
aime_id: str
|
||||
aime_a_code: str
|
||||
def __init__(self, raw: str) -> None:
|
||||
super().__init__(raw)
|
||||
try:
|
||||
self.key_obj_type = int(self.key_obj_type)
|
||||
self.exec_vu = int(self.exec_vu)
|
||||
|
||||
except AttributeError as e:
|
||||
raise DivaRequestParseException(f"PreStartRequest: {e}")
|
||||
|
||||
class PreStartResponse(BaseResponse):
|
||||
player_name: str
|
||||
sort_kind: str
|
||||
lv_efct_id: str
|
||||
lv_plt_id: str
|
||||
lv_str: str
|
||||
lv_num: str
|
||||
lv_pnt: str
|
||||
vcld_pts: str
|
||||
skn_eqp: str
|
||||
btn_se_eqp: str
|
||||
sld_se_eqp: str
|
||||
chn_sld_se_eqp: str
|
||||
sldr_tch_se_eqp: str
|
||||
passwd_stat: str
|
||||
mdl_eqp_tm: str
|
||||
def __init__(self, cmd_id: str, req_id: int, pd_id: int) -> None:
|
||||
super().__init__(cmd_id, req_id)
|
||||
self.ps_result = 1
|
||||
self.pd_id = pd_id
|
||||
self.accept_idx = 100
|
||||
self.nblss_ltt_stts = -1
|
||||
self.nblss_ltt_tckt = -1
|
||||
self.nblss_ltt_is_opn = -1
|
||||
# Ideally this would be a real array that would get converted later
|
||||
# But this is how it's stored in the db, so w/e for now
|
||||
self.mdl_eqp_ary = "-999,-999,-999"
|
||||
|
||||
class StartRequest(BaseRequest):
|
||||
def __init__(self, raw: str) -> None:
|
||||
super().__init__(raw)
|
||||
try:
|
||||
self.pd_id = int(self.pd_id)
|
||||
self.accept_idx = int(self.accept_idx)
|
||||
|
||||
except AttributeError as e:
|
||||
raise DivaRequestParseException(f"StartRequest: {e}")
|
||||
|
||||
class StartResponse(BaseResponse):
|
||||
def __init__(self, cmd_id: str, req_id: int) -> None:
|
||||
super().__init__(cmd_id, req_id)
|
@ -4,9 +4,9 @@ import logging, coloredlogs
|
||||
from logging.handlers import TimedRotatingFileHandler
|
||||
import zlib
|
||||
import json
|
||||
import urllib.parse
|
||||
import base64
|
||||
|
||||
from titles.diva.handlers.base import *
|
||||
from core.config import CoreConfig
|
||||
from titles.diva.config import DivaConfig
|
||||
from titles.diva.base import DivaBase
|
||||
@ -39,67 +39,39 @@ class DivaServlet():
|
||||
def render_POST(self, req: Request, version: int, url_path: str) -> bytes:
|
||||
req_raw = req.content.getvalue()
|
||||
url_header = req.getAllHeaders()
|
||||
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
|
||||
|
||||
#Ping Dispatch
|
||||
if "THIS_STRING_SEPARATES"in str(url_header):
|
||||
if "THIS_STRING_SEPARATES" in str(url_header):
|
||||
binary_request = req_raw.splitlines()
|
||||
binary_cmd_decoded = binary_request[3].decode("utf-8")
|
||||
binary_array = binary_cmd_decoded.split('&')
|
||||
self.logger.debug(f"Binary request {binary_request}")
|
||||
|
||||
bin_req_data = {}
|
||||
req_cls = BaseBinaryRequest(binary_cmd_decoded)
|
||||
|
||||
for kvp in binary_array:
|
||||
split_bin = kvp.split("=")
|
||||
bin_req_data[split_bin[0]] = split_bin[1]
|
||||
|
||||
self.logger.info(f"Binary {bin_req_data['cmd']} Request")
|
||||
self.logger.debug(bin_req_data)
|
||||
|
||||
handler = getattr(self.base, f"handle_{bin_req_data['cmd']}_request")
|
||||
resp = handler(bin_req_data)
|
||||
|
||||
self.logger.debug(f"Response cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}")
|
||||
return f"cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}".encode('utf-8')
|
||||
|
||||
#Main Dispatch
|
||||
else:
|
||||
json_string = json.dumps(req_raw.decode("utf-8")) #Take the response and decode as UTF-8 and dump
|
||||
b64string = json_string.replace(r'\n', '\n') # Remove all \n and separate them as new lines
|
||||
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
|
||||
|
||||
try:
|
||||
url_data = zlib.decompress( gz_string ).decode("utf-8") # Decompressing the gzip
|
||||
url_data = zlib.decompress( gz_string ).decode("utf-8", errors="replace") # Decompressing the gzip
|
||||
except zlib.error as e:
|
||||
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
|
||||
return "stat=0"
|
||||
|
||||
req_kvp = urllib.parse.unquote(url_data)
|
||||
req_data = {}
|
||||
|
||||
# We then need to split each parts with & so we can reuse them to fill out the requests
|
||||
splitted_request = str.split(req_kvp, "&")
|
||||
for kvp in splitted_request:
|
||||
split = kvp.split("=")
|
||||
req_data[split[0]] = split[1]
|
||||
|
||||
self.logger.info(f"{req_data['cmd']} Request")
|
||||
self.logger.debug(req_data)
|
||||
|
||||
func_to_find = f"handle_{req_data['cmd']}_request"
|
||||
|
||||
# Load the requests
|
||||
try:
|
||||
handler = getattr(self.base, func_to_find)
|
||||
resp = handler(req_data)
|
||||
req_cls = BaseRequest(url_data)
|
||||
except DivaRequestParseException as e:
|
||||
self.logger.error(e)
|
||||
return b""
|
||||
|
||||
except AttributeError as e:
|
||||
self.logger.warning(f"Unhandled {req_data['cmd']} request {e}")
|
||||
return f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok".encode('utf-8')
|
||||
self.logger.debug(f"Request: {req_cls.raw_dict}\nHeaders: {url_header}")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error handling method {func_to_find} {e}")
|
||||
return f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok".encode('utf-8')
|
||||
handler_str = f"handle_{req_cls.cmd}_request"
|
||||
|
||||
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
|
||||
self.logger.debug(f"Response cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}")
|
||||
if not hasattr(self.base, handler_str):
|
||||
self.logger.warn(f"Unhandled cmd {req_cls.cmd}")
|
||||
return BaseResponse(req_cls.cmd, req_cls.req_id).make().encode()
|
||||
|
||||
return f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}".encode('utf-8')
|
||||
handler = getattr(self.base, handler_str)
|
||||
return handler(req_cls.raw).encode(errors="ignore")
|
||||
|
Loading…
Reference in New Issue
Block a user