Compare commits

...

13 Commits

Author SHA1 Message Date
Hay1tsme
d7f9eed3f2 adb: update register 2023-06-03 13:52:32 -05:00
Hay1tsme
46dd8fe84f adb: see previous 2023-06-03 13:41:03 -05:00
Hay1tsme
d167bfe63d adb: comparability with 3.9 and below 2023-06-03 13:39:28 -05:00
ad9bef618f adb: add felica classes 2023-05-26 18:02:20 -04:00
7ac50868b2 ADB: ADBLookupResponse 2023-05-22 17:58:11 -04:00
8f29c796b1 adb: type checking fix 2023-05-22 17:36:15 -04:00
afef15fb00 adb: remove references to AIMEDB_RESPONSE_CODES 2023-05-22 17:30:24 -04:00
a16c85a1e2 adb: fix typo 2023-05-22 17:25:46 -04:00
a4c8897024 adb: hotfix 2023-05-22 17:23:28 -04:00
e9368b2e57 ADB: update touch 2023-05-22 17:17:07 -04:00
2c40343d9f adb: update log and log2 2023-05-22 17:11:09 -04:00
3e5c210bc4 adb: update lookup and lookup2 2023-05-22 17:08:58 -04:00
4b90d2ca40 begin adb redux, add base classes 2023-05-22 16:15:10 -04:00
6 changed files with 263 additions and 108 deletions

View File

@ -0,0 +1,4 @@
from .base import ADBBaseRequest, ADBBaseResponse, ADBHeader, ADBHeaderException
from .lookup import ADBLookupRequest, ADBLookupResponse
from .touch import ADBTouchRequest, ADBTouchResponse
from .felica import ADBFelicaLookupRequest, ADBFelicaLookupResponse, ADBFelicaLookup2Request, ADBFelicaLookup2Response

60
core/adb_handlers/base.py Normal file
View File

@ -0,0 +1,60 @@
import struct
from construct import Struct, Int16ul, Const
class ADBHeaderException(Exception):
pass
# everything is LE
class ADBHeader:
def __init__(self, magic: int, unknown: int, cmd: int, length: int, status: int) -> None:
self.magic = magic # u16
self.unknown = unknown # u16
self.cmd = cmd # u16
self.length = length # u16
self.status = status # u16
self.validate()
@classmethod
def from_data(cls, data: bytes) -> "ADBHeader":
magic, unknown, cmd, length, status = struct.unpack_from("<5H", data)
return cls(magic, unknown, cmd, length, status)
def validate(self) -> bool:
if self.magic != 0xa13e:
raise ADBHeaderException(f"Magic {self.magic} != 0xa13e")
return True
def make(self) -> bytes:
resp_struct = Struct(
"magic" / Int16ul, #Const(b">\xa1"),
"unknown" / Int16ul,
"response_code" / Int16ul,
"length" / Int16ul,
"status" / Int16ul,
)
return resp_struct.build(dict(
magic=self.magic,
unknown=self.unknown,
response_code=self.cmd,
length=self.length,
status=self.status,
))
class ADBBaseRequest:
def __init__(self, data: bytes) -> None:
self.head = ADBHeader.from_data(data)
class ADBBaseResponse:
def __init__(self, code: int = 0, length: int = 10, status: int = 1) -> None:
self.head = ADBHeader(0xa13e, 0x3087, code, length, status)
def append_padding(self, data: bytes):
"""Appends 0s to the end of the data until it's at the correct size"""
padding_size = self.head.length - len(data)
data += bytes(padding_size)
return data
def make(self) -> bytes:
return self.head.make()

View File

@ -0,0 +1,56 @@
from construct import Struct, Int32sl, Padding, Int8ub
from typing import Union
from .base import *
class ADBFelicaLookupRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.idm = data[0x20:0x28].hex()
self.pmm = data[0x28:0x30].hex()
class ADBFelicaLookupResponse(ADBBaseResponse):
def __init__(self, code: int = 0, length: int = 10, status: int = 1, access_code: Union[str, None] = None) -> None:
super().__init__(code, length, status)
self.access_code = access_code if access_code is not None else "00000000000000000000"
def make(self) -> bytes:
header = super().make()
resp_struct = Struct(
Padding(26),
"access_code" / Int8ub[10],
)
return header + resp_struct.build(dict(
access_code = self.access_code
))
class ADBFelicaLookup2Request(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.idm = data[0x30:0x38].hex()
self.pmm = data[0x38:0x40].hex()
class ADBFelicaLookup2Response(ADBBaseResponse):
def __init__(self, code: int = 0, length: int = 10, status: int = 1, user_id: Union[int, None] = None, access_code: Union[str, None] = None) -> None:
super().__init__(code, length, status)
self.user_id = user_id if user_id is not None else -1
self.access_code = access_code if access_code is not None else "00000000000000000000"
def make(self) -> bytes:
header = super().make()
resp_struct = Struct(
Padding(22),
"user_id" / Int32sl,
"unk1" / Int32sl,
"access_code" / Int8ub[10],
"unk2" / Int32sl
)
return header + resp_struct.build(dict(
user_id = self.user_id,
unk1 = -1,
access_code = self.access_code,
unk2 = 1,
))

View File

@ -0,0 +1,25 @@
from construct import Struct, Int32sl, Padding
from typing import Union
from .base import *
class ADBLookupRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.access_code = data[0x20:0x2A].hex()
class ADBLookupResponse(ADBBaseResponse):
def __init__(self, code: int = 0, length: int = 10, status: int = 1, user_id: Union[int, None] = None) -> None:
super().__init__(code, length, status)
self.user_id = user_id if user_id is not None else -1
def make(self):
header = super().make()
resp_struct = Struct(
Padding(0x16),
"user_id" / Int32sl
)
return header + resp_struct.build(dict(
user_id = self.user_id
))

View File

@ -0,0 +1,28 @@
from construct import Struct, Int16ul, Padding
from .base import *
class ADBTouchRequest(ADBBaseRequest):
def __init__(self, data: bytes) -> None:
super().__init__(data)
self.access_code = data[0x20:0x2A].hex()
class ADBTouchResponse(ADBBaseResponse):
def __init__(self, code: int = 0, length: int = 10, status: int = 1) -> None:
super().__init__(code, length, status)
self.unk1 = 0x006F
self.unk2 = 0x0001
def make(self):
header = super().make()
resp_struct = Struct(
Padding(0x05),
"unk1" / Int16ul,
Padding(2),
"unk2" / Int16ul,
)
return header + resp_struct.build(dict(
unk1 = self.unk1,
unk2 = self.unk2
))

View File

@ -2,27 +2,18 @@ from twisted.internet.protocol import Factory, Protocol
import logging, coloredlogs
from Crypto.Cipher import AES
import struct
from typing import Dict, Any
from typing import Dict, Tuple, Callable, Union
from typing_extensions import Final
from logging.handlers import TimedRotatingFileHandler
from core.config import CoreConfig
from core.data import Data
from .adb_handlers import *
class AimedbProtocol(Protocol):
AIMEDB_RESPONSE_CODES = {
"felica_lookup": 0x03,
"lookup": 0x06,
"log": 0x0A,
"campaign": 0x0C,
"touch": 0x0E,
"lookup2": 0x10,
"felica_lookup2": 0x12,
"log2": 0x14,
"hello": 0x65,
}
request_list: Dict[int, Any] = {}
CMD_CODE_GOODBYE: Final[int] = 0x66
request_list: Dict[int, Tuple[Callable[[bytes, int], Union[ADBBaseResponse, bytes]], int, str]] = {}
def __init__(self, core_cfg: CoreConfig) -> None:
self.logger = logging.getLogger("aimedb")
@ -32,16 +23,19 @@ class AimedbProtocol(Protocol):
self.logger.error("!!!KEY NOT SET!!!")
exit(1)
self.request_list[0x01] = self.handle_felica_lookup
self.request_list[0x04] = self.handle_lookup
self.request_list[0x05] = self.handle_register
self.request_list[0x09] = self.handle_log
self.request_list[0x0B] = self.handle_campaign
self.request_list[0x0D] = self.handle_touch
self.request_list[0x0F] = self.handle_lookup2
self.request_list[0x11] = self.handle_felica_lookup2
self.request_list[0x13] = self.handle_log2
self.request_list[0x64] = self.handle_hello
self.register_handler(0x01, 0x03, self.handle_felica_lookup, 'felica_lookup')
self.register_handler(0x04, 0x06, self.handle_lookup, 'lookup')
self.register_handler(0x05, 0x06, self.handle_register, 'register')
self.register_handler(0x09, 0x0A, self.handle_log, 'log')
self.register_handler(0x0B, 0x0C, self.handle_campaign, 'campaign')
self.register_handler(0x0D, 0x0E, self.handle_touch, 'touch')
self.register_handler(0x0F, 0x10, self.handle_lookup, 'lookup2')
self.register_handler(0x11, 0x12, self.handle_felica_lookup2, 'felica_lookup2')
self.register_handler(0x13, 0x14, self.handle_log2, 'log2')
self.register_handler(0x64, 0x65, self.handle_hello, 'hello')
def register_handler(self, cmd: int, resp:int, handler: Callable[[bytes, int], Union[ADBBaseResponse, bytes]], name: str) -> None:
self.request_list[cmd] = (handler, resp, name)
def append_padding(self, data: bytes):
"""Appends 0s to the end of the data until it's at the correct size"""
@ -63,70 +57,70 @@ class AimedbProtocol(Protocol):
try:
decrypted = cipher.decrypt(data)
except:
self.logger.error(f"Failed to decrypt {data.hex()}")
except Exception as e:
self.logger.error(f"Failed to decrypt {data.hex()} because {e}")
return None
self.logger.debug(f"{self.transport.getPeer().host} wrote {decrypted.hex()}")
if not decrypted[1] == 0xA1 and not decrypted[0] == 0x3E:
self.logger.error(f"Bad magic")
return None
head = ADBHeader.from_data(decrypted)
req_code = decrypted[4]
if req_code == 0x66:
if head.cmd == self.CMD_CODE_GOODBYE:
# Goodbye just requires a disconnect, no reply
self.logger.info(f"goodbye from {self.transport.getPeer().host}")
self.transport.loseConnection()
return
try:
resp = self.request_list[req_code](decrypted)
encrypted = cipher.encrypt(resp)
self.logger.debug(f"Response {resp.hex()}")
handler, resp_code, name = self.request_list.get(head.cmd, (self.handle_default, 0x00, 'default'))
if resp_code == 0:
self.logger.warning(f"No handler for cmd {hex(head.cmd)}")
else:
self.logger.info(f"{name} from {self.transport.getPeer().host}")
resp = handler(decrypted, resp_code)
if type(resp) == ADBBaseResponse or issubclass(type(resp), ADBBaseResponse):
resp_bytes = resp.make()
if len(resp_bytes) != resp.head.length:
resp_bytes = self.append_padding(resp_bytes)
elif type(resp) == bytes:
resp_bytes = resp
else:
raise TypeError(f"Unsupported type returned by ADB handler for {name}: {type(resp)}")
try:
encrypted = cipher.encrypt(resp_bytes)
self.logger.debug(f"Response {resp_bytes.hex()}")
self.transport.write(encrypted)
except KeyError:
self.logger.error(f"Unknown command code {hex(req_code)}")
return None
except Exception as e:
self.logger.error(f"Failed to encrypt {resp_bytes.hex()} because {e}")
def handle_default(self, data: bytes, resp_code: int, length: int = 10) -> ADBBaseResponse:
return ADBBaseResponse(resp_code, length)
except ValueError as e:
self.logger.error(f"Failed to encrypt {resp.hex()} because {e}")
return None
def handle_campaign(self, data: bytes, resp_code: int) -> ADBBaseResponse:
return self.handle_default(data, resp_code, 0x0200)
def handle_campaign(self, data: bytes) -> bytes:
self.logger.info(f"campaign from {self.transport.getPeer().host}")
ret = struct.pack(
"<5H",
0xA13E,
0x3087,
self.AIMEDB_RESPONSE_CODES["campaign"],
0x0200,
0x0001,
)
return self.append_padding(ret)
def handle_hello(self, data: bytes, resp_code: int) -> ADBBaseResponse:
return self.handle_default(data, resp_code, 0x0020)
def handle_hello(self, data: bytes) -> bytes:
self.logger.info(f"hello from {self.transport.getPeer().host}")
ret = struct.pack(
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["hello"], 0x0020, 0x0001
)
return self.append_padding(ret)
def handle_lookup(self, data: bytes) -> bytes:
luid = data[0x20:0x2A].hex()
user_id = self.data.card.get_user_id_from_card(access_code=luid)
if user_id is None:
user_id = -1
def handle_lookup(self, data: bytes, resp_code: int) -> ADBBaseResponse:
req = ADBLookupRequest(data)
user_id = self.data.card.get_user_id_from_card(req.access_code)
ret = ADBLookupResponse(resp_code, 0x0130, 1, user_id)
self.logger.info(
f"lookup from {self.transport.getPeer().host}: luid {luid} -> user_id {user_id}"
f"access_code {req.access_code} -> user_id {ret.user_id}"
)
ret = struct.pack(
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["lookup"], 0x0130, 0x0001
)
"""
ret += bytes(0x20 - len(ret))
if user_id is None:
@ -134,16 +128,10 @@ class AimedbProtocol(Protocol):
else:
ret += struct.pack("<l", user_id)
return self.append_padding(ret)
"""
return ret
def handle_lookup2(self, data: bytes) -> bytes:
self.logger.info(f"lookup2")
ret = bytearray(self.handle_lookup(data))
ret[4] = self.AIMEDB_RESPONSE_CODES["lookup2"]
return bytes(ret)
def handle_felica_lookup(self, data: bytes) -> bytes:
def handle_felica_lookup(self, data: bytes, resp_code: int) -> bytes:
idm = data[0x20:0x28].hex()
pmm = data[0x28:0x30].hex()
access_code = self.data.card.to_access_code(idm)
@ -155,7 +143,7 @@ class AimedbProtocol(Protocol):
"<5H",
0xA13E,
0x3087,
self.AIMEDB_RESPONSE_CODES["felica_lookup"],
resp_code,
0x0030,
0x0001,
)
@ -164,7 +152,7 @@ class AimedbProtocol(Protocol):
return self.append_padding(ret)
def handle_felica_lookup2(self, data: bytes) -> bytes:
def handle_felica_lookup2(self, data: bytes, resp_code: int) -> bytes:
idm = data[0x30:0x38].hex()
pmm = data[0x38:0x40].hex()
access_code = self.data.card.to_access_code(idm)
@ -181,7 +169,7 @@ class AimedbProtocol(Protocol):
"<5H",
0xA13E,
0x3087,
self.AIMEDB_RESPONSE_CODES["felica_lookup2"],
resp_code,
0x0140,
0x0001,
)
@ -192,18 +180,19 @@ class AimedbProtocol(Protocol):
return self.append_padding(ret)
def handle_touch(self, data: bytes) -> bytes:
self.logger.info(f"touch from {self.transport.getPeer().host}")
def handle_touch(self, data: bytes, resp_code: int) -> ADBBaseResponse:
"""
ret = struct.pack(
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["touch"], 0x0050, 0x0001
)
ret += bytes(5)
ret += struct.pack("<3H", 0x6F, 0, 1)
"""
return self.append_padding(ret)
return ADBTouchResponse(resp_code, 0x0050)
def handle_register(self, data: bytes) -> bytes:
luid = data[0x20:0x2A].hex()
def handle_register(self, data: bytes, resp_code: int) -> bytes:
req = ADBLookupRequest(data)
if self.config.server.allow_user_registration:
user_id = self.data.user.create_user()
@ -212,52 +201,45 @@ class AimedbProtocol(Protocol):
self.logger.error("Failed to register user!")
else:
card_id = self.data.card.create_card(user_id, luid)
card_id = self.data.card.create_card(user_id, req.access_code)
if card_id is None:
user_id = -1
self.logger.error("Failed to register card!")
self.logger.info(
f"register from {self.transport.getPeer().host}: luid {luid} -> user_id {user_id}"
f"Register access code {req.access_code} -> user_id {user_id}"
)
else:
self.logger.info(
f"register from {self.transport.getPeer().host} blocked!: luid {luid}"
f"Registration blocked!: access code {req.access_code}"
)
user_id = -1
resp = ADBLookupResponse(resp_code, 0x30, 1 if user_id > 0 else 0)
resp.user_id = user_id
"""
ret = struct.pack(
"<5H",
0xA13E,
0x3087,
self.AIMEDB_RESPONSE_CODES["lookup"],
resp_code,
0x0030,
0x0001 if user_id > -1 else 0,
)
ret += bytes(0x20 - len(ret))
ret += struct.pack("<l", user_id)
"""
return resp
return self.append_padding(ret)
def handle_log(self, data: bytes) -> bytes:
def handle_log(self, data: bytes, resp_code: int) -> bytes:
# TODO: Save aimedb logs
self.logger.info(f"log from {self.transport.getPeer().host}")
ret = struct.pack(
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["log"], 0x0020, 0x0001
)
return self.append_padding(ret)
return self.handle_default(data, resp_code, 0x0020)
def handle_log2(self, data: bytes) -> bytes:
self.logger.info(f"log2 from {self.transport.getPeer().host}")
ret = struct.pack(
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["log2"], 0x0040, 0x0001
)
ret += bytes(22)
ret += struct.pack("H", 1)
return self.append_padding(ret)
def handle_log2(self, data: bytes, resp_code: int) -> bytes:
# TODO: Save aimedb logs
return self.handle_default(data, resp_code, 0x0040)
class AimedbFactory(Factory):