From 8ea82ffe1aca1a6fcbe031b8d279ae3eebdb30ce Mon Sep 17 00:00:00 2001 From: Kevin Trocolli Date: Sat, 19 Aug 2023 01:35:37 -0400 Subject: [PATCH] adb: add from_req --- core/adb_handlers/base.py | 8 ++++++-- core/adb_handlers/campaign.py | 20 +++++++++++++++++++- core/adb_handlers/felica.py | 14 +++++++++++++- core/adb_handlers/lookup.py | 13 ++++++++++++- core/aimedb.py | 26 +++++++++++++------------- 5 files changed, 63 insertions(+), 18 deletions(-) diff --git a/core/adb_handlers/base.py b/core/adb_handlers/base.py index 32fd1c0..5523a29 100644 --- a/core/adb_handlers/base.py +++ b/core/adb_handlers/base.py @@ -150,8 +150,12 @@ class ADBBaseRequest: self.head = ADBHeader.from_data(data) class ADBBaseResponse: - def __init__(self, code: int = 0, length: int = 0x20, status: int = 1, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888") -> None: - self.head = ADBHeader(0xa13e, 0x3087, code, length, status, game_id, store_id, keychip_id) + def __init__(self, code: int = 0, length: int = 0x20, status: int = 1, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", protocol_ver: int = 0x3087) -> None: + self.head = ADBHeader(0xa13e, protocol_ver, code, length, status, game_id, store_id, keychip_id) + + @classmethod + def from_req(cls, req: ADBHeader, cmd: int, length: int = 0x20, status: int = 1) -> "ADBBaseResponse": + return cls(cmd, length, status, req.game_id, req.store_id, req.keychip_id, req.protocol_ver) def append_padding(self, data: bytes): """Appends 0s to the end of the data until it's at the correct size""" diff --git a/core/adb_handlers/campaign.py b/core/adb_handlers/campaign.py index 936a4c3..a1a372e 100644 --- a/core/adb_handlers/campaign.py +++ b/core/adb_handlers/campaign.py @@ -55,6 +55,12 @@ class ADBCampaignResponse(ADBBaseResponse): def __init__(self, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x0C, length: int = 0x200, status: int = 1) -> None: super().__init__(code, length, status, game_id, store_id, keychip_id) self.campaigns = [Campaign(), Campaign(), Campaign()] + + @classmethod + def from_req(cls, req: ADBHeader) -> "ADBCampaignResponse": + c = cls(req.game_id, req.store_id, req.keychip_id) + c.head.protocol_ver = req.protocol_ver + return c def make(self) -> bytes: body = b"" @@ -78,6 +84,12 @@ class ADBOldCampaignResponse(ADBBaseResponse): self.info2 = 0 self.info3 = 0 + @classmethod + def from_req(cls, req: ADBHeader) -> "ADBCampaignResponse": + c = cls(req.game_id, req.store_id, req.keychip_id) + c.head.protocol_ver = req.protocol_ver + return c + def make(self) -> bytes: resp_struct = Struct( "info0" / Int32sl, @@ -103,7 +115,13 @@ class ADBCampaignClearResponse(ADBBaseResponse): def __init__(self, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x0E, length: int = 0x50, status: int = 1) -> None: super().__init__(code, length, status, game_id, store_id, keychip_id) self.campaign_clear_status = [CampaignClear(), CampaignClear(), CampaignClear()] - + + @classmethod + def from_req(cls, req: ADBHeader) -> "ADBCampaignResponse": + c = cls(req.game_id, req.store_id, req.keychip_id) + c.head.protocol_ver = req.protocol_ver + return c + def make(self) -> bytes: body = b"" diff --git a/core/adb_handlers/felica.py b/core/adb_handlers/felica.py index 04a8961..e2f8c11 100644 --- a/core/adb_handlers/felica.py +++ b/core/adb_handlers/felica.py @@ -13,7 +13,13 @@ class ADBFelicaLookupResponse(ADBBaseResponse): def __init__(self, access_code: str = None, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x03, length: int = 0x30, status: int = 1) -> None: super().__init__(code, length, status, game_id, store_id, keychip_id) self.access_code = access_code if access_code is not None else "00000000000000000000" - + + @classmethod + def from_req(cls, req: ADBHeader, access_code: str = None) -> "ADBFelicaLookupResponse": + c = cls(access_code, req.game_id, req.store_id, req.keychip_id) + c.head.protocol_ver = req.protocol_ver + return c + def make(self) -> bytes: resp_struct = Struct( "felica_idx" / Int32ul, @@ -47,6 +53,12 @@ class ADBFelicaLookup2Response(ADBBaseResponse): self.company = CompanyCodes.SEGA self.portal_status = PortalRegStatus.NO_REG + @classmethod + def from_req(cls, req: ADBHeader, user_id: Union[int, None] = None, access_code: Union[str, None] = None) -> "ADBFelicaLookup2Response": + c = cls(user_id, access_code, req.game_id, req.store_id, req.keychip_id) + c.head.protocol_ver = req.protocol_ver + return c + def make(self) -> bytes: resp_struct = Struct( "user_id" / Int32sl, diff --git a/core/adb_handlers/lookup.py b/core/adb_handlers/lookup.py index 09a9bff..076fc0a 100644 --- a/core/adb_handlers/lookup.py +++ b/core/adb_handlers/lookup.py @@ -19,13 +19,18 @@ class ADBLookupRequest(ADBBaseRequest): self.fw_version = ReaderFwVer.from_byte(fw_version) - class ADBLookupResponse(ADBBaseResponse): def __init__(self, user_id: Union[int, None], game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x06, length: int = 0x30, status: int = 1) -> None: super().__init__(code, length, status, game_id, store_id, keychip_id) self.user_id = user_id if user_id is not None else -1 self.portal_reg = PortalRegStatus.NO_REG + @classmethod + def from_req(cls, req: ADBHeader, user_id: Union[int, None]) -> "ADBLookupResponse": + c = cls(user_id, req.game_id, req.store_id, req.keychip_id) + c.head.protocol_ver = req.protocol_ver + return c + def make(self): resp_struct = Struct( "user_id" / Int32sl, @@ -48,6 +53,12 @@ class ADBLookupExResponse(ADBBaseResponse): self.user_id = user_id if user_id is not None else -1 self.portal_reg = PortalRegStatus.NO_REG + @classmethod + def from_req(cls, req: ADBHeader, user_id: Union[int, None]) -> "ADBLookupExResponse": + c = cls(user_id, req.game_id, req.store_id, req.keychip_id) + c.head.protocol_ver = req.protocol_ver + return c + def make(self): resp_struct = Struct( "user_id" / Int32sl, diff --git a/core/aimedb.py b/core/aimedb.py index 970eef5..552205f 100644 --- a/core/aimedb.py +++ b/core/aimedb.py @@ -119,7 +119,7 @@ class AimedbProtocol(Protocol): def handle_default(self, data: bytes, resp_code: int, length: int = 0x20) -> ADBBaseResponse: req = ADBHeader.from_data(data) - return ADBBaseResponse(resp_code, length, 1, req.game_id, req.store_id, req.keychip_id) + return ADBBaseResponse(resp_code, length, 1, req.game_id, req.store_id, req.keychip_id, req.protocol_ver) def handle_hello(self, data: bytes, resp_code: int) -> ADBBaseResponse: return self.handle_default(data, resp_code) @@ -128,13 +128,13 @@ class AimedbProtocol(Protocol): h = ADBHeader.from_data(data) if h.protocol_ver >= 0x3030: req = h - resp = ADBCampaignResponse(req.game_id, req.store_id, req.keychip_id) + resp = ADBCampaignResponse.from_req(req) else: req = ADBOldCampaignRequest(data) self.logger.info(f"Legacy campaign request for campaign {req.campaign_id} (protocol version {hex(h.protocol_ver)})") - resp = ADBOldCampaignResponse(req.head.game_id, req.head.store_id, req.head.keychip_id) + resp = ADBOldCampaignResponse.from_req(req.head) # We don't currently support campaigns return resp @@ -143,7 +143,7 @@ class AimedbProtocol(Protocol): req = ADBLookupRequest(data) user_id = self.data.card.get_user_id_from_card(req.access_code) - ret = ADBLookupResponse(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id) + ret = ADBLookupResponse.from_req(req.head, user_id) self.logger.info( f"access_code {req.access_code} -> user_id {ret.user_id}" @@ -154,7 +154,7 @@ class AimedbProtocol(Protocol): req = ADBLookupRequest(data) user_id = self.data.card.get_user_id_from_card(req.access_code) - ret = ADBLookupExResponse(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id) + ret = ADBLookupExResponse.from_req(req.head, user_id) self.logger.info( f"access_code {req.access_code} -> user_id {ret.user_id}" @@ -175,7 +175,7 @@ class AimedbProtocol(Protocol): self.logger.info( f"idm {req.idm} ipm {req.pmm} -> access_code {ac}" ) - return ADBFelicaLookupResponse(ac, req.head.game_id, req.head.store_id, req.head.keychip_id) + return ADBFelicaLookupResponse.from_req(req.head, ac) def handle_felica_register(self, data: bytes, resp_code: int) -> bytes: """ @@ -207,7 +207,7 @@ class AimedbProtocol(Protocol): f"Registration blocked!: access code {ac} (IDm: {req.idm} PMm: {req.pmm})" ) - return ADBFelicaLookupResponse(ac, req.head.game_id, req.head.store_id, req.head.keychip_id) + return ADBFelicaLookupResponse.from_req(req.head, ac) def handle_felica_lookup_ex(self, data: bytes, resp_code: int) -> bytes: req = ADBFelicaLookup2Request(data) @@ -221,12 +221,12 @@ class AimedbProtocol(Protocol): f"idm {req.idm} ipm {req.pmm} -> access_code {access_code} user_id {user_id}" ) - return ADBFelicaLookup2Response(user_id, access_code, req.head.game_id, req.head.store_id, req.head.keychip_id) + return ADBFelicaLookup2Response.from_req(req.head, user_id, access_code) def handle_campaign_clear(self, data: bytes, resp_code: int) -> ADBBaseResponse: req = ADBCampaignClearRequest(data) - resp = ADBCampaignClearResponse(req.head.game_id, req.head.store_id, req.head.keychip_id) + resp = ADBCampaignClearResponse.from_req(req.head) # We don't support campaign stuff return resp @@ -258,7 +258,7 @@ class AimedbProtocol(Protocol): f"Registration blocked!: access code {req.access_code}" ) - resp = ADBLookupResponse(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id) + resp = ADBLookupResponse.from_req(req.head, user_id) if resp.user_id <= 0: resp.head.status = ADBStatus.BAN_SYS # Closest we can get to a "You cannot register" @@ -268,17 +268,17 @@ class AimedbProtocol(Protocol): def handle_status_log(self, data: bytes, resp_code: int) -> bytes: req = ADBStatusLogRequest(data) self.logger.info(f"User {req.aime_id} logged {req.status.name} event") - return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id) + return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id, req.head.protocol_ver) def handle_log(self, data: bytes, resp_code: int) -> bytes: req = ADBLogRequest(data) self.logger.info(f"User {req.aime_id} logged {req.status.name} event, credit_ct: {req.credit_ct} bet_ct: {req.bet_ct} won_ct: {req.won_ct}") - return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id) + return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id, req.head.protocol_ver) def handle_log_ex(self, data: bytes, resp_code: int) -> bytes: req = ADBLogExRequest(data) self.logger.info(f"User {req.aime_id} logged {req.status.name} event, credit_ct: {req.credit_ct} bet_ct: {req.bet_ct} won_ct: {req.won_ct}") - return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id) + return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id, req.head.protocol_ver) def handle_goodbye(self, data: bytes, resp_code: int) -> None: self.logger.info(f"goodbye from {self.transport.getPeer().host}")