from urllib import parse from urllib.parse import quote from datetime import datetime from typing import Union, Dict, List, Any from ..const import DivaConstants def lazy_http_form_parse(src: Union[str, bytes]) -> Dict[bytes, bytes]: out = {} if type(src) == str: src = src.encode() for param in src.split(b"&"): kvp = param.split(b"=") out[parse.unquote(kvp[0])] = parse.unquote(kvp[1]) return out class DivaRequestParseException(Exception): """ Exception raised when there is a fault in parsing a diva request, either due to a malformed request, or missing required items """ def __init__(self, message: str) -> None: self.message = message super().__init__(self.message) class BaseBinaryRequest: cmd: str req_id: str def __init__(self, raw: bytes) -> None: self.raw = raw self.raw_dict = dict(parse.parse_qsl(raw)) if "cmd" not in self.raw_dict: raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}") if "req_id" not in self.raw_dict: raise DivaRequestParseException( f"req_id not in request data {self.raw_dict}" ) for k, v in self.raw_dict.items(): setattr(self, k, v) class BaseRequest: def __init__(self, raw: Union[str, bytes]) -> None: self.raw = raw try: self.raw_dict: Dict[str, str] = lazy_http_form_parse(raw) except UnicodeDecodeError as e: raise DivaRequestParseException(f"Could not decode data {raw} - {e}") if "cmd" not in self.raw_dict: raise DivaRequestParseException(f"cmd not in request data {self.raw_dict}") if "req_id" not in self.raw_dict: raise DivaRequestParseException( f"req_id not in request data {self.raw_dict}" ) if "place_id" not in self.raw_dict: raise DivaRequestParseException( f"place_id not in request data {self.raw_dict}" ) if "start_up_mode" not in self.raw_dict: raise DivaRequestParseException( f"start_up_mode not in request data {self.raw_dict}" ) if "cmm_dly_mod" not in self.raw_dict: raise DivaRequestParseException( f"cmm_dly_mod not in request data {self.raw_dict}" ) if "cmm_dly_sec" not in self.raw_dict: raise DivaRequestParseException( f"cmm_dly_sec not in request data {self.raw_dict}" ) if "cmm_err_mod" not in self.raw_dict: raise DivaRequestParseException( f"cmm_err_mod not in request data {self.raw_dict}" ) if "region_code" not in self.raw_dict: raise DivaRequestParseException( f"region_code not in request data {self.raw_dict}" ) if "time_stamp" not in self.raw_dict: raise DivaRequestParseException( f"time_stamp not in request data {self.raw_dict}" ) self.cmd: str self.req_id: str self.game_id: str self.r_rev: str self.kc_serial: str self.b_serial: str self.country_code: str for k, v in self.raw_dict.items(): setattr(self, k, v) self.place_id = int(self.place_id, 16) self.start_up_mode = int(self.start_up_mode) self.cmm_dly_mod = int(self.cmm_dly_mod) self.cmm_dly_sec = int(self.cmm_dly_sec) self.cmm_err_mod = int(self.cmm_err_mod) self.region_code = int(self.region_code) # datetime.now().astimezone().replace(microsecond=0).isoformat() self.time_stamp = datetime.strptime(self.time_stamp, "%Y-%m-%dT%H:%M:%S%z") class BaseResponse: def __init__(self, cmd_id: str, req_id: int) -> None: self.cmd = cmd_id self.req_id = req_id self.stat = "ok" def make(self) -> str: itms: List[str] = [] for k, v in vars(self).items(): if type(v) == int: itms.append(encode_int(k, v)) elif type(v) == bool: itms.append(encode_bool(k, v)) elif type(v) == datetime: itms.append(encode_date(k, v)) elif type(v) == list: itms.append(encode_list(k, v)) else: itms.append(encode_str(k, v)) return "&".join(itms) class GameInitRequest(BaseRequest): def __init__(self, raw: Union[str, bytes]) -> None: super().__init__(raw) class AttendRequest(BaseRequest): def __init__(self, raw: Union[str, bytes]) -> None: super().__init__(raw) self.power_on = int(self.power_on) self.is_bb = bool(int(self.power_on)) class AttendResponse(BaseResponse): def __init__(self, req_id: int) -> None: super().__init__("attend", req_id) self.atnd_prm1 = [1] * 100 self.atnd_prm2 = [1] * 100 self.atnd_prm3 = [1] * 100 self.atnd_lut = datetime.now() def make(self) -> str: return quote(super().make(), safe=",&=") class SpendCreditRequest(BaseRequest): def __init__(self, raw: Union[str, bytes]) -> None: super().__init__(raw) try: self.pd_id = int(self.pd_id) self.my_qst_id = [int(x) for x in self.my_qst_id.split(",")] self.my_qst_sts = [int(x) for x in self.my_qst_sts.split(",")] self.crdt_typ = int(self.crdt_typ) self.cmpgn_id = [int(x) for x in self.cmpgn_id.split(",")] self.cmpgn_pb = [int(x) for x in self.cmpgn_pb.split(",")] except AttributeError as e: raise DivaRequestParseException(f"StartRequest: {e}") class SpendCreditResponse(BaseResponse): def __init__(self, req_id: int) -> None: super().__init__("spend_credit", req_id) self.cmpgn_rslt = ",".join(["-1,-1,x,-1,-1,x,x,-1,x"] * 6) self.cmpgn_rslt_num = 0 self.vcld_pts = 0 self.lv_str = "" self.lv_efct_id = 0 self.lv_plt_id = 0 def encode_int(key: str, val: Union[int, None] = None) -> str: if type(val) != int: val = -1 return f"{key}={val}" def encode_bool(key: str, val: Union[bool, None] = None) -> str: if not val: return encode_int(key, 0) return encode_int(key, 1) def encode_str(key: str, val: Union[str, None] = None, urlencode_val: bool = False) -> str: if type(val) != str: val = "xxx" if urlencode_val: val = quote(val) return f"{key}={val}" def encode_date(key: str, val: Union[datetime, None], urlencode_val: bool = True, fmt: str = DivaConstants.LUT_TIME_FMT) -> str: if type(val) != datetime: val = datetime.now().astimezone() val = val.replace(microsecond=0) dt_fmt = val.strftime(fmt) if urlencode_val: dt_fmt = quote(dt_fmt) return f"{key}={dt_fmt}" def encode_list(key: str, val: Union[List[Any], None], urlencode_final_val: bool = False, urlencode_vals: bool = False) -> str: if not val: return f"{key}=" for x in range(len(val)): if val[x] is None: val[x] = "x" if type(val[x]) == datetime: val[x] = val[x].replace(microsecond=0).strftime(DivaConstants.LUT_TIME_FMT) elif type(val[x]) == bool: val[x] = str(int(val[x])) elif type(val[x]) == int: val[x] = str(val[x]) if urlencode_vals: val[x] = quote(val[x]) all_vals = ",".join(val) if urlencode_final_val: all_vals = quote(all_vals) return f"{key}={all_vals}" def decode_list_int(val: str) -> List[int]: return [int(x) for x in val.split(",")] def decode_list(val: str) -> List[str]: return [x for x in val.split(",")]