adb: add from_req

This commit is contained in:
Hay1tsme 2023-08-19 01:35:37 -04:00
parent 904ea10920
commit 8ea82ffe1a
5 changed files with 63 additions and 18 deletions

View File

@ -150,8 +150,12 @@ class ADBBaseRequest:
self.head = ADBHeader.from_data(data)
class ADBBaseResponse:
def __init__(self, code: int = 0, length: int = 0x20, status: int = 1, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888") -> None:
self.head = ADBHeader(0xa13e, 0x3087, code, length, status, game_id, store_id, keychip_id)
def __init__(self, code: int = 0, length: int = 0x20, status: int = 1, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", protocol_ver: int = 0x3087) -> None:
self.head = ADBHeader(0xa13e, protocol_ver, code, length, status, game_id, store_id, keychip_id)
@classmethod
def from_req(cls, req: ADBHeader, cmd: int, length: int = 0x20, status: int = 1) -> "ADBBaseResponse":
return cls(cmd, length, status, req.game_id, req.store_id, req.keychip_id, req.protocol_ver)
def append_padding(self, data: bytes):
"""Appends 0s to the end of the data until it's at the correct size"""

View File

@ -55,6 +55,12 @@ class ADBCampaignResponse(ADBBaseResponse):
def __init__(self, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x0C, length: int = 0x200, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.campaigns = [Campaign(), Campaign(), Campaign()]
@classmethod
def from_req(cls, req: ADBHeader) -> "ADBCampaignResponse":
c = cls(req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self) -> bytes:
body = b""
@ -78,6 +84,12 @@ class ADBOldCampaignResponse(ADBBaseResponse):
self.info2 = 0
self.info3 = 0
@classmethod
def from_req(cls, req: ADBHeader) -> "ADBCampaignResponse":
c = cls(req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self) -> bytes:
resp_struct = Struct(
"info0" / Int32sl,
@ -103,7 +115,13 @@ class ADBCampaignClearResponse(ADBBaseResponse):
def __init__(self, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x0E, length: int = 0x50, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.campaign_clear_status = [CampaignClear(), CampaignClear(), CampaignClear()]
@classmethod
def from_req(cls, req: ADBHeader) -> "ADBCampaignResponse":
c = cls(req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self) -> bytes:
body = b""

View File

@ -13,7 +13,13 @@ class ADBFelicaLookupResponse(ADBBaseResponse):
def __init__(self, access_code: str = None, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x03, length: int = 0x30, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.access_code = access_code if access_code is not None else "00000000000000000000"
@classmethod
def from_req(cls, req: ADBHeader, access_code: str = None) -> "ADBFelicaLookupResponse":
c = cls(access_code, req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self) -> bytes:
resp_struct = Struct(
"felica_idx" / Int32ul,
@ -47,6 +53,12 @@ class ADBFelicaLookup2Response(ADBBaseResponse):
self.company = CompanyCodes.SEGA
self.portal_status = PortalRegStatus.NO_REG
@classmethod
def from_req(cls, req: ADBHeader, user_id: Union[int, None] = None, access_code: Union[str, None] = None) -> "ADBFelicaLookup2Response":
c = cls(user_id, access_code, req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self) -> bytes:
resp_struct = Struct(
"user_id" / Int32sl,

View File

@ -19,13 +19,18 @@ class ADBLookupRequest(ADBBaseRequest):
self.fw_version = ReaderFwVer.from_byte(fw_version)
class ADBLookupResponse(ADBBaseResponse):
def __init__(self, user_id: Union[int, None], game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x06, length: int = 0x30, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.user_id = user_id if user_id is not None else -1
self.portal_reg = PortalRegStatus.NO_REG
@classmethod
def from_req(cls, req: ADBHeader, user_id: Union[int, None]) -> "ADBLookupResponse":
c = cls(user_id, req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self):
resp_struct = Struct(
"user_id" / Int32sl,
@ -48,6 +53,12 @@ class ADBLookupExResponse(ADBBaseResponse):
self.user_id = user_id if user_id is not None else -1
self.portal_reg = PortalRegStatus.NO_REG
@classmethod
def from_req(cls, req: ADBHeader, user_id: Union[int, None]) -> "ADBLookupExResponse":
c = cls(user_id, req.game_id, req.store_id, req.keychip_id)
c.head.protocol_ver = req.protocol_ver
return c
def make(self):
resp_struct = Struct(
"user_id" / Int32sl,

View File

@ -119,7 +119,7 @@ class AimedbProtocol(Protocol):
def handle_default(self, data: bytes, resp_code: int, length: int = 0x20) -> ADBBaseResponse:
req = ADBHeader.from_data(data)
return ADBBaseResponse(resp_code, length, 1, req.game_id, req.store_id, req.keychip_id)
return ADBBaseResponse(resp_code, length, 1, req.game_id, req.store_id, req.keychip_id, req.protocol_ver)
def handle_hello(self, data: bytes, resp_code: int) -> ADBBaseResponse:
return self.handle_default(data, resp_code)
@ -128,13 +128,13 @@ class AimedbProtocol(Protocol):
h = ADBHeader.from_data(data)
if h.protocol_ver >= 0x3030:
req = h
resp = ADBCampaignResponse(req.game_id, req.store_id, req.keychip_id)
resp = ADBCampaignResponse.from_req(req)
else:
req = ADBOldCampaignRequest(data)
self.logger.info(f"Legacy campaign request for campaign {req.campaign_id} (protocol version {hex(h.protocol_ver)})")
resp = ADBOldCampaignResponse(req.head.game_id, req.head.store_id, req.head.keychip_id)
resp = ADBOldCampaignResponse.from_req(req.head)
# We don't currently support campaigns
return resp
@ -143,7 +143,7 @@ class AimedbProtocol(Protocol):
req = ADBLookupRequest(data)
user_id = self.data.card.get_user_id_from_card(req.access_code)
ret = ADBLookupResponse(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id)
ret = ADBLookupResponse.from_req(req.head, user_id)
self.logger.info(
f"access_code {req.access_code} -> user_id {ret.user_id}"
@ -154,7 +154,7 @@ class AimedbProtocol(Protocol):
req = ADBLookupRequest(data)
user_id = self.data.card.get_user_id_from_card(req.access_code)
ret = ADBLookupExResponse(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id)
ret = ADBLookupExResponse.from_req(req.head, user_id)
self.logger.info(
f"access_code {req.access_code} -> user_id {ret.user_id}"
@ -175,7 +175,7 @@ class AimedbProtocol(Protocol):
self.logger.info(
f"idm {req.idm} ipm {req.pmm} -> access_code {ac}"
)
return ADBFelicaLookupResponse(ac, req.head.game_id, req.head.store_id, req.head.keychip_id)
return ADBFelicaLookupResponse.from_req(req.head, ac)
def handle_felica_register(self, data: bytes, resp_code: int) -> bytes:
"""
@ -207,7 +207,7 @@ class AimedbProtocol(Protocol):
f"Registration blocked!: access code {ac} (IDm: {req.idm} PMm: {req.pmm})"
)
return ADBFelicaLookupResponse(ac, req.head.game_id, req.head.store_id, req.head.keychip_id)
return ADBFelicaLookupResponse.from_req(req.head, ac)
def handle_felica_lookup_ex(self, data: bytes, resp_code: int) -> bytes:
req = ADBFelicaLookup2Request(data)
@ -221,12 +221,12 @@ class AimedbProtocol(Protocol):
f"idm {req.idm} ipm {req.pmm} -> access_code {access_code} user_id {user_id}"
)
return ADBFelicaLookup2Response(user_id, access_code, req.head.game_id, req.head.store_id, req.head.keychip_id)
return ADBFelicaLookup2Response.from_req(req.head, user_id, access_code)
def handle_campaign_clear(self, data: bytes, resp_code: int) -> ADBBaseResponse:
req = ADBCampaignClearRequest(data)
resp = ADBCampaignClearResponse(req.head.game_id, req.head.store_id, req.head.keychip_id)
resp = ADBCampaignClearResponse.from_req(req.head)
# We don't support campaign stuff
return resp
@ -258,7 +258,7 @@ class AimedbProtocol(Protocol):
f"Registration blocked!: access code {req.access_code}"
)
resp = ADBLookupResponse(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id)
resp = ADBLookupResponse.from_req(req.head, user_id)
if resp.user_id <= 0:
resp.head.status = ADBStatus.BAN_SYS # Closest we can get to a "You cannot register"
@ -268,17 +268,17 @@ class AimedbProtocol(Protocol):
def handle_status_log(self, data: bytes, resp_code: int) -> bytes:
req = ADBStatusLogRequest(data)
self.logger.info(f"User {req.aime_id} logged {req.status.name} event")
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id)
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id, req.head.protocol_ver)
def handle_log(self, data: bytes, resp_code: int) -> bytes:
req = ADBLogRequest(data)
self.logger.info(f"User {req.aime_id} logged {req.status.name} event, credit_ct: {req.credit_ct} bet_ct: {req.bet_ct} won_ct: {req.won_ct}")
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id)
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id, req.head.protocol_ver)
def handle_log_ex(self, data: bytes, resp_code: int) -> bytes:
req = ADBLogExRequest(data)
self.logger.info(f"User {req.aime_id} logged {req.status.name} event, credit_ct: {req.credit_ct} bet_ct: {req.bet_ct} won_ct: {req.won_ct}")
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id)
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id, req.head.protocol_ver)
def handle_goodbye(self, data: bytes, resp_code: int) -> None:
self.logger.info(f"goodbye from {self.transport.getPeer().host}")