From fd50a7ee6832f1b9c7f2afc540bcf74d17a9e7ad Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Mon, 14 Aug 2023 03:32:03 +0000 Subject: [PATCH] aimedb_redux (#30) Update AimeDB from new [documentation](https://minori.tendokyu.moe/docs/allnet/aimedb/) of the protocol. Reviewed-on: https://gitea.tendokyu.moe/Hay1tsme/artemis/pulls/30 --- core/adb_handlers/__init__.py | 6 + core/adb_handlers/base.py | 163 ++++++++++++++++ core/adb_handlers/campaign.py | 114 +++++++++++ core/adb_handlers/felica.py | 72 +++++++ core/adb_handlers/log.py | 23 +++ core/adb_handlers/lookup.py | 70 +++++++ core/aimedb.py | 353 ++++++++++++++++++---------------- 7 files changed, 637 insertions(+), 164 deletions(-) create mode 100644 core/adb_handlers/__init__.py create mode 100644 core/adb_handlers/base.py create mode 100644 core/adb_handlers/campaign.py create mode 100644 core/adb_handlers/felica.py create mode 100644 core/adb_handlers/log.py create mode 100644 core/adb_handlers/lookup.py diff --git a/core/adb_handlers/__init__.py b/core/adb_handlers/__init__.py new file mode 100644 index 0000000..a5c401f --- /dev/null +++ b/core/adb_handlers/__init__.py @@ -0,0 +1,6 @@ +from .base import ADBBaseRequest, ADBBaseResponse, ADBHeader, ADBHeaderException, PortalRegStatus, LogStatus, ADBStatus +from .base import CompanyCodes, ReaderFwVer, CMD_CODE_GOODBYE, HEADER_SIZE +from .lookup import ADBLookupRequest, ADBLookupResponse, ADBLookupExResponse +from .campaign import ADBCampaignClearRequest, ADBCampaignClearResponse, ADBCampaignResponse, ADBOldCampaignRequest, ADBOldCampaignResponse +from .felica import ADBFelicaLookupRequest, ADBFelicaLookupResponse, ADBFelicaLookup2Request, ADBFelicaLookup2Response +from .log import ADBLogExRequest, ADBLogRequest, ADBStatusLogRequest diff --git a/core/adb_handlers/base.py b/core/adb_handlers/base.py new file mode 100644 index 0000000..32fd1c0 --- /dev/null +++ b/core/adb_handlers/base.py @@ -0,0 +1,163 @@ +import struct +from construct import Struct, Int16ul, Int32ul, PaddedString +from enum import Enum +import re +from typing import Union, Final + +class LogStatus(Enum): + NONE = 0 + START = 1 + CONTINUE = 2 + END = 3 + OTHER = 4 + +class PortalRegStatus(Enum): + NO_REG = 0 + PORTAL = 1 + SEGA_ID = 2 + +class ADBStatus(Enum): + UNKNOWN = 0 + GOOD = 1 + BAD_AMIE_ID = 2 + ALREADY_REG = 3 + BAN_SYS_USER = 4 + BAN_SYS = 5 + BAN_USER = 6 + BAN_GEN = 7 + LOCK_SYS_USER = 8 + LOCK_SYS = 9 + LOCK_USER = 10 + +class CompanyCodes(Enum): + NONE = 0 + SEGA = 1 + BAMCO = 2 + KONAMI = 3 + TAITO = 4 + +class ReaderFwVer(Enum): # Newer readers use a singly byte value + NONE = 0 + TN32_10 = 1 + TN32_12 = 2 + OTHER = 9 + + def __str__(self) -> str: + if self == self.TN32_10: + return "TN32MSEC003S F/W Ver1.0" + elif self == self.TN32_12: + return "TN32MSEC003S F/W Ver1.2" + elif self == self.NONE: + return "Not Specified" + elif self == self.OTHER: + return "Unknown/Other" + else: + raise ValueError(f"Bad ReaderFwVer value {self.value}") + + @classmethod + def from_byte(self, byte: bytes) -> Union["ReaderFwVer", int]: + try: + i = int.from_bytes(byte, 'little') + try: + return ReaderFwVer(i) + except ValueError: + return i + except TypeError: + return 0 + +class ADBHeaderException(Exception): + pass + +HEADER_SIZE: Final[int] = 0x20 +CMD_CODE_GOODBYE: Final[int] = 0x66 + +# everything is LE +class ADBHeader: + def __init__(self, magic: int, protocol_ver: int, cmd: int, length: int, status: int, game_id: Union[str, bytes], store_id: int, keychip_id: Union[str, bytes]) -> None: + self.magic = magic # u16 + self.protocol_ver = protocol_ver # u16 + self.cmd = cmd # u16 + self.length = length # u16 + self.status = ADBStatus(status) # u16 + self.game_id = game_id # 4 char + \x00 + self.store_id = store_id # u32 + self.keychip_id = keychip_id# 11 char + \x00 + + if type(self.game_id) == bytes: + self.game_id = self.game_id.decode() + + if type(self.keychip_id) == bytes: + self.keychip_id = self.keychip_id.decode() + + self.game_id = self.game_id.replace("\0", "") + self.keychip_id = self.keychip_id.replace("\0", "") + if self.cmd != CMD_CODE_GOODBYE: # Games for some reason send no data with goodbye + self.validate() + + @classmethod + def from_data(cls, data: bytes) -> "ADBHeader": + magic, protocol_ver, cmd, length, status, game_id, store_id, keychip_id = struct.unpack_from("<5H6sI12s", data) + head = cls(magic, protocol_ver, cmd, length, status, game_id, store_id, keychip_id) + + if head.length != len(data): + raise ADBHeaderException(f"Length is incorrect! Expect {head.length}, got {len(data)}") + + return head + + def validate(self) -> bool: + if self.magic != 0xa13e: + raise ADBHeaderException(f"Magic {self.magic} != 0xa13e") + + if self.protocol_ver < 0x1000: + raise ADBHeaderException(f"Protocol version {hex(self.protocol_ver)} is invalid!") + + if re.fullmatch(r"^S[0-9A-Z]{3}[P]?$", self.game_id) is None: + raise ADBHeaderException(f"Game ID {self.game_id} is invalid!") + + if self.store_id == 0: + raise ADBHeaderException(f"Store ID cannot be 0!") + + if re.fullmatch(r"^A[0-9]{2}[E|X][0-9]{2}[A-HJ-NP-Z][0-9]{4}$", self.keychip_id) is None: + raise ADBHeaderException(f"Keychip ID {self.keychip_id} is invalid!") + + return True + + def make(self) -> bytes: + resp_struct = Struct( + "magic" / Int16ul, + "unknown" / Int16ul, + "response_code" / Int16ul, + "length" / Int16ul, + "status" / Int16ul, + "game_id" / PaddedString(6, 'utf_8'), + "store_id" / Int32ul, + "keychip_id" / PaddedString(12, 'utf_8'), + ) + + return resp_struct.build(dict( + magic=self.magic, + unknown=self.protocol_ver, + response_code=self.cmd, + length=self.length, + status=self.status.value, + game_id = self.game_id, + store_id = self.store_id, + keychip_id = self.keychip_id, + )) + +class ADBBaseRequest: + def __init__(self, data: bytes) -> None: + self.head = ADBHeader.from_data(data) + +class ADBBaseResponse: + def __init__(self, code: int = 0, length: int = 0x20, status: int = 1, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888") -> None: + self.head = ADBHeader(0xa13e, 0x3087, code, length, status, game_id, store_id, keychip_id) + + def append_padding(self, data: bytes): + """Appends 0s to the end of the data until it's at the correct size""" + padding_size = self.head.length - len(data) + data += bytes(padding_size) + return data + + def make(self) -> bytes: + return self.head.make() diff --git a/core/adb_handlers/campaign.py b/core/adb_handlers/campaign.py new file mode 100644 index 0000000..936a4c3 --- /dev/null +++ b/core/adb_handlers/campaign.py @@ -0,0 +1,114 @@ +from construct import Struct, Int16ul, Padding, Bytes, Int32ul, Int32sl + +from .base import * + +class Campaign: + def __init__(self) -> None: + self.id = 0 + self.name = "" + self.announce_date = 0 + self.start_date = 0 + self.end_date = 0 + self.distrib_start_date = 0 + self.distrib_end_date = 0 + + def make(self) -> bytes: + name_padding = bytes(128 - len(self.name)) + return Struct( + "id" / Int32ul, + "name" / Bytes(128), + "announce_date" / Int32ul, + "start_date" / Int32ul, + "end_date" / Int32ul, + "distrib_start_date" / Int32ul, + "distrib_end_date" / Int32ul, + Padding(8), + ).build(dict( + id = self.id, + name = self.name.encode() + name_padding, + announce_date = self.announce_date, + start_date = self.start_date, + end_date = self.end_date, + distrib_start_date = self.distrib_start_date, + distrib_end_date = self.distrib_end_date, + )) + +class CampaignClear: + def __init__(self) -> None: + self.id = 0 + self.entry_flag = 0 + self.clear_flag = 0 + + def make(self) -> bytes: + return Struct( + "id" / Int32ul, + "entry_flag" / Int32ul, + "clear_flag" / Int32ul, + Padding(4), + ).build(dict( + id = self.id, + entry_flag = self.entry_flag, + clear_flag = self.clear_flag, + )) + +class ADBCampaignResponse(ADBBaseResponse): + def __init__(self, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x0C, length: int = 0x200, status: int = 1) -> None: + super().__init__(code, length, status, game_id, store_id, keychip_id) + self.campaigns = [Campaign(), Campaign(), Campaign()] + + def make(self) -> bytes: + body = b"" + + for c in self.campaigns: + body += c.make() + + self.head.length = HEADER_SIZE + len(body) + return self.head.make() + body + +class ADBOldCampaignRequest(ADBBaseRequest): + def __init__(self, data: bytes) -> None: + super().__init__(data) + self.campaign_id = struct.unpack_from(" None: + super().__init__(code, length, status, game_id, store_id, keychip_id) + self.info0 = 0 + self.info1 = 0 + self.info2 = 0 + self.info3 = 0 + + def make(self) -> bytes: + resp_struct = Struct( + "info0" / Int32sl, + "info1" / Int32sl, + "info2" / Int32sl, + "info3" / Int32sl, + ).build( + info0 = self.info0, + info1 = self.info1, + info2 = self.info2, + info3 = self.info3, + ) + + self.head.length = HEADER_SIZE + len(resp_struct) + return self.head.make() + resp_struct + +class ADBCampaignClearRequest(ADBBaseRequest): + def __init__(self, data: bytes) -> None: + super().__init__(data) + self.aime_id = struct.unpack_from(" None: + super().__init__(code, length, status, game_id, store_id, keychip_id) + self.campaign_clear_status = [CampaignClear(), CampaignClear(), CampaignClear()] + + def make(self) -> bytes: + body = b"" + + for c in self.campaign_clear_status: + body += c.make() + + self.head.length = HEADER_SIZE + len(body) + return self.head.make() + body diff --git a/core/adb_handlers/felica.py b/core/adb_handlers/felica.py new file mode 100644 index 0000000..7e659d4 --- /dev/null +++ b/core/adb_handlers/felica.py @@ -0,0 +1,72 @@ +from construct import Struct, Int32sl, Padding, Int8ub, Int16sl +from typing import Union +from .base import * + +class ADBFelicaLookupRequest(ADBBaseRequest): + def __init__(self, data: bytes) -> None: + super().__init__(data) + idm, pmm = struct.unpack_from(">QQ", data, 0x20) + self.idm = hex(idm)[2:].upper() + self.pmm = hex(pmm)[2:].upper() + +class ADBFelicaLookupResponse(ADBBaseResponse): + def __init__(self, access_code: str = None, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x03, length: int = 0x30, status: int = 1) -> None: + super().__init__(code, length, status, game_id, store_id, keychip_id) + self.access_code = access_code if access_code is not None else "00000000000000000000" + + def make(self) -> bytes: + resp_struct = Struct( + "felica_idx" / Int32ul, + "access_code" / Int8ub[10], + Padding(2) + ).build(dict( + felica_idx = 0, + access_code = bytes.fromhex(self.access_code) + )) + + self.head.length = HEADER_SIZE + len(resp_struct) + + return self.head.make() + resp_struct + +class ADBFelicaLookup2Request(ADBBaseRequest): + def __init__(self, data: bytes) -> None: + super().__init__(data) + self.random = struct.unpack_from("<16s", data, 0x20)[0] + idm, pmm = struct.unpack_from(">QQ", data, 0x30) + self.card_key_ver, self.write_ct, self.maca, company, fw_ver, self.dfc = struct.unpack_from("<16s16sQccH", data, 0x40) + self.idm = hex(idm)[2:].upper() + self.pmm = hex(pmm)[2:].upper() + self.company = CompanyCodes(company) + self.fw_ver = ReaderFwVer.from_byte(fw_ver) + +class ADBFelicaLookup2Response(ADBBaseResponse): + def __init__(self, user_id: Union[int, None] = None, access_code: Union[str, None] = None, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x12, length: int = 0x130, status: int = 1) -> None: + super().__init__(code, length, status, game_id, store_id, keychip_id) + self.user_id = user_id if user_id is not None else -1 + self.access_code = access_code if access_code is not None else "00000000000000000000" + self.company = CompanyCodes.SEGA + self.portal_status = PortalRegStatus.NO_REG + + def make(self) -> bytes: + resp_struct = Struct( + "user_id" / Int32sl, + "relation1" / Int16sl, + "relation2" / Int16sl, + "access_code" / Int8ub[10], + "portal_status" / Int8ub, + "company_code" / Int8ub, + Padding(8), + "auth_key" / Int8ub[256], + ).build(dict( + user_id = self.user_id, + relation1 = -1, # Unsupported + relation2 = -1, # Unsupported + access_code = bytes.fromhex(self.access_code), + portal_status = self.portal_status.value, + company_code = self.company.value, + auth_key = [0] * 256 # Unsupported + )) + + self.head.length = HEADER_SIZE + len(resp_struct) + + return self.head.make() + resp_struct diff --git a/core/adb_handlers/log.py b/core/adb_handlers/log.py new file mode 100644 index 0000000..8be36f9 --- /dev/null +++ b/core/adb_handlers/log.py @@ -0,0 +1,23 @@ +from construct import Struct, Int32sl, Padding, Int8sl +from typing import Union + +from .base import * + +class ADBStatusLogRequest(ADBBaseRequest): + def __init__(self, data: bytes) -> None: + super().__init__(data) + self.aime_id, status = struct.unpack_from(" None: + super().__init__(data) + self.aime_id, status, self.user_id, self.credit_ct, self.bet_ct, self.won_ct = struct.unpack_from(" None: + super().__init__(data) + self.aime_id, status, self.user_id, self.credit_ct, self.bet_ct, self.won_ct, self.local_time, \ + self.tseq, self.place_id, self.num_logs = struct.unpack_from(" None: + super().__init__(data) + self.access_code = data[0x20:0x2A].hex() + company_code, fw_version, self.serial_number = struct.unpack_from(" None: + super().__init__(code, length, status, game_id, store_id, keychip_id) + self.user_id = user_id if user_id is not None else -1 + self.portal_reg = PortalRegStatus.NO_REG + + def make(self): + resp_struct = Struct( + "user_id" / Int32sl, + "portal_reg" / Int8sl, + Padding(11) + ) + + body = resp_struct.build(dict( + user_id = self.user_id, + portal_reg = self.portal_reg.value + )) + + self.head.length = HEADER_SIZE + len(body) + return self.head.make() + body + +class ADBLookupExResponse(ADBBaseResponse): + def __init__(self, user_id: Union[int, None], game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", + code: int = 0x10, length: int = 0x130, status: int = 1) -> None: + super().__init__(code, length, status, game_id, store_id, keychip_id) + self.user_id = user_id if user_id is not None else -1 + self.portal_reg = PortalRegStatus.NO_REG + + def make(self): + resp_struct = Struct( + "user_id" / Int32sl, + "portal_reg" / Int8sl, + Padding(3), + "auth_key" / Int8sl[256], + "relation1" / Int32sl, + "relation2" / Int32sl, + ) + + body = resp_struct.build(dict( + user_id = self.user_id, + portal_reg = self.portal_reg.value, + auth_key = [0] * 256, + relation1 = -1, + relation2 = -1 + )) + + self.head.length = HEADER_SIZE + len(body) + return self.head.make() + body diff --git a/core/aimedb.py b/core/aimedb.py index 39d6373..970eef5 100644 --- a/core/aimedb.py +++ b/core/aimedb.py @@ -2,27 +2,17 @@ from twisted.internet.protocol import Factory, Protocol import logging, coloredlogs from Crypto.Cipher import AES import struct -from typing import Dict, Any +from typing import Dict, Tuple, Callable, Union +from typing_extensions import Final from logging.handlers import TimedRotatingFileHandler from core.config import CoreConfig from core.data import Data +from .adb_handlers import * class AimedbProtocol(Protocol): - AIMEDB_RESPONSE_CODES = { - "felica_lookup": 0x03, - "lookup": 0x06, - "log": 0x0A, - "campaign": 0x0C, - "touch": 0x0E, - "lookup2": 0x10, - "felica_lookup2": 0x12, - "log2": 0x14, - "hello": 0x65, - } - - request_list: Dict[int, Any] = {} + request_list: Dict[int, Tuple[Callable[[bytes, int], Union[ADBBaseResponse, bytes]], int, str]] = {} def __init__(self, core_cfg: CoreConfig) -> None: self.logger = logging.getLogger("aimedb") @@ -32,16 +22,27 @@ class AimedbProtocol(Protocol): self.logger.error("!!!KEY NOT SET!!!") exit(1) - self.request_list[0x01] = self.handle_felica_lookup - self.request_list[0x04] = self.handle_lookup - self.request_list[0x05] = self.handle_register - self.request_list[0x09] = self.handle_log - self.request_list[0x0B] = self.handle_campaign - self.request_list[0x0D] = self.handle_touch - self.request_list[0x0F] = self.handle_lookup2 - self.request_list[0x11] = self.handle_felica_lookup2 - self.request_list[0x13] = self.handle_log2 - self.request_list[0x64] = self.handle_hello + self.register_handler(0x01, 0x03, self.handle_felica_lookup, 'felica_lookup') + self.register_handler(0x02, 0x03, self.handle_felica_register, 'felica_register') + + self.register_handler(0x04, 0x06, self.handle_lookup, 'lookup') + self.register_handler(0x05, 0x06, self.handle_register, 'register') + + self.register_handler(0x07, 0x08, self.handle_status_log, 'status_log') + self.register_handler(0x09, 0x0A, self.handle_log, 'aime_log') + + self.register_handler(0x0B, 0x0C, self.handle_campaign, 'campaign') + self.register_handler(0x0D, 0x0E, self.handle_campaign_clear, 'campaign_clear') + + self.register_handler(0x0F, 0x10, self.handle_lookup_ex, 'lookup_ex') + self.register_handler(0x11, 0x12, self.handle_felica_lookup_ex, 'felica_lookup_ex') + + self.register_handler(0x13, 0x14, self.handle_log_ex, 'aime_log_ex') + self.register_handler(0x64, 0x65, self.handle_hello, 'hello') + self.register_handler(0x66, 0, self.handle_goodbye, 'goodbye') + + def register_handler(self, cmd: int, resp:int, handler: Callable[[bytes, int], Union[ADBBaseResponse, bytes]], name: str) -> None: + self.request_list[cmd] = (handler, resp, name) def append_padding(self, data: bytes): """Appends 0s to the end of the data until it's at the correct size""" @@ -63,202 +64,226 @@ class AimedbProtocol(Protocol): try: decrypted = cipher.decrypt(data) - except Exception: - self.logger.error(f"Failed to decrypt {data.hex()}") + + except Exception as e: + self.logger.error(f"Failed to decrypt {data.hex()} because {e}") return None self.logger.debug(f"{self.transport.getPeer().host} wrote {decrypted.hex()}") - if not decrypted[1] == 0xA1 and not decrypted[0] == 0x3E: - self.logger.error(f"Bad magic") - return None + try: + head = ADBHeader.from_data(decrypted) + + except ADBHeaderException as e: + self.logger.error(f"Error parsing ADB header: {e}") + try: + encrypted = cipher.encrypt(ADBBaseResponse().make()) + self.transport.write(encrypted) - req_code = decrypted[4] - - if req_code == 0x66: - self.logger.info(f"goodbye from {self.transport.getPeer().host}") - self.transport.loseConnection() + except Exception as e: + self.logger.error(f"Failed to encrypt default response because {e}") + return - try: - resp = self.request_list[req_code](decrypted) - encrypted = cipher.encrypt(resp) - self.logger.debug(f"Response {resp.hex()}") + handler, resp_code, name = self.request_list.get(head.cmd, (self.handle_default, None, 'default')) + + if resp_code is None: + self.logger.warning(f"No handler for cmd {hex(head.cmd)}") + + elif resp_code > 0: + self.logger.info(f"{name} from {head.keychip_id} ({head.game_id}) @ {self.transport.getPeer().host}") + + resp = handler(decrypted, resp_code) + + if type(resp) == ADBBaseResponse or issubclass(type(resp), ADBBaseResponse): + resp_bytes = resp.make() + if len(resp_bytes) != resp.head.length: + resp_bytes = self.append_padding(resp_bytes) + + elif type(resp) == bytes: + resp_bytes = resp + + elif resp is None: # Nothing to send, probably a goodbye + return + + else: + raise TypeError(f"Unsupported type returned by ADB handler for {name}: {type(resp)}") + + try: + encrypted = cipher.encrypt(resp_bytes) + self.logger.debug(f"Response {resp_bytes.hex()}") self.transport.write(encrypted) - except KeyError: - self.logger.error(f"Unknown command code {hex(req_code)}") - return None + except Exception as e: + self.logger.error(f"Failed to encrypt {resp_bytes.hex()} because {e}") + + def handle_default(self, data: bytes, resp_code: int, length: int = 0x20) -> ADBBaseResponse: + req = ADBHeader.from_data(data) + return ADBBaseResponse(resp_code, length, 1, req.game_id, req.store_id, req.keychip_id) - except ValueError as e: - self.logger.error(f"Failed to encrypt {resp.hex()} because {e}") - return None + def handle_hello(self, data: bytes, resp_code: int) -> ADBBaseResponse: + return self.handle_default(data, resp_code) - def handle_campaign(self, data: bytes) -> bytes: - self.logger.info(f"campaign from {self.transport.getPeer().host}") - ret = struct.pack( - "<5H", - 0xA13E, - 0x3087, - self.AIMEDB_RESPONSE_CODES["campaign"], - 0x0200, - 0x0001, - ) - return self.append_padding(ret) + def handle_campaign(self, data: bytes, resp_code: int) -> ADBBaseResponse: + h = ADBHeader.from_data(data) + if h.protocol_ver >= 0x3030: + req = h + resp = ADBCampaignResponse(req.game_id, req.store_id, req.keychip_id) - def handle_hello(self, data: bytes) -> bytes: - self.logger.info(f"hello from {self.transport.getPeer().host}") - ret = struct.pack( - "<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["hello"], 0x0020, 0x0001 - ) - return self.append_padding(ret) - - def handle_lookup(self, data: bytes) -> bytes: - luid = data[0x20:0x2A].hex() - user_id = self.data.card.get_user_id_from_card(access_code=luid) - - if user_id is None: - user_id = -1 - - self.logger.info( - f"lookup from {self.transport.getPeer().host}: luid {luid} -> user_id {user_id}" - ) - - ret = struct.pack( - "<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["lookup"], 0x0130, 0x0001 - ) - ret += bytes(0x20 - len(ret)) - - if user_id is None: - ret += struct.pack(" bytes: - self.logger.info(f"lookup2") + def handle_lookup(self, data: bytes, resp_code: int) -> ADBBaseResponse: + req = ADBLookupRequest(data) + user_id = self.data.card.get_user_id_from_card(req.access_code) - ret = bytearray(self.handle_lookup(data)) - ret[4] = self.AIMEDB_RESPONSE_CODES["lookup2"] - - return bytes(ret) - - def handle_felica_lookup(self, data: bytes) -> bytes: - idm = data[0x20:0x28].hex() - pmm = data[0x28:0x30].hex() - access_code = self.data.card.to_access_code(idm) + ret = ADBLookupResponse(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id) + self.logger.info( - f"felica_lookup from {self.transport.getPeer().host}: idm {idm} pmm {pmm} -> access_code {access_code}" + f"access_code {req.access_code} -> user_id {ret.user_id}" ) + return ret - ret = struct.pack( - "<5H", - 0xA13E, - 0x3087, - self.AIMEDB_RESPONSE_CODES["felica_lookup"], - 0x0030, - 0x0001, + def handle_lookup_ex(self, data: bytes, resp_code: int) -> ADBBaseResponse: + req = ADBLookupRequest(data) + user_id = self.data.card.get_user_id_from_card(req.access_code) + + ret = ADBLookupExResponse(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id) + + self.logger.info( + f"access_code {req.access_code} -> user_id {ret.user_id}" ) - ret += bytes(26) - ret += bytes.fromhex(access_code) + return ret - return self.append_padding(ret) + def handle_felica_lookup(self, data: bytes, resp_code: int) -> bytes: + """ + On official, I think a card has to be registered for this to actually work, but + I'm making the executive decision to not implement that and just kick back our + faux generated access code. The real felica IDm -> access code conversion is done + on the ADB server, which we do not and will not ever have access to. Because we can + assure that all IDms will be unique, this basic 0-padded hex -> int conversion will + be fine. + """ + req = ADBFelicaLookupRequest(data) + ac = self.data.card.to_access_code(req.idm) + self.logger.info( + f"idm {req.idm} ipm {req.pmm} -> access_code {ac}" + ) + return ADBFelicaLookupResponse(ac, req.head.game_id, req.head.store_id, req.head.keychip_id) - def handle_felica_lookup2(self, data: bytes) -> bytes: - idm = data[0x30:0x38].hex() - pmm = data[0x38:0x40].hex() - access_code = self.data.card.to_access_code(idm) + def handle_felica_register(self, data: bytes, resp_code: int) -> bytes: + """ + I've never seen this used. + """ + req = ADBFelicaLookupRequest(data) + ac = self.data.card.to_access_code(req.idm) + + if self.config.server.allow_user_registration: + user_id = self.data.user.create_user() + + if user_id is None: + self.logger.error("Failed to register user!") + user_id = -1 + + else: + card_id = self.data.card.create_card(user_id, ac) + + if card_id is None: + self.logger.error("Failed to register card!") + user_id = -1 + + self.logger.info( + f"Register access code {ac} (IDm: {req.idm} PMm: {req.pmm}) -> user_id {user_id}" + ) + + else: + self.logger.info( + f"Registration blocked!: access code {ac} (IDm: {req.idm} PMm: {req.pmm})" + ) + + return ADBFelicaLookupResponse(ac, req.head.game_id, req.head.store_id, req.head.keychip_id) + + def handle_felica_lookup_ex(self, data: bytes, resp_code: int) -> bytes: + req = ADBFelicaLookup2Request(data) + access_code = self.data.card.to_access_code(req.idm) user_id = self.data.card.get_user_id_from_card(access_code=access_code) if user_id is None: user_id = -1 self.logger.info( - f"felica_lookup2 from {self.transport.getPeer().host}: idm {idm} ipm {pmm} -> access_code {access_code} user_id {user_id}" + f"idm {req.idm} ipm {req.pmm} -> access_code {access_code} user_id {user_id}" ) - ret = struct.pack( - "<5H", - 0xA13E, - 0x3087, - self.AIMEDB_RESPONSE_CODES["felica_lookup2"], - 0x0140, - 0x0001, - ) - ret += bytes(22) - ret += struct.pack(" ADBBaseResponse: + req = ADBCampaignClearRequest(data) - def handle_touch(self, data: bytes) -> bytes: - self.logger.info(f"touch from {self.transport.getPeer().host}") - ret = struct.pack( - "<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["touch"], 0x0050, 0x0001 - ) - ret += bytes(5) - ret += struct.pack("<3H", 0x6F, 0, 1) + resp = ADBCampaignClearResponse(req.head.game_id, req.head.store_id, req.head.keychip_id) - return self.append_padding(ret) + # We don't support campaign stuff + return resp - def handle_register(self, data: bytes) -> bytes: - luid = data[0x20:0x2A].hex() + def handle_register(self, data: bytes, resp_code: int) -> bytes: + req = ADBLookupRequest(data) + user_id = -1 + if self.config.server.allow_user_registration: user_id = self.data.user.create_user() if user_id is None: - user_id = -1 self.logger.error("Failed to register user!") + user_id = -1 else: - card_id = self.data.card.create_card(user_id, luid) + card_id = self.data.card.create_card(user_id, req.access_code) if card_id is None: - user_id = -1 self.logger.error("Failed to register card!") + user_id = -1 self.logger.info( - f"register from {self.transport.getPeer().host}: luid {luid} -> user_id {user_id}" + f"Register access code {req.access_code} -> user_id {user_id}" ) else: self.logger.info( - f"register from {self.transport.getPeer().host} blocked!: luid {luid}" + f"Registration blocked!: access code {req.access_code}" ) - user_id = -1 - ret = struct.pack( - "<5H", - 0xA13E, - 0x3087, - self.AIMEDB_RESPONSE_CODES["lookup"], - 0x0030, - 0x0001 if user_id > -1 else 0, - ) - ret += bytes(0x20 - len(ret)) - ret += struct.pack(" bytes: - # TODO: Save aimedb logs - self.logger.info(f"log from {self.transport.getPeer().host}") - ret = struct.pack( - "<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["log"], 0x0020, 0x0001 - ) - return self.append_padding(ret) + # TODO: Save these in some capacity, as deemed relevant + def handle_status_log(self, data: bytes, resp_code: int) -> bytes: + req = ADBStatusLogRequest(data) + self.logger.info(f"User {req.aime_id} logged {req.status.name} event") + return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id) - def handle_log2(self, data: bytes) -> bytes: - self.logger.info(f"log2 from {self.transport.getPeer().host}") - ret = struct.pack( - "<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["log2"], 0x0040, 0x0001 - ) - ret += bytes(22) - ret += struct.pack("H", 1) + def handle_log(self, data: bytes, resp_code: int) -> bytes: + req = ADBLogRequest(data) + self.logger.info(f"User {req.aime_id} logged {req.status.name} event, credit_ct: {req.credit_ct} bet_ct: {req.bet_ct} won_ct: {req.won_ct}") + return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id) - return self.append_padding(ret) + def handle_log_ex(self, data: bytes, resp_code: int) -> bytes: + req = ADBLogExRequest(data) + self.logger.info(f"User {req.aime_id} logged {req.status.name} event, credit_ct: {req.credit_ct} bet_ct: {req.bet_ct} won_ct: {req.won_ct}") + return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id) + def handle_goodbye(self, data: bytes, resp_code: int) -> None: + self.logger.info(f"goodbye from {self.transport.getPeer().host}") + self.transport.loseConnection() + return class AimedbFactory(Factory): protocol = AimedbProtocol