From 3e5c210bc42d2b795584dd06412765936bb3e2ac Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Mon, 22 May 2023 17:08:58 -0400 Subject: [PATCH] adb: update lookup and lookup2 --- core/adb_handlers/__init__.py | 1 + core/adb_handlers/base.py | 34 +++++++++++++++++----------------- core/adb_handlers/lookup.py | 25 +++++++++++++++++++++++++ core/aimedb.py | 32 ++++++++++++++------------------ 4 files changed, 57 insertions(+), 35 deletions(-) create mode 100644 core/adb_handlers/lookup.py diff --git a/core/adb_handlers/__init__.py b/core/adb_handlers/__init__.py index 1fe05d6..dde5623 100644 --- a/core/adb_handlers/__init__.py +++ b/core/adb_handlers/__init__.py @@ -1 +1,2 @@ from .base import ADBBaseRequest, ADBBaseResponse, ADBHeader, ADBHeaderException +from .lookup import ADBLookupRequest, ADBLookupResponse diff --git a/core/adb_handlers/base.py b/core/adb_handlers/base.py index 8590327..02f3159 100644 --- a/core/adb_handlers/base.py +++ b/core/adb_handlers/base.py @@ -25,6 +25,22 @@ class ADBHeader: raise ADBHeaderException(f"Magic {self.magic} != 0xa13e") return True + def make(self) -> bytes: + resp_struct = Struct( + "magic" / Int16ul, #Const(b">\xa1"), + "unknown" / Int16ul, + "response_code" / Int16ul, + "length" / Int16ul, + "status" / Int16ul, + ) + + response = resp_struct.build(dict( + magic=self.magic, + unknown=self.unknown, + response_code=self.cmd, + length=self.length, + status=self.status, + )) class ADBBaseRequest: def __init__(self, data: bytes) -> None: @@ -41,20 +57,4 @@ class ADBBaseResponse: return data def make(self) -> bytes: - resp_struct = Struct( - "magic" / Int16ul, #Const(b">\xa1"), - "unknown" / Int16ul, - "response_code" / Int16ul, - "length" / Int16ul, - "status" / Int16ul, - ) - - response = resp_struct.build(dict( - magic=self.head.magic, - unknown=self.head.unknown, - response_code=self.head.cmd, - length=self.head.length, - status=self.head.status, - )) - - return self.append_padding(response) + return self.head.make() diff --git a/core/adb_handlers/lookup.py b/core/adb_handlers/lookup.py new file mode 100644 index 0000000..20aa36d --- /dev/null +++ b/core/adb_handlers/lookup.py @@ -0,0 +1,25 @@ +from construct import Struct, AlignedStruct, Int32sl, Int16ul, Padding + +from .base import * + + +class ADBLookupRequest(ADBBaseRequest): + def __init__(self, data: bytes) -> None: + super().__init__(data) + self.access_code = data[0x20:0x2A].hex() + +class ADBLookupResponse(ADBBaseResponse): + def __init__(self, code: int = 0, length: int = 10, status: int = 1, user_id: int | None = None) -> None: + super().__init__(code, length, status) + self.user_id = user_id if user_id is not None else -1 + + def make(self): + header = super().make() + resp_struct = Struct( + Padding(0x10), + "user_id" / Int32sl + ) + + return header + resp_struct.build(dict( + user_id = self.user_id + )) diff --git a/core/aimedb.py b/core/aimedb.py index 574f0ef..acb1f09 100644 --- a/core/aimedb.py +++ b/core/aimedb.py @@ -84,13 +84,15 @@ class AimedbProtocol(Protocol): if type(resp) == ADBBaseResponse: resp_bytes = resp.make() + if len(resp_bytes) != resp.head.length: + resp_bytes = self.append_padding(resp_bytes) elif type(resp) == bytes: resp_bytes = resp else: raise TypeError(f"Unsupported type returned by ADB handler for {name}: {type(resp)}") - + try: encrypted = cipher.encrypt(resp_bytes) self.logger.debug(f"Response {resp_bytes.hex()}") @@ -108,20 +110,17 @@ class AimedbProtocol(Protocol): def handle_hello(self, data: bytes, resp_code: int) -> ADBBaseResponse: return self.handle_default(data, resp_code, 0x0020) - def handle_lookup(self, data: bytes, resp_code: int) -> bytes: - luid = data[0x20:0x2A].hex() - user_id = self.data.card.get_user_id_from_card(access_code=luid) - - if user_id is None: - user_id = -1 + def handle_lookup(self, data: bytes, resp_code: int) -> ADBBaseResponse: + req = ADBLookupRequest(data) + user_id = self.data.card.get_user_id_from_card(req.access_code) + ret = ADBLookupResponse(resp_code, 0x0130, 1, user_id) + self.logger.info( - f"lookup from {self.transport.getPeer().host}: luid {luid} -> user_id {user_id}" + f"access_code {req.access_code} -> user_id {ret.user_id}" ) - ret = struct.pack( - "<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["lookup"], 0x0130, 0x0001 - ) + """ ret += bytes(0x20 - len(ret)) if user_id is None: @@ -129,14 +128,11 @@ class AimedbProtocol(Protocol): else: ret += struct.pack(" bytes: - self.logger.info(f"lookup2") - - ret = bytearray(self.handle_lookup(data)) - ret[4] = self.AIMEDB_RESPONSE_CODES["lookup2"] - - return bytes(ret) + def handle_lookup2(self, data: bytes, resp_code: int) -> ADBBaseResponse: + return self.handle_lookup(data, resp_code) def handle_felica_lookup(self, data: bytes, resp_code: int) -> bytes: idm = data[0x20:0x28].hex()