from urllib import parse from datetime import datetime from typing import Union, Dict def lazy_http_form_parse(src: Union[str, bytes]) -> Dict[bytes, bytes]: out = {} if type(src) == str: src = src.encode() for param in src.split(b"&"): kvp = param.split(b"=") out[parse.unquote(kvp[0])] = parse.unquote(kvp[1]) return out class DivaRequestParseException(Exception): """ Exception raised when there is a fault in parsing a diva request, either due to a malformed request, or missing required items """ def __init__(self, message: str) -> None: self.message = message super().__init__(self.message) class BaseBinaryRequest: cmd: str req_id: str def __init__(self, raw: bytes) -> None: self.raw = raw self.raw_dict = dict(parse.parse_qsl(raw)) if "cmd" not in self.raw_dict: raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}") if "req_id" not in self.raw_dict: raise DivaRequestParseException( f"req_id not in request data {self.raw_dict}" ) for k, v in self.raw_dict.items(): setattr(self, k, v) class BaseRequest: cmd: str req_id: str game_id: str r_rev: str kc_serial: str b_serial: str country_code: str def __init__(self, raw: Union[str, bytes]) -> None: self.raw = raw try: self.raw_dict: Dict[str, str] = lazy_http_form_parse(raw) except UnicodeDecodeError as e: raise DivaRequestParseException(f"Could not decode data {raw} - {e}") if "cmd" not in self.raw_dict: raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}") if "req_id" not in self.raw_dict: raise DivaRequestParseException( f"req_id not in request data {self.raw_dict}" ) if "place_id" not in self.raw_dict: raise DivaRequestParseException( f"place_id not in request data {self.raw_dict}" ) if "start_up_mode" not in self.raw_dict: raise DivaRequestParseException( f"start_up_mode not in request data {self.raw_dict}" ) if "cmm_dly_mod" not in self.raw_dict: raise DivaRequestParseException( f"cmm_dly_mod not in request data {self.raw_dict}" ) if "cmm_dly_sec" not in self.raw_dict: raise DivaRequestParseException( f"cmm_dly_sec not in request data {self.raw_dict}" ) if "cmm_err_mod" not in self.raw_dict: raise DivaRequestParseException( f"cmm_err_mod not in request data {self.raw_dict}" ) if "region_code" not in self.raw_dict: raise DivaRequestParseException( f"region_code not in request data {self.raw_dict}" ) if "time_stamp" not in self.raw_dict: raise DivaRequestParseException( f"time_stamp not in request data {self.raw_dict}" ) for k, v in self.raw_dict.items(): setattr(self, k, v) self.place_id = int(self.place_id) self.start_up_mode = int(self.start_up_mode) self.cmm_dly_mod = int(self.cmm_dly_mod) self.cmm_dly_sec = int(self.cmm_dly_sec) self.cmm_err_mod = int(self.cmm_err_mod) self.region_code = int(self.region_code) # datetime.now().astimezone().replace(microsecond=0).isoformat() self.time_stamp = datetime.strptime(self.time_stamp, "%Y-%m-%dT%H:%M:%S%z") class BaseResponse: def __init__(self, cmd_id: str, req_id: int) -> None: self.cmd = cmd_id self.req_id = req_id self.stat = "ok" def make(self) -> str: ret = "" for k, v in vars(self).items(): if type(v) == bool: v = int(v) ret += f"{k}={v}&" if ret[-1] == "&": ret = ret[:-1] return ret class GameInitRequest(BaseRequest): def __init__(self, raw: Union[str, bytes]) -> None: super().__init__(raw) class AttendRequest(BaseRequest): def __init__(self, raw: Union[str, bytes]) -> None: super().__init__(raw) self.power_on = int(self.power_on) self.is_bb = bool(int(self.power_on)) class AttendResponse(BaseResponse): def __init__(self, cmd_id: str, req_id: int) -> None: super().__init__(cmd_id, req_id) self.atnd_prm1 = [1] * 100 self.atnd_prm2 = [1] * 100 self.atnd_prm3 = [1] * 100 self.atnd_lut = datetime.now() def make(self) -> str: ret = super().make() ret_dict = { "atnd_prm1": ','.join([str(i) for i in self.atnd_prm1]), "atnd_prm2": ','.join([str(i) for i in self.atnd_prm2]), "atnd_prm3": ','.join([str(i) for i in self.atnd_prm3]), "atnd_lut": parse.quote(self.atnd_lut.strftime('%Y-%m-%d %H:%M:%S:16.0')) } ret += "&" + parse.urlencode(ret_dict, safe=",") return ret