forked from Hay1tsme/artemis
144 lines
4.5 KiB
Python
144 lines
4.5 KiB
Python
from urllib import parse
|
|
from datetime import datetime
|
|
from typing import Union, Dict
|
|
|
|
|
|
class DivaRequestParseException(Exception):
|
|
"""
|
|
Exception raised when there is a fault in parsing a diva request,
|
|
either due to a malformed request, or missing required items
|
|
"""
|
|
|
|
def __init__(self, message: str) -> None:
|
|
self.message = message
|
|
super().__init__(self.message)
|
|
|
|
|
|
class BaseBinaryRequest:
|
|
cmd: str
|
|
req_id: str
|
|
|
|
def __init__(self, raw: bytes) -> None:
|
|
self.raw = raw
|
|
self.raw_dict = dict(parse.parse_qsl(raw))
|
|
|
|
if "cmd" not in self.raw_dict:
|
|
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
|
|
|
|
if "req_id" not in self.raw_dict:
|
|
raise DivaRequestParseException(
|
|
f"req_id not in request data {self.raw_dict}"
|
|
)
|
|
|
|
for k, v in self.raw_dict:
|
|
setattr(self, k, v)
|
|
|
|
|
|
class BaseRequest:
|
|
cmd: str
|
|
req_id: str
|
|
game_id: str
|
|
r_rev: str
|
|
kc_serial: str
|
|
b_serial: str
|
|
country_code: str
|
|
|
|
def __init__(self, raw: Union[str, bytes]) -> None:
|
|
self.raw = raw
|
|
self.raw_dict: Dict[bytes, bytes] = dict(parse.parse_qsl(raw))
|
|
|
|
if b"cmd" not in self.raw_dict:
|
|
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
|
|
|
|
if b"req_id" not in self.raw_dict:
|
|
raise DivaRequestParseException(
|
|
f"req_id not in request data {self.raw_dict}"
|
|
)
|
|
|
|
if b"place_id" not in self.raw_dict:
|
|
raise DivaRequestParseException(
|
|
f"place_id not in request data {self.raw_dict}"
|
|
)
|
|
|
|
if b"start_up_mode" not in self.raw_dict:
|
|
raise DivaRequestParseException(
|
|
f"start_up_mode not in request data {self.raw_dict}"
|
|
)
|
|
|
|
if b"cmm_dly_mod" not in self.raw_dict:
|
|
raise DivaRequestParseException(
|
|
f"cmm_dly_mod not in request data {self.raw_dict}"
|
|
)
|
|
|
|
if b"cmm_dly_sec" not in self.raw_dict:
|
|
raise DivaRequestParseException(
|
|
f"cmm_dly_sec not in request data {self.raw_dict}"
|
|
)
|
|
|
|
if b"cmm_err_mod" not in self.raw_dict:
|
|
raise DivaRequestParseException(
|
|
f"cmm_err_mod not in request data {self.raw_dict}"
|
|
)
|
|
|
|
if b"region_code" not in self.raw_dict:
|
|
raise DivaRequestParseException(
|
|
f"region_code not in request data {self.raw_dict}"
|
|
)
|
|
|
|
if b"time_stamp" not in self.raw_dict:
|
|
raise DivaRequestParseException(
|
|
f"time_stamp not in request data {self.raw_dict}"
|
|
)
|
|
|
|
for k, v in self.raw_dict.items():
|
|
setattr(self, k.decode(), v.decode())
|
|
|
|
self.place_id = int(self.place_id)
|
|
self.start_up_mode = int(self.start_up_mode)
|
|
self.cmm_dly_mod = int(self.cmm_dly_mod)
|
|
self.cmm_dly_sec = int(self.cmm_dly_sec)
|
|
self.cmm_err_mod = int(self.cmm_err_mod)
|
|
self.region_code = int(self.region_code)
|
|
# datetime.now().astimezone().replace(microsecond=0).isoformat()
|
|
self.time_stamp = datetime.strptime(self.time_stamp, "%Y-%m-%dT%H:%M:%S%z")
|
|
|
|
|
|
class BaseResponse:
|
|
def __init__(self, cmd_id: str, req_id: int) -> None:
|
|
self.cmd = cmd_id
|
|
self.req_id = req_id
|
|
self.stat = "ok"
|
|
|
|
def make(self) -> str:
|
|
return f"cmd={self.cmd}&req_id={self.req_id}&stat={self.stat}"
|
|
|
|
class GameInitRequest(BaseRequest):
|
|
def __init__(self, raw: Union[str, bytes]) -> None:
|
|
super().__init__(raw)
|
|
|
|
class AttendRequest(BaseRequest):
|
|
def __init__(self, raw: Union[str, bytes]) -> None:
|
|
super().__init__(raw)
|
|
self.power_on = int(self.power_on)
|
|
self.is_bb = bool(int(self.power_on))
|
|
|
|
class AttendResponse(BaseResponse):
|
|
def __init__(self, cmd_id: str, req_id: int) -> None:
|
|
super().__init__(cmd_id, req_id)
|
|
self.atnd_prm1 = [1] * 100
|
|
self.atnd_prm2 = [1] * 100
|
|
self.atnd_prm3 = [1] * 100
|
|
self.atnd_lut = datetime.now()
|
|
|
|
def make(self) -> str:
|
|
ret = super().make()
|
|
ret_dict = {
|
|
"atnd_prm1": ','.join([str(i) for i in self.atnd_prm1]),
|
|
"atnd_prm2": ','.join([str(i) for i in self.atnd_prm2]),
|
|
"atnd_prm3": ','.join([str(i) for i in self.atnd_prm3]),
|
|
"atnd_lut": parse.quote(self.atnd_lut.strftime('%Y-%m-%d %H:%M:%S:16.0'))
|
|
}
|
|
ret += "&" + parse.urlencode(ret_dict, safe=",")
|
|
return ret
|
|
|