begin adb redux, add base classes

This commit is contained in:
Hay1tsme 2023-05-22 16:15:10 -04:00
parent b9fd4f294d
commit 4b90d2ca40
3 changed files with 124 additions and 68 deletions

View File

@ -0,0 +1 @@
from .base import ADBBaseRequest, ADBBaseResponse, ADBHeader, ADBHeaderException

60
core/adb_handlers/base.py Normal file
View File

@ -0,0 +1,60 @@
import struct
from construct import Struct, Int16ul, Const
class ADBHeaderException(Exception):
pass
# everything is LE
class ADBHeader:
def __init__(self, magic: int, unknown: int, cmd: int, length: int, status: int) -> None:
self.magic = magic # u16
self.unknown = unknown # u16
self.cmd = cmd # u16
self.length = length # u16
self.status = status # u16
self.validate()
@classmethod
def from_data(cls, data: bytes) -> "ADBHeader":
magic, unknown, cmd, length, status = struct.unpack_from("<5H", data)
return cls(magic, unknown, cmd, length, status)
def validate(self) -> bool:
if self.magic != 0xa13e:
raise ADBHeaderException(f"Magic {self.magic} != 0xa13e")
return True
class ADBBaseRequest:
def __init__(self, data: bytes) -> None:
self.head = ADBHeader.from_data(data)
class ADBBaseResponse:
def __init__(self, code: int = 0, length: int = 10, status: int = 1) -> None:
self.head = ADBHeader(0xa13e, 0x3087, code, length, status)
def append_padding(self, data: bytes):
"""Appends 0s to the end of the data until it's at the correct size"""
padding_size = self.head.length - len(data)
data += bytes(padding_size)
return data
def make(self) -> bytes:
resp_struct = Struct(
"magic" / Int16ul, #Const(b">\xa1"),
"unknown" / Int16ul,
"response_code" / Int16ul,
"length" / Int16ul,
"status" / Int16ul,
)
response = resp_struct.build(dict(
magic=self.head.magic,
unknown=self.head.unknown,
response_code=self.head.cmd,
length=self.head.length,
status=self.head.status,
))
return self.append_padding(response)

View File

@ -2,27 +2,18 @@ from twisted.internet.protocol import Factory, Protocol
import logging, coloredlogs
from Crypto.Cipher import AES
import struct
from typing import Dict, Any
from typing import Dict, Tuple, Callable
from typing_extensions import Final
from logging.handlers import TimedRotatingFileHandler
from core.config import CoreConfig
from core.data import Data
from .adb_handlers import *
class AimedbProtocol(Protocol):
AIMEDB_RESPONSE_CODES = {
"felica_lookup": 0x03,
"lookup": 0x06,
"log": 0x0A,
"campaign": 0x0C,
"touch": 0x0E,
"lookup2": 0x10,
"felica_lookup2": 0x12,
"log2": 0x14,
"hello": 0x65,
}
request_list: Dict[int, Any] = {}
CMD_CODE_GOODBYE: Final[int] = 0x66
request_list: Dict[int, Tuple[Callable[[bytes, int], ADBBaseResponse | bytes], int, str]] = {}
def __init__(self, core_cfg: CoreConfig) -> None:
self.logger = logging.getLogger("aimedb")
@ -32,16 +23,19 @@ class AimedbProtocol(Protocol):
self.logger.error("!!!KEY NOT SET!!!")
exit(1)
self.request_list[0x01] = self.handle_felica_lookup
self.request_list[0x04] = self.handle_lookup
self.request_list[0x05] = self.handle_register
self.request_list[0x09] = self.handle_log
self.request_list[0x0B] = self.handle_campaign
self.request_list[0x0D] = self.handle_touch
self.request_list[0x0F] = self.handle_lookup2
self.request_list[0x11] = self.handle_felica_lookup2
self.request_list[0x13] = self.handle_log2
self.request_list[0x64] = self.handle_hello
self.register_handler(0x01, 0x03, self.handle_felica_lookup, 'felica_lookup')
self.register_handler(0x04, 0x06, self.handle_lookup, 'lookup')
self.register_handler(0x05, 0x06, self.handle_register, 'register')
self.register_handler(0x09, 0x0A, self.handle_log, 'log')
self.register_handler(0x0B, 0x0C, self.handle_campaign, 'campaign')
self.register_handler(0x0D, 0x0E, self.handle_touch, 'touch')
self.register_handler(0x0F, 0x10, self.handle_lookup2, 'lookup2')
self.register_handler(0x11, 0x12, self.handle_felica_lookup2, 'felica_lookup2')
self.register_handler(0x13, 0x14, self.handle_log2, 'felica_log2')
self.register_handler(0x64, 0x65, self.handle_hello, 'felica_hello')
def register_handler(self, cmd: int, resp:int, handler: Callable[[bytes, int], ADBBaseResponse | bytes], name: str) -> None:
self.request_list[cmd] = (handler, resp, name)
def append_padding(self, data: bytes):
"""Appends 0s to the end of the data until it's at the correct size"""
@ -63,57 +57,58 @@ class AimedbProtocol(Protocol):
try:
decrypted = cipher.decrypt(data)
except:
self.logger.error(f"Failed to decrypt {data.hex()}")
except Exception as e:
self.logger.error(f"Failed to decrypt {data.hex()} because {e}")
return None
self.logger.debug(f"{self.transport.getPeer().host} wrote {decrypted.hex()}")
if not decrypted[1] == 0xA1 and not decrypted[0] == 0x3E:
self.logger.error(f"Bad magic")
return None
head = ADBHeader.from_data(decrypted)
req_code = decrypted[4]
if req_code == 0x66:
if head.cmd == self.CMD_CODE_GOODBYE:
# Goodbye just requires a disconnect, no reply
self.logger.info(f"goodbye from {self.transport.getPeer().host}")
self.transport.loseConnection()
return
try:
resp = self.request_list[req_code](decrypted)
encrypted = cipher.encrypt(resp)
self.logger.debug(f"Response {resp.hex()}")
handler, resp_code, name = self.request_list.get(head.cmd, (self.handle_default, 0x00, 'default'))
if resp_code == 0:
self.logger.warning(f"No handler for cmd {hex(head.cmd)}")
else:
self.logger.info(f"{name} from {self.transport.getPeer().host}")
resp = handler(decrypted, resp_code)
if type(resp) == ADBBaseResponse:
resp_bytes = resp.make()
elif type(resp) == bytes:
resp_bytes = resp
else:
raise TypeError(f"Unsupported type returned by ADB handler for {name}: {type(resp)}")
try:
encrypted = cipher.encrypt(resp_bytes)
self.logger.debug(f"Response {resp_bytes.hex()}")
self.transport.write(encrypted)
except KeyError:
self.logger.error(f"Unknown command code {hex(req_code)}")
return None
except Exception as e:
self.logger.error(f"Failed to encrypt {resp_bytes.hex()} because {e}")
def handle_default(self, data: bytes, resp_code: int, length: int = 10) -> ADBBaseResponse:
return ADBBaseResponse(resp_code, length)
except ValueError as e:
self.logger.error(f"Failed to encrypt {resp.hex()} because {e}")
return None
def handle_campaign(self, data: bytes, resp_code: int) -> ADBBaseResponse:
return self.handle_default(data, resp_code, 0x0200)
def handle_campaign(self, data: bytes) -> bytes:
self.logger.info(f"campaign from {self.transport.getPeer().host}")
ret = struct.pack(
"<5H",
0xA13E,
0x3087,
self.AIMEDB_RESPONSE_CODES["campaign"],
0x0200,
0x0001,
)
return self.append_padding(ret)
def handle_hello(self, data: bytes, resp_code: int) -> ADBBaseResponse:
return self.handle_default(data, resp_code, 0x0020)
def handle_hello(self, data: bytes) -> bytes:
self.logger.info(f"hello from {self.transport.getPeer().host}")
ret = struct.pack(
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["hello"], 0x0020, 0x0001
)
return self.append_padding(ret)
def handle_lookup(self, data: bytes) -> bytes:
def handle_lookup(self, data: bytes, resp_code: int) -> bytes:
luid = data[0x20:0x2A].hex()
user_id = self.data.card.get_user_id_from_card(access_code=luid)
@ -135,7 +130,7 @@ class AimedbProtocol(Protocol):
ret += struct.pack("<l", user_id)
return self.append_padding(ret)
def handle_lookup2(self, data: bytes) -> bytes:
def handle_lookup2(self, data: bytes, resp_code: int) -> bytes:
self.logger.info(f"lookup2")
ret = bytearray(self.handle_lookup(data))
@ -143,7 +138,7 @@ class AimedbProtocol(Protocol):
return bytes(ret)
def handle_felica_lookup(self, data: bytes) -> bytes:
def handle_felica_lookup(self, data: bytes, resp_code: int) -> bytes:
idm = data[0x20:0x28].hex()
pmm = data[0x28:0x30].hex()
access_code = self.data.card.to_access_code(idm)
@ -164,7 +159,7 @@ class AimedbProtocol(Protocol):
return self.append_padding(ret)
def handle_felica_lookup2(self, data: bytes) -> bytes:
def handle_felica_lookup2(self, data: bytes, resp_code: int) -> bytes:
idm = data[0x30:0x38].hex()
pmm = data[0x38:0x40].hex()
access_code = self.data.card.to_access_code(idm)
@ -192,7 +187,7 @@ class AimedbProtocol(Protocol):
return self.append_padding(ret)
def handle_touch(self, data: bytes) -> bytes:
def handle_touch(self, data: bytes, resp_code: int) -> bytes:
self.logger.info(f"touch from {self.transport.getPeer().host}")
ret = struct.pack(
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["touch"], 0x0050, 0x0001
@ -202,7 +197,7 @@ class AimedbProtocol(Protocol):
return self.append_padding(ret)
def handle_register(self, data: bytes) -> bytes:
def handle_register(self, data: bytes, resp_code: int) -> bytes:
luid = data[0x20:0x2A].hex()
if self.config.server.allow_user_registration:
user_id = self.data.user.create_user()
@ -241,7 +236,7 @@ class AimedbProtocol(Protocol):
return self.append_padding(ret)
def handle_log(self, data: bytes) -> bytes:
def handle_log(self, data: bytes, resp_code: int) -> bytes:
# TODO: Save aimedb logs
self.logger.info(f"log from {self.transport.getPeer().host}")
ret = struct.pack(
@ -249,7 +244,7 @@ class AimedbProtocol(Protocol):
)
return self.append_padding(ret)
def handle_log2(self, data: bytes) -> bytes:
def handle_log2(self, data: bytes, resp_code: int) -> bytes:
self.logger.info(f"log2 from {self.transport.getPeer().host}")
ret = struct.pack(
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["log2"], 0x0040, 0x0001