Compare commits

...

33 Commits

Author SHA1 Message Date
257b0dae87 diva: fix render_POST 2024-01-12 12:18:23 -05:00
c6f2f7e0ef Merge branch 'develop' into diva_handler_classes 2024-01-12 12:15:28 -05:00
6b158bf18d diva: fix StageResultResponse 2023-12-06 18:24:20 -05:00
1a616cba41 diva: change how requests are decoded 2023-11-26 20:43:10 -05:00
d2d02f9483 diva: add list decoders 2023-11-26 19:24:36 -05:00
024bc352d7 diva: fix attend response 2023-11-26 02:43:49 -05:00
4831aa5628 diva: add encoder helper methods 2023-11-26 01:41:25 -05:00
dd1c7667e0 diva: fix ip logging 2023-11-20 11:29:10 -05:00
2ec1dcadef Merge branch 'develop' into diva_handler_classes 2023-11-20 10:49:48 -05:00
81c13f5008 diva: fix typing 2023-11-08 21:23:54 -05:00
572b8ddbe5 Merge branch 'develop' into diva_handler_classes 2023-11-08 21:22:24 -05:00
2b02ed8684 diva: add stage_result, end handlers 2023-10-05 23:47:50 -04:00
b2a01d20d5 diva: add configurable banner_msg 2023-10-05 12:29:02 -04:00
5e03193819 diva: fix start, spend_credit, and get_pv_pd requests 2023-10-04 23:58:26 -04:00
f836b5dd21 diva: fix start request 2023-10-04 23:25:10 -04:00
53c74c6511 diva: add register, fixes, start still busted 2023-10-04 02:18:04 -04:00
7322bc60dc Merge branch 'develop' into diva_handler_classes 2023-10-04 00:53:10 -04:00
c2ee09bce0 diva: integrate latest changes 2023-05-02 11:41:35 -04:00
dfea392b03 Merge branch 'develop' into diva_handler_classes 2023-05-02 11:36:14 -04:00
998aa70929 diva: fix for responses that haven't been updated yet 2023-04-01 23:45:28 -04:00
8df1325613 diva: implement StartResponse 2023-04-01 23:19:44 -04:00
6ff7827918 diva: fill out StartResponse 2023-04-01 23:05:10 -04:00
a36170f2c3 Merge branch 'develop' into diva_handler_classes 2023-04-01 22:23:13 -04:00
48144b53f0 fix attend 2023-03-17 01:35:51 -04:00
3b852ea739 Merge branch 'develop' into diva_handler_classes 2023-03-16 22:32:18 -04:00
21c9b23617 diva: add game_init and attend handlers 2023-03-13 04:04:08 -04:00
3076bdf575 Merge branch 'develop' into diva_handler_classes 2023-03-13 02:27:12 -04:00
5be25f89ff format with black 2023-03-09 11:50:11 -05:00
5443d24352 Merge branch 'develop' into diva_handler_classes 2023-03-09 11:49:51 -05:00
Hay1tsme
56927c049f clean up index.py 2023-02-26 11:40:13 -05:00
Hay1tsme
2b81ba206c remove unused code from index.py 2023-02-25 23:40:50 -05:00
Hay1tsme
9718e822f3 add commas to urlencode safe param 2023-02-25 19:12:48 -05:00
Hay1tsme
b07e3d6a94 add base request/response classes 2023-02-25 18:50:35 -05:00
9 changed files with 794 additions and 277 deletions

View File

@ -1,6 +1,7 @@
server: server:
enable: True enable: True
loglevel: "info" loglevel: "info"
banner_msg: ""
mods: mods:
unlock_all_modules: True unlock_all_modules: True

View File

@ -8,6 +8,7 @@ from core.config import CoreConfig
from titles.diva.config import DivaConfig from titles.diva.config import DivaConfig
from titles.diva.const import DivaConstants from titles.diva.const import DivaConstants
from titles.diva.database import DivaData from titles.diva.database import DivaData
from titles.diva.handlers import *
class DivaBase: class DivaBase:
@ -20,8 +21,8 @@ class DivaBase:
self.game = DivaConstants.GAME_CODE self.game = DivaConstants.GAME_CODE
self.version = DivaConstants.VER_PROJECT_DIVA_ARCADE_FUTURE_TONE self.version = DivaConstants.VER_PROJECT_DIVA_ARCADE_FUTURE_TONE
dt = datetime.datetime.now() dt = datetime.now()
self.time_lut = urllib.parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0")) self.time_lut = parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0"))
async def handle_test_request(self, data: Dict) -> Dict: async def handle_test_request(self, data: Dict) -> Dict:
return "" return ""
@ -29,24 +30,32 @@ class DivaBase:
async def handle_game_init_request(self, data: Dict) -> Dict: async def handle_game_init_request(self, data: Dict) -> Dict:
return f"" return f""
async def handle_attend_request(self, data: Dict) -> Dict: async def handle_attend_request(self, data: bytes) -> str:
req = AttendRequest(data)
resp = AttendResponse(req.req_id)
for i in [0, 3, 4, 5, 7, 9, 10, 11, 12, 13]:
resp.atnd_prm1[i] = 0
resp.atnd_prm1[8] = 100
resp.atnd_prm2[:6] = [30, 10, 100, 4, 1, 50]
resp.atnd_prm3[0] = 100
resp.atnd_prm3[1] = 0
resp.atnd_prm3[10:13] = [2, 3, 4]
resp.atnd_prm3[16:19] = [3, 4, 5]
resp.atnd_prm3[22:25] = [4, 5, 6]
resp.atnd_prm3[80:] = [0] * 20
resp.atnd_prm3[28:79] = [5,6,7,4,4,4,9,10,14,5,10,10,25,20,50,30,90,5,
10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,
5,10,10,25,20,50,30,90,10,30]
return resp.make()
async def handle_ping_request(self, data: bytes) -> str:
encoded = "&" encoded = "&"
params = { params = {
"atnd_prm1": "0,1,1,0,0,0,1,0,100,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1", "ping_b_msg": f"Welcome to {self.core_cfg.server.name} network!" if not self.game_config.server.banner_msg else self.game_config.server.banner_msg,
"atnd_prm2": "30,10,100,4,1,50,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1",
"atnd_prm3": "100,0,1,1,1,1,1,1,1,1,2,3,4,1,1,1,3,4,5,1,1,1,4,5,6,1,1,1,5,6,7,4,4,4,9,10,14,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,10,30,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",
"atnd_lut": f"{self.time_lut}",
}
encoded += urllib.parse.urlencode(params)
encoded = encoded.replace("%2C", ",")
return encoded
async def handle_ping_request(self, data: Dict) -> Dict:
encoded = "&"
params = {
"ping_b_msg": f"Welcome to {self.core_cfg.server.name} network!",
"ping_m_msg": "xxx", "ping_m_msg": "xxx",
"atnd_lut": f"{self.time_lut}", "atnd_lut": f"{self.time_lut}",
"fi_lut": f"{self.time_lut}", "fi_lut": f"{self.time_lut}",
@ -82,9 +91,8 @@ class DivaBase:
"nblss_ltt_ed_tm": "2019-09-22 12:00:00.0", "nblss_ltt_ed_tm": "2019-09-22 12:00:00.0",
} }
encoded += urllib.parse.urlencode(params) encoded += parse.urlencode(params, safe=",")
encoded = encoded.replace("+", "%20") encoded = encoded.replace("+", "%20")
encoded = encoded.replace("%2C", ",")
return encoded return encoded
@ -133,8 +141,8 @@ class DivaBase:
with open(r"titles/diva/data/ShopCatalog.dat", encoding="utf-8") as shop: with open(r"titles/diva/data/ShopCatalog.dat", encoding="utf-8") as shop:
lines = shop.readlines() lines = shop.readlines()
for line in lines: for line in lines:
line = urllib.parse.quote(line) + "," line = parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}" catalog += f"{parse.quote(line)}"
else: else:
for shop in shopList: for shop in shopList:
@ -153,8 +161,8 @@ class DivaBase:
+ "," + ","
+ str(shop["type"]) + str(shop["type"])
) )
line = urllib.parse.quote(line) + "," line = parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}" catalog += f"{parse.quote(line)}"
catalog = catalog.replace("+", "%20") catalog = catalog.replace("+", "%20")
@ -198,8 +206,8 @@ class DivaBase:
with open(r"titles/diva/data/ItemCatalog.dat", encoding="utf-8") as item: with open(r"titles/diva/data/ItemCatalog.dat", encoding="utf-8") as item:
lines = item.readlines() lines = item.readlines()
for line in lines: for line in lines:
line = urllib.parse.quote(line) + "," line = parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}" catalog += f"{parse.quote(line)}"
else: else:
for item in itemList: for item in itemList:
@ -218,8 +226,8 @@ class DivaBase:
+ "," + ","
+ str(item["type"]) + str(item["type"])
) )
line = urllib.parse.quote(line) + "," line = parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}" catalog += f"{parse.quote(line)}"
catalog = catalog.replace("+", "%20") catalog = catalog.replace("+", "%20")
@ -276,11 +284,11 @@ class DivaBase:
"fi_add_vp": "20,5", "fi_add_vp": "20,5",
"fi_mul_vp": "1,2", "fi_mul_vp": "1,2",
"fi_st": "2019-01-01 00:00:00.0,2019-01-01 00:00:00.0", "fi_st": "2019-01-01 00:00:00.0,2019-01-01 00:00:00.0",
"fi_et": "2029-01-01 00:00:00.0,2029-01-01 00:00:00.0", "fi_et": "2029-01-01 00:00:00.0,2029-01-01 00:00:00.0", # TODO: make this last longer
"fi_lut": "{self.time_lut}", "fi_lut": f"{self.time_lut}",
} }
encoded += urllib.parse.urlencode(params) encoded += parse.urlencode(params)
encoded = encoded.replace("+", "%20") encoded = encoded.replace("+", "%20")
encoded = encoded.replace("%2C", ",") encoded = encoded.replace("%2C", ",")
@ -302,7 +310,7 @@ class DivaBase:
with open(r"titles/diva/data/QuestInfo.dat", encoding="utf-8") as shop: with open(r"titles/diva/data/QuestInfo.dat", encoding="utf-8") as shop:
lines = shop.readlines() lines = shop.readlines()
for line in lines: for line in lines:
quest += f"{urllib.parse.quote(line)}," quest += f"{parse.quote(line)},"
response = "" response = ""
response += f"&qi_lut={self.time_lut}" response += f"&qi_lut={self.time_lut}"
@ -330,7 +338,7 @@ class DivaBase:
+ "," + ","
+ str(quests["quest_enable"]) + str(quests["quest_enable"])
) )
quest += f"{urllib.parse.quote(line)}%0A," quest += f"{parse.quote(line)}%0A,"
responseline = f"{quest[:-1]}," responseline = f"{quest[:-1]},"
for i in range(len(questList), 59): for i in range(len(questList), 59):
@ -380,130 +388,72 @@ class DivaBase:
async def handle_pstd_item_ng_lst_request(self, data: Dict) -> Dict: async def handle_pstd_item_ng_lst_request(self, data: Dict) -> Dict:
return f"" return f""
async def handle_pre_start_request(self, data: Dict) -> str: async def handle_pre_start_request(self, data: bytes) -> str:
profile = await self.data.profile.get_profile(data["aime_id"], self.version) req = PreStartRequest(data)
profile_shop = await self.data.item.get_shop(data["aime_id"], self.version) resp = PreStartResponse(req.req_id, req.aime_id)
profile = await self.data.profile.get_profile(req.aime_id, self.version)
profile_shop = await self.data.item.get_shop(req.aime_id, self.version)
if profile is None: if profile is None:
return f"&ps_result=-3" return f"&ps_result=-3"
else:
response = "&ps_result=1"
response += "&accept_idx=100"
response += "&nblss_ltt_stts=-1"
response += "&nblss_ltt_tckt=-1"
response += "&nblss_ltt_is_opn=-1"
response += f"&pd_id={data['aime_id']}"
response += f"&player_name={profile['player_name']}"
response += f"&sort_kind={profile['player_name']}"
response += f"&lv_efct_id={profile['lv_efct_id']}"
response += f"&lv_plt_id={profile['lv_plt_id']}"
response += f"&lv_str={profile['lv_str']}"
response += f"&lv_num={profile['lv_num']}"
response += f"&lv_pnt={profile['lv_pnt']}"
response += f"&vcld_pts={profile['vcld_pts']}"
response += f"&skn_eqp={profile['skn_eqp']}"
response += f"&btn_se_eqp={profile['btn_se_eqp']}"
response += f"&sld_se_eqp={profile['sld_se_eqp']}"
response += f"&chn_sld_se_eqp={profile['chn_sld_se_eqp']}"
response += f"&sldr_tch_se_eqp={profile['sldr_tch_se_eqp']}"
response += f"&passwd_stat={profile['passwd_stat']}"
# Store stuff to add to rework profile_dict = profile._asdict()
response += f"&mdl_eqp_tm={self.time_lut}" profile_dict.pop("id")
profile_dict.pop("user")
profile_dict.pop("version")
mdl_eqp_ary = "-999,-999,-999" for k, v in profile_dict.items():
if hasattr(resp, k):
setattr(resp, k, v)
# get the common_modules from the profile shop if profile_shop is not None and profile_shop:
if profile_shop: resp.mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
response += f"&mdl_eqp_ary={mdl_eqp_ary}" return resp.make()
return response async def handle_registration_request(self, data: bytes) -> str:
req = RegisterRequest(data)
async def handle_registration_request(self, data: Dict) -> Dict: pd_id = await self.data.profile.create_profile(
await self.data.profile.create_profile( self.version, req.aime_id, req.player_name
self.version, data["aime_id"], data["player_name"]
) )
return f"&cd_adm_result=1&pd_id={data['aime_id']}" if pd_id is None:
return "&cd_adm_result=-1"
return RegisterResponse(req.req_id, req.aime_id).make()
async def handle_start_request(self, data: Dict) -> Dict: async def handle_start_request(self, data: bytes) -> str:
profile = await self.data.profile.get_profile(data["pd_id"], self.version) req = StartRequest(data)
profile_shop = await self.data.item.get_shop(data["pd_id"], self.version) profile = await self.data.profile.get_profile(req.pd_id, self.version)
profile_shop = await self.data.item.get_shop(req.pd_id, self.version)
if profile is None: if profile is None:
return return
resp = StartResponse(req.req_id, req.pd_id, profile['player_name'])
profile_dict = profile._asdict()
profile_dict.pop("id")
profile_dict.pop("user")
profile_dict.pop("version")
for k, v in profile_dict.items():
if hasattr(resp, k):
setattr(resp, k, v)
mdl_have = "F" * 250
# generate the mdl_have string if "unlock_all_modules" is disabled # generate the mdl_have string if "unlock_all_modules" is disabled
if not self.game_config.mods.unlock_all_modules: if not self.game_config.mods.unlock_all_modules:
mdl_have = await self.data.module.get_modules_have_string( resp.mdl_have = await self.data.module.get_modules_have_string(
data["pd_id"], self.version req.pd_id, self.version
) )
cstmz_itm_have = "F" * 250
# generate the cstmz_itm_have string if "unlock_all_items" is disabled # generate the cstmz_itm_have string if "unlock_all_items" is disabled
if not self.game_config.mods.unlock_all_items: if not self.game_config.mods.unlock_all_items:
cstmz_itm_have = await self.data.customize.get_customize_items_have_string( resp.cstmz_itm_have = await self.data.customize.get_customize_items_have_string(
data["pd_id"], self.version req.pd_id, self.version
) )
response = f"&pd_id={data['pd_id']}"
response += "&start_result=1"
response += "&accept_idx=100"
response += f"&hp_vol={profile['hp_vol']}"
response += f"&btn_se_vol={profile['btn_se_vol']}"
response += f"&btn_se_vol2={profile['btn_se_vol2']}"
response += f"&sldr_se_vol2={profile['sldr_se_vol2']}"
response += f"&sort_kind={profile['sort_kind']}"
response += f"&player_name={profile['player_name']}"
response += f"&lv_num={profile['lv_num']}"
response += f"&lv_pnt={profile['lv_pnt']}"
response += f"&lv_efct_id={profile['lv_efct_id']}"
response += f"&lv_plt_id={profile['lv_plt_id']}"
response += f"&mdl_have={mdl_have}"
response += f"&cstmz_itm_have={cstmz_itm_have}"
response += f"&use_pv_mdl_eqp={int(profile['use_pv_mdl_eqp'])}"
response += f"&use_mdl_pri={int(profile['use_mdl_pri'])}"
response += f"&use_pv_skn_eqp={int(profile['use_pv_skn_eqp'])}"
response += f"&use_pv_btn_se_eqp={int(profile['use_pv_btn_se_eqp'])}"
response += f"&use_pv_sld_se_eqp={int(profile['use_pv_sld_se_eqp'])}"
response += f"&use_pv_chn_sld_se_eqp={int(profile['use_pv_chn_sld_se_eqp'])}"
response += f"&use_pv_sldr_tch_se_eqp={int(profile['use_pv_sldr_tch_se_eqp'])}"
response += f"&vcld_pts={profile['lv_efct_id']}"
response += f"&nxt_pv_id={profile['nxt_pv_id']}"
response += f"&nxt_dffclty={profile['nxt_dffclty']}"
response += f"&nxt_edtn={profile['nxt_edtn']}"
response += f"&dsp_clr_brdr={profile['dsp_clr_brdr']}"
response += f"&dsp_intrm_rnk={profile['dsp_intrm_rnk']}"
response += f"&dsp_clr_sts={profile['dsp_clr_sts']}"
response += f"&rgo_sts={profile['rgo_sts']}"
# Contest progress
response += f"&cv_cid=-1,-1,-1,-1"
response += f"&cv_sc=-1,-1,-1,-1"
response += f"&cv_bv=-1,-1,-1,-1"
response += f"&cv_bv=-1,-1,-1,-1"
response += f"&cv_bf=-1,-1,-1,-1"
# Contest now playing id, return -1 if no current playing contest
response += f"&cnp_cid={profile['cnp_cid']}"
response += f"&cnp_val={profile['cnp_val']}"
# border can be 0=bronzem 1=silver, 2=gold
response += f"&cnp_rr={profile['cnp_rr']}"
# only show contest specifier if it is not empty
response += f"&cnp_sp={profile['cnp_sp']}" if profile["cnp_sp"] != "" else ""
# To be fully fixed # To be fully fixed
if "my_qst_id" not in profile: if "my_qst_id" in profile:
response += f"&my_qst_id=-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1" resp.my_qst_id = profile['my_qst_id']
response += f"&my_qst_sts=0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1" resp.my_qst_sts = profile['my_qst_sts']
else:
response += f"&my_qst_id={profile['my_qst_id']}"
response += f"&my_qst_sts={profile['my_qst_sts']}"
response += f"&my_qst_prgrs=0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
response += f"&my_qst_et=2022-06-19%2010%3A28%3A52.0,2022-06-19%2010%3A28%3A52.0,2022-06-19%2010%3A28%3A52.0,2100-01-01%2008%3A59%3A59.0,2100-01-01%2008%3A59%3A59.0,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx"
# define a helper class to store all counts for clear, great, # define a helper class to store all counts for clear, great,
# excellent and perfect # excellent and perfect
@ -524,7 +474,7 @@ class DivaBase:
} }
# get clear status from user scores # get clear status from user scores
pv_records = await self.data.score.get_best_scores(data["pd_id"]) pv_records = await self.data.score.get_best_scores(req.pd_id)
clear_status = "0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0" clear_status = "0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
if pv_records is not None: if pv_records is not None:
@ -560,46 +510,33 @@ class DivaBase:
clear_status = ",".join(map(str, clear_list)) clear_status = ",".join(map(str, clear_list))
response += f"&clr_sts={clear_status}" resp.clr_sts = clear_status
# Store stuff to add to rework
response += f"&mdl_eqp_tm={self.time_lut}"
mdl_eqp_ary = "-999,-999,-999"
c_itm_eqp_ary = "-999,-999,-999,-999,-999,-999,-999,-999,-999,-999,-999,-999"
ms_itm_flg_ary = "1,1,1,1,1,1,1,1,1,1,1,1"
# get the common_modules, customize_items and customize_item_flags # get the common_modules, customize_items and customize_item_flags
# from the profile shop # from the profile shop
if profile_shop: if profile_shop:
mdl_eqp_ary = profile_shop["mdl_eqp_ary"] resp.mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
c_itm_eqp_ary = profile_shop["c_itm_eqp_ary"] resp.c_itm_eqp_ary = profile_shop["c_itm_eqp_ary"]
ms_itm_flg_ary = profile_shop["ms_itm_flg_ary"] resp.ms_itm_flg_ary = profile_shop["ms_itm_flg_ary"]
response += f"&mdl_eqp_ary={mdl_eqp_ary}" return resp.make()
response += f"&c_itm_eqp_ary={c_itm_eqp_ary}"
response += f"&ms_itm_flg_ary={ms_itm_flg_ary}"
return response async def handle_pd_unlock_request(self, data: bytes) -> str:
pass
async def handle_pd_unlock_request(self, data: Dict) -> Dict: async def handle_spend_credit_request(self, data: bytes) -> str:
return f"" req = SpendCreditRequest(data)
profile = await self.data.profile.get_profile(req.pd_id, self.version)
async def handle_spend_credit_request(self, data: Dict) -> Dict:
profile = await self.data.profile.get_profile(data["pd_id"], self.version)
if profile is None: if profile is None:
return return
response = "" resp = SpendCreditResponse(req.req_id)
resp.vcld_pts = profile['vcld_pts']
resp.lv_str = profile['lv_str']
resp.lv_efct_id = profile['lv_efct_id']
resp.lv_plt_id = profile['lv_plt_id']
response += "&cmpgn_rslt=-1,-1,x,-1,-1,x,x,-1,x,-1,-1,x,-1,-1,x,x,-1,x,-1,-1,x,-1,-1,x,x,-1,x,-1,-1,x,-1,-1,x,x,-1,x,-1,-1,x,-1,-1,x,x,-1,x,-1,-1,x,-1,-1,x,x,-1,x" return resp.make()
response += "&cmpgn_rslt_num=0"
response += f"&vcld_pts={profile['vcld_pts']}"
response += f"&lv_str={profile['lv_str']}"
response += f"&lv_efct_id={profile['lv_efct_id']}"
response += f"&lv_plt_id={profile['lv_plt_id']}"
return response
def _get_pv_pd_result( def _get_pv_pd_result(
self, self,
@ -663,31 +600,31 @@ class DivaBase:
return pv_result return pv_result
async def task_generateScoreData(self, data: Dict, pd_by_pv_id, song): async def task_generateScoreData(self, pd_id: int, difficulty: int, pd_by_pv_id: str, song: int):
if int(song) > 0: if int(song) > 0:
# the request do not send a edition so just perform a query best score and ranking for each edition. # the request do not send a edition so just perform a query best score and ranking for each edition.
# 0=ORIGINAL, 1=EXTRA # 0=ORIGINAL, 1=EXTRA
pd_db_song_0 = await self.data.score.get_best_user_score( pd_db_song_0 = await self.data.score.get_best_user_score(
data["pd_id"], int(song), data["difficulty"], edition=0 pd_id, int(song), difficulty, edition=0
) )
pd_db_song_1 = await self.data.score.get_best_user_score( pd_db_song_1 = await self.data.score.get_best_user_score(
data["pd_id"], int(song), data["difficulty"], edition=1 pd_id, int(song), difficulty, edition=1
) )
pd_db_ranking_0, pd_db_ranking_1 = None, None pd_db_ranking_0, pd_db_ranking_1 = None, None
if pd_db_song_0: if pd_db_song_0:
pd_db_ranking_0 = await self.data.score.get_global_ranking( pd_db_ranking_0 = await self.data.score.get_global_ranking(
data["pd_id"], int(song), data["difficulty"], edition=0 pd_id, int(song), difficulty, edition=0
) )
if pd_db_song_1: if pd_db_song_1:
pd_db_ranking_1 = await self.data.score.get_global_ranking( pd_db_ranking_1 = await self.data.score.get_global_ranking(
data["pd_id"], int(song), data["difficulty"], edition=1 pd_id, int(song), difficulty, edition=1
) )
pd_db_customize = await self.data.pv_customize.get_pv_customize( pd_db_customize = await self.data.pv_customize.get_pv_customize(
data["pd_id"], int(song) pd_id, int(song)
) )
# generate the pv_result string with the ORIGINAL edition and the EXTRA edition appended # generate the pv_result string with the ORIGINAL edition and the EXTRA edition appended
@ -705,14 +642,14 @@ class DivaBase:
pd_by_pv_id.append(",") pd_by_pv_id.append(",")
async def handle_get_pv_pd_request(self, data: Dict) -> Dict: async def handle_get_pv_pd_request(self, data: Dict) -> Dict:
song_id = data["pd_pv_id_lst"].split(",") req = GetPvPdRequest(data)
pv = "" pv = ""
threads = [] threads = []
pd_by_pv_id = [] pd_by_pv_id = []
for song in song_id: for song in req.pd_pv_id_lst:
thread_ScoreData = Thread(target=await self.task_generateScoreData(data, pd_by_pv_id, song)) thread_ScoreData = Thread(target=await self.task_generateScoreData(req.pd_id, req.difficulty, pd_by_pv_id, song))
threads.append(thread_ScoreData) threads.append(thread_ScoreData)
for x in threads: for x in threads:
@ -724,44 +661,48 @@ class DivaBase:
for x in pd_by_pv_id: for x in pd_by_pv_id:
pv += x pv += x
resp = GetPvPdResponse(req.req_id)
resp.pd_by_pv_id = pv[:-1]
response = "" response = ""
response += f"&pd_by_pv_id={pv[:-1]}" response += f"&pd_by_pv_id={pv[:-1]}"
response += "&pdddt_flg=0" response += "&pdddt_flg=0"
response += f"&pdddt_tm={self.time_lut}" response += f"&pdddt_tm={self.time_lut}"
return response return resp.make()
async def handle_stage_start_request(self, data: Dict) -> Dict: async def handle_stage_start_request(self, data: Dict) -> Dict:
return f"" return f""
async def handle_stage_result_request(self, data: Dict) -> Dict: async def handle_stage_result_request(self, data: bytes) -> str:
profile = await self.data.profile.get_profile(data["pd_id"], self.version) req = StageResultRequest(data)
resp = StageResultResponse(req.cmd, req.req_id)
profile = await self.data.profile.get_profile(req.pd_id, self.version)
pd_song_list = data["stg_ply_pv_id"].split(",") pd_song_list = req.stg_ply_pv_id
pd_song_difficulty = data["stg_difficulty"].split(",") pd_song_difficulty = req.stg_difficulty
pd_song_edition = data["stg_edtn"].split(",") pd_song_edition = req.stg_edtn
pd_song_max_score = data["stg_score"].split(",") pd_song_max_score = req.stg_score
pd_song_max_atn_pnt = data["stg_atn_pnt"].split(",") pd_song_max_atn_pnt = req.stg_atn_pnt
pd_song_ranking = data["stg_clr_kind"].split(",") pd_song_ranking = req.stg_clr_kind
pd_song_sort_kind = data["sort_kind"] pd_song_sort_kind = req.sort_kind
pd_song_cool_cnt = data["stg_cool_cnt"].split(",") pd_song_cool_cnt = req.stg_cool_cnt
pd_song_fine_cnt = data["stg_fine_cnt"].split(",") pd_song_fine_cnt = req.stg_fine_cnt
pd_song_safe_cnt = data["stg_safe_cnt"].split(",") pd_song_safe_cnt = req.stg_safe_cnt
pd_song_sad_cnt = data["stg_sad_cnt"].split(",") pd_song_sad_cnt = req.stg_sad_cnt
pd_song_worst_cnt = data["stg_wt_wg_cnt"].split(",") pd_song_worst_cnt = req.stg_wt_wg_cnt
pd_song_max_combo = data["stg_max_cmb"].split(",") pd_song_max_combo = req.stg_max_cmb
for index, value in enumerate(pd_song_list): for index, value in enumerate(pd_song_list):
if "-1" not in pd_song_list[index]: if pd_song_list[index] > 0:
profile_pd_db_song = await self.data.score.get_best_user_score( profile_pd_db_song = await self.data.score.get_best_user_score(
data["pd_id"], req.pd_id,
pd_song_list[index], pd_song_list[index],
pd_song_difficulty[index], pd_song_difficulty[index],
pd_song_edition[index], pd_song_edition[index],
) )
if profile_pd_db_song is None: if profile_pd_db_song is None:
await self.data.score.put_best_score( await self.data.score.put_best_score(
data["pd_id"], req.pd_id,
self.version, self.version,
pd_song_list[index], pd_song_list[index],
pd_song_difficulty[index], pd_song_difficulty[index],
@ -778,7 +719,7 @@ class DivaBase:
pd_song_max_combo[index], pd_song_max_combo[index],
) )
await self.data.score.put_playlog( await self.data.score.put_playlog(
data["pd_id"], req.pd_id,
self.version, self.version,
pd_song_list[index], pd_song_list[index],
pd_song_difficulty[index], pd_song_difficulty[index],
@ -796,7 +737,7 @@ class DivaBase:
) )
elif int(pd_song_max_score[index]) >= int(profile_pd_db_song["score"]): elif int(pd_song_max_score[index]) >= int(profile_pd_db_song["score"]):
await self.data.score.put_best_score( await self.data.score.put_best_score(
data["pd_id"], req.pd_id,
self.version, self.version,
pd_song_list[index], pd_song_list[index],
pd_song_difficulty[index], pd_song_difficulty[index],
@ -813,7 +754,7 @@ class DivaBase:
pd_song_max_combo[index], pd_song_max_combo[index],
) )
await self.data.score.put_playlog( await self.data.score.put_playlog(
data["pd_id"], req.pd_id,
self.version, self.version,
pd_song_list[index], pd_song_list[index],
pd_song_difficulty[index], pd_song_difficulty[index],
@ -831,7 +772,7 @@ class DivaBase:
) )
elif int(pd_song_max_score[index]) != int(profile_pd_db_song["score"]): elif int(pd_song_max_score[index]) != int(profile_pd_db_song["score"]):
await self.data.score.put_playlog( await self.data.score.put_playlog(
data["pd_id"], req.pd_id,
self.version, self.version,
pd_song_list[index], pd_song_list[index],
pd_song_difficulty[index], pd_song_difficulty[index],
@ -860,6 +801,20 @@ class DivaBase:
new_level = (total_atn_pnt // 13979) + 1 new_level = (total_atn_pnt // 13979) + 1
new_level_pnt = round((total_atn_pnt % 13979) / 13979 * 100) new_level_pnt = round((total_atn_pnt % 13979) / 13979 * 100)
resp.lv_num_old = int(profile['lv_num'])
resp.lv_pnt_old = int(profile['lv_pnt'])
resp.lv_num = new_level
resp.lv_str = profile['lv_str']
resp.lv_pnt = new_level_pnt
resp.lv_efct_id = int(profile['lv_efct_id'])
resp.lv_plt_id = int(profile['lv_plt_id'])
resp.vcld_pts = int(profile['vcld_pts'])
resp.prsnt_vcld_pts = int(profile['vcld_pts'])
if "my_qst_id" not in profile:
quests = profile['my_qst_id'].split(",")
for x in range(len(quests)):
resp.my_qst_id[x] = int(quests[x])
response = "&chllng_kind=-1" response = "&chllng_kind=-1"
response += f"&lv_num_old={int(profile['lv_num'])}" response += f"&lv_num_old={int(profile['lv_num'])}"
response += f"&lv_pnt_old={int(profile['lv_pnt'])}" response += f"&lv_pnt_old={int(profile['lv_pnt'])}"
@ -869,16 +824,16 @@ class DivaBase:
profile["user"], profile["user"],
lv_num=new_level, lv_num=new_level,
lv_pnt=new_level_pnt, lv_pnt=new_level_pnt,
vcld_pts=int(data["vcld_pts"]), vcld_pts=req.vcld_pts,
hp_vol=int(data["hp_vol"]), hp_vol=req.hp_vol,
btn_se_vol=int(data["btn_se_vol"]), btn_se_vol=req.btn_se_vol,
sldr_se_vol2=int(data["sldr_se_vol2"]), sldr_se_vol2=req.sldr_se_vol2,
sort_kind=int(data["sort_kind"]), sort_kind=req.sort_kind,
nxt_pv_id=int(data["ply_pv_id"]), nxt_pv_id=req.ply_pv_id,
nxt_dffclty=int(data["nxt_dffclty"]), nxt_dffclty=req.nxt_dffclty,
nxt_edtn=int(data["nxt_edtn"]), nxt_edtn=req.nxt_edtn,
my_qst_id=data["my_qst_id"], my_qst_id=req.my_qst_id,
my_qst_sts=data["my_qst_sts"], my_qst_sts=req.my_qst_sts,
) )
response += f"&lv_num={new_level}" response += f"&lv_num={new_level}"
@ -911,17 +866,18 @@ class DivaBase:
response += "&my_ccd_r_hnd=-1,-1,-1,-1,-1" response += "&my_ccd_r_hnd=-1,-1,-1,-1,-1"
response += "&my_ccd_r_vp=-1,-1,-1,-1,-1" response += "&my_ccd_r_vp=-1,-1,-1,-1,-1"
return response return resp.make()
async def handle_end_request(self, data: Dict) -> Dict: async def handle_end_request(self, data: bytes) -> str:
profile = await self.data.profile.get_profile(data["pd_id"], self.version) req = EndRequest(data)
profile = await self.data.profile.get_profile(req.pd_id, self.version)
await self.data.profile.update_profile( await self.data.profile.update_profile(
profile["user"], my_qst_id=data["my_qst_id"], my_qst_sts=data["my_qst_sts"] profile["user"], my_qst_id=req.my_qst_id, my_qst_sts=req.my_qst_sts
) )
return f"" return None
async def handle_shop_exit_request(self, data: Dict) -> Dict: async def handle_shop_exit_request(self, data: bytes) -> str:
await self.data.item.put_shop( await self.data.item.put_shop(
data["pd_id"], data["pd_id"],
self.version, self.version,

View File

@ -18,6 +18,12 @@ class DivaServerConfig:
self.__config, "diva", "server", "loglevel", default="info" self.__config, "diva", "server", "loglevel", default="info"
) )
) )
@property
def banner_msg(self) -> str:
CoreConfig.get_config_field(
self.__config, "diva", "server", "banner_msg", default=""
)
class DivaModsConfig: class DivaModsConfig:

View File

@ -6,6 +6,8 @@ class DivaConstants:
VER_PROJECT_DIVA_ARCADE = 0 VER_PROJECT_DIVA_ARCADE = 0
VER_PROJECT_DIVA_ARCADE_FUTURE_TONE = 1 VER_PROJECT_DIVA_ARCADE_FUTURE_TONE = 1
LUT_TIME_FMT = "%Y-%m-%d %H:%M:%S:16.0"
VERSION_NAMES = ("Project Diva Arcade", "Project Diva Arcade Future Tone") VERSION_NAMES = ("Project Diva Arcade", "Project Diva Arcade Future Tone")
@classmethod @classmethod

View File

@ -0,0 +1,3 @@
from .base import *
from .user import *
from .pv import *

View File

@ -0,0 +1,275 @@
from urllib import parse
from urllib.parse import quote
from datetime import datetime
from typing import Union, Dict, List, Any
from ..const import DivaConstants
def lazy_http_form_parse(src: Union[str, bytes]) -> Dict[bytes, bytes]:
out = {}
if type(src) == str:
src = src.encode()
for param in src.split(b"&"):
kvp = param.split(b"=")
out[parse.unquote(kvp[0])] = parse.unquote(kvp[1])
return out
class DivaRequestParseException(Exception):
"""
Exception raised when there is a fault in parsing a diva request,
either due to a malformed request, or missing required items
"""
def __init__(self, message: str) -> None:
self.message = message
super().__init__(self.message)
class BaseBinaryRequest:
cmd: str
req_id: str
def __init__(self, raw: bytes) -> None:
self.raw = raw
self.raw_dict = dict(parse.parse_qsl(raw))
if "cmd" not in self.raw_dict:
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
if "req_id" not in self.raw_dict:
raise DivaRequestParseException(
f"req_id not in request data {self.raw_dict}"
)
for k, v in self.raw_dict.items():
setattr(self, k, v)
class BaseRequest:
def __init__(self, raw: Union[str, bytes]) -> None:
self.raw = raw
try:
self.raw_dict: Dict[str, str] = lazy_http_form_parse(raw)
except UnicodeDecodeError as e:
raise DivaRequestParseException(f"Could not decode data {raw} - {e}")
if "cmd" not in self.raw_dict:
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
if "req_id" not in self.raw_dict:
raise DivaRequestParseException(
f"req_id not in request data {self.raw_dict}"
)
if "place_id" not in self.raw_dict:
raise DivaRequestParseException(
f"place_id not in request data {self.raw_dict}"
)
if "start_up_mode" not in self.raw_dict:
raise DivaRequestParseException(
f"start_up_mode not in request data {self.raw_dict}"
)
if "cmm_dly_mod" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_dly_mod not in request data {self.raw_dict}"
)
if "cmm_dly_sec" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_dly_sec not in request data {self.raw_dict}"
)
if "cmm_err_mod" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_err_mod not in request data {self.raw_dict}"
)
if "region_code" not in self.raw_dict:
raise DivaRequestParseException(
f"region_code not in request data {self.raw_dict}"
)
if "time_stamp" not in self.raw_dict:
raise DivaRequestParseException(
f"time_stamp not in request data {self.raw_dict}"
)
self.cmd: str = self.raw_dict.get('cmd')
self.req_id: str = self.raw_dict.get('req_id')
self.game_id: str = self.raw_dict.get('game_id')
self.r_rev: str = self.raw_dict.get('r_rev')
self.kc_serial: str = self.raw_dict.get('kc_serial')
self.b_serial: str = self.raw_dict.get('b_serial')
self.country_code: str = self.raw_dict.get('country_code')
self.place_id = int(self.raw_dict['place_id'], 16)
self.start_up_mode = int(self.raw_dict.get('start_up_mode'))
self.cmm_dly_mod = int(self.raw_dict.get('cmm_dly_mod'))
self.cmm_dly_sec = int(self.raw_dict.get('cmm_dly_sec'))
self.cmm_err_mod = int(self.raw_dict.get('cmm_err_mod'))
self.region_code = int(self.raw_dict.get('region_code'))
# datetime.now().astimezone().replace(microsecond=0).isoformat()
self.time_stamp = datetime.strptime(self.raw_dict.get('time_stamp'), "%Y-%m-%dT%H:%M:%S%z")
class BaseResponse:
def __init__(self, cmd_id: str, req_id: int) -> None:
self.cmd = cmd_id
self.req_id = req_id
self.stat = "ok"
def make(self) -> str:
itms: List[str] = []
for k, v in vars(self).items():
if type(v) == int:
itms.append(encode_int(k, v))
elif type(v) == bool:
itms.append(encode_bool(k, v))
elif type(v) == datetime:
itms.append(encode_date(k, v))
elif type(v) == list:
itms.append(encode_list(k, v))
else:
itms.append(encode_str(k, v))
return "&".join(itms)
class GameInitRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
class AttendRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
if 'power_on' not in self.raw_dict:
raise DivaRequestParseException(
f"power_on not in request data {self.raw_dict}"
)
if 'is_bb' not in self.raw_dict:
raise DivaRequestParseException(
f"is_bb not in request data {self.raw_dict}"
)
self.power_on = int(self.raw_dict.get('power_on'))
self.is_bb = bool(int(self.raw_dict.get('is_bb')))
class AttendResponse(BaseResponse):
def __init__(self, req_id: int) -> None:
super().__init__("attend", req_id)
self.atnd_prm1 = [1] * 100
self.atnd_prm2 = [1] * 100
self.atnd_prm3 = [1] * 100
self.atnd_lut = datetime.now()
def make(self) -> str:
return quote(super().make(), safe=",&=")
class SpendCreditRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
if 'pd_id' not in self.raw_dict:
raise DivaRequestParseException(
f"pd_id not in request data {self.raw_dict}"
)
if 'my_qst_id' not in self.raw_dict:
raise DivaRequestParseException(
f"my_qst_id not in request data {self.raw_dict}"
)
if 'my_qst_sts' not in self.raw_dict:
raise DivaRequestParseException(
f"my_qst_sts not in request data {self.raw_dict}"
)
if 'crdt_typ' not in self.raw_dict:
raise DivaRequestParseException(
f"crdt_typ not in request data {self.raw_dict}"
)
if 'cmpgn_id' not in self.raw_dict:
raise DivaRequestParseException(
f"cmpgn_id not in request data {self.raw_dict}"
)
if 'cmpgn_pb' not in self.raw_dict:
raise DivaRequestParseException(
f"cmpgn_pb not in request data {self.raw_dict}"
)
self.pd_id = int(self.raw_dict.get('pd_id'))
self.my_qst_id = decode_list_int(self.raw_dict.get('my_qst_id'))
self.my_qst_sts = decode_list_int(self.raw_dict.get('my_qst_sts'))
self.crdt_typ = int(self.raw_dict.get('crdt_typ'))
self.cmpgn_id = decode_list_int(self.raw_dict.get('cmpgn_id'))
self.cmpgn_pb = decode_list_int(self.raw_dict.get('cmpgn_pb'))
class SpendCreditResponse(BaseResponse):
def __init__(self, req_id: int) -> None:
super().__init__("spend_credit", req_id)
self.cmpgn_rslt = ",".join(["-1,-1,x,-1,-1,x,x,-1,x"] * 6)
self.cmpgn_rslt_num = 0
self.vcld_pts = 0
self.lv_str = ""
self.lv_efct_id = 0
self.lv_plt_id = 0
def encode_int(key: str, val: Union[int, None] = None) -> str:
if type(val) != int:
val = -1
return f"{key}={val}"
def encode_bool(key: str, val: Union[bool, None] = None) -> str:
if not val:
return encode_int(key, 0)
return encode_int(key, 1)
def encode_str(key: str, val: Union[str, None] = None, urlencode_val: bool = False) -> str:
if type(val) != str:
val = "xxx"
if urlencode_val:
val = quote(val)
return f"{key}={val}"
def encode_date(key: str, val: Union[datetime, None], urlencode_val: bool = True, fmt: str = DivaConstants.LUT_TIME_FMT) -> str:
if type(val) != datetime:
val = datetime.now().astimezone()
val = val.replace(microsecond=0)
dt_fmt = val.strftime(fmt)
if urlencode_val:
dt_fmt = quote(dt_fmt)
return f"{key}={dt_fmt}"
def encode_list(key: str, val: Union[List[Any], None], urlencode_final_val: bool = False, urlencode_vals: bool = False) -> str:
if not val:
return f"{key}="
for x in range(len(val)):
if val[x] is None:
val[x] = "x"
if type(val[x]) == datetime:
val[x] = val[x].replace(microsecond=0).strftime(DivaConstants.LUT_TIME_FMT)
elif type(val[x]) == bool:
val[x] = str(int(val[x]))
elif type(val[x]) == int:
val[x] = str(val[x])
if urlencode_vals:
val[x] = quote(val[x])
all_vals = ",".join(val)
if urlencode_final_val:
all_vals = quote(all_vals)
return f"{key}={all_vals}"
def decode_list_int(val: str) -> List[int]:
return [int(x) for x in val.split(",")]
def decode_list(val: str) -> List[str]:
return [x for x in val.split(",")]

156
titles/diva/handlers/pv.py Normal file
View File

@ -0,0 +1,156 @@
from typing import Union, List
from titles.diva.handlers.base import (
BaseRequest,
BaseResponse,
DivaRequestParseException,
decode_list_int
)
from datetime import datetime
from urllib import parse
from ..const import DivaConstants
class GetPvPdRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
try:
self.pd_id = int(self.raw_dict.get('pd_id'))
self.accept_idx = int(self.raw_dict.get('accept_idx'))
self.start_idx = int(self.raw_dict.get('start_idx'))
self.difficulty = int(self.raw_dict.get('difficulty'))
self.pd_pv_id_lst: List[int] = [int(x) for x in self.pd_pv_id_lst.split(',')]
except AttributeError as e:
raise DivaRequestParseException(f"GetPvPdRequest: {e}")
class GetPvPdResponse(BaseResponse):
def __init__(self, req_id: int) -> None:
super().__init__("get_pv_pd", req_id)
self.pd_by_pv_id = ""
self.pdddt_flg = 0
self.pdddt_tm = parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))
class StageResultRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
self.pd_id = int(self.raw_dict.get('pd_id'))
self.accept_idx = int(self.raw_dict.get('accept_idx'))
self.start_idx = int(self.raw_dict.get('start_idx'))
self.hp_vol = int(self.raw_dict.get('hp_vol'))
self.btn_se_vol = int(self.raw_dict.get('btn_se_vol'))
self.btn_se_vol2 = int(self.raw_dict.get('btn_se_vol2'))
self.sldr_se_vol2 = int(self.raw_dict.get('sldr_se_vol2'))
self.use_pv_mdl_eqp = int(self.raw_dict.get('use_pv_mdl_eqp'))
self.vcld_pts = int(self.raw_dict.get('vcld_pts'))
self.nxt_pv_id = int(self.raw_dict.get('nxt_pv_id'))
self.nxt_dffclty = int(self.raw_dict.get('nxt_dffclty'))
self.nxt_edtn = int(self.raw_dict.get('nxt_edtn'))
self.sort_kind = int(self.raw_dict.get('sort_kind'))
self.nblss_ltt_stts = int(self.raw_dict.get('nblss_ltt_stts'))
self.nblss_ltt_tckt = int(self.raw_dict.get('nblss_ltt_tckt'))
self.free_play = int(self.raw_dict.get('free_play'))
self.game_type = int(self.raw_dict.get('game_type'))
self.ply_pv_id = int(self.raw_dict.get('ply_pv_id'))
self.ttl_vp_add = int(self.raw_dict.get('ttl_vp_add'))
self.ttl_vp_sub = int(self.raw_dict.get('ttl_vp_sub'))
self.continue_cnt = int(self.raw_dict.get('continue_cnt'))
self.cr_cid = int(self.raw_dict.get('cr_cid'))
self.cr_sc = int(self.raw_dict.get('cr_sc'))
self.cr_tv = int(self.raw_dict.get('cr_tv'))
self.cr_if = int(self.raw_dict.get('cr_if'))
self.my_qst_id: List[int] = decode_list_int(self.raw_dict.get('my_qst_id'))
self.my_qst_sts: List[int] = decode_list_int(self.raw_dict.get('my_qst_sts'))
self.stg_difficulty: List[int] = decode_list_int(self.raw_dict.get('stg_difficulty'))
self.stg_edtn: List[int] = decode_list_int(self.raw_dict.get('stg_edtn'))
self.stg_ply_pv_id: List[int] = decode_list_int(self.raw_dict.get('stg_ply_pv_id'))
self.stg_sel_pv_id: List[int] = decode_list_int(self.raw_dict.get('stg_sel_pv_id'))
self.stg_scrpt_ver: List[int] = decode_list_int(self.raw_dict.get('stg_scrpt_ver'))
self.stg_score: List[int] = decode_list_int(self.raw_dict.get('stg_score'))
self.stg_chllng_kind: List[int] = decode_list_int(self.raw_dict.get('stg_chllng_kind'))
self.stg_chllng_result: List[int] = decode_list_int(self.raw_dict.get('stg_chllng_result'))
self.stg_clr_kind: List[int] = decode_list_int(self.raw_dict.get('stg_clr_kind'))
self.stg_vcld_pts: List[int] = decode_list_int(self.raw_dict.get('stg_vcld_pts'))
self.stg_cool_cnt: List[int] = decode_list_int(self.raw_dict.get('stg_cool_cnt'))
self.stg_cool_pct: List[int] = decode_list_int(self.raw_dict.get('stg_cool_pct'))
self.stg_fine_cnt: List[int] = decode_list_int(self.raw_dict.get('stg_fine_cnt'))
self.stg_fine_pct: List[int] = decode_list_int(self.raw_dict.get('stg_fine_pct'))
self.stg_safe_cnt: List[int] = decode_list_int(self.raw_dict.get('stg_safe_cnt'))
self.stg_safe_pct: List[int] = decode_list_int(self.raw_dict.get('stg_safe_pct'))
self.stg_sad_cnt: List[int] = decode_list_int(self.raw_dict.get('stg_sad_cnt'))
self.stg_sad_pct: List[int] = decode_list_int(self.raw_dict.get('stg_sad_pct'))
self.stg_wt_wg_cnt: List[int] = decode_list_int(self.raw_dict.get('stg_wt_wg_cnt'))
self.stg_wt_wg_pct: List[int] = decode_list_int(self.raw_dict.get('stg_wt_wg_pct'))
self.stg_max_cmb: List[int] = decode_list_int(self.raw_dict.get('stg_max_cmb'))
self.stg_chance_tm: List[int] = decode_list_int(self.raw_dict.get('stg_chance_tm'))
self.stg_sm_hl: List[int] = decode_list_int(self.raw_dict.get('stg_sm_hl'))
self.stg_atn_pnt: List[int] = decode_list_int(self.raw_dict.get('stg_atn_pnt'))
self.stg_skin_id: List[int] = decode_list_int(self.raw_dict.get('stg_skin_id'))
self.stg_btn_se: List[int] = decode_list_int(self.raw_dict.get('stg_btn_se'))
self.stg_btn_se_vol: List[int] = decode_list_int(self.raw_dict.get('stg_btn_se_vol'))
self.stg_sld_se: List[int] = decode_list_int(self.raw_dict.get('stg_sld_se'))
self.stg_chn_sld_se: List[int] = decode_list_int(self.raw_dict.get('stg_chn_sld_se'))
self.stg_sldr_tch_se: List[int] = decode_list_int(self.raw_dict.get('stg_sldr_tch_se'))
self.stg_mdl_id: List[int] = decode_list_int(self.raw_dict.get('stg_mdl_id'))
self.stg_sel_mdl_id: List[int] = decode_list_int(self.raw_dict.get('stg_sel_mdl_id'))
self.stg_rvl_pd_id: List[int] = decode_list_int(self.raw_dict.get('stg_rvl_pd_id'))
self.stg_rvl_wl: List[int] = decode_list_int(self.raw_dict.get('stg_rvl_wl'))
self.stg_cpt_rslt: List[int] = decode_list_int(self.raw_dict.get('stg_cpt_rslt'))
self.stg_sld_scr: List[int] = decode_list_int(self.raw_dict.get('stg_sld_scr'))
self.stg_is_sr_gm: List[int] = decode_list_int(self.raw_dict.get('stg_is_sr_gm'))
self.stg_pv_brnch_rslt: List[int] = decode_list_int(self.raw_dict.get('stg_pv_brnch_rslt'))
self.stg_vcl_chg: List[int] = decode_list_int(self.raw_dict.get('stg_vcl_chg'))
self.stg_c_itm_id: List[int] = decode_list_int(self.raw_dict.get('stg_c_itm_id'))
self.stg_ms_itm_flg: List[int] = decode_list_int(self.raw_dict.get('stg_ms_itm_flg'))
self.stg_rgo: List[int] = decode_list_int(self.raw_dict.get('stg_rgo'))
self.stg_ss_num: List[int] = decode_list_int(self.raw_dict.get('stg_ss_num'))
self.stg_is_cs_scs: List[int] = decode_list_int(self.raw_dict.get('stg_is_cs_scs'))
self.stg_is_nppg_use: List[int] = decode_list_int(self.raw_dict.get('stg_is_nppg_use'))
self.stg_p_std_lo_id: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_lo_id'))
self.stg_p_std_is_to: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_to'))
self.stg_p_std_is_ccu: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_ccu'))
self.stg_p_std_is_tiu: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_tiu'))
self.stg_p_std_is_iu: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_iu'))
self.stg_p_std_is_npu: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_npu'))
self.stg_p_std_is_du: List[int] = decode_list_int(self.raw_dict.get('stg_p_std_is_du'))
self.gu_cmd: List[int] = decode_list_int(self.raw_dict.get('gu_cmd'))
self.mdl_eqp_cmn_ary: List[int] = decode_list_int(self.raw_dict.get('mdl_eqp_cmn_ary'))
self.c_itm_eqp_cmn_ary: List[int] = decode_list_int(self.raw_dict.get('c_itm_eqp_cmn_ary'))
self.ms_itm_flg_cmn_ary: List[int] = decode_list_int(self.raw_dict.get('ms_itm_flg_cmn_ary'))
self.mdl_eqp_pv_ary: List[int] = decode_list_int(self.raw_dict.get('mdl_eqp_pv_ary'))
self.c_itm_eqp_pv_ary: List[int] = decode_list_int(self.raw_dict.get('c_itm_eqp_pv_ary'))
self.ms_itm_flg_pv_ary: List[int] = decode_list_int(self.raw_dict.get('ms_itm_flg_pv_ary'))
self.stg_mdl_s_sts: List[int] = decode_list_int(self.raw_dict.get('stg_mdl_s_sts'))
self.cr_sp: List[int] = decode_list_int(parse.unquote(self.raw_dict.get('cr_sp')))
class StageResultResponse(BaseResponse):
def __init__(self, req_id: int) -> None:
super().__init__("stage_result", req_id)
self.chllng_kind = -1
self.lv_num_old = 0
self.lv_pnt_old = 0
self.lv_num = 0
self.lv_str = 0
self.lv_pnt = 0
self.lv_efct_id = 0
self.lv_plt_id = 0
self.vcld_pts = 0
self.prsnt_vcld_pts = 0
self.cerwd_kind = -1
self.cerwd_value = -1
self.cerwd_str_0 = "***"
self.cerwd_str_1 = "***"
self.ttl_str_ary = ["xxx"] * 5
self.ttl_plt_id_ary = ["xxx"] * 5
self.ttl_desc_ary = ["xxx"] * 5
self.skin_id_ary = ["xxx"] * 5
self.skin_name_ary = ["xxx"] * 5
self.skin_illust_ary = ["xxx"] * 5
self.skin_desc_ary = ["xxx"] * 5
self.my_qst_id = [-1] * 25
self.my_qst_r_qid = [-1] * 25
self.my_qst_r_knd = [-1] * 25
self.my_qst_r_vl = [-1] * 25
self.my_qst_r_nflg = [-1] * 25
self.my_ccd_r_qid = [-1] * 5
self.my_ccd_r_hnd = [-1] * 5
self.my_ccd_r_vp = [-1] * 5

View File

@ -0,0 +1,140 @@
from typing import Union
from titles.diva.handlers.base import (
BaseRequest,
BaseResponse,
DivaRequestParseException,
)
from datetime import datetime
from urllib import parse
from ..const import DivaConstants
class PreStartRequest(BaseRequest):
def __init__(self, raw: str) -> None:
super().__init__(raw)
self.pmm: str = self.raw_dict.get('pmm')
self.idm: str = self.raw_dict.get('idm')
self.mmgameid: str = self.raw_dict.get('mmgameid')
self.mmuid: str = self.raw_dict.get('mmuid')
self.a_code: str = self.raw_dict.get('a_code')
self.aime_id: str = self.raw_dict.get('aime_id')
self.aime_a_code: str = self.raw_dict.get('aime_a_code')
self.key_obj_type = int(self.raw_dict.get('key_obj_type'))
self.exec_vu = int(self.raw_dict.get('exec_vu'))
class PreStartResponse(BaseResponse):
def __init__(self, req_id: int, pd_id: int) -> None:
super().__init__("pre_start", req_id)
self.ps_result = 1
self.pd_id = pd_id
self.accept_idx = 100
self.nblss_ltt_stts = -1
self.nblss_ltt_tckt = -1
self.nblss_ltt_is_opn = -1
self.player_name: str = ""
self.sort_kind: str = ""
self.lv_efct_id: str = ""
self.lv_plt_id: str = ""
self.lv_str: str = ""
self.lv_num: str = ""
self.lv_pnt: str = ""
self.vcld_pts: str = ""
self.skn_eqp: str = ""
self.btn_se_eqp: str = ""
self.sld_se_eqp: str = ""
self.chn_sld_se_eqp: str = ""
self.sldr_tch_se_eqp: str = ""
self.passwd_stat: str = ""
self.mdl_eqp_tm: str = ""
# Ideally this would be a real array that would get converted later
# But this is how it's stored in the db, so w/e for now
self.mdl_eqp_ary = "-999,-999,-999"
class StartRequest(BaseRequest):
def __init__(self, raw: str) -> None:
super().__init__(raw)
self.pd_id = int(self.raw_dict.get('pd_id'))
self.accept_idx = int(self.raw_dict.get('accept_idx'))
class StartResponse(BaseResponse):
def __init__(self, req_id: int, pv_id: int, pv_name: str) -> None:
super().__init__("start", req_id)
self.pd_id: int = pv_id
self.start_result: int = 1
self.accept_idx: int = 100
self.hp_vol: int = 0
self.btn_se_vol: int = 1
self.btn_se_vol2: int = 1
self.sldr_se_vol2: int = 1
self.sort_kind: int = 1
self.player_name: str = pv_name
self.lv_num: int = 1
self.lv_pnt: int = 0
self.lv_efct_id: int = 1
self.lv_plt_id: int = 1
self.mdl_have: str = "F" * 250
self.cstmz_itm_have: str = "F" * 250
self.use_pv_mdl_eqp: int = 0
self.use_mdl_pri: int = 0
self.use_pv_skn_eqp: int = 1
self.use_pv_btn_se_eqp: int = 1
self.use_pv_sld_se_eqp: int = 1
self.use_pv_chn_sld_se_eqp: int = 1
self.use_pv_sldr_tch_se_eqp: int = 1
self.vcld_pts: int = 0
self.nxt_pv_id: int = 1
self.nxt_dffclty: int = 1
self.nxt_edtn: int = 0
self.dsp_clr_brdr: int = 0
self.dsp_intrm_rnk: int = 0
self.dsp_clr_sts: int = 0
self.rgo_sts: int = 0
self.cv_cid: str = "-1,-1,-1,-1"
self.cv_sc: str = "-1,-1,-1,-1"
#self.cv_bv: str = "-1,-1,-1,-1"
self.cv_bv: str = "-1,-1,-1,-1"
self.cv_bf: str = "-1,-1,-1,-1"
self.cnp_cid=-1
self.cnp_val=-1
self.cnp_rr=-1
self.my_qst_id: str = ",".join(["-1"] * 25)
self.my_qst_sts: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.my_qst_prgrs: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.my_qst_et: str = ",".join([parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))] * 5) + "," + ",".join(["xxx"] * 20)
self.clr_sts: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.mdl_eqp_tm: str = parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))
self.mdl_eqp_ary = ",".join(["-999"] * 3)
self.c_itm_eqp_ary = ",".join(["-999"] * 12)
self.ms_itm_flg_ary = ",".join(["1"] * 12)
class RegisterRequest(BaseRequest):
def __init__(self, raw: str) -> None:
super().__init__(raw)
self.pmm: str = self.raw_dict.get('pmm')
self.idm: str = self.raw_dict.get('idm')
self.mmgameid: str = self.raw_dict.get('mmgameid')
self.mmuid: str = self.raw_dict.get('mmuid')
self.a_code: str = self.raw_dict.get('a_code')
self.aime_a_code: str = self.raw_dict.get('aime_a_code')
self.player_name: str = self.raw_dict.get('player_name')
self.passwd: str = self.raw_dict.get('passwd')
self.aime_id = int(self.raw_dict.get('aime_id'))
self.key_obj_type = int(self.raw_dict.get('key_obj_type'))
class RegisterResponse(BaseResponse):
def __init__(self, req_id: int, pv_id: int) -> None:
super().__init__("register", req_id)
self.cd_adm_result: int = 1
self.pd_id: int = pv_id
class EndRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
self.my_qst_id: str = self.raw_dict.get('my_qst_id')
self.my_qst_sts: str = self.raw_dict.get('my_qst_sts')
super().__init__(raw)
try:
self.pd_id = int(self.raw_dict.get('pd_id'))
except AttributeError as e:
raise DivaRequestParseException(f"EndRequest: {e}")

View File

@ -6,11 +6,11 @@ import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
import zlib import zlib
import json import json
import urllib.parse
import base64 import base64
from os import path from os import path
from typing import Tuple, Dict, List from typing import Tuple, Dict, List
from titles.diva.handlers.base import *
from core.config import CoreConfig from core.config import CoreConfig
from core.title import BaseServlet from core.title import BaseServlet
from core.utils import Utils from core.utils import Utils
@ -84,74 +84,52 @@ class DivaServlet(BaseServlet):
url_header = request.headers url_header = request.headers
# Ping Dispatch # Ping Dispatch
if "THIS_STRING_SEPARATES" in str(url_header): if "THIS_STRING_SEPARATES" in url_header:
binary_request = req_raw.splitlines() binary_request = req_raw.splitlines()
binary_cmd_decoded = binary_request[3].decode("utf-8") binary_cmd_decoded = binary_request[3].decode("utf-8")
binary_array = binary_cmd_decoded.split("&") url_data = binary_cmd_decoded # for logging
req_cls = BaseBinaryRequest(binary_cmd_decoded)
bin_req_data = {} else:
json_string = json.dumps(
req_raw.decode("utf-8")
) # Take the response and decode as UTF-8 and dump
b64string = json_string.replace(
r"\n", "\n"
) # Remove all \n and separate them as new lines
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
for kvp in binary_array: try:
split_bin = kvp.split("=") url_data = zlib.decompress(gz_string) # Decompressing the gzip
bin_req_data[split_bin[0]] = split_bin[1] except zlib.error as e:
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
return PlainTextResponse("stat=0")
self.logger.info(f"Binary {bin_req_data['cmd']} Request") try:
self.logger.debug(bin_req_data) req_cls = BaseRequest(url_data)
except DivaRequestParseException as e:
self.logger.error(e)
return PlainTextResponse("stat=0")
handler = getattr(self.base, f"handle_{bin_req_data['cmd']}_request") self.logger.debug(f"Request: {url_data}\nHeaders: {url_header}")
resp = handler(bin_req_data) self.logger.info(
f"{req_cls.cmd} request from {req_cls.kc_serial}/{req_cls.b_serial} at {Utils.get_ip_addr(request)}"
self.logger.debug(
f"Response cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}"
)
return PlainTextResponse(f"cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}")
# Main Dispatch
json_string = json.dumps(
req_raw.decode("utf-8")
) # Take the response and decode as UTF-8 and dump
b64string = json_string.replace(
r"\n", "\n"
) # Remove all \n and separate them as new lines
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
try:
url_data = zlib.decompress(gz_string).decode(
"utf-8"
) # Decompressing the gzip
except zlib.error as e:
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
return PlainTextResponse("stat=0")
req_kvp = urllib.parse.unquote(url_data)
req_data = {}
# We then need to split each parts with & so we can reuse them to fill out the requests
splitted_request = str.split(req_kvp, "&")
for kvp in splitted_request:
split = kvp.split("=")
req_data[split[0]] = split[1]
self.logger.info(f"{req_data['cmd']} Request")
self.logger.debug(req_data)
func_to_find = f"handle_{req_data['cmd']}_request"
# Load the requests
try:
handler = getattr(self.base, func_to_find)
resp = await handler(req_data)
except AttributeError as e:
self.logger.warning(f"Unhandled {req_data['cmd']} request {e}")
return PlainTextResponse(f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok")
except Exception as e:
self.logger.error(f"Error handling method {func_to_find} {e}")
return PlainTextResponse(f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok")
self.logger.debug(
f"Response cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}"
) )
return PlainTextResponse(f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}") handler_str = f"handle_{req_cls.cmd}_request"
if not hasattr(self.base, handler_str):
self.logger.warn(f"Unhandled cmd {req_cls.cmd}")
return PlainTextResponse(BaseResponse(req_cls.cmd, req_cls.req_id).make())
handler = getattr(self.base, handler_str)
response = handler(req_cls.raw)
if response is None or response == "":
response = BaseResponse(req_cls.cmd, req_cls.req_id).make()
if not response.startswith("cmd="):
response = f"cmd={req_cls.cmd}&req_id={req_cls.req_id}&stat=ok" + response
self.logger.debug(f"Response: {response}")
return PlainTextResponse(response)