forked from Hay1tsme/artemis
Compare commits
20 Commits
master
...
diva_handl
Author | SHA1 | Date |
---|---|---|
Hay1tsme | c2ee09bce0 | |
Hay1tsme | dfea392b03 | |
Midorica | 6ad5194bb8 | |
Dniel97 | a0793aa13a | |
Dniel97 | 7364181de1 | |
Hay1tsme | 9d8762d3da | |
Hay1tsme | 998aa70929 | |
Hay1tsme | 8df1325613 | |
Hay1tsme | 6ff7827918 | |
Hay1tsme | a36170f2c3 | |
Hay1tsme | 48144b53f0 | |
Hay1tsme | 3b852ea739 | |
Hay1tsme | 21c9b23617 | |
Hay1tsme | 3076bdf575 | |
Hay1tsme | 5be25f89ff | |
Hay1tsme | 5443d24352 | |
Hay1tsme | 56927c049f | |
Hay1tsme | 2b81ba206c | |
Hay1tsme | 9718e822f3 | |
Hay1tsme | b07e3d6a94 |
|
@ -0,0 +1,9 @@
|
|||
ALTER TABLE diva_profile
|
||||
DROP cnp_cid,
|
||||
DROP cnp_val,
|
||||
DROP cnp_rr,
|
||||
DROP cnp_sp,
|
||||
DROP btn_se_eqp,
|
||||
DROP sld_se_eqp,
|
||||
DROP chn_sld_se_eqp,
|
||||
DROP sldr_tch_se_eqp;
|
|
@ -0,0 +1,9 @@
|
|||
ALTER TABLE diva_profile
|
||||
ADD cnp_cid INT NOT NULL DEFAULT -1,
|
||||
ADD cnp_val INT NOT NULL DEFAULT -1,
|
||||
ADD cnp_rr INT NOT NULL DEFAULT -1,
|
||||
ADD cnp_sp VARCHAR(255) NOT NULL DEFAULT "",
|
||||
ADD btn_se_eqp INT NOT NULL DEFAULT -1,
|
||||
ADD sld_se_eqp INT NOT NULL DEFAULT -1,
|
||||
ADD chn_sld_se_eqp INT NOT NULL DEFAULT -1,
|
||||
ADD sldr_tch_se_eqp INT NOT NULL DEFAULT -1;
|
|
@ -7,4 +7,4 @@ index = DivaServlet
|
|||
database = DivaData
|
||||
reader = DivaReader
|
||||
game_codes = [DivaConstants.GAME_CODE]
|
||||
current_schema_version = 4
|
||||
current_schema_version = 5
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
import datetime
|
||||
from datetime import datetime
|
||||
from typing import Any, List, Dict
|
||||
import logging
|
||||
import json
|
||||
import urllib
|
||||
from urllib import parse
|
||||
|
||||
from core.config import CoreConfig
|
||||
from titles.diva.config import DivaConfig
|
||||
from titles.diva.const import DivaConstants
|
||||
from titles.diva.database import DivaData
|
||||
from titles.diva.handlers import *
|
||||
|
||||
|
||||
class DivaBase:
|
||||
|
@ -20,30 +21,39 @@ class DivaBase:
|
|||
self.game = DivaConstants.GAME_CODE
|
||||
self.version = DivaConstants.VER_PROJECT_DIVA_ARCADE_FUTURE_TONE
|
||||
|
||||
dt = datetime.datetime.now()
|
||||
self.time_lut = urllib.parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0"))
|
||||
dt = datetime.now()
|
||||
self.time_lut = parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0"))
|
||||
|
||||
def handle_test_request(self, data: Dict) -> Dict:
|
||||
return ""
|
||||
def handle_test_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_game_init_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_game_init_request(self, data: bytes) -> str:
|
||||
req = GameInitRequest(data)
|
||||
return None
|
||||
|
||||
def handle_attend_request(self, data: Dict) -> Dict:
|
||||
encoded = "&"
|
||||
params = {
|
||||
"atnd_prm1": "0,1,1,0,0,0,1,0,100,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1",
|
||||
"atnd_prm2": "30,10,100,4,1,50,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1",
|
||||
"atnd_prm3": "100,0,1,1,1,1,1,1,1,1,2,3,4,1,1,1,3,4,5,1,1,1,4,5,6,1,1,1,5,6,7,4,4,4,9,10,14,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,10,30,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",
|
||||
"atnd_lut": f"{self.time_lut}",
|
||||
}
|
||||
def handle_attend_request(self, data: bytes) -> str:
|
||||
req = AttendRequest(data)
|
||||
resp = AttendResponse(req.cmd, req.req_id)
|
||||
|
||||
encoded += urllib.parse.urlencode(params)
|
||||
encoded = encoded.replace("%2C", ",")
|
||||
for i in [0, 3, 4, 5, 7, 9, 10, 11, 12, 13]:
|
||||
resp.atnd_prm1[i] = 0
|
||||
resp.atnd_prm1[8] = 100
|
||||
|
||||
return encoded
|
||||
resp.atnd_prm2[:6] = [30, 10, 100, 4, 1, 50]
|
||||
|
||||
def handle_ping_request(self, data: Dict) -> Dict:
|
||||
resp.atnd_prm3[0] = 100
|
||||
resp.atnd_prm3[1] = 0
|
||||
resp.atnd_prm3[10:13] = [2, 3, 4]
|
||||
resp.atnd_prm3[16:19] = [3, 4, 5]
|
||||
resp.atnd_prm3[22:25] = [4, 5, 6]
|
||||
resp.atnd_prm3[80:] = [0] * 20
|
||||
resp.atnd_prm3[28:79] = [5,6,7,4,4,4,9,10,14,5,10,10,25,20,50,30,90,5,
|
||||
10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,
|
||||
5,10,10,25,20,50,30,90,10,30]
|
||||
|
||||
return resp.make()
|
||||
|
||||
def handle_ping_request(self, data: bytes) -> str:
|
||||
encoded = "&"
|
||||
params = {
|
||||
"ping_b_msg": f"Welcome to {self.core_cfg.server.name} network!",
|
||||
|
@ -82,13 +92,13 @@ class DivaBase:
|
|||
"nblss_ltt_ed_tm": "2019-09-22 12:00:00.0",
|
||||
}
|
||||
|
||||
encoded += urllib.parse.urlencode(params)
|
||||
encoded += parse.urlencode(params)
|
||||
encoded = encoded.replace("+", "%20")
|
||||
encoded = encoded.replace("%2C", ",")
|
||||
|
||||
return encoded
|
||||
|
||||
def handle_pv_list_request(self, data: Dict) -> Dict:
|
||||
def handle_pv_list_request(self, data: bytes) -> str:
|
||||
pvlist = ""
|
||||
with open(r"titles/diva/data/PvList0.dat", encoding="utf-8") as shop:
|
||||
lines = shop.readlines()
|
||||
|
@ -125,7 +135,7 @@ class DivaBase:
|
|||
|
||||
return response
|
||||
|
||||
def handle_shop_catalog_request(self, data: Dict) -> Dict:
|
||||
def handle_shop_catalog_request(self, data: bytes) -> str:
|
||||
catalog = ""
|
||||
|
||||
shopList = self.data.static.get_enabled_shops(self.version)
|
||||
|
@ -133,8 +143,8 @@ class DivaBase:
|
|||
with open(r"titles/diva/data/ShopCatalog.dat", encoding="utf-8") as shop:
|
||||
lines = shop.readlines()
|
||||
for line in lines:
|
||||
line = urllib.parse.quote(line) + ","
|
||||
catalog += f"{urllib.parse.quote(line)}"
|
||||
line = parse.quote(line) + ","
|
||||
catalog += f"{parse.quote(line)}"
|
||||
|
||||
else:
|
||||
for shop in shopList:
|
||||
|
@ -153,8 +163,8 @@ class DivaBase:
|
|||
+ ","
|
||||
+ str(shop["type"])
|
||||
)
|
||||
line = urllib.parse.quote(line) + ","
|
||||
catalog += f"{urllib.parse.quote(line)}"
|
||||
line = parse.quote(line) + ","
|
||||
catalog += f"{parse.quote(line)}"
|
||||
|
||||
catalog = catalog.replace("+", "%20")
|
||||
|
||||
|
@ -163,7 +173,7 @@ class DivaBase:
|
|||
|
||||
return response
|
||||
|
||||
def handle_buy_module_request(self, data: Dict) -> Dict:
|
||||
def handle_buy_module_request(self, data: bytes) -> str:
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
module = self.data.static.get_enabled_shop(self.version, int(data["mdl_id"]))
|
||||
|
||||
|
@ -190,7 +200,7 @@ class DivaBase:
|
|||
|
||||
return response
|
||||
|
||||
def handle_cstmz_itm_ctlg_request(self, data: Dict) -> Dict:
|
||||
def handle_cstmz_itm_ctlg_request(self, data: bytes) -> str:
|
||||
catalog = ""
|
||||
|
||||
itemList = self.data.static.get_enabled_items(self.version)
|
||||
|
@ -198,8 +208,8 @@ class DivaBase:
|
|||
with open(r"titles/diva/data/ItemCatalog.dat", encoding="utf-8") as item:
|
||||
lines = item.readlines()
|
||||
for line in lines:
|
||||
line = urllib.parse.quote(line) + ","
|
||||
catalog += f"{urllib.parse.quote(line)}"
|
||||
line = parse.quote(line) + ","
|
||||
catalog += f"{parse.quote(line)}"
|
||||
|
||||
else:
|
||||
for item in itemList:
|
||||
|
@ -218,8 +228,8 @@ class DivaBase:
|
|||
+ ","
|
||||
+ str(item["type"])
|
||||
)
|
||||
line = urllib.parse.quote(line) + ","
|
||||
catalog += f"{urllib.parse.quote(line)}"
|
||||
line = parse.quote(line) + ","
|
||||
catalog += f"{parse.quote(line)}"
|
||||
|
||||
catalog = catalog.replace("+", "%20")
|
||||
|
||||
|
@ -228,7 +238,7 @@ class DivaBase:
|
|||
|
||||
return response
|
||||
|
||||
def handle_buy_cstmz_itm_request(self, data: Dict) -> Dict:
|
||||
def handle_buy_cstmz_itm_request(self, data: bytes) -> str:
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
item = self.data.static.get_enabled_item(
|
||||
self.version, int(data["cstmz_itm_id"])
|
||||
|
@ -263,29 +273,30 @@ class DivaBase:
|
|||
|
||||
return response
|
||||
|
||||
def handle_festa_info_request(self, data: Dict) -> Dict:
|
||||
def handle_festa_info_request(self, data: bytes) -> str:
|
||||
encoded = "&"
|
||||
params = {
|
||||
"fi_id": "1,-1",
|
||||
"fi_name": f"{self.core_cfg.server.name} Opening,xxx",
|
||||
"fi_kind": "0,0",
|
||||
"fi_id": "1,2",
|
||||
"fi_name": f"{self.core_cfg.server.name} Opening,Project DIVA Festa",
|
||||
# 0=PINK, 1=GREEN
|
||||
"fi_kind": "1,0",
|
||||
"fi_difficulty": "-1,-1",
|
||||
"fi_pv_id_lst": "ALL,ALL",
|
||||
"fi_attr": "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF,7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
|
||||
"fi_add_vp": "20,0",
|
||||
"fi_mul_vp": "1,1",
|
||||
"fi_st": "2022-06-17 17:00:00.0,2014-07-08 18:10:11.0",
|
||||
"fi_et": "2029-01-01 10:00:00.0,2014-07-08 18:10:11.0",
|
||||
"fi_add_vp": "20,5",
|
||||
"fi_mul_vp": "1,2",
|
||||
"fi_st": "2019-01-01 00:00:00.0,2019-01-01 00:00:00.0",
|
||||
"fi_et": "2029-01-01 00:00:00.0,2029-01-01 00:00:00.0",
|
||||
"fi_lut": "{self.time_lut}",
|
||||
}
|
||||
|
||||
encoded += urllib.parse.urlencode(params)
|
||||
encoded += parse.urlencode(params)
|
||||
encoded = encoded.replace("+", "%20")
|
||||
encoded = encoded.replace("%2C", ",")
|
||||
|
||||
return encoded
|
||||
|
||||
def handle_contest_info_request(self, data: Dict) -> Dict:
|
||||
def handle_contest_info_request(self, data: bytes) -> str:
|
||||
response = ""
|
||||
|
||||
response += f"&ci_lut={self.time_lut}"
|
||||
|
@ -293,7 +304,7 @@ class DivaBase:
|
|||
|
||||
return response
|
||||
|
||||
def handle_qst_inf_request(self, data: Dict) -> Dict:
|
||||
def handle_qst_inf_request(self, data: bytes) -> str:
|
||||
quest = ""
|
||||
|
||||
questList = self.data.static.get_enabled_quests(self.version)
|
||||
|
@ -301,7 +312,7 @@ class DivaBase:
|
|||
with open(r"titles/diva/data/QuestInfo.dat", encoding="utf-8") as shop:
|
||||
lines = shop.readlines()
|
||||
for line in lines:
|
||||
quest += f"{urllib.parse.quote(line)},"
|
||||
quest += f"{parse.quote(line)},"
|
||||
|
||||
response = ""
|
||||
response += f"&qi_lut={self.time_lut}"
|
||||
|
@ -329,7 +340,7 @@ class DivaBase:
|
|||
+ ","
|
||||
+ str(quests["quest_enable"])
|
||||
)
|
||||
quest += f"{urllib.parse.quote(line)}%0A,"
|
||||
quest += f"{parse.quote(line)}%0A,"
|
||||
|
||||
responseline = f"{quest[:-1]},"
|
||||
for i in range(len(questList), 59):
|
||||
|
@ -343,45 +354,47 @@ class DivaBase:
|
|||
|
||||
return response
|
||||
|
||||
def handle_nv_ranking_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_nv_ranking_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_ps_ranking_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_ps_ranking_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_ng_word_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_ng_word_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_rmt_wp_list_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_rmt_wp_list_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_pv_def_chr_list_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_pv_def_chr_list_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_pv_ng_mdl_list_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_pv_ng_mdl_list_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_cstmz_itm_ng_mdl_lst_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_cstmz_itm_ng_mdl_lst_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_banner_info_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_banner_info_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_banner_data_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_banner_data_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_cm_ply_info_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_cm_ply_info_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_pstd_h_ctrl_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_pstd_h_ctrl_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_pstd_item_ng_lst_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_pstd_item_ng_lst_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_pre_start_request(self, data: Dict) -> str:
|
||||
profile = self.data.profile.get_profile(data["aime_id"], self.version)
|
||||
profile_shop = self.data.item.get_shop(data["aime_id"], self.version)
|
||||
def handle_pre_start_request(self, data: bytes) -> str:
|
||||
req = PreStartRequest(data)
|
||||
resp = PreStartResponse(req.cmd, req.req_id, req.aime_id)
|
||||
profile = self.data.profile.get_profile(req.aime_id, self.version)
|
||||
profile_shop = self.data.item.get_shop(req.aime_id, self.version)
|
||||
|
||||
if profile is None:
|
||||
return f"&ps_result=-3"
|
||||
|
@ -401,119 +414,176 @@ class DivaBase:
|
|||
response += f"&lv_pnt={profile['lv_pnt']}"
|
||||
response += f"&vcld_pts={profile['vcld_pts']}"
|
||||
response += f"&skn_eqp={profile['use_pv_skn_eqp']}"
|
||||
response += f"&btn_se_eqp={profile['use_pv_btn_se_eqp']}"
|
||||
response += f"&sld_se_eqp={profile['use_pv_sld_se_eqp']}"
|
||||
response += f"&chn_sld_se_eqp={profile['use_pv_chn_sld_se_eqp']}"
|
||||
response += f"&sldr_tch_se_eqp={profile['use_pv_sldr_tch_se_eqp']}"
|
||||
response += f"&btn_se_eqp={profile['btn_se_eqp']}"
|
||||
response += f"&sld_se_eqp={profile['sld_se_eqp']}"
|
||||
response += f"&chn_sld_se_eqp={profile['chn_sld_se_eqp']}"
|
||||
response += f"&sldr_tch_se_eqp={profile['sldr_tch_se_eqp']}"
|
||||
response += f"&passwd_stat={profile['passwd_stat']}"
|
||||
|
||||
# Store stuff to add to rework
|
||||
response += f"&mdl_eqp_tm={self.time_lut}"
|
||||
profile_dict = profile._asdict()
|
||||
profile_dict.pop("id")
|
||||
profile_dict.pop("user")
|
||||
profile_dict.pop("version")
|
||||
|
||||
mdl_eqp_ary = "-999,-999,-999"
|
||||
for k, v in profile_dict.items():
|
||||
if hasattr(resp, k):
|
||||
setattr(resp, k, v)
|
||||
|
||||
# get the common_modules from the profile shop
|
||||
if profile_shop:
|
||||
mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
|
||||
if profile_shop is not None and profile_shop:
|
||||
resp.mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
|
||||
|
||||
response += f"&mdl_eqp_ary={mdl_eqp_ary}"
|
||||
return resp.make()
|
||||
|
||||
return response
|
||||
|
||||
def handle_registration_request(self, data: Dict) -> Dict:
|
||||
def handle_registration_request(self, data: bytes) -> str:
|
||||
self.data.profile.create_profile(
|
||||
self.version, data["aime_id"], data["player_name"]
|
||||
)
|
||||
return f"&cd_adm_result=1&pd_id={data['aime_id']}"
|
||||
|
||||
def handle_start_request(self, data: Dict) -> Dict:
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
profile_shop = self.data.item.get_shop(data["pd_id"], self.version)
|
||||
def handle_start_request(self, data: bytes) -> str:
|
||||
req = StartRequest(data)
|
||||
profile = self.data.profile.get_profile(req.pd_id, self.version)
|
||||
profile_shop = self.data.item.get_shop(req.pd_id, self.version)
|
||||
if profile is None:
|
||||
return
|
||||
|
||||
resp = StartResponse(req.cmd, req.req_id, req.pd_id, profile['player_name'])
|
||||
|
||||
mdl_have = "F" * 250
|
||||
# generate the mdl_have string if "unlock_all_modules" is disabled
|
||||
if not self.game_config.mods.unlock_all_modules:
|
||||
mdl_have = self.data.module.get_modules_have_string(
|
||||
resp.mdl_have = self.data.module.get_modules_have_string(
|
||||
data["pd_id"], self.version
|
||||
)
|
||||
|
||||
cstmz_itm_have = "F" * 250
|
||||
# generate the cstmz_itm_have string if "unlock_all_items" is disabled
|
||||
if not self.game_config.mods.unlock_all_items:
|
||||
cstmz_itm_have = self.data.customize.get_customize_items_have_string(
|
||||
resp.cstmz_itm_have = self.data.customize.get_customize_items_have_string(
|
||||
data["pd_id"], self.version
|
||||
)
|
||||
|
||||
response = f"&pd_id={data['pd_id']}"
|
||||
response += "&start_result=1"
|
||||
resp.pd_id = data['pd_id']
|
||||
resp.hp_vol = {profile['hp_vol']}
|
||||
resp.btn_se_vol = {profile['btn_se_vol']}
|
||||
resp.btn_se_vol2 = {profile['btn_se_vol2']}
|
||||
resp.sldr_se_vol2 = {profile['sldr_se_vol2']}
|
||||
resp.sort_kind = {profile['sort_kind']}
|
||||
resp.player_name = {profile['player_name']}
|
||||
resp.lv_num = {profile['lv_num']}
|
||||
resp.lv_pnt = {profile['lv_pnt']}
|
||||
resp.lv_efct_id = {profile['lv_efct_id']}
|
||||
resp.lv_plt_id = {profile['lv_plt_id']}
|
||||
resp.use_pv_mdl_eqp = {int(profile['use_pv_mdl_eqp'])}
|
||||
resp.use_mdl_pri = {int(profile['use_mdl_pri'])}
|
||||
resp.use_pv_skn_eqp = {int(profile['use_pv_skn_eqp'])}
|
||||
resp.use_pv_btn_se_eqp = {int(profile['use_pv_btn_se_eqp'])}
|
||||
resp.use_pv_sld_se_eqp = {int(profile['use_pv_sld_se_eqp'])}
|
||||
resp.use_pv_chn_sld_se_eqp = {int(profile['use_pv_chn_sld_se_eqp'])}
|
||||
resp.use_pv_sldr_tch_se_eqp = {int(profile['use_pv_sldr_tch_se_eqp'])}
|
||||
resp.vcld_pts = {profile['lv_efct_id']}
|
||||
resp.nxt_pv_id = {profile['nxt_pv_id']}
|
||||
resp.nxt_dffclty = {profile['nxt_dffclty']}
|
||||
resp.nxt_edtn = {profile['nxt_edtn']}
|
||||
resp.dsp_clr_brdr = {profile['dsp_clr_brdr']}
|
||||
resp.dsp_intrm_rnk = {profile['dsp_intrm_rnk']}
|
||||
resp.dsp_clr_sts = {profile['dsp_clr_sts']}
|
||||
resp.rgo_sts = {profile['rgo_sts']}
|
||||
|
||||
response += "&accept_idx=100"
|
||||
response += f"&hp_vol={profile['hp_vol']}"
|
||||
response += f"&btn_se_vol={profile['btn_se_vol']}"
|
||||
response += f"&btn_se_vol2={profile['btn_se_vol2']}"
|
||||
response += f"&sldr_se_vol2={profile['sldr_se_vol2']}"
|
||||
response += f"&sort_kind={profile['sort_kind']}"
|
||||
response += f"&player_name={profile['player_name']}"
|
||||
response += f"&lv_num={profile['lv_num']}"
|
||||
response += f"&lv_pnt={profile['lv_pnt']}"
|
||||
response += f"&lv_efct_id={profile['lv_efct_id']}"
|
||||
response += f"&lv_plt_id={profile['lv_plt_id']}"
|
||||
response += f"&mdl_have={mdl_have}"
|
||||
response += f"&cstmz_itm_have={cstmz_itm_have}"
|
||||
response += f"&use_pv_mdl_eqp={int(profile['use_pv_mdl_eqp'])}"
|
||||
response += f"&use_mdl_pri={int(profile['use_mdl_pri'])}"
|
||||
response += f"&use_pv_skn_eqp={int(profile['use_pv_skn_eqp'])}"
|
||||
response += f"&use_pv_btn_se_eqp={int(profile['use_pv_btn_se_eqp'])}"
|
||||
response += f"&use_pv_sld_se_eqp={int(profile['use_pv_sld_se_eqp'])}"
|
||||
response += f"&use_pv_chn_sld_se_eqp={int(profile['use_pv_chn_sld_se_eqp'])}"
|
||||
response += f"&use_pv_sldr_tch_se_eqp={int(profile['use_pv_sldr_tch_se_eqp'])}"
|
||||
response += f"&vcld_pts={profile['lv_efct_id']}"
|
||||
response += f"&nxt_pv_id={profile['nxt_pv_id']}"
|
||||
response += f"&nxt_dffclty={profile['nxt_dffclty']}"
|
||||
response += f"&nxt_edtn={profile['nxt_edtn']}"
|
||||
response += f"&dsp_clr_brdr={profile['dsp_clr_brdr']}"
|
||||
response += f"&dsp_intrm_rnk={profile['dsp_intrm_rnk']}"
|
||||
response += f"&dsp_clr_sts={profile['dsp_clr_sts']}"
|
||||
response += f"&rgo_sts={profile['rgo_sts']}"
|
||||
# Contest progress
|
||||
response += f"&cv_cid=-1,-1,-1,-1"
|
||||
response += f"&cv_sc=-1,-1,-1,-1"
|
||||
response += f"&cv_bv=-1,-1,-1,-1"
|
||||
response += f"&cv_bv=-1,-1,-1,-1"
|
||||
response += f"&cv_bf=-1,-1,-1,-1"
|
||||
|
||||
# Contest now playing id, return -1 if no current playing contest
|
||||
response += f"&cnp_cid={profile['cnp_cid']}"
|
||||
response += f"&cnp_val={profile['cnp_val']}"
|
||||
# border can be 0=bronzem 1=silver, 2=gold
|
||||
response += f"&cnp_rr={profile['cnp_rr']}"
|
||||
# only show contest specifier if it is not empty
|
||||
response += f"&cnp_sp={profile['cnp_sp']}" if profile["cnp_sp"] != "" else ""
|
||||
|
||||
# To be fully fixed
|
||||
if "my_qst_id" not in profile:
|
||||
response += f"&my_qst_id=-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
|
||||
response += f"&my_qst_sts=0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
|
||||
else:
|
||||
response += f"&my_qst_id={profile['my_qst_id']}"
|
||||
response += f"&my_qst_sts={profile['my_qst_sts']}"
|
||||
if "my_qst_id" in profile:
|
||||
resp.my_qst_id = {profile['my_qst_id']}
|
||||
resp.my_qst_sts = {profile['my_qst_sts']}
|
||||
|
||||
response += f"&my_qst_prgrs=0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
|
||||
response += f"&my_qst_et=2022-06-19%2010%3A28%3A52.0,2022-06-19%2010%3A28%3A52.0,2022-06-19%2010%3A28%3A52.0,2100-01-01%2008%3A59%3A59.0,2100-01-01%2008%3A59%3A59.0,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx"
|
||||
response += f"&clr_sts=0,0,0,0,0,0,0,0,56,52,35,6,6,3,1,0,0,0,0,0"
|
||||
|
||||
# define a helper class to store all counts for clear, great,
|
||||
# excellent and perfect
|
||||
class ClearSet:
|
||||
def __init__(self):
|
||||
self.clear = 0
|
||||
self.great = 0
|
||||
self.excellent = 0
|
||||
self.perfect = 0
|
||||
|
||||
# create a dict to store the ClearSets per difficulty
|
||||
clear_set_dict = {
|
||||
0: ClearSet(), # easy
|
||||
1: ClearSet(), # normal
|
||||
2: ClearSet(), # hard
|
||||
3: ClearSet(), # extreme
|
||||
4: ClearSet(), # exExtreme
|
||||
}
|
||||
|
||||
# get clear status from user scores
|
||||
pv_records = self.data.score.get_best_scores(data["pd_id"])
|
||||
clear_status = "0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
|
||||
|
||||
if pv_records is not None:
|
||||
for score in pv_records:
|
||||
if score["edition"] == 0:
|
||||
# cheap and standard both count to "clear"
|
||||
if score["clr_kind"] in {1, 2}:
|
||||
clear_set_dict[score["difficulty"]].clear += 1
|
||||
elif score["clr_kind"] == 3:
|
||||
clear_set_dict[score["difficulty"]].great += 1
|
||||
elif score["clr_kind"] == 4:
|
||||
clear_set_dict[score["difficulty"]].excellent += 1
|
||||
elif score["clr_kind"] == 5:
|
||||
clear_set_dict[score["difficulty"]].perfect += 1
|
||||
else:
|
||||
# 4=ExExtreme
|
||||
if score["clr_kind"] in {1, 2}:
|
||||
clear_set_dict[4].clear += 1
|
||||
elif score["clr_kind"] == 3:
|
||||
clear_set_dict[4].great += 1
|
||||
elif score["clr_kind"] == 4:
|
||||
clear_set_dict[4].excellent += 1
|
||||
elif score["clr_kind"] == 5:
|
||||
clear_set_dict[4].perfect += 1
|
||||
|
||||
# now add all values to a list
|
||||
clear_list = []
|
||||
for clear_set in clear_set_dict.values():
|
||||
clear_list.append(clear_set.clear)
|
||||
clear_list.append(clear_set.great)
|
||||
clear_list.append(clear_set.excellent)
|
||||
clear_list.append(clear_set.perfect)
|
||||
|
||||
clear_status = ",".join(map(str, clear_list))
|
||||
|
||||
response += f"&clr_sts={clear_status}"
|
||||
|
||||
# Store stuff to add to rework
|
||||
response += f"&mdl_eqp_tm={self.time_lut}"
|
||||
|
||||
mdl_eqp_ary = "-999,-999,-999"
|
||||
c_itm_eqp_ary = "-999,-999,-999,-999,-999,-999,-999,-999,-999,-999,-999,-999"
|
||||
ms_itm_flg_ary = "1,1,1,1,1,1,1,1,1,1,1,1"
|
||||
|
||||
# get the common_modules, customize_items and customize_item_flags
|
||||
# from the profile shop
|
||||
if profile_shop:
|
||||
mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
|
||||
c_itm_eqp_ary = profile_shop["c_itm_eqp_ary"]
|
||||
ms_itm_flg_ary = profile_shop["ms_itm_flg_ary"]
|
||||
resp.mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
|
||||
resp.c_itm_eqp_ary = profile_shop["c_itm_eqp_ary"]
|
||||
resp.ms_itm_flg_ary = profile_shop["ms_itm_flg_ary"]
|
||||
|
||||
response += f"&mdl_eqp_ary={mdl_eqp_ary}"
|
||||
response += f"&c_itm_eqp_ary={c_itm_eqp_ary}"
|
||||
response += f"&ms_itm_flg_ary={ms_itm_flg_ary}"
|
||||
return resp.make()
|
||||
|
||||
return response
|
||||
def handle_pd_unlock_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_pd_unlock_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
|
||||
def handle_spend_credit_request(self, data: Dict) -> Dict:
|
||||
def handle_spend_credit_request(self, data: bytes) -> str:
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
if profile is None:
|
||||
return
|
||||
|
@ -591,7 +661,7 @@ class DivaBase:
|
|||
|
||||
return pv_result
|
||||
|
||||
def handle_get_pv_pd_request(self, data: Dict) -> Dict:
|
||||
def handle_get_pv_pd_request(self, data: bytes) -> str:
|
||||
song_id = data["pd_pv_id_lst"].split(",")
|
||||
pv = ""
|
||||
|
||||
|
@ -631,9 +701,9 @@ class DivaBase:
|
|||
|
||||
self.logger.debug(f"pv_result = {pv_result}")
|
||||
|
||||
pv += urllib.parse.quote(pv_result)
|
||||
pv += parse.quote(pv_result)
|
||||
else:
|
||||
pv += urllib.parse.quote(f"{song}***")
|
||||
pv += parse.quote(f"{song}***")
|
||||
pv += ","
|
||||
|
||||
response = ""
|
||||
|
@ -643,10 +713,10 @@ class DivaBase:
|
|||
|
||||
return response
|
||||
|
||||
def handle_stage_start_request(self, data: Dict) -> Dict:
|
||||
return f""
|
||||
def handle_stage_start_request(self, data: bytes) -> str:
|
||||
pass
|
||||
|
||||
def handle_stage_result_request(self, data: Dict) -> Dict:
|
||||
def handle_stage_result_request(self, data: bytes) -> str:
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
|
||||
pd_song_list = data["stg_ply_pv_id"].split(",")
|
||||
|
@ -825,7 +895,7 @@ class DivaBase:
|
|||
|
||||
return response
|
||||
|
||||
def handle_end_request(self, data: Dict) -> Dict:
|
||||
def handle_end_request(self, data: bytes) -> str:
|
||||
profile = self.data.profile.get_profile(data["pd_id"], self.version)
|
||||
|
||||
self.data.profile.update_profile(
|
||||
|
@ -833,7 +903,7 @@ class DivaBase:
|
|||
)
|
||||
return f""
|
||||
|
||||
def handle_shop_exit_request(self, data: Dict) -> Dict:
|
||||
def handle_shop_exit_request(self, data: bytes) -> str:
|
||||
self.data.item.put_shop(
|
||||
data["pd_id"],
|
||||
self.version,
|
||||
|
|
|
@ -6,6 +6,8 @@ class DivaConstants:
|
|||
VER_PROJECT_DIVA_ARCADE = 0
|
||||
VER_PROJECT_DIVA_ARCADE_FUTURE_TONE = 1
|
||||
|
||||
LUT_TIME_FMT = "%Y-%m-%d %H:%M:%S:16.0"
|
||||
|
||||
VERSION_NAMES = ("Project Diva Arcade", "Project Diva Arcade Future Tone")
|
||||
|
||||
@classmethod
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
from titles.diva.handlers.base import *
|
||||
from titles.diva.handlers.user import *
|
|
@ -0,0 +1,143 @@
|
|||
from urllib import parse
|
||||
from datetime import datetime
|
||||
from typing import Union, Dict
|
||||
|
||||
|
||||
class DivaRequestParseException(Exception):
|
||||
"""
|
||||
Exception raised when there is a fault in parsing a diva request,
|
||||
either due to a malformed request, or missing required items
|
||||
"""
|
||||
|
||||
def __init__(self, message: str) -> None:
|
||||
self.message = message
|
||||
super().__init__(self.message)
|
||||
|
||||
|
||||
class BaseBinaryRequest:
|
||||
cmd: str
|
||||
req_id: str
|
||||
|
||||
def __init__(self, raw: bytes) -> None:
|
||||
self.raw = raw
|
||||
self.raw_dict = dict(parse.parse_qsl(raw))
|
||||
|
||||
if "cmd" not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
|
||||
|
||||
if "req_id" not in self.raw_dict:
|
||||
raise DivaRequestParseException(
|
||||
f"req_id not in request data {self.raw_dict}"
|
||||
)
|
||||
|
||||
for k, v in self.raw_dict:
|
||||
setattr(self, k, v)
|
||||
|
||||
|
||||
class BaseRequest:
|
||||
cmd: str
|
||||
req_id: str
|
||||
game_id: str
|
||||
r_rev: str
|
||||
kc_serial: str
|
||||
b_serial: str
|
||||
country_code: str
|
||||
|
||||
def __init__(self, raw: Union[str, bytes]) -> None:
|
||||
self.raw = raw
|
||||
self.raw_dict: Dict[bytes, bytes] = dict(parse.parse_qsl(raw))
|
||||
|
||||
if b"cmd" not in self.raw_dict:
|
||||
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
|
||||
|
||||
if b"req_id" not in self.raw_dict:
|
||||
raise DivaRequestParseException(
|
||||
f"req_id not in request data {self.raw_dict}"
|
||||
)
|
||||
|
||||
if b"place_id" not in self.raw_dict:
|
||||
raise DivaRequestParseException(
|
||||
f"place_id not in request data {self.raw_dict}"
|
||||
)
|
||||
|
||||
if b"start_up_mode" not in self.raw_dict:
|
||||
raise DivaRequestParseException(
|
||||
f"start_up_mode not in request data {self.raw_dict}"
|
||||
)
|
||||
|
||||
if b"cmm_dly_mod" not in self.raw_dict:
|
||||
raise DivaRequestParseException(
|
||||
f"cmm_dly_mod not in request data {self.raw_dict}"
|
||||
)
|
||||
|
||||
if b"cmm_dly_sec" not in self.raw_dict:
|
||||
raise DivaRequestParseException(
|
||||
f"cmm_dly_sec not in request data {self.raw_dict}"
|
||||
)
|
||||
|
||||
if b"cmm_err_mod" not in self.raw_dict:
|
||||
raise DivaRequestParseException(
|
||||
f"cmm_err_mod not in request data {self.raw_dict}"
|
||||
)
|
||||
|
||||
if b"region_code" not in self.raw_dict:
|
||||
raise DivaRequestParseException(
|
||||
f"region_code not in request data {self.raw_dict}"
|
||||
)
|
||||
|
||||
if b"time_stamp" not in self.raw_dict:
|
||||
raise DivaRequestParseException(
|
||||
f"time_stamp not in request data {self.raw_dict}"
|
||||
)
|
||||
|
||||
for k, v in self.raw_dict.items():
|
||||
setattr(self, k.decode(), v.decode())
|
||||
|
||||
self.place_id = int(self.place_id)
|
||||
self.start_up_mode = int(self.start_up_mode)
|
||||
self.cmm_dly_mod = int(self.cmm_dly_mod)
|
||||
self.cmm_dly_sec = int(self.cmm_dly_sec)
|
||||
self.cmm_err_mod = int(self.cmm_err_mod)
|
||||
self.region_code = int(self.region_code)
|
||||
# datetime.now().astimezone().replace(microsecond=0).isoformat()
|
||||
self.time_stamp = datetime.strptime(self.time_stamp, "%Y-%m-%dT%H:%M:%S%z")
|
||||
|
||||
|
||||
class BaseResponse:
|
||||
def __init__(self, cmd_id: str, req_id: int) -> None:
|
||||
self.cmd = cmd_id
|
||||
self.req_id = req_id
|
||||
self.stat = "ok"
|
||||
|
||||
def make(self) -> str:
|
||||
return f"cmd={self.cmd}&req_id={self.req_id}&stat={self.stat}"
|
||||
|
||||
class GameInitRequest(BaseRequest):
|
||||
def __init__(self, raw: Union[str, bytes]) -> None:
|
||||
super().__init__(raw)
|
||||
|
||||
class AttendRequest(BaseRequest):
|
||||
def __init__(self, raw: Union[str, bytes]) -> None:
|
||||
super().__init__(raw)
|
||||
self.power_on = int(self.power_on)
|
||||
self.is_bb = bool(int(self.power_on))
|
||||
|
||||
class AttendResponse(BaseResponse):
|
||||
def __init__(self, cmd_id: str, req_id: int) -> None:
|
||||
super().__init__(cmd_id, req_id)
|
||||
self.atnd_prm1 = [1] * 100
|
||||
self.atnd_prm2 = [1] * 100
|
||||
self.atnd_prm3 = [1] * 100
|
||||
self.atnd_lut = datetime.now()
|
||||
|
||||
def make(self) -> str:
|
||||
ret = super().make()
|
||||
ret_dict = {
|
||||
"atnd_prm1": ','.join([str(i) for i in self.atnd_prm1]),
|
||||
"atnd_prm2": ','.join([str(i) for i in self.atnd_prm2]),
|
||||
"atnd_prm3": ','.join([str(i) for i in self.atnd_prm3]),
|
||||
"atnd_lut": parse.quote(self.atnd_lut.strftime('%Y-%m-%d %H:%M:%S:16.0'))
|
||||
}
|
||||
ret += "&" + parse.urlencode(ret_dict, safe=",")
|
||||
return ret
|
||||
|
|
@ -0,0 +1,111 @@
|
|||
from titles.diva.handlers.base import (
|
||||
BaseRequest,
|
||||
BaseResponse,
|
||||
DivaRequestParseException,
|
||||
)
|
||||
from datetime import datetime
|
||||
from urllib import parse
|
||||
from ..const import DivaConstants
|
||||
|
||||
class PreStartRequest(BaseRequest):
|
||||
pmm: str
|
||||
idm: str
|
||||
mmgameid: str
|
||||
mmuid: str
|
||||
a_code: str
|
||||
aime_id: str
|
||||
aime_a_code: str
|
||||
|
||||
def __init__(self, raw: str) -> None:
|
||||
super().__init__(raw)
|
||||
try:
|
||||
self.key_obj_type = int(self.key_obj_type)
|
||||
self.exec_vu = int(self.exec_vu)
|
||||
|
||||
except AttributeError as e:
|
||||
raise DivaRequestParseException(f"PreStartRequest: {e}")
|
||||
|
||||
|
||||
class PreStartResponse(BaseResponse):
|
||||
player_name: str
|
||||
sort_kind: str
|
||||
lv_efct_id: str
|
||||
lv_plt_id: str
|
||||
lv_str: str
|
||||
lv_num: str
|
||||
lv_pnt: str
|
||||
vcld_pts: str
|
||||
skn_eqp: str
|
||||
btn_se_eqp: str
|
||||
sld_se_eqp: str
|
||||
chn_sld_se_eqp: str
|
||||
sldr_tch_se_eqp: str
|
||||
passwd_stat: str
|
||||
mdl_eqp_tm: str
|
||||
|
||||
def __init__(self, cmd_id: str, req_id: int, pd_id: int) -> None:
|
||||
super().__init__(cmd_id, req_id)
|
||||
self.ps_result = 1
|
||||
self.pd_id = pd_id
|
||||
self.accept_idx = 100
|
||||
self.nblss_ltt_stts = -1
|
||||
self.nblss_ltt_tckt = -1
|
||||
self.nblss_ltt_is_opn = -1
|
||||
# Ideally this would be a real array that would get converted later
|
||||
# But this is how it's stored in the db, so w/e for now
|
||||
self.mdl_eqp_ary = "-999,-999,-999"
|
||||
|
||||
|
||||
class StartRequest(BaseRequest):
|
||||
def __init__(self, raw: str) -> None:
|
||||
super().__init__(raw)
|
||||
try:
|
||||
self.pd_id = int(self.pd_id)
|
||||
self.accept_idx = int(self.accept_idx)
|
||||
|
||||
except AttributeError as e:
|
||||
raise DivaRequestParseException(f"StartRequest: {e}")
|
||||
|
||||
|
||||
class StartResponse(BaseResponse):
|
||||
def __init__(self, cmd_id: str, req_id: int, pv_id: int, pv_name: str) -> None:
|
||||
super().__init__(cmd_id, req_id)
|
||||
self.pd_id: int = pv_id
|
||||
self.start_result: int = 1
|
||||
self.accept_idx: int = 100
|
||||
self.hp_vol: int = 0
|
||||
self.btn_se_vol: int = 1
|
||||
self.btn_se_vol2: int = 1
|
||||
self.sldr_se_vol2: int = 1
|
||||
self.sort_kind: int = 1
|
||||
self.player_name: str = pv_name
|
||||
self.lv_num: int = 1
|
||||
self.lv_pnt: int = 0
|
||||
self.lv_efct_id: int = 1
|
||||
self.lv_plt_id: int = 1
|
||||
self.mdl_have: str = "F" * 250
|
||||
self.cstmz_itm_have: str = "F" * 250
|
||||
self.use_pv_mdl_eqp: int = 0
|
||||
self.use_mdl_pri: int = 0
|
||||
self.use_pv_skn_eqp: int = 1
|
||||
self.use_pv_btn_se_eqp: int = 1
|
||||
self.use_pv_sld_se_eqp: int = 1
|
||||
self.use_pv_chn_sld_se_eqp: int = 1
|
||||
self.use_pv_sldr_tch_se_eqp: int = 1
|
||||
self.vcld_pts: int = 0
|
||||
self.nxt_pv_id: int = 1
|
||||
self.nxt_dffclty: int = 1
|
||||
self.nxt_edtn: int = 0
|
||||
self.dsp_clr_brdr: int = 0
|
||||
self.dsp_intrm_rnk: int = 0
|
||||
self.dsp_clr_sts: int = 0
|
||||
self.rgo_sts: int = 0
|
||||
self.my_qst_id: str = ",".join(["-1"] * 25)
|
||||
self.my_qst_sts: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
|
||||
self.my_qst_prgrs: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
|
||||
self.my_qst_et: str = ",".join([parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))] * 5) + "," + ",".join(["xxx"] * 20)
|
||||
self.clr_sts: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
|
||||
self.mdl_eqp_tm: str = parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))
|
||||
self.mdl_eqp_ary = ",".join(["-999"] * 3)
|
||||
self.c_itm_eqp_ary = ",".join(["-999"] * 12)
|
||||
self.ms_itm_flg_ary = ",".join(["1"] * 12)
|
|
@ -4,11 +4,11 @@ import logging, coloredlogs
|
|||
from logging.handlers import TimedRotatingFileHandler
|
||||
import zlib
|
||||
import json
|
||||
import urllib.parse
|
||||
import base64
|
||||
from os import path
|
||||
from typing import Tuple
|
||||
|
||||
from titles.diva.handlers.base import *
|
||||
from core.config import CoreConfig
|
||||
from titles.diva.config import DivaConfig
|
||||
from titles.diva.const import DivaConstants
|
||||
|
@ -74,87 +74,54 @@ class DivaServlet:
|
|||
def render_POST(self, req: Request, version: int, url_path: str) -> bytes:
|
||||
req_raw = req.content.getvalue()
|
||||
url_header = req.getAllHeaders()
|
||||
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
|
||||
|
||||
# Ping Dispatch
|
||||
if "THIS_STRING_SEPARATES" in str(url_header):
|
||||
binary_request = req_raw.splitlines()
|
||||
binary_cmd_decoded = binary_request[3].decode("utf-8")
|
||||
binary_array = binary_cmd_decoded.split("&")
|
||||
|
||||
bin_req_data = {}
|
||||
req_cls = BaseBinaryRequest(binary_cmd_decoded)
|
||||
|
||||
for kvp in binary_array:
|
||||
split_bin = kvp.split("=")
|
||||
bin_req_data[split_bin[0]] = split_bin[1]
|
||||
else:
|
||||
json_string = json.dumps(
|
||||
req_raw.decode("utf-8")
|
||||
) # Take the response and decode as UTF-8 and dump
|
||||
b64string = json_string.replace(
|
||||
r"\n", "\n"
|
||||
) # Remove all \n and separate them as new lines
|
||||
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
|
||||
|
||||
self.logger.info(f"Binary {bin_req_data['cmd']} Request")
|
||||
self.logger.debug(bin_req_data)
|
||||
try:
|
||||
url_data = zlib.decompress(gz_string) # Decompressing the gzip
|
||||
except zlib.error as e:
|
||||
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
|
||||
return b"stat=0"
|
||||
|
||||
handler = getattr(self.base, f"handle_{bin_req_data['cmd']}_request")
|
||||
resp = handler(bin_req_data)
|
||||
try:
|
||||
req_cls = BaseRequest(url_data)
|
||||
except DivaRequestParseException as e:
|
||||
self.logger.error(e)
|
||||
return b"stat=0"
|
||||
|
||||
self.logger.debug(
|
||||
f"Response cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}"
|
||||
)
|
||||
return f"cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}".encode(
|
||||
"utf-8"
|
||||
)
|
||||
|
||||
# Main Dispatch
|
||||
json_string = json.dumps(
|
||||
req_raw.decode("utf-8")
|
||||
) # Take the response and decode as UTF-8 and dump
|
||||
b64string = json_string.replace(
|
||||
r"\n", "\n"
|
||||
) # Remove all \n and separate them as new lines
|
||||
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
|
||||
|
||||
try:
|
||||
url_data = zlib.decompress(gz_string).decode(
|
||||
"utf-8"
|
||||
) # Decompressing the gzip
|
||||
except zlib.error as e:
|
||||
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
|
||||
return "stat=0"
|
||||
|
||||
req_kvp = urllib.parse.unquote(url_data)
|
||||
req_data = {}
|
||||
|
||||
# We then need to split each parts with & so we can reuse them to fill out the requests
|
||||
splitted_request = str.split(req_kvp, "&")
|
||||
for kvp in splitted_request:
|
||||
split = kvp.split("=")
|
||||
req_data[split[0]] = split[1]
|
||||
|
||||
self.logger.info(f"{req_data['cmd']} Request")
|
||||
self.logger.debug(req_data)
|
||||
|
||||
func_to_find = f"handle_{req_data['cmd']}_request"
|
||||
|
||||
# Load the requests
|
||||
try:
|
||||
handler = getattr(self.base, func_to_find)
|
||||
resp = handler(req_data)
|
||||
|
||||
except AttributeError as e:
|
||||
self.logger.warning(f"Unhandled {req_data['cmd']} request {e}")
|
||||
return f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok".encode(
|
||||
"utf-8"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error handling method {func_to_find} {e}")
|
||||
return f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok".encode(
|
||||
"utf-8"
|
||||
)
|
||||
|
||||
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
|
||||
self.logger.debug(
|
||||
f"Response cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}"
|
||||
self.logger.debug(f"Request: {url_data}\nHeaders: {url_header}")
|
||||
self.logger.info(
|
||||
f"{req_cls.cmd} request from {req_cls.kc_serial}/{req_cls.b_serial} at {req.getClientAddress().host}"
|
||||
)
|
||||
|
||||
return (
|
||||
f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}".encode(
|
||||
"utf-8"
|
||||
)
|
||||
)
|
||||
handler_str = f"handle_{req_cls.cmd}_request"
|
||||
|
||||
if not hasattr(self.base, handler_str):
|
||||
self.logger.warn(f"Unhandled cmd {req_cls.cmd}")
|
||||
return BaseResponse(req_cls.cmd, req_cls.req_id).make().encode()
|
||||
|
||||
handler = getattr(self.base, handler_str)
|
||||
|
||||
response = handler(req_cls.raw)
|
||||
if response is None or response == "":
|
||||
response = BaseResponse(req_cls.cmd, req_cls.req_id).make()
|
||||
|
||||
if not response.startswith("cmd="):
|
||||
response = f"cmd={req_cls.cmd}&req_id={req_cls.req_id}&stat=ok" + response
|
||||
|
||||
self.logger.debug(f"Response: {response}")
|
||||
return response.encode(errors="ignore")
|
||||
|
|
|
@ -34,9 +34,17 @@ profile = Table(
|
|||
Column("use_pv_sld_se_eqp", Boolean, nullable=False, server_default="0"),
|
||||
Column("use_pv_chn_sld_se_eqp", Boolean, nullable=False, server_default="0"),
|
||||
Column("use_pv_sldr_tch_se_eqp", Boolean, nullable=False, server_default="0"),
|
||||
Column("btn_se_eqp", Integer, nullable=False, server_default="-1"),
|
||||
Column("sld_se_eqp", Integer, nullable=False, server_default="-1"),
|
||||
Column("chn_sld_se_eqp", Integer, nullable=False, server_default="-1"),
|
||||
Column("sldr_tch_se_eqp", Integer, nullable=False, server_default="-1"),
|
||||
Column("nxt_pv_id", Integer, nullable=False, server_default="708"),
|
||||
Column("nxt_dffclty", Integer, nullable=False, server_default="2"),
|
||||
Column("nxt_edtn", Integer, nullable=False, server_default="0"),
|
||||
Column("cnp_cid", Integer, nullable=False, server_default="-1"),
|
||||
Column("cnp_val", Integer, nullable=False, server_default="-1"),
|
||||
Column("cnp_rr", Integer, nullable=False, server_default="-1"),
|
||||
Column("cnp_sp", String(255), nullable=False, server_default=""),
|
||||
Column("dsp_clr_brdr", Integer, nullable=False, server_default="7"),
|
||||
Column("dsp_intrm_rnk", Integer, nullable=False, server_default="1"),
|
||||
Column("dsp_clr_sts", Integer, nullable=False, server_default="1"),
|
||||
|
|
|
@ -3,6 +3,7 @@ from sqlalchemy.types import Integer, String, TIMESTAMP, JSON, Boolean
|
|||
from sqlalchemy.schema import ForeignKey
|
||||
from sqlalchemy.sql import func, select
|
||||
from sqlalchemy.dialects.mysql import insert
|
||||
from sqlalchemy.engine import Row
|
||||
from typing import Optional, List, Dict, Any
|
||||
|
||||
from core.data.schema import BaseData, metadata
|
||||
|
@ -167,7 +168,7 @@ class DivaScoreData(BaseData):
|
|||
|
||||
def get_best_user_score(
|
||||
self, user_id: int, pv_id: int, difficulty: int, edition: int
|
||||
) -> Optional[Dict]:
|
||||
) -> Optional[Row]:
|
||||
sql = score.select(
|
||||
and_(
|
||||
score.c.user == user_id,
|
||||
|
@ -184,7 +185,7 @@ class DivaScoreData(BaseData):
|
|||
|
||||
def get_top3_scores(
|
||||
self, pv_id: int, difficulty: int, edition: int
|
||||
) -> Optional[List[Dict]]:
|
||||
) -> Optional[List[Row]]:
|
||||
sql = (
|
||||
score.select(
|
||||
and_(
|
||||
|
@ -204,7 +205,7 @@ class DivaScoreData(BaseData):
|
|||
|
||||
def get_global_ranking(
|
||||
self, user_id: int, pv_id: int, difficulty: int, edition: int
|
||||
) -> Optional[List]:
|
||||
) -> Optional[List[Row]]:
|
||||
# get the subquery max score of a user with pv_id, difficulty and
|
||||
# edition
|
||||
sql_sub = (
|
||||
|
@ -231,7 +232,7 @@ class DivaScoreData(BaseData):
|
|||
return None
|
||||
return result.fetchone()
|
||||
|
||||
def get_best_scores(self, user_id: int) -> Optional[List]:
|
||||
def get_best_scores(self, user_id: int) -> Optional[List[Row]]:
|
||||
sql = score.select(score.c.user == user_id)
|
||||
|
||||
result = self.execute(sql)
|
||||
|
|
Loading…
Reference in New Issue