feat(core): Add support for Maimai DX CN

- Add ChimeDB configuration and related processing logic
- Implement network initialization and file download functionality for Maimai DX CN
- Update game version constants and configurations to support the CN version
- Optimize data encryption and decryption processes to meet CN requirements
This commit is contained in:
2025-01-08 18:57:22 +08:00
parent 0cf41ff389
commit 4e3af1b80f
9 changed files with 639 additions and 65 deletions

View File

@ -1,3 +1,5 @@
import string
import pytz import pytz
import base64 import base64
import zlib import zlib
@ -21,6 +23,7 @@ from Crypto.Signature import PKCS1_v1_5
from os import path, environ, mkdir, access, W_OK from os import path, environ, mkdir, access, W_OK
from .config import CoreConfig from .config import CoreConfig
from .crypto import CipherAES
from .utils import Utils from .utils import Utils
from .data import Data from .data import Data
from .const import * from .const import *
@ -311,6 +314,186 @@ class AllnetServlet:
return PlainTextResponse(resp_str) return PlainTextResponse(resp_str)
async def handle_cn_net_initialize(self, request: Request):
ua = request.headers.get("User-Agent", "")
game_id = ua.split(";")[0]
if game_id not in self.config.allnet.cn_allnet_encrypt_keys:
self.logger.error(f"Unrecognised game {game_id} attempted cn_net_initialize from {request.client.host}")
return PlainTextResponse(
content="",
status_code=404,
)
cryptor = CipherAES(
bytes.fromhex(self.config.allnet.cn_allnet_encrypt_keys[game_id][0]),
bytes.fromhex(self.config.allnet.cn_allnet_encrypt_keys[game_id][1]),
)
request_ip = Utils.get_ip_addr(request)
data = self.from_cn_request(await request.body(), cryptor)
self.logger.info(type(data))
try:
req_urlencode = data
req_dict = self.allnet_req_to_dict(req_urlencode)
self.logger.info(f"Allnet CN request: {req_urlencode}")
if req_dict is None:
raise AllnetRequestException()
req = AllnetCnInitializeRequest(req_dict[0])
# Validate the request. Currently we only validate the fields we plan on using
if not req.game_id or not req.ver or not req.serial:
raise AllnetRequestException(
f"Bad auth request params from {request_ip} - {vars(req)}"
)
except AllnetRequestException as e:
if e.message != "":
self.logger.error(e)
return PlainTextResponse()
resp = AllnetCnInitializeResponse(req.token)
self.logger.debug(f"Allnet request: {vars(req)}")
machine = await self.data.arcade.get_machine(req.serial)
if machine is None and not self.config.server.allow_unregistered_serials:
msg = f"Unrecognised serial {req.serial} attempted allnet auth from {request_ip}."
await self.data.base.log_event(
"allnet", "ALLNET_AUTH_UNKNOWN_SERIAL", logging.WARN, msg
)
self.logger.warning(msg)
resp.stat = ALLNET_STAT.bad_machine.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
self.logger.debug(f"Allnet response: {resp_dict}")
return PlainTextResponse(self.to_cn_response(resp_dict, cryptor))
if machine is not None:
arcade = await self.data.arcade.get_arcade(machine["arcade"])
if self.config.server.check_arcade_ip:
if arcade["ip"] and arcade["ip"] is not None and arcade["ip"] != req.ip:
msg = f"Serial {req.serial} attempted allnet auth from bad IP {req.ip} (expected {arcade['ip']})."
await self.data.base.log_event(
"allnet", "ALLNET_AUTH_BAD_IP", logging.ERROR, msg
)
self.logger.warning(msg)
resp.stat = ALLNET_STAT.bad_shop.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
elif (not arcade["ip"] or arcade["ip"] is None) and self.config.server.strict_ip_checking:
msg = f"Serial {req.serial} attempted allnet auth from bad IP {req.ip}, but arcade {arcade['id']} has no IP set! (strict checking enabled)."
await self.data.base.log_event(
"allnet", "ALLNET_AUTH_NO_SHOP_IP", logging.ERROR, msg
)
self.logger.warning(msg)
resp.stat = ALLNET_STAT.bad_shop.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
country = (
arcade["country"] if machine["country"] is None else machine["country"]
)
if country is None:
country = AllnetCountryCode.JAPAN.value
resp.country = country
resp.place_id = f"{arcade['id']:04X}"
resp.allnet_id = machine["id"]
resp.name = arcade["name"] if arcade["name"] is not None else ""
resp.nickname = arcade["nickname"] if arcade["nickname"] is not None else ""
resp.region0 = (
arcade["region_id"]
if arcade["region_id"] is not None
else AllnetJapanRegionId.AICHI.value
)
resp.region_name0 = (
arcade["state"]
if arcade["state"] is not None
else AllnetJapanRegionId.AICHI.name
)
resp.region_name1 = (
arcade["country"]
if arcade["country"] is not None
else AllnetCountryCode.JAPAN.value
)
resp.region_name2 = arcade["city"] if arcade["city"] is not None else ""
resp.client_timezone = ( # lmao
arcade["timezone"] if arcade["timezone"] is not None else "+0900" if req.format_ver == 3 else "+09:00"
)
if req.game_id not in TitleServlet.title_registry:
if not self.config.server.is_develop:
msg = f"Unrecognised game {req.game_id} attempted allnet auth from {request_ip}."
await self.data.base.log_event(
"allnet", "ALLNET_AUTH_UNKNOWN_GAME", logging.WARN, msg
)
self.logger.warning(msg)
resp.stat = ALLNET_STAT.bad_game.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
self.logger.debug(f"Allnet response: {resp_dict}")
return PlainTextResponse(self.to_cn_response(resp_dict, cryptor))
else:
self.logger.info(
f"Allowed unknown game {req.game_id} v{req.ver} to authenticate from {request_ip} due to 'is_develop' being enabled. S/N: {req.serial}"
)
resp.uri1 = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/"
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
self.logger.debug(f"Allnet response: {resp_dict}")
return PlainTextResponse(self.to_cn_response(resp_dict, cryptor))
elif self.config.allnet.enable_game_id_whitelist and req.game_id not in self.config.allnet.game_id_whitelist:
if not self.config.server.is_develop:
msg = f"Disallowed game {req.game_id} attempted allnet auth from {request_ip}."
await self.data.base.log_event(
"allnet", "ALLNET_AUTH_DISALLOWED_GAME", logging.WARN, msg
)
self.logger.warning(msg)
resp.stat = ALLNET_STAT.bad_game.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
self.logger.debug(f"Allnet response: {resp_dict}")
return PlainTextResponse(self.to_cn_response(resp_dict, cryptor))
else:
self.logger.info(
f"Allowed the game {req.game_id} v{req.ver} to authenticate which was disallowed from {request_ip} due to 'is_develop' being enabled. S/N: {req.serial}"
)
resp.uri1 = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/"
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
self.logger.debug(f"Allnet response: {resp_dict}")
return PlainTextResponse(self.to_cn_response(resp_dict, cryptor))
int_ver = req.ver.replace(".", "")
try:
resp.uri1, _ = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver),
req.serial)
except Exception as e:
self.logger.error(f"Error running get_allnet_info for {req.game_id} - {e}")
resp.stat = ALLNET_STAT.bad_game.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
# resp.uri1 = "https://maimai-gm.wahlap.com:42081/"
msg = f"{req.serial} authenticated from {request_ip}: {req.game_id} v{req.ver}"
await self.data.base.log_event("allnet", "ALLNET_AUTH_SUCCESS", logging.INFO, msg)
self.logger.info(msg)
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
self.logger.debug(f"Allnet response: {resp_dict}")
return PlainTextResponse(self.to_cn_response(resp_dict, cryptor))
async def handle_dlorder(self, request: Request): async def handle_dlorder(self, request: Request):
request_ip = Utils.get_ip_addr(request) request_ip = Utils.get_ip_addr(request)
pragma_header = request.headers.get('Pragma', "") pragma_header = request.headers.get('Pragma', "")
@ -396,6 +579,75 @@ class AllnetServlet:
return PlainTextResponse(res_str) return PlainTextResponse(res_str)
async def handle_cn_dlorder(self, request: Request):
ua = request.headers.get("User-Agent", "")
game_id = ua.split(";")[0]
if game_id not in self.config.allnet.cn_allnet_encrypt_keys:
self.logger.error(f"Unrecognised game {game_id} attempted cn_net_initialize from {request.client.host}")
return PlainTextResponse(
content="",
status_code=404,
)
cryptor = CipherAES(
bytes.fromhex(self.config.allnet.cn_allnet_encrypt_keys[game_id][0]),
bytes.fromhex(self.config.allnet.cn_allnet_encrypt_keys[game_id][1]),
)
request_ip = Utils.get_ip_addr(request)
data = self.from_cn_request(await request.body(), cryptor)
self.logger.info(f"CN DownloadOrder from {request_ip} -> {data}")
try:
req_urlencode = data
req_dict = self.allnet_req_to_dict(req_urlencode)
if req_dict is None:
raise AllnetRequestException()
req = AllnetDownloadOrderCnRequest(req_dict[0])
# Validate the request. Currently we only validate the fields we plan on using
if not req.game_id or not req.ver or not req.serial:
raise AllnetRequestException(
f"Bad download request params from {request_ip} - {vars(req)}"
)
except AllnetRequestException as e:
if e.message != "":
self.logger.error(e)
return PlainTextResponse()
self.logger.info(
f"DownloadOrder from {request_ip} -> {req.game_id} v{req.ver} serial {req.serial}"
)
resp = AllnetDownloadOrderCnResponse()
if (
not self.config.allnet.allow_online_updates
or not self.config.allnet.update_cfg_folder
):
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return PlainTextResponse(self.to_cn_response(resp_dict, cryptor))
else: # TODO: Keychip check
if path.exists(
f"{self.config.allnet.update_cfg_folder}/{req.game_id}-{req.ver.replace('.', '')}-app.ini"
):
resp.uri = f"http://{self.config.server.hostname}:{self.config.server.port}/dl/ini/{req.game_id}-{req.ver.replace('.', '')}-app.ini"
if path.exists(
f"{self.config.allnet.update_cfg_folder}/{req.game_id}-{req.ver.replace('.', '')}-opt.ini"
):
resp.uri += f"|http://{self.config.server.hostname}:{self.config.server.port}/dl/ini/{req.game_id}-{req.ver.replace('.', '')}-opt.ini"
self.logger.debug(f"Sending download uri {resp.uri}")
await self.data.base.log_event("allnet", "DLORDER_REQ_SUCCESS", logging.INFO,
f"{Utils.get_ip_addr(request)} requested DL Order for {req.serial} {req.game_id} v{req.ver}")
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return PlainTextResponse(self.to_cn_response(resp_dict, cryptor))
async def handle_dlorder_ini(self, request: Request) -> bytes: async def handle_dlorder_ini(self, request: Request) -> bytes:
req_file = request.path_params.get("file", "").replace("%0A", "").replace("\n", "") req_file = request.path_params.get("file", "").replace("%0A", "").replace("\n", "")
request_ip = Utils.get_ip_addr(request) request_ip = Utils.get_ip_addr(request)
@ -517,6 +769,16 @@ class AllnetServlet:
zipped = zlib.compress(unzipped) zipped = zlib.compress(unzipped)
return base64.b64encode(zipped) return base64.b64encode(zipped)
def get_pure_string(self, old_str: str) -> str:
return ''.join([char for char in old_str if char in string.printable])
def from_cn_request(self, request_data, cryptor) -> str:
return self.get_pure_string(cryptor.decrypt(request_data)[16:].decode("utf-8"))
def to_cn_response(self, response_dict, cryptor: CipherAES) -> bytes:
return cryptor.encrypt(b'\x00' * 16 + urllib.parse.unquote(urllib.parse.urlencode(response_dict)).encode("utf-8") + b"\r\n")
class BillingServlet: class BillingServlet:
def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None: def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None:
self.config = core_cfg self.config = core_cfg
@ -746,6 +1008,39 @@ class AllnetPowerOnResponse2(AllnetPowerOnResponse):
self.timezone = "+09:00" self.timezone = "+09:00"
self.res_class = "PowerOnResponseV2" self.res_class = "PowerOnResponseV2"
class AllnetCnInitializeRequest:
def __init__(self, req: Dict) -> None:
self.game_id: str = req.get("title_id", "")
self.ver: str = req.get("title_ver", "")
self.serial: str = req.get("client_id", "")
self.ip: str = req.get("ip", "127.0.0.1")
self.firm_ver: str = req.get("title_ver", "")
self.boot_ver: str = req.get("title_ver", "")
self.encode: str = req.get("encode", "EUC-JP")
self.hops = int(req.get("hops", "-1"))
self.format_ver = float(req.get("format_ver", "1.00"))
self.token: str = req.get("token", "0")
class AllnetCnInitializeResponse:
def __init__(self, token: str) -> None:
self.result = '1'
self.place_id = "0123"
self.uri1 = ""
self.uri2 = ""
self.name = "ARTEMiS"
self.nickname = "ARTEMiS"
self.setting = "1"
self.region0 = "1"
self.region_name0 = "W"
self.country = AllnetCountryCode.CHINA.value
self.location_type = "1"
self.utc_time = datetime.now(tz=pytz.timezone("UTC")).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
self.client_timezone = " 0800"
self.res_ver = "3"
self.token = token
class AllnetDownloadOrderRequest: class AllnetDownloadOrderRequest:
def __init__(self, req: Dict) -> None: def __init__(self, req: Dict) -> None:
self.game_id = req.get("game_id", "") self.game_id = req.get("game_id", "")
@ -753,6 +1048,18 @@ class AllnetDownloadOrderRequest:
self.serial = req.get("serial", "") self.serial = req.get("serial", "")
self.encode = req.get("encode", "") self.encode = req.get("encode", "")
class AllnetDownloadOrderCnRequest:
def __init__(self, req: Dict) -> None:
self.game_id = req.get("title_id", "")
self.ver = req.get("title_ver", "")
self.serial = req.get("client_id", "")
self.encode = req.get("encode", "")
class AllnetDownloadOrderCnResponse:
def __init__(self, stat: int = 1, uri: str = "|") -> None:
self.result = stat
self.uri = uri
class AllnetDownloadOrderResponse: class AllnetDownloadOrderResponse:
def __init__(self, stat: int = 1, serial: str = "", uri: str = "null") -> None: def __init__(self, stat: int = 1, serial: str = "", uri: str = "null") -> None:
self.stat = stat self.stat = stat
@ -1000,6 +1307,8 @@ route_lst = [
Route("/sys/servlet/DownloadOrder", allnet.handle_dlorder, methods=["GET", "POST"]), Route("/sys/servlet/DownloadOrder", allnet.handle_dlorder, methods=["GET", "POST"]),
Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]), Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]),
Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]), Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]),
Route("/net/initialize", allnet.handle_cn_net_initialize, methods=["POST"]),
Route("/net/delivery/instruction", allnet.handle_cn_dlorder, methods=["GET", "POST"]),
Route("/naomitest.html", allnet.handle_naomitest), Route("/naomitest.html", allnet.handle_naomitest),
] ]

View File

@ -11,6 +11,7 @@ from typing import List
from core import CoreConfig, TitleServlet, MuchaServlet from core import CoreConfig, TitleServlet, MuchaServlet
from core.allnet import AllnetServlet, BillingServlet from core.allnet import AllnetServlet, BillingServlet
from core.chimedb import ChimeServlet
from core.frontend import FrontendServlet from core.frontend import FrontendServlet
async def dummy_rt(request: Request): async def dummy_rt(request: Request):
@ -79,6 +80,8 @@ if not cfg.allnet.standalone:
Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]), Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]),
Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]), Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]),
Route("/naomitest.html", allnet.handle_naomitest), Route("/naomitest.html", allnet.handle_naomitest),
Route("/net/initialize", allnet.handle_cn_net_initialize, methods=["POST"]),
Route("/net/delivery/instruction", allnet.handle_cn_dlorder, methods=["GET", "POST"]),
] ]
if cfg.allnet.allow_online_updates: if cfg.allnet.allow_online_updates:
@ -87,6 +90,14 @@ if not cfg.allnet.standalone:
Route("/dl/ini/{file:str}", allnet.handle_dlorder_ini), Route("/dl/ini/{file:str}", allnet.handle_dlorder_ini),
] ]
if cfg.chimedb.enable:
chimedb = ChimeServlet(cfg, cfg_dir)
route_lst += [
Route("/wc_aime/api/alive_check", chimedb.handle_qr_alive, methods=["POST"]),
Route("/qrcode/api/alive_check", chimedb.handle_qr_alive, methods=["POST"]),
Route("/wc_aime/api/get_data", chimedb.handle_qr_lookup, methods=["POST"])
]
for code, game in title.title_registry.items(): for code, game in title.title_registry.items():
route_lst += game.get_routes() route_lst += game.get_routes()

139
core/chimedb.py Normal file
View File

@ -0,0 +1,139 @@
import hashlib
import json
import logging
from enum import Enum
from logging.handlers import TimedRotatingFileHandler
import coloredlogs
from starlette.responses import PlainTextResponse
from starlette.requests import Request
from core.config import CoreConfig
from core.data import Data
class ChimeDBStatus(Enum):
NONE = 0
READER_SETUP_FAIL = 1
READER_ACCESS_FAIL = 2
READER_INCOMPATIBLE = 3
DB_RESOLVE_FAIL = 4
DB_ACCESS_TIMEOUT = 5
DB_ACCESS_FAIL = 6
AIME_ID_INVALID = 7
NO_BOARD_INFO = 8
LOCK_BAN_SYSTEM_USER = 9
LOCK_BAN_SYSTEM = 10
LOCK_BAN_USER = 11
LOCK_BAN = 12
LOCK_SYSTEM_USER = 13
LOCK_SYSTEM = 14
LOCK_USER = 15
class ChimeServlet:
def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None:
self.config = core_cfg
self.config_folder = cfg_folder
self.data = Data(core_cfg)
self.logger = logging.getLogger("chimedb")
if not hasattr(self.logger, "initted"):
log_fmt_str = "[%(asctime)s] Chimedb | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.config.server.log_dir, "chimedb"),
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.config.aimedb.loglevel)
coloredlogs.install(
level=core_cfg.aimedb.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.initted = True
if not core_cfg.chimedb.key:
self.logger.error("!!!KEY NOT SET!!!")
exit(1)
self.logger.info("Serving")
async def handle_qr_alive(self, request: Request):
return PlainTextResponse("alive")
async def handle_qr_lookup(self, request: Request) -> bytes:
req = json.loads(await request.body())
access_code = req["qrCode"][-20:]
timestamp = req["timestamp"]
try:
userId = await self._lookup(access_code)
data = json.dumps({
"userID": userId,
"errorID": 0,
"timestamp": timestamp,
"key": self._hash_key(userId, timestamp)
})
except Exception as e:
self.logger.error(e.with_traceback(None))
data = json.dumps({
"userID": -1,
"errorID": ChimeDBStatus.DB_ACCESS_FAIL,
"timestamp": timestamp,
"key": self._hash_key(-1, timestamp)
})
return PlainTextResponse(data)
def _hash_key(self, chip_id, timestamp):
input_string = f"{chip_id}{timestamp}{self.config.chimedb.key}"
hash_object = hashlib.sha256(input_string.encode('utf-8'))
hex_dig = hash_object.hexdigest()
formatted_hex = format(int(hex_dig, 16), '064x').upper()
return formatted_hex
async def _lookup(self, access_code):
user_id = await self.data.card.get_user_id_from_card(access_code)
self.logger.info(f"access_code {access_code} -> user_id {user_id}")
if not user_id or user_id <= 0:
user_id = await self._register(access_code)
return user_id
async def _register(self, access_code):
user_id = -1
if self.config.server.allow_user_registration:
user_id = await self.data.user.create_user()
if user_id is None:
self.logger.error("Failed to register user!")
user_id = -1
else:
card_id = await self.data.card.create_card(user_id, access_code)
if card_id is None:
self.logger.error("Failed to register card!")
user_id = -1
self.logger.info(
f"Register access code {access_code} -> user_id {user_id}"
)
else:
self.logger.info(f"Registration blocked!: access code {access_code}")
return user_id

View File

@ -346,6 +346,12 @@ class AllnetConfig:
return CoreConfig.get_config_field( return CoreConfig.get_config_field(
self.__config, "core", "allnet", "standalone", default=False self.__config, "core", "allnet", "standalone", default=False
) )
@property
def cn_allnet_encrypt_keys(self) -> dict[str, list[str]]:
return CoreConfig.get_config_field(
self.__config, "core", "allnet", "cn_allnet_encrypt_keys", default={}
)
@property @property
def port(self) -> int: def port(self) -> int:
@ -475,6 +481,30 @@ class MuchaConfig:
) )
) )
class ChimedbConfig:
def __init__(self, parent_config: "CoreConfig") -> None:
self.__config = parent_config
@property
def enable(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "core", "chimedb", "enable", default=True
)
@property
def loglevel(self) -> int:
return CoreConfig.str_to_loglevel(
CoreConfig.get_config_field(
self.__config, "core", "chimedb", "loglevel", default="info"
)
)
@property
def key(self) -> str:
return CoreConfig.get_config_field(
self.__config, "core", "chimedb", "key", default=""
)
class CoreConfig(dict): class CoreConfig(dict):
def __init__(self) -> None: def __init__(self) -> None:
self.server = ServerConfig(self) self.server = ServerConfig(self)
@ -485,6 +515,8 @@ class CoreConfig(dict):
self.billing = BillingConfig(self) self.billing = BillingConfig(self)
self.aimedb = AimedbConfig(self) self.aimedb = AimedbConfig(self)
self.mucha = MuchaConfig(self) self.mucha = MuchaConfig(self)
self.chimedb = ChimedbConfig(self)
@classmethod @classmethod
def str_to_loglevel(cls, level_str: str): def str_to_loglevel(cls, level_str: str):

42
core/crypto.py Normal file
View File

@ -0,0 +1,42 @@
import zlib
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import base64
class CipherAES:
def __init__(self,AES_KEY,AES_IV, BLOCK_SIZE=128, KEY_SIZE=256):
self.BLOCK_SIZE = BLOCK_SIZE
self.KEY_SIZE = KEY_SIZE
self.AES_KEY = AES_KEY
self.AES_IV = AES_IV
def _pad(self,data):
block_size = self.BLOCK_SIZE // 8
padding_length = block_size - len(data) % block_size
return data + bytes([padding_length]) * padding_length
def _unpad(self, padded_data):
pad_char = padded_data[-1]
if not 1 <= pad_char <= self.BLOCK_SIZE // 8:
raise Exception("Invalid padding")
return padded_data[:-pad_char]
def encrypt(self, plaintext):
if isinstance(plaintext, str):
plaintext = plaintext.encode('utf-8')
backend = default_backend()
cipher = Cipher(algorithms.AES(self.AES_KEY), modes.CBC(self.AES_IV), backend=backend)
encryptor = cipher.encryptor()
padded_plaintext = self._pad(plaintext)
ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
return ciphertext
def decrypt(self, ciphertext):
backend = default_backend()
cipher = Cipher(algorithms.AES(self.AES_KEY), modes.CBC(self.AES_IV), backend=backend)
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(ciphertext) + decryptor.finalize()
return self._unpad(decrypted_data)

View File

@ -45,6 +45,10 @@ allnet:
loglevel: "info" loglevel: "info"
allow_online_updates: False allow_online_updates: False
update_cfg_folder: "" update_cfg_folder: ""
cn_allnet_encrypt_keys:
GAMEID:
- '00000000000000000000000000000000'
- '00000000000000000000000000000000'
billing: billing:
standalone: True standalone: True
@ -64,4 +68,8 @@ aimedb:
id_lifetime_seconds: 86400 id_lifetime_seconds: 86400
mucha: mucha:
loglevel: "info" loglevel: "info"x
chimedb:
enable: False
key: ""

View File

@ -29,6 +29,7 @@ class Mai2Constants:
GAME_CODE_FINALE = "SDEY" GAME_CODE_FINALE = "SDEY"
GAME_CODE_DX = "SDEZ" GAME_CODE_DX = "SDEZ"
GAME_CODE_DX_INT = "SDGA" GAME_CODE_DX_INT = "SDGA"
GAME_CODE_DX_CHN = "SDGA"
CONFIG_NAME = "mai2.yaml" CONFIG_NAME = "mai2.yaml"
@ -56,6 +57,7 @@ class Mai2Constants:
VER_MAIMAI_DX_FESTIVAL_PLUS = 20 VER_MAIMAI_DX_FESTIVAL_PLUS = 20
VER_MAIMAI_DX_BUDDIES = 21 VER_MAIMAI_DX_BUDDIES = 21
VER_MAIMAI_DX_BUDDIES_PLUS = 22 VER_MAIMAI_DX_BUDDIES_PLUS = 22
VER_MAIMAI_DX_PRISM = 23
VERSION_STRING = ( VERSION_STRING = (
"maimai", "maimai",
@ -80,7 +82,8 @@ class Mai2Constants:
"maimai DX FESTiVAL", "maimai DX FESTiVAL",
"maimai DX FESTiVAL PLUS", "maimai DX FESTiVAL PLUS",
"maimai DX BUDDiES", "maimai DX BUDDiES",
"maimai DX BUDDiES PLUS" "maimai DX BUDDiES PLUS",
"maimai DX PRiSM",
) )
@classmethod @classmethod

View File

@ -57,13 +57,32 @@ class Mai2DX(Mai2Base):
"requestInterval": 1800, "requestInterval": 1800,
"rebootStartTime": reboot_start, "rebootStartTime": reboot_start,
"rebootEndTime": reboot_end, "rebootEndTime": reboot_end,
"rebootInterval": 0,
"movieUploadLimit": 100, "movieUploadLimit": 100,
"movieStatus": 1, "movieStatus": 1 if self.game_config.uploads.movies else 0,
"movieServerUri": "", "movieServerUri": "",
"deliverServerUri": "", "deliverServerUri": "",
"oldServerUri": self.old_server, "oldServerUri": self.old_server,
"usbDlServerUri": "", "usbDlServerUri": "",
"rebootInterval": 0, "maxCountRivalMusic": 100,
"replicationDelayLimit": 10,
"exclusionStartTime": "00:00:00",
"exclusionEndTime": "00:00:00",
"pingDisable": True,
"packetTimeout": 20000,
"packetTimeoutLong": 60000,
"packetRetryCount": 10,
"userDataDlErrTimeout": 300000,
"userDataDlErrRetryCount": 1000,
"userDataDlErrSamePacketRetryCount": 1000,
"userDataUpSkipTimeout": 0,
"userDataUpSkipRetryCount": 0,
"iconPhotoDisable": not self.game_config.uploads.photos,
"uploadPhotoDisable": not self.game_config.uploads.photos,
"nameEntryDisable": False,
"maxCountMusic": 0,
"maxCountItem": 0,
"packetRecreateCount": 0
}, },
"isAouAccession": False, "isAouAccession": False,
} }

View File

@ -15,6 +15,7 @@ from Crypto.Cipher import AES
from Crypto.Util.Padding import pad from Crypto.Util.Padding import pad
from core.config import CoreConfig from core.config import CoreConfig
from core.crypto import CipherAES
from core.utils import Utils from core.utils import Utils
from core.title import BaseServlet from core.title import BaseServlet
from .config import Mai2Config from .config import Mai2Config
@ -288,43 +289,54 @@ class Mai2Servlet(BaseServlet):
encrypted = False encrypted = False
if game_code == "SDEZ": # JP if game_code == "SDEZ": # JP
if version < 110: # 1.0 if version < 110: # 1.0
internal_ver = Mai2Constants.VER_MAIMAI_DX internal_ver = Mai2Constants.VER_MAIMAI_DX
elif version >= 110 and version < 114: # PLUS elif version >= 110 and version < 114: # PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_PLUS internal_ver = Mai2Constants.VER_MAIMAI_DX_PLUS
elif version >= 114 and version < 117: # Splash elif version >= 114 and version < 117: # Splash
internal_ver = Mai2Constants.VER_MAIMAI_DX_SPLASH internal_ver = Mai2Constants.VER_MAIMAI_DX_SPLASH
elif version >= 117 and version < 120: # Splash PLUS elif version >= 117 and version < 120: # Splash PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_SPLASH_PLUS internal_ver = Mai2Constants.VER_MAIMAI_DX_SPLASH_PLUS
elif version >= 120 and version < 125: # UNiVERSE elif version >= 120 and version < 125: # UNiVERSE
internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE
elif version >= 125 and version < 130: # UNiVERSE PLUS elif version >= 125 and version < 130: # UNiVERSE PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE_PLUS internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE_PLUS
elif version >= 130 and version < 135: # FESTiVAL elif version >= 130 and version < 135: # FESTiVAL
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL
elif version >= 135 and version < 140: # FESTiVAL PLUS elif version >= 135 and version < 140: # FESTiVAL PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL_PLUS internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL_PLUS
elif version >= 140 and version < 145: # BUDDiES elif version >= 140 and version < 145: # BUDDiES
internal_ver = Mai2Constants.VER_MAIMAI_DX_BUDDIES internal_ver = Mai2Constants.VER_MAIMAI_DX_BUDDIES
elif version >= 145: # BUDDiES PLUS elif version >= 145: # BUDDiES PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_BUDDIES_PLUS internal_ver = Mai2Constants.VER_MAIMAI_DX_BUDDIES_PLUS
elif game_code == "SDGA": # Int elif game_code == "SDGA": # Int
if version < 105: # 1.0 if version < 105: # 1.0
internal_ver = Mai2Constants.VER_MAIMAI_DX internal_ver = Mai2Constants.VER_MAIMAI_DX
elif version >= 105 and version < 110: # PLUS elif version >= 105 and version < 110: # PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_PLUS internal_ver = Mai2Constants.VER_MAIMAI_DX_PLUS
elif version >= 110 and version < 115: # Splash elif version >= 110 and version < 115: # Splash
internal_ver = Mai2Constants.VER_MAIMAI_DX_SPLASH internal_ver = Mai2Constants.VER_MAIMAI_DX_SPLASH
elif version >= 115 and version < 120: # Splash PLUS elif version >= 115 and version < 120: # Splash PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_SPLASH_PLUS internal_ver = Mai2Constants.VER_MAIMAI_DX_SPLASH_PLUS
elif version >= 120 and version < 125: # UNiVERSE elif version >= 120 and version < 125: # UNiVERSE
internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE
elif version >= 125 and version < 130: # UNiVERSE PLUS elif version >= 125 and version < 130: # UNiVERSE PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE_PLUS internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE_PLUS
elif version >= 130 and version < 135: # FESTiVAL elif version >= 130 and version < 135: # FESTiVAL
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL
elif version >= 135 and version < 140: # FESTiVAL PLUS elif version >= 135 and version < 140: # FESTiVAL PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL_PLUS internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL_PLUS
elif game_code == "SDGB": # CN
if 100 <= version < 110: # Maimai DX CN
internal_ver = Mai2Constants.VER_MAIMAI_DX
elif 110 <= version < 120: # Maimai DX CN 2021
internal_ver = Mai2Constants.VER_MAIMAI_DX_SPLASH
elif 120 <= version < 130: # Maimai DX CN 2022
internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE
elif 130 <= version < 140: # Maimai DX CN 2023 (FESTiVAL)
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL
elif 140 <= version < 150: # Maimai DX CN 2024 (BUDDiES)
internal_ver = Mai2Constants.VER_MAIMAI_DX_BUDDIES
if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32: if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32:
# If we get a 32 character long hex string, it's a hash and we're # If we get a 32 character long hex string, it's a hash and we're
@ -344,24 +356,6 @@ class Mai2Servlet(BaseServlet):
return Response(zlib.compress(b'{"stat": "0"}')) return Response(zlib.compress(b'{"stat": "0"}'))
endpoint = self.hash_table[internal_ver][endpoint.lower()] endpoint = self.hash_table[internal_ver][endpoint.lower()]
try:
crypt = AES.new(
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
AES.MODE_CBC,
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
)
req_raw = crypt.decrypt(req_raw)
except Exception as e:
self.logger.error(
"Failed to decrypt v%s request to %s",
version, endpoint,
exc_info=e,
)
return Response(zlib.compress(b'{"stat": "0"}'))
encrypted = True encrypted = True
if ( if (
@ -384,13 +378,31 @@ class Mai2Servlet(BaseServlet):
) )
return Response(zlib.compress(b'{"stat": "0"}')) return Response(zlib.compress(b'{"stat": "0"}'))
req_data = json.loads(unzip) if encrypted:
try:
crypt = CipherAES(
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
)
decrypted = crypt.decrypt(unzip)
except Exception as e:
self.logger.error(
"Failed to decrypt v%s request to %s",
version, endpoint,
exc_info=e,
)
return Response(zlib.compress(b'{"stat": "0"}'))
req_data = json.loads(decrypted)
else:
req_data = json.loads(unzip)
self.logger.info(f"v{version} {endpoint} request from {client_ip}") self.logger.info(f"v{version} {endpoint} request from {client_ip}")
self.logger.debug(req_data) self.logger.debug(req_data)
endpoint = ( endpoint = (
endpoint.replace("MaimaiExp", "") endpoint.replace("MaimaiExp", "").replace("MaimaiChn", "")
if game_code == Mai2Constants.GAME_CODE_DX_INT if game_code == Mai2Constants.GAME_CODE_DX_INT
else endpoint else endpoint
) )
@ -419,16 +431,15 @@ class Mai2Servlet(BaseServlet):
if not encrypted or version < 110: if not encrypted or version < 110:
return Response(zipped) return Response(zipped)
padded = pad(zipped, 16)
crypt = AES.new( crypt = CipherAES(
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]), bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
AES.MODE_CBC,
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]), bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
) )
return Response(crypt.encrypt(padded)) return Response(
zlib.compress(crypt.encrypt(json.dumps(resp, ensure_ascii=False).encode("utf-8")))
)
async def handle_old_srv(self, request: Request) -> bytes: async def handle_old_srv(self, request: Request) -> bytes: