Compare commits

...

33 Commits

Author SHA1 Message Date
257b0dae87 diva: fix render_POST 2024-01-12 12:18:23 -05:00
c6f2f7e0ef Merge branch 'develop' into diva_handler_classes 2024-01-12 12:15:28 -05:00
6b158bf18d diva: fix StageResultResponse 2023-12-06 18:24:20 -05:00
1a616cba41 diva: change how requests are decoded 2023-11-26 20:43:10 -05:00
d2d02f9483 diva: add list decoders 2023-11-26 19:24:36 -05:00
024bc352d7 diva: fix attend response 2023-11-26 02:43:49 -05:00
4831aa5628 diva: add encoder helper methods 2023-11-26 01:41:25 -05:00
dd1c7667e0 diva: fix ip logging 2023-11-20 11:29:10 -05:00
2ec1dcadef Merge branch 'develop' into diva_handler_classes 2023-11-20 10:49:48 -05:00
81c13f5008 diva: fix typing 2023-11-08 21:23:54 -05:00
572b8ddbe5 Merge branch 'develop' into diva_handler_classes 2023-11-08 21:22:24 -05:00
2b02ed8684 diva: add stage_result, end handlers 2023-10-05 23:47:50 -04:00
b2a01d20d5 diva: add configurable banner_msg 2023-10-05 12:29:02 -04:00
5e03193819 diva: fix start, spend_credit, and get_pv_pd requests 2023-10-04 23:58:26 -04:00
f836b5dd21 diva: fix start request 2023-10-04 23:25:10 -04:00
53c74c6511 diva: add register, fixes, start still busted 2023-10-04 02:18:04 -04:00
7322bc60dc Merge branch 'develop' into diva_handler_classes 2023-10-04 00:53:10 -04:00
c2ee09bce0 diva: integrate latest changes 2023-05-02 11:41:35 -04:00
dfea392b03 Merge branch 'develop' into diva_handler_classes 2023-05-02 11:36:14 -04:00
998aa70929 diva: fix for responses that haven't been updated yet 2023-04-01 23:45:28 -04:00
8df1325613 diva: implement StartResponse 2023-04-01 23:19:44 -04:00
6ff7827918 diva: fill out StartResponse 2023-04-01 23:05:10 -04:00
a36170f2c3 Merge branch 'develop' into diva_handler_classes 2023-04-01 22:23:13 -04:00
48144b53f0 fix attend 2023-03-17 01:35:51 -04:00
3b852ea739 Merge branch 'develop' into diva_handler_classes 2023-03-16 22:32:18 -04:00
21c9b23617 diva: add game_init and attend handlers 2023-03-13 04:04:08 -04:00
3076bdf575 Merge branch 'develop' into diva_handler_classes 2023-03-13 02:27:12 -04:00
5be25f89ff format with black 2023-03-09 11:50:11 -05:00
5443d24352 Merge branch 'develop' into diva_handler_classes 2023-03-09 11:49:51 -05:00
Hay1tsme
56927c049f clean up index.py 2023-02-26 11:40:13 -05:00
Hay1tsme
2b81ba206c remove unused code from index.py 2023-02-25 23:40:50 -05:00
Hay1tsme
9718e822f3 add commas to urlencode safe param 2023-02-25 19:12:48 -05:00
Hay1tsme
b07e3d6a94 add base request/response classes 2023-02-25 18:50:35 -05:00
9 changed files with 794 additions and 277 deletions

View File

@ -1,6 +1,7 @@
server:
enable: True
loglevel: "info"
banner_msg: ""
mods:
unlock_all_modules: True

View File

@ -8,6 +8,7 @@ from core.config import CoreConfig
from titles.diva.config import DivaConfig
from titles.diva.const import DivaConstants
from titles.diva.database import DivaData
from titles.diva.handlers import *
class DivaBase:
@ -20,8 +21,8 @@ class DivaBase:
self.game = DivaConstants.GAME_CODE
self.version = DivaConstants.VER_PROJECT_DIVA_ARCADE_FUTURE_TONE
dt = datetime.datetime.now()
self.time_lut = urllib.parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0"))
dt = datetime.now()
self.time_lut = parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0"))
async def handle_test_request(self, data: Dict) -> Dict:
return ""
@ -29,24 +30,32 @@ class DivaBase:
async def handle_game_init_request(self, data: Dict) -> Dict:
return f""
async def handle_attend_request(self, data: Dict) -> Dict:
async def handle_attend_request(self, data: bytes) -> str:
req = AttendRequest(data)
resp = AttendResponse(req.req_id)
for i in [0, 3, 4, 5, 7, 9, 10, 11, 12, 13]:
resp.atnd_prm1[i] = 0
resp.atnd_prm1[8] = 100
resp.atnd_prm2[:6] = [30, 10, 100, 4, 1, 50]
resp.atnd_prm3[0] = 100
resp.atnd_prm3[1] = 0
resp.atnd_prm3[10:13] = [2, 3, 4]
resp.atnd_prm3[16:19] = [3, 4, 5]
resp.atnd_prm3[22:25] = [4, 5, 6]
resp.atnd_prm3[80:] = [0] * 20
resp.atnd_prm3[28:79] = [5,6,7,4,4,4,9,10,14,5,10,10,25,20,50,30,90,5,
10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,
5,10,10,25,20,50,30,90,10,30]
return resp.make()
async def handle_ping_request(self, data: bytes) -> str:
encoded = "&"
params = {
"atnd_prm1": "0,1,1,0,0,0,1,0,100,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1",
"atnd_prm2": "30,10,100,4,1,50,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1",
"atnd_prm3": "100,0,1,1,1,1,1,1,1,1,2,3,4,1,1,1,3,4,5,1,1,1,4,5,6,1,1,1,5,6,7,4,4,4,9,10,14,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,10,30,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",
"atnd_lut": f"{self.time_lut}",
}
encoded += urllib.parse.urlencode(params)
encoded = encoded.replace("%2C", ",")
return encoded
async def handle_ping_request(self, data: Dict) -> Dict:
encoded = "&"
params = {
"ping_b_msg": f"Welcome to {self.core_cfg.server.name} network!",
"ping_b_msg": f"Welcome to {self.core_cfg.server.name} network!" if not self.game_config.server.banner_msg else self.game_config.server.banner_msg,
"ping_m_msg": "xxx",
"atnd_lut": f"{self.time_lut}",
"fi_lut": f"{self.time_lut}",
@ -82,9 +91,8 @@ class DivaBase:
"nblss_ltt_ed_tm": "2019-09-22 12:00:00.0",
}
encoded += urllib.parse.urlencode(params)
encoded += parse.urlencode(params, safe=",")
encoded = encoded.replace("+", "%20")
encoded = encoded.replace("%2C", ",")
return encoded
@ -133,8 +141,8 @@ class DivaBase:
with open(r"titles/diva/data/ShopCatalog.dat", encoding="utf-8") as shop:
lines = shop.readlines()
for line in lines:
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
else:
for shop in shopList:
@ -153,8 +161,8 @@ class DivaBase:
+ ","
+ str(shop["type"])
)
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
catalog = catalog.replace("+", "%20")
@ -198,8 +206,8 @@ class DivaBase:
with open(r"titles/diva/data/ItemCatalog.dat", encoding="utf-8") as item:
lines = item.readlines()
for line in lines:
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
else:
for item in itemList:
@ -218,8 +226,8 @@ class DivaBase:
+ ","
+ str(item["type"])
)
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
catalog = catalog.replace("+", "%20")
@ -276,11 +284,11 @@ class DivaBase:
"fi_add_vp": "20,5",
"fi_mul_vp": "1,2",
"fi_st": "2019-01-01 00:00:00.0,2019-01-01 00:00:00.0",
"fi_et": "2029-01-01 00:00:00.0,2029-01-01 00:00:00.0",
"fi_lut": "{self.time_lut}",
"fi_et": "2029-01-01 00:00:00.0,2029-01-01 00:00:00.0", # TODO: make this last longer
"fi_lut": f"{self.time_lut}",
}
encoded += urllib.parse.urlencode(params)
encoded += parse.urlencode(params)
encoded = encoded.replace("+", "%20")
encoded = encoded.replace("%2C", ",")
@ -302,7 +310,7 @@ class DivaBase:
with open(r"titles/diva/data/QuestInfo.dat", encoding="utf-8") as shop:
lines = shop.readlines()
for line in lines:
quest += f"{urllib.parse.quote(line)},"
quest += f"{parse.quote(line)},"
response = ""
response += f"&qi_lut={self.time_lut}"
@ -330,7 +338,7 @@ class DivaBase:
+ ","
+ str(quests["quest_enable"])
)
quest += f"{urllib.parse.quote(line)}%0A,"
quest += f"{parse.quote(line)}%0A,"
responseline = f"{quest[:-1]},"
for i in range(len(questList), 59):
@ -380,130 +388,72 @@ class DivaBase:
async def handle_pstd_item_ng_lst_request(self, data: Dict) -> Dict:
return f""
async def handle_pre_start_request(self, data: Dict) -> str:
profile = await self.data.profile.get_profile(data["aime_id"], self.version)
profile_shop = await self.data.item.get_shop(data["aime_id"], self.version)
async def handle_pre_start_request(self, data: bytes) -> str:
req = PreStartRequest(data)
resp = PreStartResponse(req.req_id, req.aime_id)
profile = await self.data.profile.get_profile(req.aime_id, self.version)
profile_shop = await self.data.item.get_shop(req.aime_id, self.version)
if profile is None:
return f"&ps_result=-3"
else:
response = "&ps_result=1"
response += "&accept_idx=100"
response += "&nblss_ltt_stts=-1"
response += "&nblss_ltt_tckt=-1"
response += "&nblss_ltt_is_opn=-1"
response += f"&pd_id={data['aime_id']}"
response += f"&player_name={profile['player_name']}"
response += f"&sort_kind={profile['player_name']}"
response += f"&lv_efct_id={profile['lv_efct_id']}"
response += f"&lv_plt_id={profile['lv_plt_id']}"
response += f"&lv_str={profile['lv_str']}"
response += f"&lv_num={profile['lv_num']}"
response += f"&lv_pnt={profile['lv_pnt']}"
response += f"&vcld_pts={profile['vcld_pts']}"
response += f"&skn_eqp={profile['skn_eqp']}"
response += f"&btn_se_eqp={profile['btn_se_eqp']}"
response += f"&sld_se_eqp={profile['sld_se_eqp']}"
response += f"&chn_sld_se_eqp={profile['chn_sld_se_eqp']}"
response += f"&sldr_tch_se_eqp={profile['sldr_tch_se_eqp']}"
response += f"&passwd_stat={profile['passwd_stat']}"
# Store stuff to add to rework
response += f"&mdl_eqp_tm={self.time_lut}"
profile_dict = profile._asdict()
profile_dict.pop("id")
profile_dict.pop("user")
profile_dict.pop("version")
mdl_eqp_ary = "-999,-999,-999"
for k, v in profile_dict.items():
if hasattr(resp, k):
setattr(resp, k, v)
# get the common_modules from the profile shop
if profile_shop:
mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
if profile_shop is not None and profile_shop:
resp.mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
response += f"&mdl_eqp_ary={mdl_eqp_ary}"
return resp.make()
return response
async def handle_registration_request(self, data: Dict) -> Dict:
await self.data.profile.create_profile(
self.version, data["aime_id"], data["player_name"]
async def handle_registration_request(self, data: bytes) -> str:
req = RegisterRequest(data)
pd_id = await self.data.profile.create_profile(
self.version, req.aime_id, req.player_name
)
return f"&cd_adm_result=1&pd_id={data['aime_id']}"
if pd_id is None:
return "&cd_adm_result=-1"
return RegisterResponse(req.req_id, req.aime_id).make()
async def handle_start_request(self, data: Dict) -> Dict:
profile = await self.data.profile.get_profile(data["pd_id"], self.version)
profile_shop = await self.data.item.get_shop(data["pd_id"], self.version)
async def handle_start_request(self, data: bytes) -> str:
req = StartRequest(data)
profile = await self.data.profile.get_profile(req.pd_id, self.version)
profile_shop = await self.data.item.get_shop(req.pd_id, self.version)
if profile is None:
return
resp = StartResponse(req.req_id, req.pd_id, profile['player_name'])
profile_dict = profile._asdict()
profile_dict.pop("id")
profile_dict.pop("user")
profile_dict.pop("version")
for k, v in profile_dict.items():
if hasattr(resp, k):
setattr(resp, k, v)
mdl_have = "F" * 250
# generate the mdl_have string if "unlock_all_modules" is disabled
if not self.game_config.mods.unlock_all_modules:
mdl_have = await self.data.module.get_modules_have_string(
data["pd_id"], self.version
resp.mdl_have = await self.data.module.get_modules_have_string(
req.pd_id, self.version
)
cstmz_itm_have = "F" * 250
# generate the cstmz_itm_have string if "unlock_all_items" is disabled
if not self.game_config.mods.unlock_all_items:
cstmz_itm_have = await self.data.customize.get_customize_items_have_string(
data["pd_id"], self.version
resp.cstmz_itm_have = await self.data.customize.get_customize_items_have_string(
req.pd_id, self.version
)
response = f"&pd_id={data['pd_id']}"
response += "&start_result=1"
response += "&accept_idx=100"
response += f"&hp_vol={profile['hp_vol']}"
response += f"&btn_se_vol={profile['btn_se_vol']}"
response += f"&btn_se_vol2={profile['btn_se_vol2']}"
response += f"&sldr_se_vol2={profile['sldr_se_vol2']}"
response += f"&sort_kind={profile['sort_kind']}"
response += f"&player_name={profile['player_name']}"
response += f"&lv_num={profile['lv_num']}"
response += f"&lv_pnt={profile['lv_pnt']}"
response += f"&lv_efct_id={profile['lv_efct_id']}"
response += f"&lv_plt_id={profile['lv_plt_id']}"
response += f"&mdl_have={mdl_have}"
response += f"&cstmz_itm_have={cstmz_itm_have}"
response += f"&use_pv_mdl_eqp={int(profile['use_pv_mdl_eqp'])}"
response += f"&use_mdl_pri={int(profile['use_mdl_pri'])}"
response += f"&use_pv_skn_eqp={int(profile['use_pv_skn_eqp'])}"
response += f"&use_pv_btn_se_eqp={int(profile['use_pv_btn_se_eqp'])}"
response += f"&use_pv_sld_se_eqp={int(profile['use_pv_sld_se_eqp'])}"
response += f"&use_pv_chn_sld_se_eqp={int(profile['use_pv_chn_sld_se_eqp'])}"
response += f"&use_pv_sldr_tch_se_eqp={int(profile['use_pv_sldr_tch_se_eqp'])}"
response += f"&vcld_pts={profile['lv_efct_id']}"
response += f"&nxt_pv_id={profile['nxt_pv_id']}"
response += f"&nxt_dffclty={profile['nxt_dffclty']}"
response += f"&nxt_edtn={profile['nxt_edtn']}"
response += f"&dsp_clr_brdr={profile['dsp_clr_brdr']}"
response += f"&dsp_intrm_rnk={profile['dsp_intrm_rnk']}"
response += f"&dsp_clr_sts={profile['dsp_clr_sts']}"
response += f"&rgo_sts={profile['rgo_sts']}"
# Contest progress
response += f"&cv_cid=-1,-1,-1,-1"
response += f"&cv_sc=-1,-1,-1,-1"
response += f"&cv_bv=-1,-1,-1,-1"
response += f"&cv_bv=-1,-1,-1,-1"
response += f"&cv_bf=-1,-1,-1,-1"
# Contest now playing id, return -1 if no current playing contest
response += f"&cnp_cid={profile['cnp_cid']}"
response += f"&cnp_val={profile['cnp_val']}"
# border can be 0=bronzem 1=silver, 2=gold
response += f"&cnp_rr={profile['cnp_rr']}"
# only show contest specifier if it is not empty
response += f"&cnp_sp={profile['cnp_sp']}" if profile["cnp_sp"] != "" else ""
# To be fully fixed
if "my_qst_id" not in profile:
response += f"&my_qst_id=-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
response += f"&my_qst_sts=0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
else:
response += f"&my_qst_id={profile['my_qst_id']}"
response += f"&my_qst_sts={profile['my_qst_sts']}"
response += f"&my_qst_prgrs=0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
response += f"&my_qst_et=2022-06-19%2010%3A28%3A52.0,2022-06-19%2010%3A28%3A52.0,2022-06-19%2010%3A28%3A52.0,2100-01-01%2008%3A59%3A59.0,2100-01-01%2008%3A59%3A59.0,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx"
if "my_qst_id" in profile:
resp.my_qst_id = profile['my_qst_id']
resp.my_qst_sts = profile['my_qst_sts']
# define a helper class to store all counts for clear, great,
# excellent and perfect
@ -524,7 +474,7 @@ class DivaBase:
}
# get clear status from user scores
pv_records = await self.data.score.get_best_scores(data["pd_id"])
pv_records = await self.data.score.get_best_scores(req.pd_id)
clear_status = "0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
if pv_records is not None:
@ -560,46 +510,33 @@ class DivaBase:
clear_status = ",".join(map(str, clear_list))
response += f"&clr_sts={clear_status}"
# Store stuff to add to rework
response += f"&mdl_eqp_tm={self.time_lut}"
mdl_eqp_ary = "-999,-999,-999"
c_itm_eqp_ary = "-999,-999,-999,-999,-999,-999,-999,-999,-999,-999,-999,-999"
ms_itm_flg_ary = "1,1,1,1,1,1,1,1,1,1,1,1"
resp.clr_sts = clear_status
# get the common_modules, customize_items and customize_item_flags
# from the profile shop
if profile_shop:
mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
c_itm_eqp_ary = profile_shop["c_itm_eqp_ary"]
ms_itm_flg_ary = profile_shop["ms_itm_flg_ary"]
resp.mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
resp.c_itm_eqp_ary = profile_shop["c_itm_eqp_ary"]
resp.ms_itm_flg_ary = profile_shop["ms_itm_flg_ary"]
response += f"&mdl_eqp_ary={mdl_eqp_ary}"
response += f"&c_itm_eqp_ary={c_itm_eqp_ary}"
response += f"&ms_itm_flg_ary={ms_itm_flg_ary}"
return resp.make()
return response
async def handle_pd_unlock_request(self, data: bytes) -> str:
pass
async def handle_pd_unlock_request(self, data: Dict) -> Dict:
return f""
async def handle_spend_credit_request(self, data: Dict) -> Dict:
profile = await self.data.profile.get_profile(data["pd_id"], self.version)
async def handle_spend_credit_request(self, data: bytes) -> str:
req = SpendCreditRequest(data)
profile = await self.data.profile.get_profile(req.pd_id, self.version)
if profile is None:
return
response = ""
resp = SpendCreditResponse(req.req_id)
resp.vcld_pts = profile['vcld_pts']
resp.lv_str = profile['lv_str']
resp.lv_efct_id = profile['lv_efct_id']
resp.lv_plt_id = profile['lv_plt_id']
response += "&cmpgn_rslt=-1,-1,x,-1,-1,x,x,-1,x,-1,-1,x,-1,-1,x,x,-1,x,-1,-1,x,-1,-1,x,x,-1,x,-1,-1,x,-1,-1,x,x,-1,x,-1,-1,x,-1,-1,x,x,-1,x,-1,-1,x,-1,-1,x,x,-1,x"
response += "&cmpgn_rslt_num=0"
response += f"&vcld_pts={profile['vcld_pts']}"
response += f"&lv_str={profile['lv_str']}"
response += f"&lv_efct_id={profile['lv_efct_id']}"
response += f"&lv_plt_id={profile['lv_plt_id']}"
return response
return resp.make()
def _get_pv_pd_result(
self,
@ -663,31 +600,31 @@ class DivaBase:
return pv_result
async def task_generateScoreData(self, data: Dict, pd_by_pv_id, song):
async def task_generateScoreData(self, pd_id: int, difficulty: int, pd_by_pv_id: str, song: int):
if int(song) > 0:
# the request do not send a edition so just perform a query best score and ranking for each edition.
# 0=ORIGINAL, 1=EXTRA
pd_db_song_0 = await self.data.score.get_best_user_score(
data["pd_id"], int(song), data["difficulty"], edition=0
pd_id, int(song), difficulty, edition=0
)
pd_db_song_1 = await self.data.score.get_best_user_score(
data["pd_id"], int(song), data["difficulty"], edition=1
pd_id, int(song), difficulty, edition=1
)
pd_db_ranking_0, pd_db_ranking_1 = None, None
if pd_db_song_0:
pd_db_ranking_0 = await self.data.score.get_global_ranking(
data["pd_id"], int(song), data["difficulty"], edition=0
pd_id, int(song), difficulty, edition=0
)
if pd_db_song_1:
pd_db_ranking_1 = await self.data.score.get_global_ranking(
data["pd_id"], int(song), data["difficulty"], edition=1
pd_id, int(song), difficulty, edition=1
)
pd_db_customize = await self.data.pv_customize.get_pv_customize(
data["pd_id"], int(song)
pd_id, int(song)
)
# generate the pv_result string with the ORIGINAL edition and the EXTRA edition appended
@ -705,14 +642,14 @@ class DivaBase:
pd_by_pv_id.append(",")
async def handle_get_pv_pd_request(self, data: Dict) -> Dict:
song_id = data["pd_pv_id_lst"].split(",")
req = GetPvPdRequest(data)
pv = ""
threads = []
pd_by_pv_id = []
for song in song_id:
thread_ScoreData = Thread(target=await self.task_generateScoreData(data, pd_by_pv_id, song))
for song in req.pd_pv_id_lst:
thread_ScoreData = Thread(target=await self.task_generateScoreData(req.pd_id, req.difficulty, pd_by_pv_id, song))
threads.append(thread_ScoreData)
for x in threads:
@ -724,44 +661,48 @@ class DivaBase:
for x in pd_by_pv_id:
pv += x
resp = GetPvPdResponse(req.req_id)
resp.pd_by_pv_id = pv[:-1]
response = ""
response += f"&pd_by_pv_id={pv[:-1]}"
response += "&pdddt_flg=0"
response += f"&pdddt_tm={self.time_lut}"
return response
return resp.make()
async def handle_stage_start_request(self, data: Dict) -> Dict:
return f""
async def handle_stage_result_request(self, data: Dict) -> Dict:
profile = await self.data.profile.get_profile(data["pd_id"], self.version)
async def handle_stage_result_request(self, data: bytes) -> str:
req = StageResultRequest(data)
resp = StageResultResponse(req.cmd, req.req_id)
profile = await self.data.profile.get_profile(req.pd_id, self.version)
pd_song_list = data["stg_ply_pv_id"].split(",")
pd_song_difficulty = data["stg_difficulty"].split(",")
pd_song_edition = data["stg_edtn"].split(",")
pd_song_max_score = data["stg_score"].split(",")
pd_song_max_atn_pnt = data["stg_atn_pnt"].split(",")
pd_song_ranking = data["stg_clr_kind"].split(",")
pd_song_sort_kind = data["sort_kind"]
pd_song_cool_cnt = data["stg_cool_cnt"].split(",")
pd_song_fine_cnt = data["stg_fine_cnt"].split(",")
pd_song_safe_cnt = data["stg_safe_cnt"].split(",")
pd_song_sad_cnt = data["stg_sad_cnt"].split(",")
pd_song_worst_cnt = data["stg_wt_wg_cnt"].split(",")
pd_song_max_combo = data["stg_max_cmb"].split(",")
pd_song_list = req.stg_ply_pv_id
pd_song_difficulty = req.stg_difficulty
pd_song_edition = req.stg_edtn
pd_song_max_score = req.stg_score
pd_song_max_atn_pnt = req.stg_atn_pnt
pd_song_ranking = req.stg_clr_kind
pd_song_sort_kind = req.sort_kind
pd_song_cool_cnt = req.stg_cool_cnt
pd_song_fine_cnt = req.stg_fine_cnt
pd_song_safe_cnt = req.stg_safe_cnt
pd_song_sad_cnt = req.stg_sad_cnt
pd_song_worst_cnt = req.stg_wt_wg_cnt
pd_song_max_combo = req.stg_max_cmb
for index, value in enumerate(pd_song_list):
if "-1" not in pd_song_list[index]:
if pd_song_list[index] > 0:
profile_pd_db_song = await self.data.score.get_best_user_score(
data["pd_id"],
req.pd_id,
pd_song_list[index],
pd_song_difficulty[index],
pd_song_edition[index],
)
if profile_pd_db_song is None:
await self.data.score.put_best_score(
data["pd_id"],
req.pd_id,
self.version,
pd_song_list[index],
pd_song_difficulty[index],
@ -778,7 +719,7 @@ class DivaBase:
pd_song_max_combo[index],
)
await self.data.score.put_playlog(
data["pd_id"],
req.pd_id,
self.version,
pd_song_list[index],
pd_song_difficulty[index],
@ -796,7 +737,7 @@ class DivaBase:
)
elif int(pd_song_max_score[index]) >= int(profile_pd_db_song["score"]):
await self.data.score.put_best_score(
data["pd_id"],
req.pd_id,
self.version,
pd_song_list[index],
pd_song_difficulty[index],
@ -813,7 +754,7 @@ class DivaBase:
pd_song_max_combo[index],
)
await self.data.score.put_playlog(
data["pd_id"],
req.pd_id,
self.version,
pd_song_list[index],
pd_song_difficulty[index],
@ -831,7 +772,7 @@ class DivaBase:
)
elif int(pd_song_max_score[index]) != int(profile_pd_db_song["score"]):
await self.data.score.put_playlog(
data["pd_id"],
req.pd_id,
self.version,
pd_song_list[index],
pd_song_difficulty[index],
@ -860,6 +801,20 @@ class DivaBase:
new_level = (total_atn_pnt // 13979) + 1
new_level_pnt = round((total_atn_pnt % 13979) / 13979 * 100)
resp.lv_num_old = int(profile['lv_num'])
resp.lv_pnt_old = int(profile['lv_pnt'])
resp.lv_num = new_level
resp.lv_str = profile['lv_str']
resp.lv_pnt = new_level_pnt
resp.lv_efct_id = int(profile['lv_efct_id'])
resp.lv_plt_id = int(profile['lv_plt_id'])
resp.vcld_pts = int(profile['vcld_pts'])
resp.prsnt_vcld_pts = int(profile['vcld_pts'])
if "my_qst_id" not in profile:
quests = profile['my_qst_id'].split(",")
for x in range(len(quests)):
resp.my_qst_id[x] = int(quests[x])
response = "&chllng_kind=-1"
response += f"&lv_num_old={int(profile['lv_num'])}"
response += f"&lv_pnt_old={int(profile['lv_pnt'])}"
@ -869,16 +824,16 @@ class DivaBase:
profile["user"],
lv_num=new_level,
lv_pnt=new_level_pnt,
vcld_pts=int(data["vcld_pts"]),
hp_vol=int(data["hp_vol"]),
btn_se_vol=int(data["btn_se_vol"]),
sldr_se_vol2=int(data["sldr_se_vol2"]),
sort_kind=int(data["sort_kind"]),
nxt_pv_id=int(data["ply_pv_id"]),
nxt_dffclty=int(data["nxt_dffclty"]),
nxt_edtn=int(data["nxt_edtn"]),
my_qst_id=data["my_qst_id"],
my_qst_sts=data["my_qst_sts"],
vcld_pts=req.vcld_pts,
hp_vol=req.hp_vol,
btn_se_vol=req.btn_se_vol,
sldr_se_vol2=req.sldr_se_vol2,
sort_kind=req.sort_kind,
nxt_pv_id=req.ply_pv_id,
nxt_dffclty=req.nxt_dffclty,
nxt_edtn=req.nxt_edtn,
my_qst_id=req.my_qst_id,
my_qst_sts=req.my_qst_sts,
)
response += f"&lv_num={new_level}"
@ -911,17 +866,18 @@ class DivaBase:
response += "&my_ccd_r_hnd=-1,-1,-1,-1,-1"
response += "&my_ccd_r_vp=-1,-1,-1,-1,-1"
return response
return resp.make()
async def handle_end_request(self, data: Dict) -> Dict:
profile = await self.data.profile.get_profile(data["pd_id"], self.version)
async def handle_end_request(self, data: bytes) -> str:
req = EndRequest(data)
profile = await self.data.profile.get_profile(req.pd_id, self.version)
await self.data.profile.update_profile(
profile["user"], my_qst_id=data["my_qst_id"], my_qst_sts=data["my_qst_sts"]
profile["user"], my_qst_id=req.my_qst_id, my_qst_sts=req.my_qst_sts
)
return f""
return None
async def handle_shop_exit_request(self, data: Dict) -> Dict:
async def handle_shop_exit_request(self, data: bytes) -> str:
await self.data.item.put_shop(
data["pd_id"],
self.version,

View File

@ -18,6 +18,12 @@ class DivaServerConfig:
self.__config, "diva", "server", "loglevel", default="info"
)
)
@property
def banner_msg(self) -> str:
CoreConfig.get_config_field(
self.__config, "diva", "server", "banner_msg", default=""
)
class DivaModsConfig:

View File

@ -6,6 +6,8 @@ class DivaConstants:
VER_PROJECT_DIVA_ARCADE = 0
VER_PROJECT_DIVA_ARCADE_FUTURE_TONE = 1
LUT_TIME_FMT = "%Y-%m-%d %H:%M:%S:16.0"
VERSION_NAMES = ("Project Diva Arcade", "Project Diva Arcade Future Tone")
@classmethod

View File

@ -0,0 +1,3 @@
from .base import *
from .user import *
from .pv import *

View File

@ -0,0 +1,275 @@
from urllib import parse
from urllib.parse import quote
from datetime import datetime
from typing import Union, Dict, List, Any
from ..const import DivaConstants
def lazy_http_form_parse(src: Union[str, bytes]) -> Dict[bytes, bytes]:
out = {}
if type(src) == str:
src = src.encode()
for param in src.split(b"&"):
kvp = param.split(b"=")
out[parse.unquote(kvp[0])] = parse.unquote(kvp[1])
return out
class DivaRequestParseException(Exception):
"""
Exception raised when there is a fault in parsing a diva request,
either due to a malformed request, or missing required items
"""
def __init__(self, message: str) -> None:
self.message = message
super().__init__(self.message)
class BaseBinaryRequest:
cmd: str
req_id: str
def __init__(self, raw: bytes) -> None:
self.raw = raw
self.raw_dict = dict(parse.parse_qsl(raw))
if "cmd" not in self.raw_dict:
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
if "req_id" not in self.raw_dict:
raise DivaRequestParseException(
f"req_id not in request data {self.raw_dict}"
)
for k, v in self.raw_dict.items():
setattr(self, k, v)
class BaseRequest:
def __init__(self, raw: Union[str, bytes]) -> None:
self.raw = raw
try:
self.raw_dict: Dict[str, str] = lazy_http_form_parse(raw)
except UnicodeDecodeError as e:
raise DivaRequestParseException(f"Could not decode data {raw} - {e}")
if "cmd" not in self.raw_dict:
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
if "req_id" not in self.raw_dict:
raise DivaRequestParseException(
f"req_id not in request data {self.raw_dict}"
)
if "place_id" not in self.raw_dict:
raise DivaRequestParseException(
f"place_id not in request data {self.raw_dict}"
)
if "start_up_mode" not in self.raw_dict:
raise DivaRequestParseException(
f"start_up_mode not in request data {self.raw_dict}"
)
if "cmm_dly_mod" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_dly_mod not in request data {self.raw_dict}"
)
if "cmm_dly_sec" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_dly_sec not in request data {self.raw_dict}"
)
if "cmm_err_mod" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_err_mod not in request data {self.raw_dict}"
)
if "region_code" not in self.raw_dict:
raise DivaRequestParseException(
f"region_code not in request data {self.raw_dict}"
)
if "time_stamp" not in self.raw_dict:
raise DivaRequestParseException(
f"time_stamp not in request data {self.raw_dict}"
)
self.cmd: str = self.raw_dict.get('cmd')
self.req_id: str = self.raw_dict.get('req_id')
self.game_id: str = self.raw_dict.get('game_id')
self.r_rev: str = self.raw_dict.get('r_rev')
self.kc_serial: str = self.raw_dict.get('kc_serial')
self.b_serial: str = self.raw_dict.get('b_serial')
self.country_code: str = self.raw_dict.get('country_code')
self.place_id = int(self.raw_dict['place_id'], 16)
self.start_up_mode = int(self.raw_dict.get('start_up_mode'))
self.cmm_dly_mod = int(self.raw_dict.get('cmm_dly_mod'))
self.cmm_dly_sec = int(self.raw_dict.get('cmm_dly_sec'))
self.cmm_err_mod = int(self.raw_dict.get('cmm_err_mod'))
self.region_code = int(self.raw_dict.get('region_code'))
# datetime.now().astimezone().replace(microsecond=0).isoformat()
self.time_stamp = datetime.strptime(self.raw_dict.get('time_stamp'), "%Y-%m-%dT%H:%M:%S%z")
class BaseResponse:
def __init__(self, cmd_id: str, req_id: int) -> None:
self.cmd = cmd_id
self.req_id = req_id
self.stat = "ok"
def make(self) -> str:
itms: List[str] = []
for k, v in vars(self).items():
if type(v) == int:
itms.append(encode_int(k, v))
elif type(v) == bool:
itms.append(encode_bool(k, v))
elif type(v) == datetime:
itms.append(encode_date(k, v))
elif type(v) == list:
itms.append(encode_list(k, v))
else:
itms.append(encode_str(k, v))
return "&".join(itms)
class GameInitRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
class AttendRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
if 'power_on' not in self.raw_dict:
raise DivaRequestParseException(
f"power_on not in request data {self.raw_dict}"
)
if 'is_bb' not in self.raw_dict:
raise DivaRequestParseException(
f"is_bb not in request data {self.raw_dict}"
)
self.power_on = int(self.raw_dict.get('power_on'))
self.is_bb = bool(int(self.raw_dict.get('is_bb')))
class AttendResponse(BaseResponse):
def __init__(self, req_id: int) -> None:
super().__init__("attend", req_id)
self.atnd_prm1 = [1] * 100
self.atnd_prm2 = [1] * 100
self.atnd_prm3 = [1] * 100
self.atnd_lut = datetime.now()
def make(self) -> str:
return quote(super().make(), safe=",&=")
class SpendCreditRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
if 'pd_id' not in self.raw_dict:
raise DivaRequestParseException(
f"pd_id not in request data {self.raw_dict}"
)
if 'my_qst_id' not in self.raw_dict:
raise DivaRequestParseException(
f"my_qst_id not in request data {self.raw_dict}"
)
if 'my_qst_sts' not in self.raw_dict:
raise DivaRequestParseException(
f"my_qst_sts not in request data {self.raw_dict}"
)
if 'crdt_typ' not in self.raw_dict:
raise DivaRequestParseException(
f"crdt_typ not in request data {self.raw_dict}"
)
if 'cmpgn_id' not in self.raw_dict:
raise DivaRequestParseException(
f"cmpgn_id not in request data {self.raw_dict}"
)
if 'cmpgn_pb' not in self.raw_dict:
raise DivaRequestParseException(
f"cmpgn_pb not in request data {self.raw_dict}"
)
self.pd_id = int(self.raw_dict.get('pd_id'))
self.my_qst_id = decode_list_int(self.raw_dict.get('my_qst_id'))
self.my_qst_sts = decode_list_int(self.raw_dict.get('my_qst_sts'))
self.crdt_typ = int(self.raw_dict.get('crdt_typ'))
self.cmpgn_id = decode_list_int(self.raw_dict.get('cmpgn_id'))
self.cmpgn_pb = decode_list_int(self.raw_dict.get('cmpgn_pb'))
class SpendCreditResponse(BaseResponse):
def __init__(self, req_id: int) -> None:
super().__init__("spend_credit", req_id)
self.cmpgn_rslt = ",".join(["-1,-1,x,-1,-1,x,x,-1,x"] * 6)
self.cmpgn_rslt_num = 0
self.vcld_pts = 0
self.lv_str = ""
self.lv_efct_id = 0
self.lv_plt_id = 0
def encode_int(key: str, val: Union[int, None] = None) -> str:
if type(val) != int:
val = -1
return f"{key}={val}"
def encode_bool(key: str, val: Union[bool, None] = None) -> str:
if not val:
return encode_int(key, 0)
return encode_int(key, 1)
def encode_str(key: str, val: Union[str, None] = None, urlencode_val: bool = False) -> str:
if type(val) != str:
val = "xxx"
if urlencode_val:
val = quote(val)
return f"{key}={val}"
def encode_date(key: str, val: Union[datetime, None], urlencode_val: bool = True, fmt: str = DivaConstants.LUT_TIME_FMT) -> str:
if type(val) != datetime:
val = datetime.now().astimezone()
val = val.replace(microsecond=0)
dt_fmt = val.strftime(fmt)
if urlencode_val:
dt_fmt = quote(dt_fmt)
return f"{key}={dt_fmt}"
def encode_list(key: str, val: Union[List[Any], None], urlencode_final_val: bool = False, urlencode_vals: bool = False) -> str:
if not val:
return f"{key}="
for x in range(len(val)):
if val[x] is None:
val[x] = "x"
if type(val[x]) == datetime:
val[x] = val[x].replace(microsecond=0).strftime(DivaConstants.LUT_TIME_FMT)
elif type(val[x]) == bool:
val[x] = str(int(val[x]))
elif type(val[x]) == int:
val[x] = str(val[x])
if urlencode_vals:
val[x] = quote(val[x])
all_vals = ",".join(val)
if urlencode_final_val:
all_vals = quote(all_vals)
return f"{key}={all_vals}"
def decode_list_int(val: str) -> List[int]:
return [int(x) for x in val.split(",")]
def decode_list(val: str) -> List[str]:
return [x for x in val.split(",")]

156
titles/diva/handlers/pv.py Normal file
View File

@ -0,0 +1,156 @@
from typing import Union, List
from titles.diva.handlers.base import (
BaseRequest,
BaseResponse,
DivaRequestParseException,
decode_list_int
)
from datetime import datetime
from urllib import parse
from ..const import DivaConstants
class GetPvPdRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
try:
self.pd_id = int(self.raw_dict.get('pd_id'))
self.accept_idx = int(self.raw_dict.get('accept_idx'))
self.start_idx = int(self.raw_dict.get('start_idx'))
self.difficulty = int(self.raw_dict.get('difficulty'))
self.pd_pv_id_lst: List[int] = [int(x) for x in self.pd_pv_id_lst.split(',')]
except AttributeError as e:
raise DivaRequestParseException(f"GetPvPdRequest: {e}")
class GetPvPdResponse(BaseResponse):
def __init__(self, req_id: int) -> None:
super().__init__("get_pv_pd", req_id)
self.pd_by_pv_id = ""
self.pdddt_flg = 0
self.pdddt_tm = parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))
class StageResultRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
self.pd_id = int(self.raw_dict.get('pd_id'))
self.accept_idx = int(self.raw_dict.get('accept_idx'))
self.start_idx = int(self.raw_dict.get('start_idx'))
self.hp_vol = int(self.raw_dict.get('hp_vol'))
self.btn_se_vol = int(self.raw_dict.get('btn_se_vol'))
self.btn_se_vol2 = int(self.raw_dict.get('btn_se_vol2'))
self.sldr_se_vol2 = int(self.raw_dict.get('sldr_se_vol2'))
self.use_pv_mdl_eqp = int(self.raw_dict.get('use_pv_mdl_eqp'))
self.vcld_pts = int(self.raw_dict.get('vcld_pts'))
self.nxt_pv_id = int(self.raw_dict.get('nxt_pv_id'))
self.nxt_dffclty = int(self.raw_dict.get('nxt_dffclty'))
self.nxt_edtn = int(self.raw_dict.get('nxt_edtn'))
self.sort_kind = int(self.raw_dict.get('sort_kind'))
self.nblss_ltt_stts = int(self.raw_dict.get('nblss_ltt_stts'))
self.nblss_ltt_tckt = int(self.raw_dict.get('nblss_ltt_tckt'))
self.free_play = int(self.raw_dict.get('free_play'))
self.game_type = int(self.raw_dict.get('game_type'))
self.ply_pv_id = int(self.raw_dict.get('ply_pv_id'))
self.ttl_vp_add = int(self.raw_dict.get('ttl_vp_add'))
self.ttl_vp_sub = int(self.raw_dict.get('ttl_vp_sub'))
self.continue_cnt = int(self.raw_dict.get('continue_cnt'))
self.cr_cid = int(self.raw_dict.get('cr_cid'))
self.cr_sc = int(self.raw_dict.get('cr_sc'))
self.cr_tv = int(self.raw_dict.get('cr_tv'))
self.cr_if = int(self.raw_dict.get('cr_if'))
self.my_qst_id: List[int] = decode_list_int(self.raw_dict.get('my_qst_id'))
self.my_qst_sts: List[int] = decode_list_int(self.raw_dict.get('my_qst_sts'))
self.stg_difficulty: List[int] = decode_list_int(self.raw_dict.get('stg_difficulty'))
self.stg_edtn: List[int] = decode_list_int(self.raw_dict.get('stg_edtn'))
self.stg_ply_pv_id: List[int] = decode_list_int(self.raw_dict.get('stg_ply_pv_id'))
self.stg_sel_pv_id: List[int] = decode_list_int(self.raw_dict.get('stg_sel_pv_id'))
self.stg_scrpt_ver: List[int] = decode_list_int(self.raw_dict.get('stg_scrpt_ver'))
self.stg_score: List[int] = decode_list_int(self.raw_dict.get('stg_score'))
self.stg_chllng_kind: List[int] = decode_list_int(self.raw_dict.get('stg_chllng_kind'))
self.stg_chllng_result: List[int] = decode_list_int(self.raw_dict.get('stg_chllng_result'))
self.stg_clr_kind: List[int] = decode_list_int(self.raw_dict.get('stg_clr_kind'))
self.stg_vcld_pts: List[int] = decode_list_int(self.raw_dict.get('stg_vcld_pts'))
self.stg_cool_cnt: List[int] = decode_list_int(self.raw_dict.get('stg_cool_cnt'))
self.stg_cool_pct: List[int] = decode_list_int(self.raw_dict.get('stg_cool_pct'))
self.stg_fine_cnt: List[int] = decode_list_int(self.raw_dict.get('stg_fine_cnt'))
self.stg_fine_pct: List[int] = decode_list_int(self.raw_dict.get('stg_fine_pct'))
self.stg_safe_cnt: List[int] = decode_list_int(self.raw_dict.get('stg_safe_cnt'))
self.stg_safe_pct: List[int] = decode_list_int(self.raw_dict.get('stg_safe_pct'))
self.stg_sad_cnt: List[int] = decode_list_int(self.raw_dict.get('stg_sad_cnt'))
self.stg_sad_pct: List[int] = decode_list_int(self.raw_dict.get('stg_sad_pct'))
self.stg_wt_wg_cnt: List[int] = decode_list_int(self.raw_dict.get('stg_wt_wg_cnt'))
self.stg_wt_wg_pct: List[int] = decode_list_int(self.raw_dict.get('stg_wt_wg_pct'))
self.stg_max_cmb: List[int] = decode_list_int(self.raw_dict.get('stg_max_cmb'))
self.stg_chance_tm: List[int] = decode_list_int(self.raw_dict.get('stg_chance_tm'))
self.stg_sm_hl: List[int] = decode_list_int(self.raw_dict.get('stg_sm_hl'))
self.stg_atn_pnt: List[int] = decode_list_int(self.raw_dict.get('stg_atn_pnt'))
self.stg_skin_id: List[int] = decode_list_int(self.raw_dict.get('stg_skin_id'))
self.stg_btn_se: List[int] = decode_list_int(self.raw_dict.get('stg_btn_se'))
self.stg_btn_se_vol: List[int] = decode_list_int(self.raw_dict.get('stg_btn_se_vol'))
self.stg_sld_se: List[int] = decode_list_int(self.raw_dict.get('stg_sld_se'))
self.stg_chn_sld_se: List[int] = decode_list_int(self.raw_dict.get('stg_chn_sld_se'))
self.stg_sldr_tch_se: List[int] = decode_list_int(self.raw_dict.get('stg_sldr_tch_se'))
self.stg_mdl_id: List[int] = decode_list_int(self.raw_dict.get('stg_mdl_id'))
self.stg_sel_mdl_id: List[int] = decode_list_int(self.raw_dict.get('stg_sel_mdl_id'))
self.stg_rvl_pd_id: List[int] = decode_list_int(self.raw_dict.get('stg_rvl_pd_id'))
self.stg_rvl_wl: List[int] = decode_list_int(self.raw_dict.get('stg_rvl_wl'))
self.stg_cpt_rslt: List[int] = decode_list_int(self.raw_dict.get('stg_cpt_rslt'))
self.stg_sld_scr: List[int] = decode_list_int(self.raw_dict.get('stg_sld_scr'))
self.stg_is_sr_gm: List[int] = decode_list_int(self.raw_dict.get('stg_is_sr_gm'))
self.stg_pv_brnch_rslt: List[int] = decode_list_int(self.raw_dict.get('stg_pv_brnch_rslt'))
self.stg_vcl_chg: List[int] = decode_list_int(self.raw_dict.get('stg_vcl_chg'))
self.stg_c_itm_id: List[int] = decode_list_int(self.raw_dict.get('stg_c_itm_id'))
self.stg_ms_itm_flg: List[int] = decode_list_int(self.raw_dict.get('stg_ms_itm_flg'))
self.stg_rgo: List[int] = decode_list_int(self.raw_dict.get('stg_rgo'))
self.stg_ss_num: List[int] = decode_list_int(self.raw_dict.get('stg_ss_num'))
self.stg_is_cs_scs: List[int] = decode_list_int(self.raw_dict.get('stg_is_cs_scs'))
self.stg_is_nppg_use: List[int] = decode_list_int(self.raw_dict.get('stg_is_nppg_use'))
self.stg_p_std_lo_id: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_lo_id'))
self.stg_p_std_is_to: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_to'))
self.stg_p_std_is_ccu: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_ccu'))
self.stg_p_std_is_tiu: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_tiu'))
self.stg_p_std_is_iu: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_iu'))
self.stg_p_std_is_npu: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_npu'))
self.stg_p_std_is_du: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_du'))
self.gu_cmd: List[int] = decode_list_int(self.raw_dict.get('gu_cmd'))
self.mdl_eqp_cmn_ary: List[int] = decode_list_int(self.raw_dict.get('mdl_eqp_cmn_ary'))
self.c_itm_eqp_cmn_ary: List[int] = decode_list_int(self.raw_dict.get('c_itm_eqp_cmn_ary'))
self.ms_itm_flg_cmn_ary: List[int] = decode_list_int(self.raw_dict.get('ms_itm_flg_cmn_ary'))
self.mdl_eqp_pv_ary: List[int] = decode_list_int(self.raw_dict.get('mdl_eqp_pv_ary'))
self.c_itm_eqp_pv_ary: List[int] = decode_list_int(self.raw_dict.get('c_itm_eqp_pv_ary'))
self.ms_itm_flg_pv_ary: List[int] = decode_list_int(self.raw_dict.get('ms_itm_flg_pv_ary'))
self.stg_mdl_s_sts: List[int] = decode_list_int(self.raw_dict.get('stg_mdl_s_sts'))
self.cr_sp: List[int] = decode_list_int(parse.unquote(self.raw_dict.get('cr_sp')))
class StageResultResponse(BaseResponse):
def __init__(self, req_id: int) -> None:
super().__init__("stage_result", req_id)
self.chllng_kind = -1
self.lv_num_old = 0
self.lv_pnt_old = 0
self.lv_num = 0
self.lv_str = 0
self.lv_pnt = 0
self.lv_efct_id = 0
self.lv_plt_id = 0
self.vcld_pts = 0
self.prsnt_vcld_pts = 0
self.cerwd_kind = -1
self.cerwd_value = -1
self.cerwd_str_0 = "***"
self.cerwd_str_1 = "***"
self.ttl_str_ary = ["xxx"] * 5
self.ttl_plt_id_ary = ["xxx"] * 5
self.ttl_desc_ary = ["xxx"] * 5
self.skin_id_ary = ["xxx"] * 5
self.skin_name_ary = ["xxx"] * 5
self.skin_illust_ary = ["xxx"] * 5
self.skin_desc_ary = ["xxx"] * 5
self.my_qst_id = [-1] * 25
self.my_qst_r_qid = [-1] * 25
self.my_qst_r_knd = [-1] * 25
self.my_qst_r_vl = [-1] * 25
self.my_qst_r_nflg = [-1] * 25
self.my_ccd_r_qid = [-1] * 5
self.my_ccd_r_hnd = [-1] * 5
self.my_ccd_r_vp = [-1] * 5

View File

@ -0,0 +1,140 @@
from typing import Union
from titles.diva.handlers.base import (
BaseRequest,
BaseResponse,
DivaRequestParseException,
)
from datetime import datetime
from urllib import parse
from ..const import DivaConstants
class PreStartRequest(BaseRequest):
def __init__(self, raw: str) -> None:
super().__init__(raw)
self.pmm: str = self.raw_dict.get('pmm')
self.idm: str = self.raw_dict.get('idm')
self.mmgameid: str = self.raw_dict.get('mmgameid')
self.mmuid: str = self.raw_dict.get('mmuid')
self.a_code: str = self.raw_dict.get('a_code')
self.aime_id: str = self.raw_dict.get('aime_id')
self.aime_a_code: str = self.raw_dict.get('aime_a_code')
self.key_obj_type = int(self.raw_dict.get('key_obj_type'))
self.exec_vu = int(self.raw_dict.get('exec_vu'))
class PreStartResponse(BaseResponse):
def __init__(self, req_id: int, pd_id: int) -> None:
super().__init__("pre_start", req_id)
self.ps_result = 1
self.pd_id = pd_id
self.accept_idx = 100
self.nblss_ltt_stts = -1
self.nblss_ltt_tckt = -1
self.nblss_ltt_is_opn = -1
self.player_name: str = ""
self.sort_kind: str = ""
self.lv_efct_id: str = ""
self.lv_plt_id: str = ""
self.lv_str: str = ""
self.lv_num: str = ""
self.lv_pnt: str = ""
self.vcld_pts: str = ""
self.skn_eqp: str = ""
self.btn_se_eqp: str = ""
self.sld_se_eqp: str = ""
self.chn_sld_se_eqp: str = ""
self.sldr_tch_se_eqp: str = ""
self.passwd_stat: str = ""
self.mdl_eqp_tm: str = ""
# Ideally this would be a real array that would get converted later
# But this is how it's stored in the db, so w/e for now
self.mdl_eqp_ary = "-999,-999,-999"
class StartRequest(BaseRequest):
def __init__(self, raw: str) -> None:
super().__init__(raw)
self.pd_id = int(self.raw_dict.get('pd_id'))
self.accept_idx = int(self.raw_dict.get('accept_idx'))
class StartResponse(BaseResponse):
def __init__(self, req_id: int, pv_id: int, pv_name: str) -> None:
super().__init__("start", req_id)
self.pd_id: int = pv_id
self.start_result: int = 1
self.accept_idx: int = 100
self.hp_vol: int = 0
self.btn_se_vol: int = 1
self.btn_se_vol2: int = 1
self.sldr_se_vol2: int = 1
self.sort_kind: int = 1
self.player_name: str = pv_name
self.lv_num: int = 1
self.lv_pnt: int = 0
self.lv_efct_id: int = 1
self.lv_plt_id: int = 1
self.mdl_have: str = "F" * 250
self.cstmz_itm_have: str = "F" * 250
self.use_pv_mdl_eqp: int = 0
self.use_mdl_pri: int = 0
self.use_pv_skn_eqp: int = 1
self.use_pv_btn_se_eqp: int = 1
self.use_pv_sld_se_eqp: int = 1
self.use_pv_chn_sld_se_eqp: int = 1
self.use_pv_sldr_tch_se_eqp: int = 1
self.vcld_pts: int = 0
self.nxt_pv_id: int = 1
self.nxt_dffclty: int = 1
self.nxt_edtn: int = 0
self.dsp_clr_brdr: int = 0
self.dsp_intrm_rnk: int = 0
self.dsp_clr_sts: int = 0
self.rgo_sts: int = 0
self.cv_cid: str = "-1,-1,-1,-1"
self.cv_sc: str = "-1,-1,-1,-1"
#self.cv_bv: str = "-1,-1,-1,-1"
self.cv_bv: str = "-1,-1,-1,-1"
self.cv_bf: str = "-1,-1,-1,-1"
self.cnp_cid=-1
self.cnp_val=-1
self.cnp_rr=-1
self.my_qst_id: str = ",".join(["-1"] * 25)
self.my_qst_sts: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.my_qst_prgrs: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.my_qst_et: str = ",".join([parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))] * 5) + "," + ",".join(["xxx"] * 20)
self.clr_sts: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.mdl_eqp_tm: str = parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))
self.mdl_eqp_ary = ",".join(["-999"] * 3)
self.c_itm_eqp_ary = ",".join(["-999"] * 12)
self.ms_itm_flg_ary = ",".join(["1"] * 12)
class RegisterRequest(BaseRequest):
def __init__(self, raw: str) -> None:
super().__init__(raw)
self.pmm: str = self.raw_dict.get('pmm')
self.idm: str = self.raw_dict.get('idm')
self.mmgameid: str = self.raw_dict.get('mmgameid')
self.mmuid: str = self.raw_dict.get('mmuid')
self.a_code: str = self.raw_dict.get('a_code')
self.aime_a_code: str = self.raw_dict.get('aime_a_code')
self.player_name: str = self.raw_dict.get('player_name')
self.passwd: str = self.raw_dict.get('passwd')
self.aime_id = int(self.raw_dict.get('aime_id'))
self.key_obj_type = int(self.raw_dict.get('key_obj_type'))
class RegisterResponse(BaseResponse):
def __init__(self, req_id: int, pv_id: int) -> None:
super().__init__("register", req_id)
self.cd_adm_result: int = 1
self.pd_id: int = pv_id
class EndRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
self.my_qst_id: str = self.raw_dict.get('my_qst_id')
self.my_qst_sts: str = self.raw_dict.get('my_qst_sts')
super().__init__(raw)
try:
self.pd_id = int(self.raw_dict.get('pd_id'))
except AttributeError as e:
raise DivaRequestParseException(f"EndRequest: {e}")

View File

@ -6,11 +6,11 @@ import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
import zlib
import json
import urllib.parse
import base64
from os import path
from typing import Tuple, Dict, List
from titles.diva.handlers.base import *
from core.config import CoreConfig
from core.title import BaseServlet
from core.utils import Utils
@ -84,74 +84,52 @@ class DivaServlet(BaseServlet):
url_header = request.headers
# Ping Dispatch
if "THIS_STRING_SEPARATES" in str(url_header):
if "THIS_STRING_SEPARATES" in url_header:
binary_request = req_raw.splitlines()
binary_cmd_decoded = binary_request[3].decode("utf-8")
binary_array = binary_cmd_decoded.split("&")
url_data = binary_cmd_decoded # for logging
req_cls = BaseBinaryRequest(binary_cmd_decoded)
bin_req_data = {}
else:
json_string = json.dumps(
req_raw.decode("utf-8")
) # Take the response and decode as UTF-8 and dump
b64string = json_string.replace(
r"\n", "\n"
) # Remove all \n and separate them as new lines
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
for kvp in binary_array:
split_bin = kvp.split("=")
bin_req_data[split_bin[0]] = split_bin[1]
try:
url_data = zlib.decompress(gz_string) # Decompressing the gzip
except zlib.error as e:
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
return PlainTextResponse("stat=0")
self.logger.info(f"Binary {bin_req_data['cmd']} Request")
self.logger.debug(bin_req_data)
try:
req_cls = BaseRequest(url_data)
except DivaRequestParseException as e:
self.logger.error(e)
return PlainTextResponse("stat=0")
handler = getattr(self.base, f"handle_{bin_req_data['cmd']}_request")
resp = handler(bin_req_data)
self.logger.debug(
f"Response cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}"
)
return PlainTextResponse(f"cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}")
# Main Dispatch
json_string = json.dumps(
req_raw.decode("utf-8")
) # Take the response and decode as UTF-8 and dump
b64string = json_string.replace(
r"\n", "\n"
) # Remove all \n and separate them as new lines
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
try:
url_data = zlib.decompress(gz_string).decode(
"utf-8"
) # Decompressing the gzip
except zlib.error as e:
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
return PlainTextResponse("stat=0")
req_kvp = urllib.parse.unquote(url_data)
req_data = {}
# We then need to split each parts with & so we can reuse them to fill out the requests
splitted_request = str.split(req_kvp, "&")
for kvp in splitted_request:
split = kvp.split("=")
req_data[split[0]] = split[1]
self.logger.info(f"{req_data['cmd']} Request")
self.logger.debug(req_data)
func_to_find = f"handle_{req_data['cmd']}_request"
# Load the requests
try:
handler = getattr(self.base, func_to_find)
resp = await handler(req_data)
except AttributeError as e:
self.logger.warning(f"Unhandled {req_data['cmd']} request {e}")
return PlainTextResponse(f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok")
except Exception as e:
self.logger.error(f"Error handling method {func_to_find} {e}")
return PlainTextResponse(f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok")
self.logger.debug(
f"Response cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}"
self.logger.debug(f"Request: {url_data}\nHeaders: {url_header}")
self.logger.info(
f"{req_cls.cmd} request from {req_cls.kc_serial}/{req_cls.b_serial} at {Utils.get_ip_addr(request)}"
)
return PlainTextResponse(f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}")
handler_str = f"handle_{req_cls.cmd}_request"
if not hasattr(self.base, handler_str):
self.logger.warn(f"Unhandled cmd {req_cls.cmd}")
return PlainTextResponse(BaseResponse(req_cls.cmd, req_cls.req_id).make())
handler = getattr(self.base, handler_str)
response = handler(req_cls.raw)
if response is None or response == "":
response = BaseResponse(req_cls.cmd, req_cls.req_id).make()
if not response.startswith("cmd="):
response = f"cmd={req_cls.cmd}&req_id={req_cls.req_id}&stat=ok" + response
self.logger.debug(f"Response: {response}")
return PlainTextResponse(response)