artemis/titles/diva/handlers/base.py

275 lines
9.2 KiB
Python

from urllib import parse
from urllib.parse import quote
from datetime import datetime
from typing import Union, Dict, List, Any
from ..const import DivaConstants
def lazy_http_form_parse(src: Union[str, bytes]) -> Dict[bytes, bytes]:
out = {}
if type(src) == str:
src = src.encode()
for param in src.split(b"&"):
kvp = param.split(b"=")
out[parse.unquote(kvp[0])] = parse.unquote(kvp[1])
return out
class DivaRequestParseException(Exception):
"""
Exception raised when there is a fault in parsing a diva request,
either due to a malformed request, or missing required items
"""
def __init__(self, message: str) -> None:
self.message = message
super().__init__(self.message)
class BaseBinaryRequest:
cmd: str
req_id: str
def __init__(self, raw: bytes) -> None:
self.raw = raw
self.raw_dict = dict(parse.parse_qsl(raw))
if "cmd" not in self.raw_dict:
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
if "req_id" not in self.raw_dict:
raise DivaRequestParseException(
f"req_id not in request data {self.raw_dict}"
)
for k, v in self.raw_dict.items():
setattr(self, k, v)
class BaseRequest:
def __init__(self, raw: Union[str, bytes]) -> None:
self.raw = raw
try:
self.raw_dict: Dict[str, str] = lazy_http_form_parse(raw)
except UnicodeDecodeError as e:
raise DivaRequestParseException(f"Could not decode data {raw} - {e}")
if "cmd" not in self.raw_dict:
raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}")
if "req_id" not in self.raw_dict:
raise DivaRequestParseException(
f"req_id not in request data {self.raw_dict}"
)
if "place_id" not in self.raw_dict:
raise DivaRequestParseException(
f"place_id not in request data {self.raw_dict}"
)
if "start_up_mode" not in self.raw_dict:
raise DivaRequestParseException(
f"start_up_mode not in request data {self.raw_dict}"
)
if "cmm_dly_mod" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_dly_mod not in request data {self.raw_dict}"
)
if "cmm_dly_sec" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_dly_sec not in request data {self.raw_dict}"
)
if "cmm_err_mod" not in self.raw_dict:
raise DivaRequestParseException(
f"cmm_err_mod not in request data {self.raw_dict}"
)
if "region_code" not in self.raw_dict:
raise DivaRequestParseException(
f"region_code not in request data {self.raw_dict}"
)
if "time_stamp" not in self.raw_dict:
raise DivaRequestParseException(
f"time_stamp not in request data {self.raw_dict}"
)
self.cmd: str = self.raw_dict.get('cmd')
self.req_id: str = self.raw_dict.get('req_id')
self.game_id: str = self.raw_dict.get('game_id')
self.r_rev: str = self.raw_dict.get('r_rev')
self.kc_serial: str = self.raw_dict.get('kc_serial')
self.b_serial: str = self.raw_dict.get('b_serial')
self.country_code: str = self.raw_dict.get('country_code')
self.place_id = int(self.raw_dict['place_id'], 16)
self.start_up_mode = int(self.raw_dict.get('start_up_mode'))
self.cmm_dly_mod = int(self.raw_dict.get('cmm_dly_mod'))
self.cmm_dly_sec = int(self.raw_dict.get('cmm_dly_sec'))
self.cmm_err_mod = int(self.raw_dict.get('cmm_err_mod'))
self.region_code = int(self.raw_dict.get('region_code'))
# datetime.now().astimezone().replace(microsecond=0).isoformat()
self.time_stamp = datetime.strptime(self.raw_dict.get('time_stamp'), "%Y-%m-%dT%H:%M:%S%z")
class BaseResponse:
def __init__(self, cmd_id: str, req_id: int) -> None:
self.cmd = cmd_id
self.req_id = req_id
self.stat = "ok"
def make(self) -> str:
itms: List[str] = []
for k, v in vars(self).items():
if type(v) == int:
itms.append(encode_int(k, v))
elif type(v) == bool:
itms.append(encode_bool(k, v))
elif type(v) == datetime:
itms.append(encode_date(k, v))
elif type(v) == list:
itms.append(encode_list(k, v))
else:
itms.append(encode_str(k, v))
return "&".join(itms)
class GameInitRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
class AttendRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
if 'power_on' not in self.raw_dict:
raise DivaRequestParseException(
f"power_on not in request data {self.raw_dict}"
)
if 'is_bb' not in self.raw_dict:
raise DivaRequestParseException(
f"is_bb not in request data {self.raw_dict}"
)
self.power_on = int(self.raw_dict.get('power_on'))
self.is_bb = bool(int(self.raw_dict.get('is_bb')))
class AttendResponse(BaseResponse):
def __init__(self, req_id: int) -> None:
super().__init__("attend", req_id)
self.atnd_prm1 = [1] * 100
self.atnd_prm2 = [1] * 100
self.atnd_prm3 = [1] * 100
self.atnd_lut = datetime.now()
def make(self) -> str:
return quote(super().make(), safe=",&=")
class SpendCreditRequest(BaseRequest):
def __init__(self, raw: Union[str, bytes]) -> None:
super().__init__(raw)
if 'pd_id' not in self.raw_dict:
raise DivaRequestParseException(
f"pd_id not in request data {self.raw_dict}"
)
if 'my_qst_id' not in self.raw_dict:
raise DivaRequestParseException(
f"my_qst_id not in request data {self.raw_dict}"
)
if 'my_qst_sts' not in self.raw_dict:
raise DivaRequestParseException(
f"my_qst_sts not in request data {self.raw_dict}"
)
if 'crdt_typ' not in self.raw_dict:
raise DivaRequestParseException(
f"crdt_typ not in request data {self.raw_dict}"
)
if 'cmpgn_id' not in self.raw_dict:
raise DivaRequestParseException(
f"cmpgn_id not in request data {self.raw_dict}"
)
if 'cmpgn_pb' not in self.raw_dict:
raise DivaRequestParseException(
f"cmpgn_pb not in request data {self.raw_dict}"
)
self.pd_id = int(self.raw_dict.get('pd_id'))
self.my_qst_id = decode_list_int(self.raw_dict.get('my_qst_id'))
self.my_qst_sts = decode_list_int(self.raw_dict.get('my_qst_sts'))
self.crdt_typ = int(self.raw_dict.get('crdt_typ'))
self.cmpgn_id = decode_list_int(self.raw_dict.get('cmpgn_id'))
self.cmpgn_pb = decode_list_int(self.raw_dict.get('cmpgn_pb'))
class SpendCreditResponse(BaseResponse):
def __init__(self, req_id: int) -> None:
super().__init__("spend_credit", req_id)
self.cmpgn_rslt = ",".join(["-1,-1,x,-1,-1,x,x,-1,x"] * 6)
self.cmpgn_rslt_num = 0
self.vcld_pts = 0
self.lv_str = ""
self.lv_efct_id = 0
self.lv_plt_id = 0
def encode_int(key: str, val: Union[int, None] = None) -> str:
if type(val) != int:
val = -1
return f"{key}={val}"
def encode_bool(key: str, val: Union[bool, None] = None) -> str:
if not val:
return encode_int(key, 0)
return encode_int(key, 1)
def encode_str(key: str, val: Union[str, None] = None, urlencode_val: bool = False) -> str:
if type(val) != str:
val = "xxx"
if urlencode_val:
val = quote(val)
return f"{key}={val}"
def encode_date(key: str, val: Union[datetime, None], urlencode_val: bool = True, fmt: str = DivaConstants.LUT_TIME_FMT) -> str:
if type(val) != datetime:
val = datetime.now().astimezone()
val = val.replace(microsecond=0)
dt_fmt = val.strftime(fmt)
if urlencode_val:
dt_fmt = quote(dt_fmt)
return f"{key}={dt_fmt}"
def encode_list(key: str, val: Union[List[Any], None], urlencode_final_val: bool = False, urlencode_vals: bool = False) -> str:
if not val:
return f"{key}="
for x in range(len(val)):
if val[x] is None:
val[x] = "x"
if type(val[x]) == datetime:
val[x] = val[x].replace(microsecond=0).strftime(DivaConstants.LUT_TIME_FMT)
elif type(val[x]) == bool:
val[x] = str(int(val[x]))
elif type(val[x]) == int:
val[x] = str(val[x])
if urlencode_vals:
val[x] = quote(val[x])
all_vals = ",".join(val)
if urlencode_final_val:
all_vals = quote(all_vals)
return f"{key}={all_vals}"
def decode_list_int(val: str) -> List[int]:
return [int(x) for x in val.split(",")]
def decode_list(val: str) -> List[str]:
return [x for x in val.split(",")]