1
0
Fork 0

Compare commits

...

20 Commits

Author SHA1 Message Date
Hay1tsme c2ee09bce0 diva: integrate latest changes 2023-05-02 11:41:35 -04:00
Hay1tsme dfea392b03 Merge branch 'develop' into diva_handler_classes 2023-05-02 11:36:14 -04:00
Midorica 6ad5194bb8 Merge pull request 'Project Diva Arcade: Added Clear Status calculation + small improvements' (#18) from Dniel97/artemis:diva_clear_set into develop
Reviewed-on: Hay1tsme/artemis#18
2023-04-30 23:18:19 +00:00
Dniel97 a0793aa13a
diva: added clear set calculation + small improvements 2023-04-30 23:31:13 +02:00
Dniel97 7364181de1
Merge branch 'develop' into fork_develop 2023-04-30 20:39:35 +02:00
Hay1tsme 9d8762d3da Update 'readme.md' 2023-04-29 21:18:28 -04:00
Hay1tsme 998aa70929 diva: fix for responses that haven't been updated yet 2023-04-01 23:45:28 -04:00
Hay1tsme 8df1325613 diva: implement StartResponse 2023-04-01 23:19:44 -04:00
Hay1tsme 6ff7827918 diva: fill out StartResponse 2023-04-01 23:05:10 -04:00
Hay1tsme a36170f2c3 Merge branch 'develop' into diva_handler_classes 2023-04-01 22:23:13 -04:00
Hay1tsme 48144b53f0 fix attend 2023-03-17 01:35:51 -04:00
Hay1tsme 3b852ea739 Merge branch 'develop' into diva_handler_classes 2023-03-16 22:32:18 -04:00
Hay1tsme 21c9b23617 diva: add game_init and attend handlers 2023-03-13 04:04:08 -04:00
Hay1tsme 3076bdf575 Merge branch 'develop' into diva_handler_classes 2023-03-13 02:27:12 -04:00
Hay1tsme 5be25f89ff format with black 2023-03-09 11:50:11 -05:00
Hay1tsme 5443d24352 Merge branch 'develop' into diva_handler_classes 2023-03-09 11:49:51 -05:00
Hay1tsme 56927c049f clean up index.py 2023-02-26 11:40:13 -05:00
Hay1tsme 2b81ba206c remove unused code from index.py 2023-02-25 23:40:50 -05:00
Hay1tsme 9718e822f3 add commas to urlencode safe param 2023-02-25 19:12:48 -05:00
Hay1tsme b07e3d6a94 add base request/response classes 2023-02-25 18:50:35 -05:00
12 changed files with 557 additions and 235 deletions

View File

@ -0,0 +1,9 @@
ALTER TABLE diva_profile
DROP cnp_cid,
DROP cnp_val,
DROP cnp_rr,
DROP cnp_sp,
DROP btn_se_eqp,
DROP sld_se_eqp,
DROP chn_sld_se_eqp,
DROP sldr_tch_se_eqp;

View File

@ -0,0 +1,9 @@
ALTER TABLE diva_profile
ADD cnp_cid INT NOT NULL DEFAULT -1,
ADD cnp_val INT NOT NULL DEFAULT -1,
ADD cnp_rr INT NOT NULL DEFAULT -1,
ADD cnp_sp VARCHAR(255) NOT NULL DEFAULT "",
ADD btn_se_eqp INT NOT NULL DEFAULT -1,
ADD sld_se_eqp INT NOT NULL DEFAULT -1,
ADD chn_sld_se_eqp INT NOT NULL DEFAULT -1,
ADD sldr_tch_se_eqp INT NOT NULL DEFAULT -1;

View File

@ -2,7 +2,7 @@
A network service emulator for games running SEGA'S ALL.NET service, and similar.
# Supported games
Games listed below have been tested and confirmed working. Only game versions older then the current one in active use in arcades (n-0) or current game versions older then a year (y-1) are supported.
Games listed below have been tested and confirmed working. Only game versions older then the version currently active in arcades, or games versions that have not recieved a major update in over one year, are supported.
+ Chunithm
+ All versions up to New!! Plus

View File

@ -7,4 +7,4 @@ index = DivaServlet
database = DivaData
reader = DivaReader
game_codes = [DivaConstants.GAME_CODE]
current_schema_version = 4
current_schema_version = 5

View File

@ -1,13 +1,14 @@
import datetime
from datetime import datetime
from typing import Any, List, Dict
import logging
import json
import urllib
from urllib import parse
from core.config import CoreConfig
from titles.diva.config import DivaConfig
from titles.diva.const import DivaConstants
from titles.diva.database import DivaData
from titles.diva.handlers import *
class DivaBase:
@ -20,30 +21,39 @@ class DivaBase:
self.game = DivaConstants.GAME_CODE
self.version = DivaConstants.VER_PROJECT_DIVA_ARCADE_FUTURE_TONE
dt = datetime.datetime.now()
self.time_lut = urllib.parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0"))
dt = datetime.now()
self.time_lut = parse.quote(dt.strftime("%Y-%m-%d %H:%M:%S:16.0"))
def handle_test_request(self, data: Dict) -> Dict:
return ""
def handle_test_request(self, data: bytes) -> str:
pass
def handle_game_init_request(self, data: Dict) -> Dict:
return f""
def handle_game_init_request(self, data: bytes) -> str:
req = GameInitRequest(data)
return None
def handle_attend_request(self, data: Dict) -> Dict:
encoded = "&"
params = {
"atnd_prm1": "0,1,1,0,0,0,1,0,100,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1",
"atnd_prm2": "30,10,100,4,1,50,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1",
"atnd_prm3": "100,0,1,1,1,1,1,1,1,1,2,3,4,1,1,1,3,4,5,1,1,1,4,5,6,1,1,1,5,6,7,4,4,4,9,10,14,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,10,30,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0",
"atnd_lut": f"{self.time_lut}",
}
def handle_attend_request(self, data: bytes) -> str:
req = AttendRequest(data)
resp = AttendResponse(req.cmd, req.req_id)
encoded += urllib.parse.urlencode(params)
encoded = encoded.replace("%2C", ",")
for i in [0, 3, 4, 5, 7, 9, 10, 11, 12, 13]:
resp.atnd_prm1[i] = 0
resp.atnd_prm1[8] = 100
return encoded
resp.atnd_prm2[:6] = [30, 10, 100, 4, 1, 50]
def handle_ping_request(self, data: Dict) -> Dict:
resp.atnd_prm3[0] = 100
resp.atnd_prm3[1] = 0
resp.atnd_prm3[10:13] = [2, 3, 4]
resp.atnd_prm3[16:19] = [3, 4, 5]
resp.atnd_prm3[22:25] = [4, 5, 6]
resp.atnd_prm3[80:] = [0] * 20
resp.atnd_prm3[28:79] = [5,6,7,4,4,4,9,10,14,5,10,10,25,20,50,30,90,5,
10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,5,10,10,25,20,50,30,90,
5,10,10,25,20,50,30,90,10,30]
return resp.make()
def handle_ping_request(self, data: bytes) -> str:
encoded = "&"
params = {
"ping_b_msg": f"Welcome to {self.core_cfg.server.name} network!",
@ -82,13 +92,13 @@ class DivaBase:
"nblss_ltt_ed_tm": "2019-09-22 12:00:00.0",
}
encoded += urllib.parse.urlencode(params)
encoded += parse.urlencode(params)
encoded = encoded.replace("+", "%20")
encoded = encoded.replace("%2C", ",")
return encoded
def handle_pv_list_request(self, data: Dict) -> Dict:
def handle_pv_list_request(self, data: bytes) -> str:
pvlist = ""
with open(r"titles/diva/data/PvList0.dat", encoding="utf-8") as shop:
lines = shop.readlines()
@ -125,7 +135,7 @@ class DivaBase:
return response
def handle_shop_catalog_request(self, data: Dict) -> Dict:
def handle_shop_catalog_request(self, data: bytes) -> str:
catalog = ""
shopList = self.data.static.get_enabled_shops(self.version)
@ -133,8 +143,8 @@ class DivaBase:
with open(r"titles/diva/data/ShopCatalog.dat", encoding="utf-8") as shop:
lines = shop.readlines()
for line in lines:
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
else:
for shop in shopList:
@ -153,8 +163,8 @@ class DivaBase:
+ ","
+ str(shop["type"])
)
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
catalog = catalog.replace("+", "%20")
@ -163,7 +173,7 @@ class DivaBase:
return response
def handle_buy_module_request(self, data: Dict) -> Dict:
def handle_buy_module_request(self, data: bytes) -> str:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
module = self.data.static.get_enabled_shop(self.version, int(data["mdl_id"]))
@ -190,7 +200,7 @@ class DivaBase:
return response
def handle_cstmz_itm_ctlg_request(self, data: Dict) -> Dict:
def handle_cstmz_itm_ctlg_request(self, data: bytes) -> str:
catalog = ""
itemList = self.data.static.get_enabled_items(self.version)
@ -198,8 +208,8 @@ class DivaBase:
with open(r"titles/diva/data/ItemCatalog.dat", encoding="utf-8") as item:
lines = item.readlines()
for line in lines:
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
else:
for item in itemList:
@ -218,8 +228,8 @@ class DivaBase:
+ ","
+ str(item["type"])
)
line = urllib.parse.quote(line) + ","
catalog += f"{urllib.parse.quote(line)}"
line = parse.quote(line) + ","
catalog += f"{parse.quote(line)}"
catalog = catalog.replace("+", "%20")
@ -228,7 +238,7 @@ class DivaBase:
return response
def handle_buy_cstmz_itm_request(self, data: Dict) -> Dict:
def handle_buy_cstmz_itm_request(self, data: bytes) -> str:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
item = self.data.static.get_enabled_item(
self.version, int(data["cstmz_itm_id"])
@ -263,29 +273,30 @@ class DivaBase:
return response
def handle_festa_info_request(self, data: Dict) -> Dict:
def handle_festa_info_request(self, data: bytes) -> str:
encoded = "&"
params = {
"fi_id": "1,-1",
"fi_name": f"{self.core_cfg.server.name} Opening,xxx",
"fi_kind": "0,0",
"fi_id": "1,2",
"fi_name": f"{self.core_cfg.server.name} Opening,Project DIVA Festa",
# 0=PINK, 1=GREEN
"fi_kind": "1,0",
"fi_difficulty": "-1,-1",
"fi_pv_id_lst": "ALL,ALL",
"fi_attr": "7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF,7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
"fi_add_vp": "20,0",
"fi_mul_vp": "1,1",
"fi_st": "2022-06-17 17:00:00.0,2014-07-08 18:10:11.0",
"fi_et": "2029-01-01 10:00:00.0,2014-07-08 18:10:11.0",
"fi_add_vp": "20,5",
"fi_mul_vp": "1,2",
"fi_st": "2019-01-01 00:00:00.0,2019-01-01 00:00:00.0",
"fi_et": "2029-01-01 00:00:00.0,2029-01-01 00:00:00.0",
"fi_lut": "{self.time_lut}",
}
encoded += urllib.parse.urlencode(params)
encoded += parse.urlencode(params)
encoded = encoded.replace("+", "%20")
encoded = encoded.replace("%2C", ",")
return encoded
def handle_contest_info_request(self, data: Dict) -> Dict:
def handle_contest_info_request(self, data: bytes) -> str:
response = ""
response += f"&ci_lut={self.time_lut}"
@ -293,7 +304,7 @@ class DivaBase:
return response
def handle_qst_inf_request(self, data: Dict) -> Dict:
def handle_qst_inf_request(self, data: bytes) -> str:
quest = ""
questList = self.data.static.get_enabled_quests(self.version)
@ -301,7 +312,7 @@ class DivaBase:
with open(r"titles/diva/data/QuestInfo.dat", encoding="utf-8") as shop:
lines = shop.readlines()
for line in lines:
quest += f"{urllib.parse.quote(line)},"
quest += f"{parse.quote(line)},"
response = ""
response += f"&qi_lut={self.time_lut}"
@ -329,7 +340,7 @@ class DivaBase:
+ ","
+ str(quests["quest_enable"])
)
quest += f"{urllib.parse.quote(line)}%0A,"
quest += f"{parse.quote(line)}%0A,"
responseline = f"{quest[:-1]},"
for i in range(len(questList), 59):
@ -343,45 +354,47 @@ class DivaBase:
return response
def handle_nv_ranking_request(self, data: Dict) -> Dict:
return f""
def handle_nv_ranking_request(self, data: bytes) -> str:
pass
def handle_ps_ranking_request(self, data: Dict) -> Dict:
return f""
def handle_ps_ranking_request(self, data: bytes) -> str:
pass
def handle_ng_word_request(self, data: Dict) -> Dict:
return f""
def handle_ng_word_request(self, data: bytes) -> str:
pass
def handle_rmt_wp_list_request(self, data: Dict) -> Dict:
return f""
def handle_rmt_wp_list_request(self, data: bytes) -> str:
pass
def handle_pv_def_chr_list_request(self, data: Dict) -> Dict:
return f""
def handle_pv_def_chr_list_request(self, data: bytes) -> str:
pass
def handle_pv_ng_mdl_list_request(self, data: Dict) -> Dict:
return f""
def handle_pv_ng_mdl_list_request(self, data: bytes) -> str:
pass
def handle_cstmz_itm_ng_mdl_lst_request(self, data: Dict) -> Dict:
return f""
def handle_cstmz_itm_ng_mdl_lst_request(self, data: bytes) -> str:
pass
def handle_banner_info_request(self, data: Dict) -> Dict:
return f""
def handle_banner_info_request(self, data: bytes) -> str:
pass
def handle_banner_data_request(self, data: Dict) -> Dict:
return f""
def handle_banner_data_request(self, data: bytes) -> str:
pass
def handle_cm_ply_info_request(self, data: Dict) -> Dict:
return f""
def handle_cm_ply_info_request(self, data: bytes) -> str:
pass
def handle_pstd_h_ctrl_request(self, data: Dict) -> Dict:
return f""
def handle_pstd_h_ctrl_request(self, data: bytes) -> str:
pass
def handle_pstd_item_ng_lst_request(self, data: Dict) -> Dict:
return f""
def handle_pstd_item_ng_lst_request(self, data: bytes) -> str:
pass
def handle_pre_start_request(self, data: Dict) -> str:
profile = self.data.profile.get_profile(data["aime_id"], self.version)
profile_shop = self.data.item.get_shop(data["aime_id"], self.version)
def handle_pre_start_request(self, data: bytes) -> str:
req = PreStartRequest(data)
resp = PreStartResponse(req.cmd, req.req_id, req.aime_id)
profile = self.data.profile.get_profile(req.aime_id, self.version)
profile_shop = self.data.item.get_shop(req.aime_id, self.version)
if profile is None:
return f"&ps_result=-3"
@ -401,119 +414,176 @@ class DivaBase:
response += f"&lv_pnt={profile['lv_pnt']}"
response += f"&vcld_pts={profile['vcld_pts']}"
response += f"&skn_eqp={profile['use_pv_skn_eqp']}"
response += f"&btn_se_eqp={profile['use_pv_btn_se_eqp']}"
response += f"&sld_se_eqp={profile['use_pv_sld_se_eqp']}"
response += f"&chn_sld_se_eqp={profile['use_pv_chn_sld_se_eqp']}"
response += f"&sldr_tch_se_eqp={profile['use_pv_sldr_tch_se_eqp']}"
response += f"&btn_se_eqp={profile['btn_se_eqp']}"
response += f"&sld_se_eqp={profile['sld_se_eqp']}"
response += f"&chn_sld_se_eqp={profile['chn_sld_se_eqp']}"
response += f"&sldr_tch_se_eqp={profile['sldr_tch_se_eqp']}"
response += f"&passwd_stat={profile['passwd_stat']}"
# Store stuff to add to rework
response += f"&mdl_eqp_tm={self.time_lut}"
profile_dict = profile._asdict()
profile_dict.pop("id")
profile_dict.pop("user")
profile_dict.pop("version")
mdl_eqp_ary = "-999,-999,-999"
for k, v in profile_dict.items():
if hasattr(resp, k):
setattr(resp, k, v)
# get the common_modules from the profile shop
if profile_shop:
mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
if profile_shop is not None and profile_shop:
resp.mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
response += f"&mdl_eqp_ary={mdl_eqp_ary}"
return resp.make()
return response
def handle_registration_request(self, data: Dict) -> Dict:
def handle_registration_request(self, data: bytes) -> str:
self.data.profile.create_profile(
self.version, data["aime_id"], data["player_name"]
)
return f"&cd_adm_result=1&pd_id={data['aime_id']}"
def handle_start_request(self, data: Dict) -> Dict:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
profile_shop = self.data.item.get_shop(data["pd_id"], self.version)
def handle_start_request(self, data: bytes) -> str:
req = StartRequest(data)
profile = self.data.profile.get_profile(req.pd_id, self.version)
profile_shop = self.data.item.get_shop(req.pd_id, self.version)
if profile is None:
return
resp = StartResponse(req.cmd, req.req_id, req.pd_id, profile['player_name'])
mdl_have = "F" * 250
# generate the mdl_have string if "unlock_all_modules" is disabled
if not self.game_config.mods.unlock_all_modules:
mdl_have = self.data.module.get_modules_have_string(
resp.mdl_have = self.data.module.get_modules_have_string(
data["pd_id"], self.version
)
cstmz_itm_have = "F" * 250
# generate the cstmz_itm_have string if "unlock_all_items" is disabled
if not self.game_config.mods.unlock_all_items:
cstmz_itm_have = self.data.customize.get_customize_items_have_string(
resp.cstmz_itm_have = self.data.customize.get_customize_items_have_string(
data["pd_id"], self.version
)
response = f"&pd_id={data['pd_id']}"
response += "&start_result=1"
resp.pd_id = data['pd_id']
resp.hp_vol = {profile['hp_vol']}
resp.btn_se_vol = {profile['btn_se_vol']}
resp.btn_se_vol2 = {profile['btn_se_vol2']}
resp.sldr_se_vol2 = {profile['sldr_se_vol2']}
resp.sort_kind = {profile['sort_kind']}
resp.player_name = {profile['player_name']}
resp.lv_num = {profile['lv_num']}
resp.lv_pnt = {profile['lv_pnt']}
resp.lv_efct_id = {profile['lv_efct_id']}
resp.lv_plt_id = {profile['lv_plt_id']}
resp.use_pv_mdl_eqp = {int(profile['use_pv_mdl_eqp'])}
resp.use_mdl_pri = {int(profile['use_mdl_pri'])}
resp.use_pv_skn_eqp = {int(profile['use_pv_skn_eqp'])}
resp.use_pv_btn_se_eqp = {int(profile['use_pv_btn_se_eqp'])}
resp.use_pv_sld_se_eqp = {int(profile['use_pv_sld_se_eqp'])}
resp.use_pv_chn_sld_se_eqp = {int(profile['use_pv_chn_sld_se_eqp'])}
resp.use_pv_sldr_tch_se_eqp = {int(profile['use_pv_sldr_tch_se_eqp'])}
resp.vcld_pts = {profile['lv_efct_id']}
resp.nxt_pv_id = {profile['nxt_pv_id']}
resp.nxt_dffclty = {profile['nxt_dffclty']}
resp.nxt_edtn = {profile['nxt_edtn']}
resp.dsp_clr_brdr = {profile['dsp_clr_brdr']}
resp.dsp_intrm_rnk = {profile['dsp_intrm_rnk']}
resp.dsp_clr_sts = {profile['dsp_clr_sts']}
resp.rgo_sts = {profile['rgo_sts']}
response += "&accept_idx=100"
response += f"&hp_vol={profile['hp_vol']}"
response += f"&btn_se_vol={profile['btn_se_vol']}"
response += f"&btn_se_vol2={profile['btn_se_vol2']}"
response += f"&sldr_se_vol2={profile['sldr_se_vol2']}"
response += f"&sort_kind={profile['sort_kind']}"
response += f"&player_name={profile['player_name']}"
response += f"&lv_num={profile['lv_num']}"
response += f"&lv_pnt={profile['lv_pnt']}"
response += f"&lv_efct_id={profile['lv_efct_id']}"
response += f"&lv_plt_id={profile['lv_plt_id']}"
response += f"&mdl_have={mdl_have}"
response += f"&cstmz_itm_have={cstmz_itm_have}"
response += f"&use_pv_mdl_eqp={int(profile['use_pv_mdl_eqp'])}"
response += f"&use_mdl_pri={int(profile['use_mdl_pri'])}"
response += f"&use_pv_skn_eqp={int(profile['use_pv_skn_eqp'])}"
response += f"&use_pv_btn_se_eqp={int(profile['use_pv_btn_se_eqp'])}"
response += f"&use_pv_sld_se_eqp={int(profile['use_pv_sld_se_eqp'])}"
response += f"&use_pv_chn_sld_se_eqp={int(profile['use_pv_chn_sld_se_eqp'])}"
response += f"&use_pv_sldr_tch_se_eqp={int(profile['use_pv_sldr_tch_se_eqp'])}"
response += f"&vcld_pts={profile['lv_efct_id']}"
response += f"&nxt_pv_id={profile['nxt_pv_id']}"
response += f"&nxt_dffclty={profile['nxt_dffclty']}"
response += f"&nxt_edtn={profile['nxt_edtn']}"
response += f"&dsp_clr_brdr={profile['dsp_clr_brdr']}"
response += f"&dsp_intrm_rnk={profile['dsp_intrm_rnk']}"
response += f"&dsp_clr_sts={profile['dsp_clr_sts']}"
response += f"&rgo_sts={profile['rgo_sts']}"
# Contest progress
response += f"&cv_cid=-1,-1,-1,-1"
response += f"&cv_sc=-1,-1,-1,-1"
response += f"&cv_bv=-1,-1,-1,-1"
response += f"&cv_bv=-1,-1,-1,-1"
response += f"&cv_bf=-1,-1,-1,-1"
# Contest now playing id, return -1 if no current playing contest
response += f"&cnp_cid={profile['cnp_cid']}"
response += f"&cnp_val={profile['cnp_val']}"
# border can be 0=bronzem 1=silver, 2=gold
response += f"&cnp_rr={profile['cnp_rr']}"
# only show contest specifier if it is not empty
response += f"&cnp_sp={profile['cnp_sp']}" if profile["cnp_sp"] != "" else ""
# To be fully fixed
if "my_qst_id" not in profile:
response += f"&my_qst_id=-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
response += f"&my_qst_sts=0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
else:
response += f"&my_qst_id={profile['my_qst_id']}"
response += f"&my_qst_sts={profile['my_qst_sts']}"
if "my_qst_id" in profile:
resp.my_qst_id = {profile['my_qst_id']}
resp.my_qst_sts = {profile['my_qst_sts']}
response += f"&my_qst_prgrs=0,0,0,0,0,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1"
response += f"&my_qst_et=2022-06-19%2010%3A28%3A52.0,2022-06-19%2010%3A28%3A52.0,2022-06-19%2010%3A28%3A52.0,2100-01-01%2008%3A59%3A59.0,2100-01-01%2008%3A59%3A59.0,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx,xxx"
response += f"&clr_sts=0,0,0,0,0,0,0,0,56,52,35,6,6,3,1,0,0,0,0,0"
# define a helper class to store all counts for clear, great,
# excellent and perfect
class ClearSet:
def __init__(self):
self.clear = 0
self.great = 0
self.excellent = 0
self.perfect = 0
# create a dict to store the ClearSets per difficulty
clear_set_dict = {
0: ClearSet(), # easy
1: ClearSet(), # normal
2: ClearSet(), # hard
3: ClearSet(), # extreme
4: ClearSet(), # exExtreme
}
# get clear status from user scores
pv_records = self.data.score.get_best_scores(data["pd_id"])
clear_status = "0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
if pv_records is not None:
for score in pv_records:
if score["edition"] == 0:
# cheap and standard both count to "clear"
if score["clr_kind"] in {1, 2}:
clear_set_dict[score["difficulty"]].clear += 1
elif score["clr_kind"] == 3:
clear_set_dict[score["difficulty"]].great += 1
elif score["clr_kind"] == 4:
clear_set_dict[score["difficulty"]].excellent += 1
elif score["clr_kind"] == 5:
clear_set_dict[score["difficulty"]].perfect += 1
else:
# 4=ExExtreme
if score["clr_kind"] in {1, 2}:
clear_set_dict[4].clear += 1
elif score["clr_kind"] == 3:
clear_set_dict[4].great += 1
elif score["clr_kind"] == 4:
clear_set_dict[4].excellent += 1
elif score["clr_kind"] == 5:
clear_set_dict[4].perfect += 1
# now add all values to a list
clear_list = []
for clear_set in clear_set_dict.values():
clear_list.append(clear_set.clear)
clear_list.append(clear_set.great)
clear_list.append(clear_set.excellent)
clear_list.append(clear_set.perfect)
clear_status = ",".join(map(str, clear_list))
response += f"&clr_sts={clear_status}"
# Store stuff to add to rework
response += f"&mdl_eqp_tm={self.time_lut}"
mdl_eqp_ary = "-999,-999,-999"
c_itm_eqp_ary = "-999,-999,-999,-999,-999,-999,-999,-999,-999,-999,-999,-999"
ms_itm_flg_ary = "1,1,1,1,1,1,1,1,1,1,1,1"
# get the common_modules, customize_items and customize_item_flags
# from the profile shop
if profile_shop:
mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
c_itm_eqp_ary = profile_shop["c_itm_eqp_ary"]
ms_itm_flg_ary = profile_shop["ms_itm_flg_ary"]
resp.mdl_eqp_ary = profile_shop["mdl_eqp_ary"]
resp.c_itm_eqp_ary = profile_shop["c_itm_eqp_ary"]
resp.ms_itm_flg_ary = profile_shop["ms_itm_flg_ary"]
response += f"&mdl_eqp_ary={mdl_eqp_ary}"
response += f"&c_itm_eqp_ary={c_itm_eqp_ary}"
response += f"&ms_itm_flg_ary={ms_itm_flg_ary}"
return resp.make()
return response
def handle_pd_unlock_request(self, data: bytes) -> str:
pass
def handle_pd_unlock_request(self, data: Dict) -> Dict:
return f""
def handle_spend_credit_request(self, data: Dict) -> Dict:
def handle_spend_credit_request(self, data: bytes) -> str:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
if profile is None:
return
@ -591,7 +661,7 @@ class DivaBase:
return pv_result
def handle_get_pv_pd_request(self, data: Dict) -> Dict:
def handle_get_pv_pd_request(self, data: bytes) -> str:
song_id = data["pd_pv_id_lst"].split(",")
pv = ""
@ -631,9 +701,9 @@ class DivaBase:
self.logger.debug(f"pv_result = {pv_result}")
pv += urllib.parse.quote(pv_result)
pv += parse.quote(pv_result)
else:
pv += urllib.parse.quote(f"{song}***")
pv += parse.quote(f"{song}***")
pv += ","
response = ""
@ -643,10 +713,10 @@ class DivaBase:
return response
def handle_stage_start_request(self, data: Dict) -> Dict:
return f""
def handle_stage_start_request(self, data: bytes) -> str:
pass
def handle_stage_result_request(self, data: Dict) -> Dict:
def handle_stage_result_request(self, data: bytes) -> str:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
pd_song_list = data["stg_ply_pv_id"].split(",")
@ -825,7 +895,7 @@ class DivaBase:
return response
def handle_end_request(self, data: Dict) -> Dict:
def handle_end_request(self, data: bytes) -> str:
profile = self.data.profile.get_profile(data["pd_id"], self.version)
self.data.profile.update_profile(
@ -833,7 +903,7 @@ class DivaBase:
)
return f""
def handle_shop_exit_request(self, data: Dict) -> Dict:
def handle_shop_exit_request(self, data: bytes) -> str:
self.data.item.put_shop(
data["pd_id"],
self.version,

View File

@ -6,6 +6,8 @@ class DivaConstants:
VER_PROJECT_DIVA_ARCADE = 0
VER_PROJECT_DIVA_ARCADE_FUTURE_TONE = 1
LUT_TIME_FMT = "%Y-%m-%d %H:%M:%S:16.0"
VERSION_NAMES = ("Project Diva Arcade", "Project Diva Arcade Future Tone")
@classmethod

View File

@ -0,0 +1,2 @@
from titles.diva.handlers.base import *
from titles.diva.handlers.user import *

View File

@ -0,0 +1,143 @@
from urllib import parse
from datetime import datetime
from typing import Union, Dict
class DivaRequestParseException(Exception):
"""
Exception raised when there is a fault in parsing a diva request,
either due to a malformed request, or missing required items
"""
def __init__(self, message: str) -> None:
self.message = message
super().__init__(self.message)
class BaseBinaryRequest:
cmd: str
req_id: str
def __init__(self, raw: bytes) -> None:
self.raw = raw
self.raw_dict = dict(parse.parse_qsl(raw))
if "cmd" not in self.raw_dict:
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
if "req_id" not in self.raw_dict:
raise DivaRequestParseException(
f"req_id not in request data {self.raw_dict}"
)
for k, v in self.raw_dict:
setattr(self, k, v)
class BaseRequest:
cmd: str
req_id: str
game_id: str
r_rev: str
kc_serial: str
b_serial: str
country_code: str
def __init__(self, raw: Union[str, bytes]) -> None:
self.raw = raw
self.raw_dict: Dict[bytes, bytes] = dict(parse.parse_qsl(raw))
if b"cmd" not in self.raw_dict:
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
if b"req_id" not in self.raw_dict:
raise DivaRequestParseException(
f"req_id not in request data {self.raw_dict}"
)
if b"place_id" not in self.raw_dict:
raise DivaRequestParseException(
f"place_id not in request data {self.raw_dict}"
)
if b"start_up_mode" not in self.raw_dict:
raise DivaRequestParseException(
f"start_up_mode not in request data {self.raw_dict}"
)
if b"cmm_dly_mod" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_dly_mod not in request data {self.raw_dict}"
)
if b"cmm_dly_sec" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_dly_sec not in request data {self.raw_dict}"
)
if b"cmm_err_mod" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_err_mod not in request data {self.raw_dict}"
)
if b"region_code" not in self.raw_dict:
raise DivaRequestParseException(
f"region_code not in request data {self.raw_dict}"
)
if b"time_stamp" not in self.raw_dict:
raise DivaRequestParseException(
f"time_stamp not in request data {self.raw_dict}"
)
for k, v in self.raw_dict.items():
setattr(self, k.decode(), v.decode())
self.place_id = int(self.place_id)
self.start_up_mode = int(self.start_up_mode)
self.cmm_dly_mod = int(self.cmm_dly_mod)
self.cmm_dly_sec = int(self.cmm_dly_sec)
self.cmm_err_mod = int(self.cmm_err_mod)
self.region_code = int(self.region_code)
# datetime.now().astimezone().replace(microsecond=0).isoformat()
self.time_stamp = datetime.strptime(self.time_stamp, "%Y-%m-%dT%H:%M:%S%z")
class BaseResponse:
def __init__(self, cmd_id: str, req_id: int) -> None:
self.cmd = cmd_id
self.req_id = req_id
self.stat = "ok"
def make(self) -> str:
return f"cmd={self.cmd}&req_id={self.req_id}&stat={self.stat}"
class GameInitRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
class AttendRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
self.power_on = int(self.power_on)
self.is_bb = bool(int(self.power_on))
class AttendResponse(BaseResponse):
def __init__(self, cmd_id: str, req_id: int) -> None:
super().__init__(cmd_id, req_id)
self.atnd_prm1 = [1] * 100
self.atnd_prm2 = [1] * 100
self.atnd_prm3 = [1] * 100
self.atnd_lut = datetime.now()
def make(self) -> str:
ret = super().make()
ret_dict = {
"atnd_prm1": ','.join([str(i) for i in self.atnd_prm1]),
"atnd_prm2": ','.join([str(i) for i in self.atnd_prm2]),
"atnd_prm3": ','.join([str(i) for i in self.atnd_prm3]),
"atnd_lut": parse.quote(self.atnd_lut.strftime('%Y-%m-%d %H:%M:%S:16.0'))
}
ret += "&" + parse.urlencode(ret_dict, safe=",")
return ret

View File

@ -0,0 +1,111 @@
from titles.diva.handlers.base import (
BaseRequest,
BaseResponse,
DivaRequestParseException,
)
from datetime import datetime
from urllib import parse
from ..const import DivaConstants
class PreStartRequest(BaseRequest):
pmm: str
idm: str
mmgameid: str
mmuid: str
a_code: str
aime_id: str
aime_a_code: str
def __init__(self, raw: str) -> None:
super().__init__(raw)
try:
self.key_obj_type = int(self.key_obj_type)
self.exec_vu = int(self.exec_vu)
except AttributeError as e:
raise DivaRequestParseException(f"PreStartRequest: {e}")
class PreStartResponse(BaseResponse):
player_name: str
sort_kind: str
lv_efct_id: str
lv_plt_id: str
lv_str: str
lv_num: str
lv_pnt: str
vcld_pts: str
skn_eqp: str
btn_se_eqp: str
sld_se_eqp: str
chn_sld_se_eqp: str
sldr_tch_se_eqp: str
passwd_stat: str
mdl_eqp_tm: str
def __init__(self, cmd_id: str, req_id: int, pd_id: int) -> None:
super().__init__(cmd_id, req_id)
self.ps_result = 1
self.pd_id = pd_id
self.accept_idx = 100
self.nblss_ltt_stts = -1
self.nblss_ltt_tckt = -1
self.nblss_ltt_is_opn = -1
# Ideally this would be a real array that would get converted later
# But this is how it's stored in the db, so w/e for now
self.mdl_eqp_ary = "-999,-999,-999"
class StartRequest(BaseRequest):
def __init__(self, raw: str) -> None:
super().__init__(raw)
try:
self.pd_id = int(self.pd_id)
self.accept_idx = int(self.accept_idx)
except AttributeError as e:
raise DivaRequestParseException(f"StartRequest: {e}")
class StartResponse(BaseResponse):
def __init__(self, cmd_id: str, req_id: int, pv_id: int, pv_name: str) -> None:
super().__init__(cmd_id, req_id)
self.pd_id: int = pv_id
self.start_result: int = 1
self.accept_idx: int = 100
self.hp_vol: int = 0
self.btn_se_vol: int = 1
self.btn_se_vol2: int = 1
self.sldr_se_vol2: int = 1
self.sort_kind: int = 1
self.player_name: str = pv_name
self.lv_num: int = 1
self.lv_pnt: int = 0
self.lv_efct_id: int = 1
self.lv_plt_id: int = 1
self.mdl_have: str = "F" * 250
self.cstmz_itm_have: str = "F" * 250
self.use_pv_mdl_eqp: int = 0
self.use_mdl_pri: int = 0
self.use_pv_skn_eqp: int = 1
self.use_pv_btn_se_eqp: int = 1
self.use_pv_sld_se_eqp: int = 1
self.use_pv_chn_sld_se_eqp: int = 1
self.use_pv_sldr_tch_se_eqp: int = 1
self.vcld_pts: int = 0
self.nxt_pv_id: int = 1
self.nxt_dffclty: int = 1
self.nxt_edtn: int = 0
self.dsp_clr_brdr: int = 0
self.dsp_intrm_rnk: int = 0
self.dsp_clr_sts: int = 0
self.rgo_sts: int = 0
self.my_qst_id: str = ",".join(["-1"] * 25)
self.my_qst_sts: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.my_qst_prgrs: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.my_qst_et: str = ",".join([parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))] * 5) + "," + ",".join(["xxx"] * 20)
self.clr_sts: str = ",".join("0" * 5) + "," + ",".join(["-1"] * 20)
self.mdl_eqp_tm: str = parse.quote(datetime.now().strftime(DivaConstants.LUT_TIME_FMT))
self.mdl_eqp_ary = ",".join(["-999"] * 3)
self.c_itm_eqp_ary = ",".join(["-999"] * 12)
self.ms_itm_flg_ary = ",".join(["1"] * 12)

View File

@ -4,11 +4,11 @@ import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
import zlib
import json
import urllib.parse
import base64
from os import path
from typing import Tuple
from titles.diva.handlers.base import *
from core.config import CoreConfig
from titles.diva.config import DivaConfig
from titles.diva.const import DivaConstants
@ -74,87 +74,54 @@ class DivaServlet:
def render_POST(self, req: Request, version: int, url_path: str) -> bytes:
req_raw = req.content.getvalue()
url_header = req.getAllHeaders()
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
# Ping Dispatch
if "THIS_STRING_SEPARATES" in str(url_header):
binary_request = req_raw.splitlines()
binary_cmd_decoded = binary_request[3].decode("utf-8")
binary_array = binary_cmd_decoded.split("&")
bin_req_data = {}
req_cls = BaseBinaryRequest(binary_cmd_decoded)
for kvp in binary_array:
split_bin = kvp.split("=")
bin_req_data[split_bin[0]] = split_bin[1]
else:
json_string = json.dumps(
req_raw.decode("utf-8")
) # Take the response and decode as UTF-8 and dump
b64string = json_string.replace(
r"\n", "\n"
) # Remove all \n and separate them as new lines
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
self.logger.info(f"Binary {bin_req_data['cmd']} Request")
self.logger.debug(bin_req_data)
try:
url_data = zlib.decompress(gz_string) # Decompressing the gzip
except zlib.error as e:
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
return b"stat=0"
handler = getattr(self.base, f"handle_{bin_req_data['cmd']}_request")
resp = handler(bin_req_data)
try:
req_cls = BaseRequest(url_data)
except DivaRequestParseException as e:
self.logger.error(e)
return b"stat=0"
self.logger.debug(
f"Response cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}"
)
return f"cmd={bin_req_data['cmd']}&req_id={bin_req_data['req_id']}&stat=ok{resp}".encode(
"utf-8"
)
# Main Dispatch
json_string = json.dumps(
req_raw.decode("utf-8")
) # Take the response and decode as UTF-8 and dump
b64string = json_string.replace(
r"\n", "\n"
) # Remove all \n and separate them as new lines
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
try:
url_data = zlib.decompress(gz_string).decode(
"utf-8"
) # Decompressing the gzip
except zlib.error as e:
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
return "stat=0"
req_kvp = urllib.parse.unquote(url_data)
req_data = {}
# We then need to split each parts with & so we can reuse them to fill out the requests
splitted_request = str.split(req_kvp, "&")
for kvp in splitted_request:
split = kvp.split("=")
req_data[split[0]] = split[1]
self.logger.info(f"{req_data['cmd']} Request")
self.logger.debug(req_data)
func_to_find = f"handle_{req_data['cmd']}_request"
# Load the requests
try:
handler = getattr(self.base, func_to_find)
resp = handler(req_data)
except AttributeError as e:
self.logger.warning(f"Unhandled {req_data['cmd']} request {e}")
return f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok".encode(
"utf-8"
)
except Exception as e:
self.logger.error(f"Error handling method {func_to_find} {e}")
return f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok".encode(
"utf-8"
)
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
self.logger.debug(
f"Response cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}"
self.logger.debug(f"Request: {url_data}\nHeaders: {url_header}")
self.logger.info(
f"{req_cls.cmd} request from {req_cls.kc_serial}/{req_cls.b_serial} at {req.getClientAddress().host}"
)
return (
f"cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}".encode(
"utf-8"
)
)
handler_str = f"handle_{req_cls.cmd}_request"
if not hasattr(self.base, handler_str):
self.logger.warn(f"Unhandled cmd {req_cls.cmd}")
return BaseResponse(req_cls.cmd, req_cls.req_id).make().encode()
handler = getattr(self.base, handler_str)
response = handler(req_cls.raw)
if response is None or response == "":
response = BaseResponse(req_cls.cmd, req_cls.req_id).make()
if not response.startswith("cmd="):
response = f"cmd={req_cls.cmd}&req_id={req_cls.req_id}&stat=ok" + response
self.logger.debug(f"Response: {response}")
return response.encode(errors="ignore")

View File

@ -34,9 +34,17 @@ profile = Table(
Column("use_pv_sld_se_eqp", Boolean, nullable=False, server_default="0"),
Column("use_pv_chn_sld_se_eqp", Boolean, nullable=False, server_default="0"),
Column("use_pv_sldr_tch_se_eqp", Boolean, nullable=False, server_default="0"),
Column("btn_se_eqp", Integer, nullable=False, server_default="-1"),
Column("sld_se_eqp", Integer, nullable=False, server_default="-1"),
Column("chn_sld_se_eqp", Integer, nullable=False, server_default="-1"),
Column("sldr_tch_se_eqp", Integer, nullable=False, server_default="-1"),
Column("nxt_pv_id", Integer, nullable=False, server_default="708"),
Column("nxt_dffclty", Integer, nullable=False, server_default="2"),
Column("nxt_edtn", Integer, nullable=False, server_default="0"),
Column("cnp_cid", Integer, nullable=False, server_default="-1"),
Column("cnp_val", Integer, nullable=False, server_default="-1"),
Column("cnp_rr", Integer, nullable=False, server_default="-1"),
Column("cnp_sp", String(255), nullable=False, server_default=""),
Column("dsp_clr_brdr", Integer, nullable=False, server_default="7"),
Column("dsp_intrm_rnk", Integer, nullable=False, server_default="1"),
Column("dsp_clr_sts", Integer, nullable=False, server_default="1"),

View File

@ -3,6 +3,7 @@ from sqlalchemy.types import Integer, String, TIMESTAMP, JSON, Boolean
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql import func, select
from sqlalchemy.dialects.mysql import insert
from sqlalchemy.engine import Row
from typing import Optional, List, Dict, Any
from core.data.schema import BaseData, metadata
@ -167,7 +168,7 @@ class DivaScoreData(BaseData):
def get_best_user_score(
self, user_id: int, pv_id: int, difficulty: int, edition: int
) -> Optional[Dict]:
) -> Optional[Row]:
sql = score.select(
and_(
score.c.user == user_id,
@ -184,7 +185,7 @@ class DivaScoreData(BaseData):
def get_top3_scores(
self, pv_id: int, difficulty: int, edition: int
) -> Optional[List[Dict]]:
) -> Optional[List[Row]]:
sql = (
score.select(
and_(
@ -204,7 +205,7 @@ class DivaScoreData(BaseData):
def get_global_ranking(
self, user_id: int, pv_id: int, difficulty: int, edition: int
) -> Optional[List]:
) -> Optional[List[Row]]:
# get the subquery max score of a user with pv_id, difficulty and
# edition
sql_sub = (
@ -231,7 +232,7 @@ class DivaScoreData(BaseData):
return None
return result.fetchone()
def get_best_scores(self, user_id: int) -> Optional[List]:
def get_best_scores(self, user_id: int) -> Optional[List[Row]]:
sql = score.select(score.c.user == user_id)
result = self.execute(sql)