aimedb_redux (#30)

Update AimeDB from new [documentation](https://minori.tendokyu.moe/docs/allnet/aimedb/) of the protocol.
Reviewed-on: Hay1tsme/artemis#30
This commit is contained in:
2023-08-14 03:32:03 +00:00
parent 9e3a51a57a
commit fd50a7ee68
7 changed files with 637 additions and 164 deletions

View File

@ -0,0 +1,6 @@
from .base import ADBBaseRequest, ADBBaseResponse, ADBHeader, ADBHeaderException, PortalRegStatus, LogStatus, ADBStatus
from .base import CompanyCodes, ReaderFwVer, CMD_CODE_GOODBYE, HEADER_SIZE
from .lookup import ADBLookupRequest, ADBLookupResponse, ADBLookupExResponse
from .campaign import ADBCampaignClearRequest, ADBCampaignClearResponse, ADBCampaignResponse, ADBOldCampaignRequest, ADBOldCampaignResponse
from .felica import ADBFelicaLookupRequest, ADBFelicaLookupResponse, ADBFelicaLookup2Request, ADBFelicaLookup2Response
from .log import ADBLogExRequest, ADBLogRequest, ADBStatusLogRequest

163
core/adb_handlers/base.py Normal file
View File

@ -0,0 +1,163 @@
import struct
from construct import Struct, Int16ul, Int32ul, PaddedString
from enum import Enum
import re
from typing import Union, Final
class LogStatus(Enum):
NONE = 0
START = 1
CONTINUE = 2
END = 3
OTHER = 4
class PortalRegStatus(Enum):
NO_REG = 0
PORTAL = 1
SEGA_ID = 2
class ADBStatus(Enum):
UNKNOWN = 0
GOOD = 1
BAD_AMIE_ID = 2
ALREADY_REG = 3
BAN_SYS_USER = 4
BAN_SYS = 5
BAN_USER = 6
BAN_GEN = 7
LOCK_SYS_USER = 8
LOCK_SYS = 9
LOCK_USER = 10
class CompanyCodes(Enum):
NONE = 0
SEGA = 1
BAMCO = 2
KONAMI = 3
TAITO = 4
class ReaderFwVer(Enum): # Newer readers use a singly byte value
NONE = 0
TN32_10 = 1
TN32_12 = 2
OTHER = 9
def __str__(self) -> str:
if self == self.TN32_10:
return "TN32MSEC003S F/W Ver1.0"
elif self == self.TN32_12:
return "TN32MSEC003S F/W Ver1.2"
elif self == self.NONE:
return "Not Specified"
elif self == self.OTHER:
return "Unknown/Other"
else:
raise ValueError(f"Bad ReaderFwVer value {self.value}")
@classmethod
def from_byte(self, byte: bytes) -> Union["ReaderFwVer", int]:
try:
i = int.from_bytes(byte, 'little')
try:
return ReaderFwVer(i)
except ValueError:
return i
except TypeError:
return 0
class ADBHeaderException(Exception):
pass
HEADER_SIZE: Final[int] = 0x20
CMD_CODE_GOODBYE: Final[int] = 0x66
# everything is LE
class ADBHeader:
def __init__(self, magic: int, protocol_ver: int, cmd: int, length: int, status: int, game_id: Union[str, bytes], store_id: int, keychip_id: Union[str, bytes]) -> None:
self.magic = magic # u16
self.protocol_ver = protocol_ver # u16
self.cmd = cmd # u16
self.length = length # u16
self.status = ADBStatus(status) # u16
self.game_id = game_id # 4 char + \x00
self.store_id = store_id # u32
self.keychip_id = keychip_id# 11 char + \x00
if type(self.game_id) == bytes:
self.game_id = self.game_id.decode()
if type(self.keychip_id) == bytes:
self.keychip_id = self.keychip_id.decode()
self.game_id = self.game_id.replace("\0", "")
self.keychip_id = self.keychip_id.replace("\0", "")
if self.cmd != CMD_CODE_GOODBYE: # Games for some reason send no data with goodbye
self.validate()
@classmethod
def from_data(cls, data: bytes) -> "ADBHeader":
magic, protocol_ver, cmd, length, status, game_id, store_id, keychip_id = struct.unpack_from("<5H6sI12s", data)
head = cls(magic, protocol_ver, cmd, length, status, game_id, store_id, keychip_id)
if head.length != len(data):
raise ADBHeaderException(f"Length is incorrect! Expect {head.length}, got {len(data)}")
return head
def validate(self) -> bool:
if self.magic != 0xa13e:
raise ADBHeaderException(f"Magic {self.magic} != 0xa13e")
if self.protocol_ver < 0x1000:
raise ADBHeaderException(f"Protocol version {hex(self.protocol_ver)} is invalid!")
if re.fullmatch(r"^S[0-9A-Z]{3}[P]?$", self.game_id) is None:
raise ADBHeaderException(f"Game ID {self.game_id} is invalid!")
if self.store_id == 0:
raise ADBHeaderException(f"Store ID cannot be 0!")
if re.fullmatch(r"^A[0-9]{2}[E|X][0-9]{2}[A-HJ-NP-Z][0-9]{4}$", self.keychip_id) is None:
raise ADBHeaderException(f"Keychip ID {self.keychip_id} is invalid!")
return True
def make(self) -> bytes:
resp_struct = Struct(
"magic" / Int16ul,
"unknown" / Int16ul,
"response_code" / Int16ul,
"length" / Int16ul,
"status" / Int16ul,
"game_id" / PaddedString(6, 'utf_8'),
"store_id" / Int32ul,
"keychip_id" / PaddedString(12, 'utf_8'),
)
return resp_struct.build(dict(
magic=self.magic,
unknown=self.protocol_ver,
response_code=self.cmd,
length=self.length,
status=self.status.value,
game_id = self.game_id,
store_id = self.store_id,
keychip_id = self.keychip_id,
))
class ADBBaseRequest:
def __init__(self, data: bytes) -> None:
self.head = ADBHeader.from_data(data)
class ADBBaseResponse:
def __init__(self, code: int = 0, length: int = 0x20, status: int = 1, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888") -> None:
self.head = ADBHeader(0xa13e, 0x3087, code, length, status, game_id, store_id, keychip_id)
def append_padding(self, data: bytes):
"""Appends 0s to the end of the data until it's at the correct size"""
padding_size = self.head.length - len(data)
data += bytes(padding_size)
return data
def make(self) -> bytes:
return self.head.make()

View File

@ -0,0 +1,114 @@
from construct import Struct, Int16ul, Padding, Bytes, Int32ul, Int32sl
from .base import *
class Campaign:
def __init__(self) -> None:
self.id = 0
self.name = ""
self.announce_date = 0
self.start_date = 0
self.end_date = 0
self.distrib_start_date = 0
self.distrib_end_date = 0
def make(self) -> bytes:
name_padding = bytes(128 - len(self.name))
return Struct(
"id" / Int32ul,
"name" / Bytes(128),
"announce_date" / Int32ul,
"start_date" / Int32ul,
"end_date" / Int32ul,
"distrib_start_date" / Int32ul,
"distrib_end_date" / Int32ul,
Padding(8),
).build(dict(
id = self.id,
name = self.name.encode() + name_padding,
announce_date = self.announce_date,
start_date = self.start_date,
end_date = self.end_date,
distrib_start_date = self.distrib_start_date,
distrib_end_date = self.distrib_end_date,
))
class CampaignClear:
def __init__(self) -> None:
self.id = 0
self.entry_flag = 0
self.clear_flag = 0
def make(self) -> bytes:
return Struct(
"id" / Int32ul,
"entry_flag" / Int32ul,
"clear_flag" / Int32ul,
Padding(4),
).build(dict(
id = self.id,
entry_flag = self.entry_flag,
clear_flag = self.clear_flag,
))
class ADBCampaignResponse(ADBBaseResponse):
def __init__(self, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x0C, length: int = 0x200, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.campaigns = [Campaign(), Campaign(), Campaign()]
def make(self) -> bytes:
body = b""
for c in self.campaigns:
body += c.make()
self.head.length = HEADER_SIZE + len(body)
return self.head.make() + body
class ADBOldCampaignRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.campaign_id = struct.unpack_from("<I", data, 0x20)
class ADBOldCampaignResponse(ADBBaseResponse):
def __init__(self, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x0C, length: int = 0x30, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.info0 = 0
self.info1 = 0
self.info2 = 0
self.info3 = 0
def make(self) -> bytes:
resp_struct = Struct(
"info0" / Int32sl,
"info1" / Int32sl,
"info2" / Int32sl,
"info3" / Int32sl,
).build(
info0 = self.info0,
info1 = self.info1,
info2 = self.info2,
info3 = self.info3,
)
self.head.length = HEADER_SIZE + len(resp_struct)
return self.head.make() + resp_struct
class ADBCampaignClearRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.aime_id = struct.unpack_from("<i", data, 0x20)
class ADBCampaignClearResponse(ADBBaseResponse):
def __init__(self, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x0E, length: int = 0x50, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.campaign_clear_status = [CampaignClear(), CampaignClear(), CampaignClear()]
def make(self) -> bytes:
body = b""
for c in self.campaign_clear_status:
body += c.make()
self.head.length = HEADER_SIZE + len(body)
return self.head.make() + body

View File

@ -0,0 +1,72 @@
from construct import Struct, Int32sl, Padding, Int8ub, Int16sl
from typing import Union
from .base import *
class ADBFelicaLookupRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
idm, pmm = struct.unpack_from(">QQ", data, 0x20)
self.idm = hex(idm)[2:].upper()
self.pmm = hex(pmm)[2:].upper()
class ADBFelicaLookupResponse(ADBBaseResponse):
def __init__(self, access_code: str = None, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x03, length: int = 0x30, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.access_code = access_code if access_code is not None else "00000000000000000000"
def make(self) -> bytes:
resp_struct = Struct(
"felica_idx" / Int32ul,
"access_code" / Int8ub[10],
Padding(2)
).build(dict(
felica_idx = 0,
access_code = bytes.fromhex(self.access_code)
))
self.head.length = HEADER_SIZE + len(resp_struct)
return self.head.make() + resp_struct
class ADBFelicaLookup2Request(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.random = struct.unpack_from("<16s", data, 0x20)[0]
idm, pmm = struct.unpack_from(">QQ", data, 0x30)
self.card_key_ver, self.write_ct, self.maca, company, fw_ver, self.dfc = struct.unpack_from("<16s16sQccH", data, 0x40)
self.idm = hex(idm)[2:].upper()
self.pmm = hex(pmm)[2:].upper()
self.company = CompanyCodes(company)
self.fw_ver = ReaderFwVer.from_byte(fw_ver)
class ADBFelicaLookup2Response(ADBBaseResponse):
def __init__(self, user_id: Union[int, None] = None, access_code: Union[str, None] = None, game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x12, length: int = 0x130, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.user_id = user_id if user_id is not None else -1
self.access_code = access_code if access_code is not None else "00000000000000000000"
self.company = CompanyCodes.SEGA
self.portal_status = PortalRegStatus.NO_REG
def make(self) -> bytes:
resp_struct = Struct(
"user_id" / Int32sl,
"relation1" / Int16sl,
"relation2" / Int16sl,
"access_code" / Int8ub[10],
"portal_status" / Int8ub,
"company_code" / Int8ub,
Padding(8),
"auth_key" / Int8ub[256],
).build(dict(
user_id = self.user_id,
relation1 = -1, # Unsupported
relation2 = -1, # Unsupported
access_code = bytes.fromhex(self.access_code),
portal_status = self.portal_status.value,
company_code = self.company.value,
auth_key = [0] * 256 # Unsupported
))
self.head.length = HEADER_SIZE + len(resp_struct)
return self.head.make() + resp_struct

23
core/adb_handlers/log.py Normal file
View File

@ -0,0 +1,23 @@
from construct import Struct, Int32sl, Padding, Int8sl
from typing import Union
from .base import *
class ADBStatusLogRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.aime_id, status = struct.unpack_from("<II", data, 0x20)
self.status = LogStatus(status)
class ADBLogRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.aime_id, status, self.user_id, self.credit_ct, self.bet_ct, self.won_ct = struct.unpack_from("<IIQiii", data, 0x20)
self.status = LogStatus(status)
class ADBLogExRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.aime_id, status, self.user_id, self.credit_ct, self.bet_ct, self.won_ct, self.local_time, \
self.tseq, self.place_id, self.num_logs = struct.unpack_from("<IIQiii4xQiII", data, 0x20)
self.status = LogStatus(status)

View File

@ -0,0 +1,70 @@
from construct import Struct, Int32sl, Padding, Int8sl
from typing import Union
from .base import *
class ADBLookupException(Exception):
pass
class ADBLookupRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.access_code = data[0x20:0x2A].hex()
company_code, fw_version, self.serial_number = struct.unpack_from("<bbI", data, 0x2A)
try:
self.company_code = CompanyCodes(company_code)
except ValueError as e:
raise ADBLookupException(f"Invalid company code - {e}")
self.fw_version = ReaderFwVer.from_byte(fw_version)
class ADBLookupResponse(ADBBaseResponse):
def __init__(self, user_id: Union[int, None], game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888", code: int = 0x06, length: int = 0x30, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.user_id = user_id if user_id is not None else -1
self.portal_reg = PortalRegStatus.NO_REG
def make(self):
resp_struct = Struct(
"user_id" / Int32sl,
"portal_reg" / Int8sl,
Padding(11)
)
body = resp_struct.build(dict(
user_id = self.user_id,
portal_reg = self.portal_reg.value
))
self.head.length = HEADER_SIZE + len(body)
return self.head.make() + body
class ADBLookupExResponse(ADBBaseResponse):
def __init__(self, user_id: Union[int, None], game_id: str = "SXXX", store_id: int = 1, keychip_id: str = "A69E01A8888",
code: int = 0x10, length: int = 0x130, status: int = 1) -> None:
super().__init__(code, length, status, game_id, store_id, keychip_id)
self.user_id = user_id if user_id is not None else -1
self.portal_reg = PortalRegStatus.NO_REG
def make(self):
resp_struct = Struct(
"user_id" / Int32sl,
"portal_reg" / Int8sl,
Padding(3),
"auth_key" / Int8sl[256],
"relation1" / Int32sl,
"relation2" / Int32sl,
)
body = resp_struct.build(dict(
user_id = self.user_id,
portal_reg = self.portal_reg.value,
auth_key = [0] * 256,
relation1 = -1,
relation2 = -1
))
self.head.length = HEADER_SIZE + len(body)
return self.head.make() + body