artemis/core/aimedb.py

411 lines
17 KiB
Python

import logging, coloredlogs
from Crypto.Cipher import AES
from typing import Dict, Tuple, Callable, Union, Optional
import asyncio
from logging.handlers import TimedRotatingFileHandler
from core.config import CoreConfig
from core.utils import create_sega_auth_key
from core.data import Data
from .adb_handlers import *
class AimedbServlette():
request_list: Dict[int, Tuple[Callable[[bytes, int], Union[ADBBaseResponse, bytes]], int, str]] = {}
def __init__(self, core_cfg: CoreConfig) -> None:
self.config = core_cfg
self.data = Data(core_cfg)
self.logger = logging.getLogger("aimedb")
if not hasattr(self.logger, "initted"):
log_fmt_str = "[%(asctime)s] Aimedb | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.config.server.log_dir, "aimedb"),
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.config.aimedb.loglevel)
coloredlogs.install(
level=core_cfg.aimedb.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.initted = True
if not core_cfg.aimedb.key:
self.logger.error("!!!KEY NOT SET!!!")
exit(1)
self.register_handler(0x01, 0x03, self.handle_felica_lookup, 'felica_lookup')
self.register_handler(0x02, 0x03, self.handle_felica_register, 'felica_register')
self.register_handler(0x04, 0x06, self.handle_lookup, 'lookup')
self.register_handler(0x05, 0x06, self.handle_register, 'register')
self.register_handler(0x07, 0x08, self.handle_status_log, 'status_log')
self.register_handler(0x09, 0x0A, self.handle_log, 'aime_log')
self.register_handler(0x0B, 0x0C, self.handle_campaign, 'campaign')
self.register_handler(0x0D, 0x0E, self.handle_campaign_clear, 'campaign_clear')
self.register_handler(0x0F, 0x10, self.handle_lookup_ex, 'lookup_ex')
self.register_handler(0x11, 0x12, self.handle_felica_lookup_ex, 'felica_lookup_ex')
self.register_handler(0x13, 0x14, self.handle_log_ex, 'aime_log_ex')
self.register_handler(0x64, 0x65, self.handle_hello, 'hello')
def register_handler(self, cmd: int, resp:int, handler: Callable[[bytes, int], Union[ADBBaseResponse, bytes]], name: str) -> None:
self.request_list[cmd] = (handler, resp, name)
def start(self) -> None:
self.logger.info(f"Start on port {self.config.aimedb.port}")
addr = self.config.aimedb.listen_address if self.config.aimedb.listen_address else self.config.server.listen_address
asyncio.create_task(asyncio.start_server(self.dataReceived, addr, self.config.aimedb.port))
async def dataReceived(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self.logger.debug(f"Connection made from {writer.get_extra_info('peername')[0]}")
while True:
try:
data: bytes = await reader.read(4096)
if len(data) == 0:
self.logger.debug("Connection closed")
return
await self.process_data(data, reader, writer)
await writer.drain()
except ConnectionResetError as e:
self.logger.debug("Connection reset, disconnecting")
return
async def process_data(self, data: bytes, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> Optional[bytes]:
addr = writer.get_extra_info('peername')[0]
cipher = AES.new(self.config.aimedb.key.encode(), AES.MODE_ECB)
try:
decrypted = cipher.decrypt(data)
except Exception as e:
self.logger.error(f"Failed to decrypt {data.hex()} because {e}")
return
self.logger.debug(f"{addr} wrote {decrypted.hex()}")
try:
head = ADBHeader.from_data(decrypted)
except ADBHeaderException as e:
self.logger.error(f"Error parsing ADB header: {e}")
try:
encrypted = cipher.encrypt(ADBBaseResponse().make())
writer.write(encrypted)
await writer.drain()
return
except Exception as e:
self.logger.error(f"Failed to encrypt default response because {e}")
return
if head.keychip_id == "ABCD1234567" or head.store_id == 0xfff0:
self.logger.warning(f"Request from uninitialized AMLib: {vars(head)}")
if head.cmd == 0x66:
self.logger.info("Goodbye")
writer.close()
return
handler, resp_code, name = self.request_list.get(head.cmd, (self.handle_default, None, 'default'))
if resp_code is None:
self.logger.warning(f"No handler for cmd {hex(head.cmd)}")
elif resp_code > 0:
self.logger.info(f"{name} from {head.keychip_id} ({head.game_id}) @ {addr}")
resp = await handler(decrypted, resp_code)
if type(resp) == ADBBaseResponse or issubclass(type(resp), ADBBaseResponse):
resp_bytes = resp.make()
elif type(resp) == bytes:
resp_bytes = resp
elif resp is None: # Nothing to send, probably a goodbye
self.logger.warn(f"None return by handler for {name}")
return
else:
self.logger.error(f"Unsupported type returned by ADB handler for {name}: {type(resp)}")
raise TypeError(f"Unsupported type returned by ADB handler for {name}: {type(resp)}")
try:
encrypted = cipher.encrypt(resp_bytes)
self.logger.debug(f"Response {resp_bytes.hex()}")
writer.write(encrypted)
except Exception as e:
self.logger.error(f"Failed to encrypt {resp_bytes.hex()} because {e}")
async def handle_default(self, data: bytes, resp_code: int, length: int = 0x20) -> ADBBaseResponse:
req = ADBHeader.from_data(data)
return ADBBaseResponse(resp_code, length, 1, req.game_id, req.store_id, req.keychip_id, req.protocol_ver)
async def handle_hello(self, data: bytes, resp_code: int) -> ADBBaseResponse:
return await self.handle_default(data, resp_code)
async def handle_campaign(self, data: bytes, resp_code: int) -> ADBBaseResponse:
h = ADBHeader.from_data(data)
if h.protocol_ver >= 0x3030:
req = h
resp = ADBCampaignResponse.from_req(req)
else:
req = ADBOldCampaignRequest(data)
self.logger.info(f"Legacy campaign request for campaign {req.campaign_id} (protocol version {hex(h.protocol_ver)})")
resp = ADBOldCampaignResponse.from_req(req.head)
# We don't currently support campaigns
return resp
async def handle_lookup(self, data: bytes, resp_code: int) -> ADBBaseResponse:
req = ADBLookupRequest(data)
user_id = await self.data.card.get_user_id_from_card(req.access_code)
is_banned = await self.data.card.get_card_banned(req.access_code)
is_locked = await self.data.card.get_card_locked(req.access_code)
ret = ADBLookupResponse.from_req(req.head, user_id)
if is_banned and is_locked:
ret.head.status = ADBStatus.BAN_SYS_USER
elif is_banned:
ret.head.status = ADBStatus.BAN_SYS
elif is_locked:
ret.head.status = ADBStatus.LOCK_USER
self.logger.info(
f"access_code {req.access_code} -> user_id {ret.user_id}"
)
if user_id and user_id > 0:
await self.data.card.update_card_last_login(req.access_code)
if req.access_code.startswith("010") or req.access_code.startswith("3"):
await self.data.card.set_chip_id_by_access_code(req.access_code, req.serial_number)
self.logger.info(f"Attempt to set chip id to {req.serial_number} for access code {req.access_code}")
return ret
async def handle_lookup_ex(self, data: bytes, resp_code: int) -> ADBBaseResponse:
req = ADBLookupRequest(data)
user_id = await self.data.card.get_user_id_from_card(req.access_code)
is_banned = await self.data.card.get_card_banned(req.access_code)
is_locked = await self.data.card.get_card_locked(req.access_code)
ret = ADBLookupExResponse.from_req(req.head, user_id)
if is_banned and is_locked:
ret.head.status = ADBStatus.BAN_SYS_USER
elif is_banned:
ret.head.status = ADBStatus.BAN_SYS
elif is_locked:
ret.head.status = ADBStatus.LOCK_USER
self.logger.info(
f"access_code {req.access_code} -> user_id {ret.user_id}"
)
if user_id and user_id > 0 and self.config.aimedb.id_secret:
auth_key = create_sega_auth_key(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id, self.config.aimedb.id_secret, self.config.aimedb.id_lifetime_seconds)
if auth_key is not None:
auth_key_extra_len = 256 - len(auth_key)
auth_key_full = auth_key.encode() + (b"\0" * auth_key_extra_len)
self.logger.debug(f"Generated auth token {auth_key}")
ret.auth_key = auth_key_full
if user_id and user_id > 0:
await self.data.card.update_card_last_login(req.access_code)
return ret
async def handle_felica_lookup(self, data: bytes, resp_code: int) -> bytes:
"""
On official, the IDm is used as a key to look up the stored access code in a large
database. We do not have access to that database so we have to make due with what we got.
Interestingly, namco games are able to read S_PAD0 and send the server the correct access
code, but aimedb doesn't. Until somebody either enters the correct code manually, or scans
on a game that reads it correctly from the card, this will have to do. It's the same conversion
used on the big boy networks.
"""
req = ADBFelicaLookupRequest(data)
card = await self.data.card.get_card_by_idm(req.idm)
if not card:
ac = self.data.card.to_access_code(req.idm)
test = await self.data.card.get_card_by_access_code(ac)
if test:
await self.data.card.set_idm_by_access_code(ac, req.idm)
else:
ac = card['access_code']
self.logger.info(
f"idm {req.idm} ipm {req.pmm} -> access_code {ac}"
)
return ADBFelicaLookupResponse.from_req(req.head, ac)
async def handle_felica_register(self, data: bytes, resp_code: int) -> bytes:
"""
Used to register felica moble access codes. Will never be used on our network
because we don't implement felica_lookup properly.
"""
req = ADBFelicaLookupRequest(data)
ac = self.data.card.to_access_code(req.idm)
if self.config.server.allow_user_registration:
user_id = await self.data.user.create_user()
if user_id is None:
self.logger.error("Failed to register user!")
user_id = -1
else:
card_id = await self.data.card.create_card(user_id, ac)
if card_id is None:
self.logger.error("Failed to register card!")
user_id = -1
self.logger.info(
f"Register access code {ac} (IDm: {req.idm} PMm: {req.pmm}) -> user_id {user_id}"
)
else:
self.logger.info(
f"Registration blocked!: access code {ac} (IDm: {req.idm} PMm: {req.pmm})"
)
if user_id > 0:
await self.data.card.update_card_last_login(ac)
return ADBFelicaLookupResponse.from_req(req.head, ac)
async def handle_felica_lookup_ex(self, data: bytes, resp_code: int) -> bytes:
req = ADBFelicaLookup2Request(data)
user_id = None
card = await self.data.card.get_card_by_idm(req.idm)
if not card:
access_code = self.data.card.to_access_code(req.idm)
card = await self.data.card.get_card_by_access_code(access_code)
if card:
user_id = card['user']
await self.data.card.set_idm_by_access_code(access_code, req.idm)
else:
user_id = card['user']
access_code = card['access_code']
if user_id is None:
user_id = -1
self.logger.info(
f"idm {req.idm} ipm {req.pmm} -> access_code {access_code} user_id {user_id}"
)
resp = ADBFelicaLookup2Response.from_req(req.head, user_id, access_code)
if user_id > 0:
if card['is_banned'] and card['is_locked']:
resp.head.status = ADBStatus.BAN_SYS_USER
elif card['is_banned']:
resp.head.status = ADBStatus.BAN_SYS
elif card['is_locked']:
resp.head.status = ADBStatus.LOCK_USER
if user_id and user_id > 0 and self.config.aimedb.id_secret:
auth_key = create_sega_auth_key(user_id, req.head.game_id, req.head.store_id, req.head.keychip_id, self.config.aimedb.id_secret, self.config.aimedb.id_lifetime_seconds)
if auth_key is not None:
auth_key_extra_len = 256 - len(auth_key)
auth_key_full = auth_key.encode() + (b"\0" * auth_key_extra_len)
self.logger.debug(f"Generated auth token {auth_key}")
resp.auth_key = auth_key_full
if user_id and user_id > 0:
await self.data.card.update_card_last_login(access_code)
return resp
async def handle_campaign_clear(self, data: bytes, resp_code: int) -> ADBBaseResponse:
req = ADBCampaignClearRequest(data)
resp = ADBCampaignClearResponse.from_req(req.head)
# We don't support campaign stuff
return resp
async def handle_register(self, data: bytes, resp_code: int) -> bytes:
req = ADBLookupRequest(data)
user_id = -1
if self.config.server.allow_user_registration:
user_id = await self.data.user.create_user()
if user_id is None:
self.logger.error("Failed to register user!")
user_id = -1
else:
card_id = await self.data.card.create_card(user_id, req.access_code)
if card_id is None:
self.logger.error("Failed to register card!")
user_id = -1
self.logger.info(
f"Register access code {req.access_code} -> user_id {user_id}"
)
else:
self.logger.info(
f"Registration blocked!: access code {req.access_code}"
)
if user_id > 0:
if req.access_code.startswith("010") or req.access_code.startswith("3"):
await self.data.card.set_chip_id_by_access_code(req.access_code, req.serial_number)
self.logger.info(f"Attempt to set chip id to {req.serial_number} for access code {req.access_code}")
elif req.access_code.startswith("0008"):
idm = self.data.card.to_idm(req.access_code)
await self.data.card.set_idm_by_access_code(req.access_code, idm)
self.logger.info(f"Attempt to set IDm to {idm} for access code {req.access_code}")
resp = ADBLookupResponse.from_req(req.head, user_id)
if resp.user_id <= 0:
resp.head.status = ADBStatus.BAN_SYS # Closest we can get to a "You cannot register"
else:
await self.data.card.update_card_last_login(req.access_code)
return resp
# TODO: Save these in some capacity, as deemed relevant
async def handle_status_log(self, data: bytes, resp_code: int) -> bytes:
req = ADBStatusLogRequest(data)
self.logger.info(f"User {req.aime_id} logged {req.status.name} event")
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id, req.head.protocol_ver)
async def handle_log(self, data: bytes, resp_code: int) -> bytes:
req = ADBLogRequest(data)
self.logger.info(f"User {req.aime_id} logged {req.status.name} event, credit_ct: {req.credit_ct} bet_ct: {req.bet_ct} won_ct: {req.won_ct}")
return ADBBaseResponse(resp_code, 0x20, 1, req.head.game_id, req.head.store_id, req.head.keychip_id, req.head.protocol_ver)
async def handle_log_ex(self, data: bytes, resp_code: int) -> bytes:
req = ADBLogExRequest(data)
strs = []
self.logger.info(f"Recieved {req.num_logs} or {len(req.logs)} logs")
for x in range(req.num_logs):
self.logger.debug(f"User {req.logs[x].aime_id} logged {req.logs[x].status.name} event, credit_ct: {req.logs[x].credit_ct} bet_ct: {req.logs[x].bet_ct} won_ct: {req.logs[x].won_ct}")
return ADBLogExResponse.from_req(req.head)