artemis/core/aimedb.py
2023-06-03 13:52:32 -05:00

280 lines
9.5 KiB
Python

from twisted.internet.protocol import Factory, Protocol
import logging, coloredlogs
from Crypto.Cipher import AES
import struct
from typing import Dict, Tuple, Callable, Union
from typing_extensions import Final
from logging.handlers import TimedRotatingFileHandler
from core.config import CoreConfig
from core.data import Data
from .adb_handlers import *
class AimedbProtocol(Protocol):
CMD_CODE_GOODBYE: Final[int] = 0x66
request_list: Dict[int, Tuple[Callable[[bytes, int], Union[ADBBaseResponse, bytes]], int, str]] = {}
def __init__(self, core_cfg: CoreConfig) -> None:
self.logger = logging.getLogger("aimedb")
self.config = core_cfg
self.data = Data(core_cfg)
if core_cfg.aimedb.key == "":
self.logger.error("!!!KEY NOT SET!!!")
exit(1)
self.register_handler(0x01, 0x03, self.handle_felica_lookup, 'felica_lookup')
self.register_handler(0x04, 0x06, self.handle_lookup, 'lookup')
self.register_handler(0x05, 0x06, self.handle_register, 'register')
self.register_handler(0x09, 0x0A, self.handle_log, 'log')
self.register_handler(0x0B, 0x0C, self.handle_campaign, 'campaign')
self.register_handler(0x0D, 0x0E, self.handle_touch, 'touch')
self.register_handler(0x0F, 0x10, self.handle_lookup, 'lookup2')
self.register_handler(0x11, 0x12, self.handle_felica_lookup2, 'felica_lookup2')
self.register_handler(0x13, 0x14, self.handle_log2, 'log2')
self.register_handler(0x64, 0x65, self.handle_hello, 'hello')
def register_handler(self, cmd: int, resp:int, handler: Callable[[bytes, int], Union[ADBBaseResponse, bytes]], name: str) -> None:
self.request_list[cmd] = (handler, resp, name)
def append_padding(self, data: bytes):
"""Appends 0s to the end of the data until it's at the correct size"""
length = struct.unpack_from("<H", data, 6)
padding_size = length[0] - len(data)
data += bytes(padding_size)
return data
def connectionMade(self) -> None:
self.logger.debug(f"{self.transport.getPeer().host} Connected")
def connectionLost(self, reason) -> None:
self.logger.debug(
f"{self.transport.getPeer().host} Disconnected - {reason.value}"
)
def dataReceived(self, data: bytes) -> None:
cipher = AES.new(self.config.aimedb.key.encode(), AES.MODE_ECB)
try:
decrypted = cipher.decrypt(data)
except Exception as e:
self.logger.error(f"Failed to decrypt {data.hex()} because {e}")
return None
self.logger.debug(f"{self.transport.getPeer().host} wrote {decrypted.hex()}")
head = ADBHeader.from_data(decrypted)
if head.cmd == self.CMD_CODE_GOODBYE:
# Goodbye just requires a disconnect, no reply
self.logger.info(f"goodbye from {self.transport.getPeer().host}")
self.transport.loseConnection()
return
handler, resp_code, name = self.request_list.get(head.cmd, (self.handle_default, 0x00, 'default'))
if resp_code == 0:
self.logger.warning(f"No handler for cmd {hex(head.cmd)}")
else:
self.logger.info(f"{name} from {self.transport.getPeer().host}")
resp = handler(decrypted, resp_code)
if type(resp) == ADBBaseResponse or issubclass(type(resp), ADBBaseResponse):
resp_bytes = resp.make()
if len(resp_bytes) != resp.head.length:
resp_bytes = self.append_padding(resp_bytes)
elif type(resp) == bytes:
resp_bytes = resp
else:
raise TypeError(f"Unsupported type returned by ADB handler for {name}: {type(resp)}")
try:
encrypted = cipher.encrypt(resp_bytes)
self.logger.debug(f"Response {resp_bytes.hex()}")
self.transport.write(encrypted)
except Exception as e:
self.logger.error(f"Failed to encrypt {resp_bytes.hex()} because {e}")
def handle_default(self, data: bytes, resp_code: int, length: int = 10) -> ADBBaseResponse:
return ADBBaseResponse(resp_code, length)
def handle_campaign(self, data: bytes, resp_code: int) -> ADBBaseResponse:
return self.handle_default(data, resp_code, 0x0200)
def handle_hello(self, data: bytes, resp_code: int) -> ADBBaseResponse:
return self.handle_default(data, resp_code, 0x0020)
def handle_lookup(self, data: bytes, resp_code: int) -> ADBBaseResponse:
req = ADBLookupRequest(data)
user_id = self.data.card.get_user_id_from_card(req.access_code)
ret = ADBLookupResponse(resp_code, 0x0130, 1, user_id)
self.logger.info(
f"access_code {req.access_code} -> user_id {ret.user_id}"
)
"""
ret += bytes(0x20 - len(ret))
if user_id is None:
ret += struct.pack("<iH", -1, 0)
else:
ret += struct.pack("<l", user_id)
return self.append_padding(ret)
"""
return ret
def handle_felica_lookup(self, data: bytes, resp_code: int) -> bytes:
idm = data[0x20:0x28].hex()
pmm = data[0x28:0x30].hex()
access_code = self.data.card.to_access_code(idm)
self.logger.info(
f"felica_lookup from {self.transport.getPeer().host}: idm {idm} pmm {pmm} -> access_code {access_code}"
)
ret = struct.pack(
"<5H",
0xA13E,
0x3087,
resp_code,
0x0030,
0x0001,
)
ret += bytes(26)
ret += bytes.fromhex(access_code)
return self.append_padding(ret)
def handle_felica_lookup2(self, data: bytes, resp_code: int) -> bytes:
idm = data[0x30:0x38].hex()
pmm = data[0x38:0x40].hex()
access_code = self.data.card.to_access_code(idm)
user_id = self.data.card.get_user_id_from_card(access_code=access_code)
if user_id is None:
user_id = -1
self.logger.info(
f"felica_lookup2 from {self.transport.getPeer().host}: idm {idm} ipm {pmm} -> access_code {access_code} user_id {user_id}"
)
ret = struct.pack(
"<5H",
0xA13E,
0x3087,
resp_code,
0x0140,
0x0001,
)
ret += bytes(22)
ret += struct.pack("<lq", user_id, -1) # first -1 is ext_id, 3rd is access code
ret += bytes.fromhex(access_code)
ret += struct.pack("<l", 1)
return self.append_padding(ret)
def handle_touch(self, data: bytes, resp_code: int) -> ADBBaseResponse:
"""
ret = struct.pack(
"<5H", 0xA13E, 0x3087, self.AIMEDB_RESPONSE_CODES["touch"], 0x0050, 0x0001
)
ret += bytes(5)
ret += struct.pack("<3H", 0x6F, 0, 1)
"""
return ADBTouchResponse(resp_code, 0x0050)
def handle_register(self, data: bytes, resp_code: int) -> bytes:
req = ADBLookupRequest(data)
if self.config.server.allow_user_registration:
user_id = self.data.user.create_user()
if user_id is None:
user_id = -1
self.logger.error("Failed to register user!")
else:
card_id = self.data.card.create_card(user_id, req.access_code)
if card_id is None:
user_id = -1
self.logger.error("Failed to register card!")
self.logger.info(
f"Register access code {req.access_code} -> user_id {user_id}"
)
else:
self.logger.info(
f"Registration blocked!: access code {req.access_code}"
)
user_id = -1
resp = ADBLookupResponse(resp_code, 0x30, 1 if user_id > 0 else 0)
resp.user_id = user_id
"""
ret = struct.pack(
"<5H",
0xA13E,
0x3087,
resp_code,
0x0030,
0x0001 if user_id > -1 else 0,
)
ret += bytes(0x20 - len(ret))
ret += struct.pack("<l", user_id)
"""
return resp
def handle_log(self, data: bytes, resp_code: int) -> bytes:
# TODO: Save aimedb logs
return self.handle_default(data, resp_code, 0x0020)
def handle_log2(self, data: bytes, resp_code: int) -> bytes:
# TODO: Save aimedb logs
return self.handle_default(data, resp_code, 0x0040)
class AimedbFactory(Factory):
protocol = AimedbProtocol
def __init__(self, cfg: CoreConfig) -> None:
self.config = cfg
log_fmt_str = "[%(asctime)s] Aimedb | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
self.logger = logging.getLogger("aimedb")
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.config.server.log_dir, "aimedb"),
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.config.aimedb.loglevel)
coloredlogs.install(
level=cfg.aimedb.loglevel, logger=self.logger, fmt=log_fmt_str
)
if self.config.aimedb.key == "":
self.logger.error("Please set 'key' field in your config file.")
exit(1)
self.logger.info(f"Ready on port {self.config.aimedb.port}")
def buildProtocol(self, addr):
return AimedbProtocol(self.config)