Compare commits

...

13 Commits

Author SHA1 Message Date
Hay1tsme
d7f9eed3f2 adb: update register 2023-06-03 13:52:32 -05:00
Hay1tsme
46dd8fe84f adb: see previous 2023-06-03 13:41:03 -05:00
Hay1tsme
d167bfe63d adb: comparability with 3.9 and below 2023-06-03 13:39:28 -05:00
ad9bef618f adb: add felica classes 2023-05-26 18:02:20 -04:00
7ac50868b2 ADB: ADBLookupResponse 2023-05-22 17:58:11 -04:00
8f29c796b1 adb: type checking fix 2023-05-22 17:36:15 -04:00
afef15fb00 adb: remove references to AIMEDB_RESPONSE_CODES 2023-05-22 17:30:24 -04:00
a16c85a1e2 adb: fix typo 2023-05-22 17:25:46 -04:00
a4c8897024 adb: hotfix 2023-05-22 17:23:28 -04:00
e9368b2e57 ADB: update touch 2023-05-22 17:17:07 -04:00
2c40343d9f adb: update log and log2 2023-05-22 17:11:09 -04:00
3e5c210bc4 adb: update lookup and lookup2 2023-05-22 17:08:58 -04:00
4b90d2ca40 begin adb redux, add base classes 2023-05-22 16:15:10 -04:00
6 changed files with 263 additions and 108 deletions

View File

@ -0,0 +1,4 @@
from .base import ADBBaseRequest, ADBBaseResponse, ADBHeader, ADBHeaderException
from .lookup import ADBLookupRequest, ADBLookupResponse
from .touch import ADBTouchRequest, ADBTouchResponse
from .felica import ADBFelicaLookupRequest, ADBFelicaLookupResponse, ADBFelicaLookup2Request, ADBFelicaLookup2Response

60
core/adb_handlers/base.py Normal file
View File

@ -0,0 +1,60 @@
import struct
from construct import Struct, Int16ul, Const
class ADBHeaderException(Exception):
pass
# everything is LE
class ADBHeader:
def __init__(self, magic: int, unknown: int, cmd: int, length: int, status: int) -> None:
self.magic = magic # u16
self.unknown = unknown # u16
self.cmd = cmd # u16
self.length = length # u16
self.status = status # u16
self.validate()
@classmethod
def from_data(cls, data: bytes) -> "ADBHeader":
magic, unknown, cmd, length, status = struct.unpack_from("<5H", data)
return cls(magic, unknown, cmd, length, status)
def validate(self) -> bool:
if self.magic != 0xa13e:
raise ADBHeaderException(f"Magic {self.magic} != 0xa13e")
return True
def make(self) -> bytes:
resp_struct = Struct(
"magic" / Int16ul, #Const(b">\xa1"),
"unknown" / Int16ul,
"response_code" / Int16ul,
"length" / Int16ul,
"status" / Int16ul,
)
return resp_struct.build(dict(
magic=self.magic,
unknown=self.unknown,
response_code=self.cmd,
length=self.length,
status=self.status,
))
class ADBBaseRequest:
def __init__(self, data: bytes) -> None:
self.head = ADBHeader.from_data(data)
class ADBBaseResponse:
def __init__(self, code: int = 0, length: int = 10, status: int = 1) -> None:
self.head = ADBHeader(0xa13e, 0x3087, code, length, status)
def append_padding(self, data: bytes):
"""Appends 0s to the end of the data until it's at the correct size"""
padding_size = self.head.length - len(data)
data += bytes(padding_size)
return data
def make(self) -> bytes:
return self.head.make()

View File

@ -0,0 +1,56 @@
from construct import Struct, Int32sl, Padding, Int8ub
from typing import Union
from .base import *
class ADBFelicaLookupRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.idm = data[0x20:0x28].hex()
self.pmm = data[0x28:0x30].hex()
class ADBFelicaLookupResponse(ADBBaseResponse):
def __init__(self, code: int = 0, length: int = 10, status: int = 1, access_code: Union[str, None] = None) -> None:
super().__init__(code, length, status)
self.access_code = access_code if access_code is not None else "00000000000000000000"
def make(self) -> bytes:
header = super().make()
resp_struct = Struct(
Padding(26),
"access_code" / Int8ub[10],
)
return header + resp_struct.build(dict(
access_code = self.access_code
))
class ADBFelicaLookup2Request(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.idm = data[0x30:0x38].hex()
self.pmm = data[0x38:0x40].hex()
class ADBFelicaLookup2Response(ADBBaseResponse):
def __init__(self, code: int = 0, length: int = 10, status: int = 1, user_id: Union[int, None] = None, access_code: Union[str, None] = None) -> None:
super().__init__(code, length, status)
self.user_id = user_id if user_id is not None else -1
self.access_code = access_code if access_code is not None else "00000000000000000000"
def make(self) -> bytes:
header = super().make()
resp_struct = Struct(
Padding(22),
"user_id" / Int32sl,
"unk1" / Int32sl,
"access_code" / Int8ub[10],
"unk2" / Int32sl
)
return header + resp_struct.build(dict(
user_id = self.user_id,
unk1 = -1,
access_code = self.access_code,
unk2 = 1,
))

View File

@ -0,0 +1,25 @@
from construct import Struct, Int32sl, Padding
from typing import Union
from .base import *
class ADBLookupRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.access_code = data[0x20:0x2A].hex()
class ADBLookupResponse(ADBBaseResponse):
def __init__(self, code: int = 0, length: int = 10, status: int = 1, user_id: Union[int, None] = None) -> None:
super().__init__(code, length, status)
self.user_id = user_id if user_id is not None else -1
def make(self):
header = super().make()
resp_struct = Struct(
Padding(0x16),
"user_id" / Int32sl
)
return header + resp_struct.build(dict(
user_id = self.user_id
))

View File

@ -0,0 +1,28 @@
from construct import Struct, Int16ul, Padding
from .base import *
class ADBTouchRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.access_code = data[0x20:0x2A].hex()
class ADBTouchResponse(ADBBaseResponse):
def __init__(self, code: int = 0, length: int = 10, status: int = 1) -> None:
super().__init__(code, length, status)
self.unk1 = 0x006F
self.unk2 = 0x0001
def make(self):
header = super().make()
resp_struct = Struct(
Padding(0x05),
"unk1" / Int16ul,
Padding(2),
"unk2" / Int16ul,
)
return header + resp_struct.build(dict(
unk1 = self.unk1,
unk2 = self.unk2
))

View File

@ -2,27 +2,18 @@ from twisted.internet.protocol import Factory, Protocol
import logging, coloredlogs import logging, coloredlogs
from Crypto.Cipher import AES from Crypto.Cipher import AES
import struct import struct
from typing import Dict, Any from typing import Dict, Tuple, Callable, Union
from typing_extensions import Final
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
from core.config import CoreConfig from core.config import CoreConfig
from core.data import Data from core.data import Data
from .adb_handlers import *
class AimedbProtocol(Protocol): class AimedbProtocol(Protocol):
AIMEDB_RESPONSE_CODES = { CMD_CODE_GOODBYE: Final[int] = 0x66
"felica_lookup": 0x03, request_list: Dict[int, Tuple[Callable[[bytes, int], Union[ADBBaseResponse, bytes]], int, str]] = {}
"lookup": 0x06,
"log": 0x0A,
"campaign": 0x0C,
"touch": 0x0E,
"lookup2": 0x10,
"felica_lookup2": 0x12,
"log2": 0x14,
"hello": 0x65,
}
request_list: Dict[int, Any] = {}
def __init__(self, core_cfg: CoreConfig) -> None: def __init__(self, core_cfg: CoreConfig) -> None:
self.logger = logging.getLogger("aimedb") self.logger = logging.getLogger("aimedb")
@ -32,16 +23,19 @@ class AimedbProtocol(Protocol):
self.logger.error("!!!KEY NOT SET!!!") self.logger.error("!!!KEY NOT SET!!!")
exit(1) exit(1)
self.request_list[0x01] = self.handle_felica_lookup self.register_handler(0x01, 0x03, self.handle_felica_lookup, 'felica_lookup')
self.request_list[0x04] = self.handle_lookup self.register_handler(0x04, 0x06, self.handle_lookup, 'lookup')
self.request_list[0x05] = self.handle_register self.register_handler(0x05, 0x06, self.handle_register, 'register')
self.request_list[0x09] = self.handle_log self.register_handler(0x09, 0x0A, self.handle_log, 'log')
self.request_list[0x0B] = self.handle_campaign self.register_handler(0x0B, 0x0C, self.handle_campaign, 'campaign')
self.request_list[0x0D] = self.handle_touch self.register_handler(0x0D, 0x0E, self.handle_touch, 'touch')
self.request_list[0x0F] = self.handle_lookup2 self.register_handler(0x0F, 0x10, self.handle_lookup, 'lookup2')
self.request_list[0x11] = self.handle_felica_lookup2 self.register_handler(0x11, 0x12, self.handle_felica_lookup2, 'felica_lookup2')
self.request_list[0x13] = self.handle_log2 self.register_handler(0x13, 0x14, self.handle_log2, 'log2')
self.request_list[0x64] = self.handle_hello self.register_handler(0x64, 0x65, self.handle_hello, 'hello')
def register_handler(self, cmd: int, resp:int, handler: Callable[[bytes, int], Union[ADBBaseResponse, bytes]], name: str) -> None:
self.request_list[cmd] = (handler, resp, name)
def append_padding(self, data: bytes): def append_padding(self, data: bytes):
"""Appends 0s to the end of the data until it's at the correct size""" """Appends 0s to the end of the data until it's at the correct size"""
@ -63,70 +57,70 @@ class AimedbProtocol(Protocol):
try: try:
decrypted = cipher.decrypt(data) decrypted = cipher.decrypt(data)
except:
self.logger.error(f"Failed to decrypt {data.hex()}") except Exception as e:
self.logger.error(f"Failed to decrypt {data.hex()} because {e}")
return None return None
self.logger.debug(f"{self.transport.getPeer().host} wrote {decrypted.hex()}") self.logger.debug(f"{self.transport.getPeer().host} wrote {decrypted.hex()}")
if not decrypted[1] == 0xA1 and not decrypted[0] == 0x3E: head = ADBHeader.from_data(decrypted)
self.logger.error(f"Bad magic")
return None
req_code = decrypted[4] if head.cmd == self.CMD_CODE_GOODBYE:
# Goodbye just requires a disconnect, no reply
if req_code == 0x66:
self.logger.info(f"goodbye from {self.transport.getPeer().host}") self.logger.info(f"goodbye from {self.transport.getPeer().host}")
self.transport.loseConnection() self.transport.loseConnection()
return return
try: handler, resp_code, name = self.request_list.get(head.cmd, (self.handle_default, 0x00, 'default'))
resp = self.request_list[req_code](decrypted)
encrypted = cipher.encrypt(resp) if resp_code == 0:
self.logger.debug(f"Response {resp.hex()}") self.logger.warning(f"No handler for cmd {hex(head.cmd)}")
else:
self.logger.info(f"{name} from {self.transport.getPeer().host}")
resp = handler(decrypted, resp_code)
if type(resp) == ADBBaseResponse or issubclass(type(resp), ADBBaseResponse):
resp_bytes = resp.make()
if len(resp_bytes) != resp.head.length:
resp_bytes = self.append_padding(resp_bytes)
elif type(resp) == bytes:
resp_bytes = resp
else:
raise TypeError(f"Unsupported type returned by ADB handler for {name}: {type(resp)}")
try:
encrypted = cipher.encrypt(resp_bytes)
self.logger.debug(f"Response {resp_bytes.hex()}")
self.transport.write(encrypted) self.transport.write(encrypted)
except KeyError: except Exception as e:
self.logger.error(f"Unknown command code {hex(req_code)}") self.logger.error(f"Failed to encrypt {resp_bytes.hex()} because {e}")
return None
def handle_default(self, data: bytes, resp_code: int, length: int = 10) -> ADBBaseResponse:
return ADBBaseResponse(resp_code, length)
except ValueError as e: def handle_campaign(self, data: bytes, resp_code: int) -> ADBBaseResponse:
self.logger.error(f"Failed to encrypt {resp.hex()} because {e}") return self.handle_default(data, resp_code, 0x0200)
return None
def handle_campaign(self, data: bytes) -> bytes: def handle_hello(self, data: bytes, resp_code: int) -> ADBBaseResponse:
self.logger.info(f"campaign from {self.transport.getPeer().host}") return self.handle_default(data, resp_code, 0x0020)
ret = struct.pack(
"<5H",
0xA13E,
0x3087,
self.AIMEDB_RESPONSE_CODES["campaign"],
0x0200,
0x0001,
)
return self.append_padding(ret)
def handle_hello(self, data: bytes) -> bytes: def handle_lookup(self, data: bytes, resp_code: int) -> ADBBaseResponse:
self.logger.info(f"hello from {self.transport.getPeer().host}") req = ADBLookupRequest(data)
ret = struct.pack( user_id = self.data.card.get_user_id_from_card(req.access_code)
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["hello"], 0x0020, 0x0001
)
return self.append_padding(ret)
def handle_lookup(self, data: bytes) -> bytes:
luid = data[0x20:0x2A].hex()
user_id = self.data.card.get_user_id_from_card(access_code=luid)
if user_id is None:
user_id = -1
ret = ADBLookupResponse(resp_code, 0x0130, 1, user_id)
self.logger.info( self.logger.info(
f"lookup from {self.transport.getPeer().host}: luid {luid} -> user_id {user_id}" f"access_code {req.access_code} -> user_id {ret.user_id}"
) )
ret = struct.pack( """
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["lookup"], 0x0130, 0x0001
)
ret += bytes(0x20 - len(ret)) ret += bytes(0x20 - len(ret))
if user_id is None: if user_id is None:
@ -134,16 +128,10 @@ class AimedbProtocol(Protocol):
else: else:
ret += struct.pack("<l", user_id) ret += struct.pack("<l", user_id)
return self.append_padding(ret) return self.append_padding(ret)
"""
return ret
def handle_lookup2(self, data: bytes) -> bytes: def handle_felica_lookup(self, data: bytes, resp_code: int) -> bytes:
self.logger.info(f"lookup2")
ret = bytearray(self.handle_lookup(data))
ret[4] = self.AIMEDB_RESPONSE_CODES["lookup2"]
return bytes(ret)
def handle_felica_lookup(self, data: bytes) -> bytes:
idm = data[0x20:0x28].hex() idm = data[0x20:0x28].hex()
pmm = data[0x28:0x30].hex() pmm = data[0x28:0x30].hex()
access_code = self.data.card.to_access_code(idm) access_code = self.data.card.to_access_code(idm)
@ -155,7 +143,7 @@ class AimedbProtocol(Protocol):
"<5H", "<5H",
0xA13E, 0xA13E,
0x3087, 0x3087,
self.AIMEDB_RESPONSE_CODES["felica_lookup"], resp_code,
0x0030, 0x0030,
0x0001, 0x0001,
) )
@ -164,7 +152,7 @@ class AimedbProtocol(Protocol):
return self.append_padding(ret) return self.append_padding(ret)
def handle_felica_lookup2(self, data: bytes) -> bytes: def handle_felica_lookup2(self, data: bytes, resp_code: int) -> bytes:
idm = data[0x30:0x38].hex() idm = data[0x30:0x38].hex()
pmm = data[0x38:0x40].hex() pmm = data[0x38:0x40].hex()
access_code = self.data.card.to_access_code(idm) access_code = self.data.card.to_access_code(idm)
@ -181,7 +169,7 @@ class AimedbProtocol(Protocol):
"<5H", "<5H",
0xA13E, 0xA13E,
0x3087, 0x3087,
self.AIMEDB_RESPONSE_CODES["felica_lookup2"], resp_code,
0x0140, 0x0140,
0x0001, 0x0001,
) )
@ -192,18 +180,19 @@ class AimedbProtocol(Protocol):
return self.append_padding(ret) return self.append_padding(ret)
def handle_touch(self, data: bytes) -> bytes: def handle_touch(self, data: bytes, resp_code: int) -> ADBBaseResponse:
self.logger.info(f"touch from {self.transport.getPeer().host}") """
ret = struct.pack( ret = struct.pack(
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["touch"], 0x0050, 0x0001 "<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["touch"], 0x0050, 0x0001
) )
ret += bytes(5) ret += bytes(5)
ret += struct.pack("<3H", 0x6F, 0, 1) ret += struct.pack("<3H", 0x6F, 0, 1)
"""
return self.append_padding(ret) return ADBTouchResponse(resp_code, 0x0050)
def handle_register(self, data: bytes) -> bytes: def handle_register(self, data: bytes, resp_code: int) -> bytes:
luid = data[0x20:0x2A].hex() req = ADBLookupRequest(data)
if self.config.server.allow_user_registration: if self.config.server.allow_user_registration:
user_id = self.data.user.create_user() user_id = self.data.user.create_user()
@ -212,52 +201,45 @@ class AimedbProtocol(Protocol):
self.logger.error("Failed to register user!") self.logger.error("Failed to register user!")
else: else:
card_id = self.data.card.create_card(user_id, luid) card_id = self.data.card.create_card(user_id, req.access_code)
if card_id is None: if card_id is None:
user_id = -1 user_id = -1
self.logger.error("Failed to register card!") self.logger.error("Failed to register card!")
self.logger.info( self.logger.info(
f"register from {self.transport.getPeer().host}: luid {luid} -> user_id {user_id}" f"Register access code {req.access_code} -> user_id {user_id}"
) )
else: else:
self.logger.info( self.logger.info(
f"register from {self.transport.getPeer().host} blocked!: luid {luid}" f"Registration blocked!: access code {req.access_code}"
) )
user_id = -1 user_id = -1
resp = ADBLookupResponse(resp_code, 0x30, 1 if user_id > 0 else 0)
resp.user_id = user_id
"""
ret = struct.pack( ret = struct.pack(
"<5H", "<5H",
0xA13E, 0xA13E,
0x3087, 0x3087,
self.AIMEDB_RESPONSE_CODES["lookup"], resp_code,
0x0030, 0x0030,
0x0001 if user_id > -1 else 0, 0x0001 if user_id > -1 else 0,
) )
ret += bytes(0x20 - len(ret)) ret += bytes(0x20 - len(ret))
ret += struct.pack("<l", user_id) ret += struct.pack("<l", user_id)
"""
return resp
return self.append_padding(ret) def handle_log(self, data: bytes, resp_code: int) -> bytes:
def handle_log(self, data: bytes) -> bytes:
# TODO: Save aimedb logs # TODO: Save aimedb logs
self.logger.info(f"log from {self.transport.getPeer().host}") return self.handle_default(data, resp_code, 0x0020)
ret = struct.pack(
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["log"], 0x0020, 0x0001
)
return self.append_padding(ret)
def handle_log2(self, data: bytes) -> bytes: def handle_log2(self, data: bytes, resp_code: int) -> bytes:
self.logger.info(f"log2 from {self.transport.getPeer().host}") # TODO: Save aimedb logs
ret = struct.pack( return self.handle_default(data, resp_code, 0x0040)
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["log2"], 0x0040, 0x0001
)
ret += bytes(22)
ret += struct.pack("H", 1)
return self.append_padding(ret)
class AimedbFactory(Factory): class AimedbFactory(Factory):