forked from Dniel97/artemis
402 lines
15 KiB
Python
402 lines
15 KiB
Python
from typing import Dict, Any, Optional
|
|
import logging, coloredlogs
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
from starlette.requests import Request
|
|
from starlette.responses import PlainTextResponse
|
|
from datetime import datetime
|
|
from Crypto.Cipher import Blowfish
|
|
import pytz
|
|
|
|
from .config import CoreConfig
|
|
from .utils import Utils
|
|
from .title import TitleServlet
|
|
from .data import Data
|
|
from .const import *
|
|
|
|
class MuchaServlet:
|
|
mucha_registry: Dict[str, Dict[str, str]] = {}
|
|
def __init__(self, cfg: CoreConfig, cfg_dir: str) -> None:
|
|
self.config = cfg
|
|
self.config_dir = cfg_dir
|
|
|
|
self.logger = logging.getLogger("mucha")
|
|
log_fmt_str = "[%(asctime)s] Mucha | %(levelname)s | %(message)s"
|
|
log_fmt = logging.Formatter(log_fmt_str)
|
|
|
|
fileHandler = TimedRotatingFileHandler(
|
|
"{0}/{1}.log".format(self.config.server.log_dir, "mucha"),
|
|
when="d",
|
|
backupCount=10,
|
|
)
|
|
fileHandler.setFormatter(log_fmt)
|
|
|
|
consoleHandler = logging.StreamHandler()
|
|
consoleHandler.setFormatter(log_fmt)
|
|
|
|
self.logger.addHandler(fileHandler)
|
|
self.logger.addHandler(consoleHandler)
|
|
|
|
self.logger.setLevel(cfg.mucha.loglevel)
|
|
coloredlogs.install(level=cfg.mucha.loglevel, logger=self.logger, fmt=log_fmt_str)
|
|
|
|
self.data = Data(cfg)
|
|
|
|
for _, mod in TitleServlet.title_registry.items():
|
|
enabled, game_cds, netids = mod.get_mucha_info(self.config, self.config_dir)
|
|
if enabled:
|
|
for x in range(len(game_cds)):
|
|
self.mucha_registry[game_cds[x]] = { "netid_prefix": netids[x] }
|
|
|
|
self.logger.info(f"Serving {len(self.mucha_registry)} games")
|
|
|
|
async def handle_boardauth(self, request: Request) -> bytes:
|
|
bod = await request.body()
|
|
req_dict = self.mucha_preprocess(bod)
|
|
client_ip = Utils.get_ip_addr(request)
|
|
|
|
if req_dict is None:
|
|
self.logger.error(
|
|
f"Error processing mucha request {bod}"
|
|
)
|
|
return PlainTextResponse("RESULTS=000")
|
|
|
|
req = MuchaAuthRequest(req_dict)
|
|
self.logger.debug(f"Mucha request {vars(req)}")
|
|
|
|
if not req.gameCd or not req.gameVer or not req.sendDate or not req.countryCd or not req.serialNum:
|
|
self.logger.warn(f"Missing required fields - {vars(req)}")
|
|
return PlainTextResponse("RESULTS=000")
|
|
|
|
minfo = self.mucha_registry.get(req.gameCd, {})
|
|
|
|
if not minfo:
|
|
self.logger.warning(f"Unknown gameCd {req.gameCd} from {client_ip}")
|
|
return PlainTextResponse("RESULTS=000")
|
|
|
|
b_key = b""
|
|
for x in range(8):
|
|
b_key += req.sendDate[(x - 1) & 7].encode()
|
|
|
|
b_iv = b_key # what the fuck namco
|
|
|
|
cipher = Blowfish.new(b_key, Blowfish.MODE_CBC, b_iv)
|
|
try:
|
|
sn_decrypt = cipher.decrypt(bytes.fromhex(req.serialNum))[:12].decode()
|
|
except Exception as e:
|
|
self.logger.error(f"Decrypt SN {req.serialNum} failed! - {e}")
|
|
return PlainTextResponse("RESULTS=000")
|
|
|
|
self.logger.info(f"Boardauth request from {sn_decrypt} ({client_ip}) for {req.gameVer}")
|
|
|
|
resp = MuchaAuthResponse(
|
|
f"{self.config.server.hostname}{':' + str(self.config.server.port) if not self.config.server.is_using_proxy else ''}"
|
|
)
|
|
|
|
netid = minfo.get('netid_prefix', "ABxN") + sn_decrypt[5:]
|
|
|
|
cab = await self.data.arcade.get_machine(netid)
|
|
if cab:
|
|
arcade = await self.data.arcade.get_arcade(cab['id'])
|
|
if not arcade:
|
|
self.logger.error(f"Failed to get arcade with id {cab['id']}")
|
|
return PlainTextResponse("RESULTS=000")
|
|
|
|
resp.AREA_0 = arcade["region_id"] or AllnetJapanRegionId.AICHI.name
|
|
resp.AREA_0_EN = arcade["region_id"] or AllnetJapanRegionId.AICHI.name
|
|
resp.AREA_FULL_0 = arcade["region_id"] or AllnetJapanRegionId.AICHI.name
|
|
resp.AREA_FULL_0_EN = arcade["region_id"] or AllnetJapanRegionId.AICHI.name
|
|
|
|
resp.AREA_1 = arcade["country"] or cab['country'] or AllnetCountryCode.JAPAN.value
|
|
resp.AREA_1_EN = arcade["country"] or cab['country'] or AllnetCountryCode.JAPAN.value
|
|
resp.AREA_FULL_1 = arcade["country"] or cab['country'] or AllnetCountryCode.JAPAN.value
|
|
resp.AREA_FULL_1_EN = arcade["country"] or cab['country'] or AllnetCountryCode.JAPAN.value
|
|
|
|
resp.AREA_2 = arcade["city"] if arcade["city"] else ""
|
|
resp.AREA_2_EN = arcade["city"] if arcade["city"] else ""
|
|
resp.AREA_FULL_2 = arcade["city"] if arcade["city"] else ""
|
|
resp.AREA_FULL_2_EN = arcade["city"] if arcade["city"] else ""
|
|
|
|
resp.AREA_3 = ""
|
|
resp.AREA_3_EN = ""
|
|
resp.AREA_FULL_3 = ""
|
|
resp.AREA_FULL_3_EN = ""
|
|
|
|
resp.PREFECTURE_ID = arcade['region_id']
|
|
resp.COUNTRY_CD = arcade['country'] or cab['country'] or AllnetCountryCode.JAPAN.value
|
|
resp.PLACE_ID = req.placeId if req.placeId else f"{arcade['country'] or cab['country'] or AllnetCountryCode.JAPAN.value}{arcade['id']:04X}"
|
|
resp.SHOP_NAME = arcade['name']
|
|
resp.SHOP_NAME_EN = arcade['name']
|
|
resp.SHOP_NICKNAME = arcade['nickname']
|
|
resp.SHOP_NICKNAME_EN = arcade['nickname']
|
|
|
|
elif self.config.server.allow_unregistered_serials:
|
|
self.logger.info(f"Allow unknown serial {netid} ({sn_decrypt}) to auth")
|
|
|
|
else:
|
|
self.logger.warn(f'Auth failed for NetID {netid}')
|
|
return PlainTextResponse("RESULTS=000")
|
|
|
|
self.logger.debug(f"Mucha response {vars(resp)}")
|
|
|
|
return PlainTextResponse(self.mucha_postprocess(vars(resp)))
|
|
|
|
async def handle_updatecheck(self, request: Request) -> bytes:
|
|
bod = await request.body()
|
|
req_dict = self.mucha_preprocess(bod)
|
|
client_ip = Utils.get_ip_addr(request)
|
|
|
|
if req_dict is None:
|
|
self.logger.error(
|
|
f"Error processing mucha request {bod}"
|
|
)
|
|
return PlainTextResponse("RESULTS=000")
|
|
|
|
req = MuchaUpdateRequest(req_dict)
|
|
self.logger.info(f"Updatecheck request from {req.serialNum} ({client_ip}) for {req.gameVer}")
|
|
self.logger.debug(f"Mucha request {vars(req)}")
|
|
|
|
if req.gameCd not in self.mucha_registry:
|
|
self.logger.warning(f"Unknown gameCd {req.gameCd}")
|
|
return PlainTextResponse("RESULTS=000")
|
|
|
|
resp = MuchaUpdateResponse(req.gameVer, f"{self.config.server.hostname}{':' + str(self.config.server.port) if not self.config.server.is_using_proxy else ''}")
|
|
|
|
self.logger.debug(f"Mucha response {vars(resp)}")
|
|
|
|
return PlainTextResponse(self.mucha_postprocess(vars(resp)))
|
|
|
|
async def handle_dlstate(self, request: Request) -> bytes:
|
|
bod = await request.body()
|
|
req_dict = self.mucha_preprocess(bod)
|
|
client_ip = Utils.get_ip_addr(request)
|
|
|
|
if req_dict is None:
|
|
self.logger.error(
|
|
f"Error processing mucha request {bod}"
|
|
)
|
|
return PlainTextResponse("RESULTS=000")
|
|
|
|
req = MuchaDownloadStateRequest(req_dict)
|
|
self.logger.info(f"DownloadState request from {req.serialNum} ({client_ip}) for {req.gameCd} -> {req.updateVer}")
|
|
self.logger.debug(f"request {vars(req)}")
|
|
return PlainTextResponse("RESULTS=001")
|
|
|
|
def mucha_preprocess(self, data: bytes) -> Optional[Dict]:
|
|
try:
|
|
ret: Dict[str, Any] = {}
|
|
|
|
for x in data.decode().split("&"):
|
|
kvp = x.split("=")
|
|
if len(kvp) == 2:
|
|
ret[kvp[0]] = kvp[1]
|
|
|
|
return ret
|
|
|
|
except Exception:
|
|
self.logger.error(f"Error processing mucha request {data}")
|
|
return None
|
|
|
|
def mucha_postprocess(self, data: dict) -> Optional[bytes]:
|
|
try:
|
|
urlencode = "&".join(f"{k}={v}" for k, v in data.items())
|
|
|
|
return urlencode.encode()
|
|
|
|
except Exception:
|
|
self.logger.error("Error processing mucha response")
|
|
return None
|
|
|
|
|
|
class MuchaAuthRequest:
|
|
def __init__(self, request: Dict) -> None:
|
|
# gameCd + boardType + countryCd + version
|
|
self.gameVer = request.get("gameVer", "")
|
|
self.sendDate = request.get("sendDate", "") # %Y%m%d
|
|
self.serialNum = request.get("serialNum", "")
|
|
self.gameCd = request.get("gameCd", "")
|
|
self.boardType = request.get("boardType", "")
|
|
self.boardId = request.get("boardId", "")
|
|
self.mac = request.get("mac", "")
|
|
self.placeId = request.get("placeId", "")
|
|
self.storeRouterIp = request.get("storeRouterIp", "")
|
|
self.countryCd = request.get("countryCd", "")
|
|
self.useToken = request.get("useToken", "")
|
|
self.allToken = request.get("allToken", "")
|
|
|
|
|
|
class MuchaAuthResponse:
|
|
def __init__(self, mucha_url: str) -> None:
|
|
self.RESULTS = "001"
|
|
self.AUTH_INTERVAL = "86400"
|
|
self.SERVER_TIME = datetime.strftime(datetime.now(), "%Y%m%d%H%M")
|
|
self.UTC_SERVER_TIME = datetime.strftime(datetime.now(pytz.UTC), "%Y%m%d%H%M")
|
|
|
|
self.CHARGE_URL = f"https://{mucha_url}/charge/"
|
|
self.FILE_URL = f"https://{mucha_url}/file/"
|
|
self.URL_1 = f"https://{mucha_url}/url1/"
|
|
self.URL_2 = f"https://{mucha_url}/url2/"
|
|
self.URL_3 = f"https://{mucha_url}/url3/"
|
|
|
|
self.PLACE_ID = "JPN123"
|
|
self.COUNTRY_CD = "JPN"
|
|
self.SHOP_NAME = "TestShop!"
|
|
self.SHOP_NICKNAME = "TestShop"
|
|
self.AREA_0 = "008"
|
|
self.AREA_1 = "009"
|
|
self.AREA_2 = "010"
|
|
self.AREA_3 = "011"
|
|
self.AREA_FULL_0 = ""
|
|
self.AREA_FULL_1 = ""
|
|
self.AREA_FULL_2 = ""
|
|
self.AREA_FULL_3 = ""
|
|
|
|
self.SHOP_NAME_EN = "TestShop!"
|
|
self.SHOP_NICKNAME_EN = "TestShop"
|
|
self.AREA_0_EN = "008"
|
|
self.AREA_1_EN = "009"
|
|
self.AREA_2_EN = "010"
|
|
self.AREA_3_EN = "011"
|
|
self.AREA_FULL_0_EN = ""
|
|
self.AREA_FULL_1_EN = ""
|
|
self.AREA_FULL_2_EN = ""
|
|
self.AREA_FULL_3_EN = ""
|
|
|
|
self.PREFECTURE_ID = "1"
|
|
self.EXPIRATION_DATE = "null"
|
|
self.USE_TOKEN = "0"
|
|
self.CONSUME_TOKEN = "0"
|
|
self.DONGLE_FLG = "1"
|
|
self.FORCE_BOOT = "0"
|
|
|
|
|
|
class MuchaUpdateRequest:
|
|
def __init__(self, request: Dict) -> None:
|
|
self.gameVer = request.get("gameVer", "")
|
|
self.gameCd = request.get("gameCd", "")
|
|
self.serialNum = request.get("serialNum", "")
|
|
self.countryCd = request.get("countryCd", "")
|
|
self.placeId = request.get("placeId", "")
|
|
self.storeRouterIp = request.get("storeRouterIp", "")
|
|
|
|
|
|
class MuchaUpdateResponse:
|
|
def __init__(self, game_ver: str, mucha_url: str) -> None:
|
|
self.RESULTS = "001"
|
|
self.EXE_VER = game_ver
|
|
|
|
self.UPDATE_VER_1 = game_ver
|
|
self.UPDATE_URL_1 = f"http://{mucha_url}/updUrl1/"
|
|
self.UPDATE_SIZE_1 = "20"
|
|
|
|
self.CHECK_CRC_1 = "0000000000000000"
|
|
self.CHECK_URL_1 = f"http://{mucha_url}/checkUrl/"
|
|
self.CHECK_SIZE_1 = "20"
|
|
|
|
self.INFO_SIZE_1 = "0"
|
|
self.COM_SIZE_1 = "0"
|
|
self.COM_TIME_1 = "0"
|
|
self.LAN_INFO_SIZE_1 = "0"
|
|
|
|
self.USER_ID = ""
|
|
self.PASSWORD = ""
|
|
|
|
"""
|
|
RESULTS
|
|
EXE_VER
|
|
|
|
UPDATE_VER_%d
|
|
UPDATE_URL_%d
|
|
UPDATE_SIZE_%d
|
|
|
|
CHECK_CRC_%d
|
|
CHECK_URL_%d
|
|
CHECK_SIZE_%d
|
|
|
|
INFO_SIZE_1
|
|
COM_SIZE_1
|
|
COM_TIME_1
|
|
LAN_INFO_SIZE_1
|
|
|
|
USER_ID
|
|
PASSWORD
|
|
"""
|
|
class MuchaUpdateResponseStub:
|
|
def __init__(self, game_ver: str) -> None:
|
|
self.RESULTS = "001"
|
|
self.UPDATE_VER_1 = game_ver
|
|
|
|
class MuchaDownloadStateRequest:
|
|
def __init__(self, request: Dict) -> None:
|
|
self.gameCd = request.get("gameCd", "")
|
|
self.updateVer = request.get("updateVer", "")
|
|
self.serialNum = request.get("serialNum", "")
|
|
self.fileSize = request.get("fileSize", "")
|
|
self.compFileSize = request.get("compFileSize", "")
|
|
self.boardId = request.get("boardId", "")
|
|
self.placeId = request.get("placeId", "")
|
|
self.storeRouterIp = request.get("storeRouterIp", "")
|
|
|
|
class MuchaDownloadErrorRequest:
|
|
def __init__(self, request: Dict) -> None:
|
|
self.gameCd = request.get("gameCd", "")
|
|
self.updateVer = request.get("updateVer", "")
|
|
self.serialNum = request.get("serialNum", "")
|
|
self.downloadUrl = request.get("downloadUrl", "")
|
|
self.errCd = request.get("errCd", "")
|
|
self.errMessage = request.get("errMessage", "")
|
|
self.boardId = request.get("boardId", "")
|
|
self.placeId = request.get("placeId", "")
|
|
self.storeRouterIp = request.get("storeRouterIp", "")
|
|
|
|
class MuchaRegiAuthRequest:
|
|
def __init__(self, request: Dict) -> None:
|
|
self.gameCd = request.get("gameCd", "")
|
|
self.serialNum = request.get("serialNum", "") # Encrypted
|
|
self.countryCd = request.get("countryCd", "")
|
|
self.registrationCd = request.get("registrationCd", "")
|
|
self.sendDate = request.get("sendDate", "")
|
|
self.useToken = request.get("useToken", "")
|
|
self.allToken = request.get("allToken", "")
|
|
self.placeId = request.get("placeId", "")
|
|
self.storeRouterIp = request.get("storeRouterIp", "")
|
|
|
|
class MuchaRegiAuthResponse:
|
|
def __init__(self) -> None:
|
|
self.RESULTS = "001" # 001 = success, 099, 098, 097 = fail, others = fail
|
|
self.ALL_TOKEN = "0" # Encrypted
|
|
self.ADD_TOKEN = "0" # Encrypted
|
|
|
|
class MuchaTokenStateRequest:
|
|
def __init__(self, request: Dict) -> None:
|
|
self.gameCd = request.get("gameCd", "")
|
|
self.serialNum = request.get("serialNum", "")
|
|
self.countryCd = request.get("countryCd", "")
|
|
self.useToken = request.get("useToken", "")
|
|
self.allToken = request.get("allToken", "")
|
|
self.placeId = request.get("placeId", "")
|
|
self.storeRouterIp = request.get("storeRouterIp", "")
|
|
|
|
class MuchaTokenStateResponse:
|
|
def __init__(self) -> None:
|
|
self.RESULTS = "001"
|
|
|
|
class MuchaTokenMarginStateRequest:
|
|
def __init__(self, request: Dict) -> None:
|
|
self.gameCd = request.get("gameCd", "")
|
|
self.serialNum = request.get("serialNum", "")
|
|
self.countryCd = request.get("countryCd", "")
|
|
self.placeId = request.get("placeId", "")
|
|
self.limitLowerToken = request.get("limitLowerToken", 0)
|
|
self.limitUpperToken = request.get("limitUpperToken", 0)
|
|
self.settlementMonth = request.get("settlementMonth", 0)
|
|
|
|
class MuchaTokenMarginStateResponse:
|
|
def __init__(self) -> None:
|
|
self.RESULTS = "001"
|
|
self.LIMIT_LOWER_TOKEN = 0
|
|
self.LIMIT_UPPER_TOKEN = 0
|
|
self.LAST_SETTLEMENT_MONTH = 0
|
|
self.LAST_LIMIT_LOWER_TOKEN = 0
|
|
self.LAST_LIMIT_UPPER_TOKEN = 0
|
|
self.SETTLEMENT_MONTH = 0
|