add support for Maimai DX CN 2023 (舞萌DX 2023)

This commit is contained in:
Error063 2024-05-12 03:19:12 +08:00
parent d09f3e9907
commit dfa6b80e1b
11 changed files with 440 additions and 42 deletions

3
.gitignore vendored
View File

@ -160,4 +160,5 @@ config/*
deliver/* deliver/*
*.gz *.gz
dbdump-*.json dbdump-*.json
/config/

View File

@ -1,3 +1,4 @@
import hashlib
from typing import Dict, List, Any, Optional, Tuple, Union, Final from typing import Dict, List, Any, Optional, Tuple, Union, Final
import logging, coloredlogs import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
@ -133,7 +134,7 @@ class AllnetServlet:
request_ip = Utils.get_ip_addr(request) request_ip = Utils.get_ip_addr(request)
pragma_header = request.getHeader('Pragma') pragma_header = request.getHeader('Pragma')
is_dfi = pragma_header is not None and pragma_header == "DFI" is_dfi = pragma_header is not None and pragma_header == "DFI"
try: try:
if is_dfi: if is_dfi:
req_urlencode = self.from_dfi(request.content.getvalue()) req_urlencode = self.from_dfi(request.content.getvalue())
@ -166,7 +167,7 @@ class AllnetServlet:
self.logger.debug(f"Allnet request: {vars(req)}") self.logger.debug(f"Allnet request: {vars(req)}")
machine = self.data.arcade.get_machine(req.serial) machine = self.data.arcade.get_machine(req.serial)
if machine is None and not self.config.server.allow_unregistered_serials: if machine is None and not self.config.server.allow_unregistered_serials:
msg = f"Unrecognised serial {req.serial} attempted allnet auth from {request_ip}." msg = f"Unrecognised serial {req.serial} attempted allnet auth from {request_ip}."
self.data.base.log_event( self.data.base.log_event(
@ -191,7 +192,7 @@ class AllnetServlet:
resp.stat = ALLNET_STAT.bad_shop.value resp.stat = ALLNET_STAT.bad_shop.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None} resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8") return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8")
elif (not arcade["ip"] or arcade["ip"] is None) and self.config.server.strict_ip_checking: elif (not arcade["ip"] or arcade["ip"] is None) and self.config.server.strict_ip_checking:
msg = f"Serial {req.serial} attempted allnet auth from bad IP {req.ip}, but arcade {arcade['id']} has no IP set! (strict checking enabled)." msg = f"Serial {req.serial} attempted allnet auth from bad IP {req.ip}, but arcade {arcade['id']} has no IP set! (strict checking enabled)."
self.data.base.log_event( self.data.base.log_event(
@ -234,7 +235,7 @@ class AllnetServlet:
resp.client_timezone = ( # lmao resp.client_timezone = ( # lmao
arcade["timezone"] if arcade["timezone"] is not None else "+0900" if req.format_ver == 3 else "+09:00" arcade["timezone"] if arcade["timezone"] is not None else "+0900" if req.format_ver == 3 else "+09:00"
) )
if req.game_id not in TitleServlet.title_registry: if req.game_id not in TitleServlet.title_registry:
if not self.config.server.is_develop: if not self.config.server.is_develop:
msg = f"Unrecognised game {req.game_id} attempted allnet auth from {request_ip}." msg = f"Unrecognised game {req.game_id} attempted allnet auth from {request_ip}."
@ -253,14 +254,14 @@ class AllnetServlet:
) )
resp.uri = f"http://{self.config.title.hostname}:{self.config.title.port}/{req.game_id}/{req.ver.replace('.', '')}/" resp.uri = f"http://{self.config.title.hostname}:{self.config.title.port}/{req.game_id}/{req.ver.replace('.', '')}/"
resp.host = f"{self.config.title.hostname}:{self.config.title.port}" resp.host = f"{self.config.title.hostname}:{self.config.title.port}"
resp_dict = {k: v for k, v in vars(resp).items() if v is not None} resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict))
self.logger.debug(f"Allnet response: {resp_str}") self.logger.debug(f"Allnet response: {resp_str}")
return (resp_str + "\n").encode("utf-8") return (resp_str + "\n").encode("utf-8")
int_ver = req.ver.replace(".", "") int_ver = req.ver.replace(".", "")
resp.uri, resp.host = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial) resp.uri, resp.host = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial)
@ -270,7 +271,7 @@ class AllnetServlet:
resp_dict = {k: v for k, v in vars(resp).items() if v is not None} resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict))
self.logger.debug(f"Allnet response: {resp_dict}") self.logger.debug(f"Allnet response: {resp_dict}")
resp_str += "\n" resp_str += "\n"
"""if is_dfi: """if is_dfi:
@ -283,7 +284,7 @@ class AllnetServlet:
request_ip = Utils.get_ip_addr(request) request_ip = Utils.get_ip_addr(request)
pragma_header = request.getHeader('Pragma') pragma_header = request.getHeader('Pragma')
is_dfi = pragma_header is not None and pragma_header == "DFI" is_dfi = pragma_header is not None and pragma_header == "DFI"
try: try:
if is_dfi: if is_dfi:
req_urlencode = self.from_dfi(request.content.getvalue()) req_urlencode = self.from_dfi(request.content.getvalue())
@ -348,7 +349,7 @@ class AllnetServlet:
if path.exists(f"{self.config.allnet.update_cfg_folder}/{req_file}"): if path.exists(f"{self.config.allnet.update_cfg_folder}/{req_file}"):
self.logger.info(f"Request for DL INI file {req_file} from {Utils.get_ip_addr(request)} successful") self.logger.info(f"Request for DL INI file {req_file} from {Utils.get_ip_addr(request)} successful")
self.data.base.log_event("allnet", "DLORDER_INI_SENT", logging.INFO, f"{Utils.get_ip_addr(request)} successfully recieved {req_file}") self.data.base.log_event("allnet", "DLORDER_INI_SENT", logging.INFO, f"{Utils.get_ip_addr(request)} successfully recieved {req_file}")
return open( return open(
f"{self.config.allnet.update_cfg_folder}/{req_file}", "rb" f"{self.config.allnet.update_cfg_folder}/{req_file}", "rb"
).read() ).read()
@ -364,14 +365,14 @@ class AllnetServlet:
except Exception as e: except Exception as e:
self.logger.warning(f"Failed to parse DL Report: {e}") self.logger.warning(f"Failed to parse DL Report: {e}")
return "NG" return "NG"
dl_data_type = DLIMG_TYPE.app dl_data_type = DLIMG_TYPE.app
dl_data = req_dict.get("appimage", {}) dl_data = req_dict.get("appimage", {})
if dl_data is None or not dl_data: if dl_data is None or not dl_data:
dl_data_type = DLIMG_TYPE.opt dl_data_type = DLIMG_TYPE.opt
dl_data = req_dict.get("optimage", {}) dl_data = req_dict.get("optimage", {})
if dl_data is None or not dl_data: if dl_data is None or not dl_data:
self.logger.warning(f"Failed to parse DL Report: Invalid format - contains neither appimage nor optimage") self.logger.warning(f"Failed to parse DL Report: Invalid format - contains neither appimage nor optimage")
return "NG" return "NG"
@ -381,10 +382,10 @@ class AllnetServlet:
if not rep.validate(): if not rep.validate():
self.logger.warning(f"Failed to parse DL Report: Invalid format - {rep.err}") self.logger.warning(f"Failed to parse DL Report: Invalid format - {rep.err}")
return "NG" return "NG"
msg = f"{rep.serial} @ {client_ip} reported {rep.rep_type.name} download state {rep.rf_state.name} for {rep.gd} v{rep.dav}:"\ msg = f"{rep.serial} @ {client_ip} reported {rep.rep_type.name} download state {rep.rf_state.name} for {rep.gd} v{rep.dav}:"\
f" {rep.tdsc}/{rep.tsc} segments downloaded for working files {rep.wfl} with {rep.dfl if rep.dfl else 'none'} complete." f" {rep.tdsc}/{rep.tsc} segments downloaded for working files {rep.wfl} with {rep.dfl if rep.dfl else 'none'} complete."
self.data.base.log_event("allnet", "DL_REPORT", logging.INFO, msg, dl_data) self.data.base.log_event("allnet", "DL_REPORT", logging.INFO, msg, dl_data)
self.logger.info(msg) self.logger.info(msg)
@ -393,7 +394,7 @@ class AllnetServlet:
def handle_loaderstaterecorder(self, request: Request, match: Dict) -> bytes: def handle_loaderstaterecorder(self, request: Request, match: Dict) -> bytes:
req_data = request.content.getvalue() req_data = request.content.getvalue()
sections = req_data.decode("utf-8").split("\r\n") sections = req_data.decode("utf-8").split("\r\n")
req_dict = dict(urllib.parse.parse_qsl(sections[0])) req_dict = dict(urllib.parse.parse_qsl(sections[0]))
serial: Union[str, None] = req_dict.get("serial", None) serial: Union[str, None] = req_dict.get("serial", None)
@ -407,21 +408,21 @@ class AllnetServlet:
self.logger.info(f"LoaderStateRecorder Request from {ip} {serial}: {num_files_dld}/{num_files_to_dl} Files download (State: {dl_state})") self.logger.info(f"LoaderStateRecorder Request from {ip} {serial}: {num_files_dld}/{num_files_to_dl} Files download (State: {dl_state})")
return "OK".encode() return "OK".encode()
def handle_alive(self, request: Request, match: Dict) -> bytes: def handle_alive(self, request: Request, match: Dict) -> bytes:
return "OK".encode() return "OK".encode()
def handle_billing_request(self, request: Request, _: Dict): def handle_billing_request(self, request: Request, _: Dict):
req_raw = request.content.getvalue() req_raw = request.content.getvalue()
if request.getHeader('Content-Type') == "application/octet-stream": if request.getHeader('Content-Type') == "application/octet-stream":
req_unzip = zlib.decompressobj(-zlib.MAX_WBITS).decompress(req_raw) req_unzip = zlib.decompressobj(-zlib.MAX_WBITS).decompress(req_raw)
else: else:
req_unzip = req_raw req_unzip = req_raw
req_dict = self.billing_req_to_dict(req_unzip) req_dict = self.billing_req_to_dict(req_unzip)
request_ip = Utils.get_ip_addr(request) request_ip = Utils.get_ip_addr(request)
if req_dict is None: if req_dict is None:
self.logger.error(f"Failed to parse request {request.content.getvalue()}") self.logger.error(f"Failed to parse request {request.content.getvalue()}")
return b"" return b""
@ -441,7 +442,7 @@ class AllnetServlet:
for x in range(1, len(req_dict)): for x in range(1, len(req_dict)):
if not req_dict[x]: if not req_dict[x]:
continue continue
try: try:
tmp = TraceData(req_dict[x]) tmp = TraceData(req_dict[x])
if tmp.trace_type == TraceDataType.CHARGE: if tmp.trace_type == TraceDataType.CHARGE:
@ -450,14 +451,14 @@ class AllnetServlet:
tmp = TraceDataEvent(req_dict[x]) tmp = TraceDataEvent(req_dict[x])
elif tmp.trace_type == TraceDataType.CREDIT: elif tmp.trace_type == TraceDataType.CREDIT:
tmp = TraceDataCredit(req_dict[x]) tmp = TraceDataCredit(req_dict[x])
traces.append(tmp) traces.append(tmp)
except KeyError as e: except KeyError as e:
self.logger.warn(f"Tracelog failed to parse: {e}") self.logger.warn(f"Tracelog failed to parse: {e}")
kc_serial_bytes = req.keychipid.encode() kc_serial_bytes = req.keychipid.encode()
machine = self.data.arcade.get_machine(req.keychipid) machine = self.data.arcade.get_machine(req.keychipid)
if machine is None and not self.config.server.allow_unregistered_serials: if machine is None and not self.config.server.allow_unregistered_serials:
@ -512,6 +513,47 @@ class AllnetServlet:
self.logger.info(f"Ping from {Utils.get_ip_addr(request)}") self.logger.info(f"Ping from {Utils.get_ip_addr(request)}")
return b"naomi ok" return b"naomi ok"
# def handle_qr_alive(self, request: Request, _: Dict):
# return b"alive"
#
# def handle_qr_lookup(self, request: Request, _: Dict) -> bytes:
# req = json.loads(request.content.getvalue())
# access_code = req["qrCode"][-20:]
# timestamp = req["timestamp"]
#
# try:
# userId = self.chimedb.handle_lookup(access_code)
# data = json.dumps({
# "userID": userId,
# "errorID": 0,
# "timestamp": timestamp,
# "key": self.hash_data(userId, timestamp)
# })
# except Exception as e:
#
# self.logger.error(e.with_traceback(None))
#
# data = json.dumps({
# "userID": -1,
# "errorID": 1,
# "timestamp": timestamp,
# "key": self.hash_data(-1, timestamp)
# })
#
# self.logger.info(data)
# return data.encode()
#
#
# def hash_data(self, chip_id, timestamp):
# input_string = f"{chip_id}{timestamp}XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW"
# hash_object = hashlib.sha256(input_string.encode('utf-8'))
# hex_dig = hash_object.hexdigest()
#
# formatted_hex = format(int(hex_dig, 16), '064x').upper()
#
# return formatted_hex
def billing_req_to_dict(self, data: bytes): def billing_req_to_dict(self, data: bytes):
""" """
Parses an billing request string into a python dictionary Parses an billing request string into a python dictionary

118
core/chimedb.py Normal file
View File

@ -0,0 +1,118 @@
import hashlib
import json
import logging
from logging.handlers import TimedRotatingFileHandler
from typing import Dict
import coloredlogs
from twisted.web.http import Request
from core.config import CoreConfig
from core.data import Data
class ChimeServlet:
def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None:
self.config = core_cfg
self.config_folder = cfg_folder
self.data = Data(core_cfg)
self.logger = logging.getLogger("chimedb")
if not hasattr(self.logger, "initted"):
log_fmt_str = "[%(asctime)s] Chimedb | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.config.server.log_dir, "aimedb"),
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.config.aimedb.loglevel)
coloredlogs.install(
level=core_cfg.aimedb.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.initted = True
self.logger.info("Serving")
def handle_qr_alive(self, request: Request, _: Dict):
return b"alive"
def handle_qr_lookup(self, request: Request, _: Dict) -> bytes:
req = json.loads(request.content.getvalue())
access_code = req["qrCode"][-20:]
timestamp = req["timestamp"]
try:
userId = self._lookup(access_code)
data = json.dumps({
"userID": userId,
"errorID": 0,
"timestamp": timestamp,
"key": self._hash_key(userId, timestamp)
})
except Exception as e:
self.logger.error(e.with_traceback(None))
data = json.dumps({
"userID": -1,
"errorID": 1,
"timestamp": timestamp,
"key": self._hash_key(-1, timestamp)
})
return data.encode()
def _hash_key(self, chip_id, timestamp):
input_string = f"{chip_id}{timestamp}{self.config.chime.key}"
hash_object = hashlib.sha256(input_string.encode('utf-8'))
hex_dig = hash_object.hexdigest()
formatted_hex = format(int(hex_dig, 16), '064x').upper()
return formatted_hex
def _lookup(self, access_code):
user_id = self.data.card.get_user_id_from_card(access_code)
self.logger.info(f"access_code {access_code} -> user_id {user_id}")
if not user_id or user_id <= 0:
user_id = self._register(access_code)
return user_id
def _register(self, access_code):
user_id = -1
if self.config.server.allow_user_registration:
user_id = self.data.user.create_user()
if user_id is None:
self.logger.error("Failed to register user!")
user_id = -1
else:
card_id = self.data.card.create_card(user_id, access_code)
if card_id is None:
self.logger.error("Failed to register card!")
user_id = -1
self.logger.info(
f"Register access code {access_code} -> user_id {user_id}"
)
else:
self.logger.info(f"Registration blocked!: access code {access_code}")
return user_id

View File

@ -351,6 +351,22 @@ class MuchaConfig:
self.__config, "core", "mucha", "hostname", default="localhost" self.__config, "core", "mucha", "hostname", default="localhost"
) )
class ChimeConfig:
def __init__(self, parent_config: "CoreConfig") -> None:
self.__config = parent_config
@property
def enable(self) -> int:
return CoreConfig.get_config_field(
self.__config, "core", "chime", "enable", default=False
)
@property
def key(self) -> str:
return CoreConfig.get_config_field(
self.__config, "core", "chime", "key", default=""
)
class CoreConfig(dict): class CoreConfig(dict):
def __init__(self) -> None: def __init__(self) -> None:
@ -362,6 +378,7 @@ class CoreConfig(dict):
self.billing = BillingConfig(self) self.billing = BillingConfig(self)
self.aimedb = AimedbConfig(self) self.aimedb = AimedbConfig(self)
self.mucha = MuchaConfig(self) self.mucha = MuchaConfig(self)
self.chime = ChimeConfig(self)
@classmethod @classmethod
def str_to_loglevel(cls, level_str: str): def str_to_loglevel(cls, level_str: str):

View File

@ -63,3 +63,7 @@ mucha:
enable: False enable: False
hostname: "localhost" hostname: "localhost"
loglevel: "info" loglevel: "info"
chime:
enable: False
key: ""

View File

@ -13,6 +13,9 @@ from twisted.web.http import Request
from routes import Mapper from routes import Mapper
from threading import Thread from threading import Thread
from core.chimedb import ChimeServlet
class HttpDispatcher(resource.Resource): class HttpDispatcher(resource.Resource):
def __init__(self, cfg: CoreConfig, config_dir: str): def __init__(self, cfg: CoreConfig, config_dir: str):
super().__init__() super().__init__()
@ -25,6 +28,7 @@ class HttpDispatcher(resource.Resource):
self.title = TitleServlet(cfg, config_dir) self.title = TitleServlet(cfg, config_dir)
self.allnet = AllnetServlet(cfg, config_dir) self.allnet = AllnetServlet(cfg, config_dir)
self.mucha = MuchaServlet(cfg, config_dir) self.mucha = MuchaServlet(cfg, config_dir)
self.chime = ChimeServlet(cfg, config_dir)
self.map_get.connect( self.map_get.connect(
"allnet_downloadorder_ini", "allnet_downloadorder_ini",
@ -144,6 +148,30 @@ class HttpDispatcher(resource.Resource):
conditions=dict(method=["POST"]), conditions=dict(method=["POST"]),
) )
# Chime
if cfg.chime.enable:
self.map_post.connect(
"chime_qr_alive",
"/qrcode/api/alive_check",
controller="chime",
action="handle_qr_alive",
conditions=dict(method=["POST"]),
)
self.map_post.connect(
"chime_chime_alive",
"/wc_aime/api/alive_check",
controller="chime",
action="handle_qr_alive",
conditions=dict(method=["POST"]),
)
self.map_post.connect(
"chime_qr_lookup",
"/wc_aime/api/get_data",
controller="chime",
action="handle_qr_lookup",
conditions=dict(method=["POST"]),
)
for code, game in self.title.title_registry.items(): for code, game in self.title.title_registry.items():
get_matchers, post_matchers = game.get_endpoint_matchers() get_matchers, post_matchers = game.get_endpoint_matchers()

View File

@ -8,6 +8,7 @@ database = Mai2Data
reader = Mai2Reader reader = Mai2Reader
game_codes = [ game_codes = [
Mai2Constants.GAME_CODE_DX, Mai2Constants.GAME_CODE_DX,
Mai2Constants.GAME_CODE_DX_CN,
Mai2Constants.GAME_CODE_FINALE, Mai2Constants.GAME_CODE_FINALE,
Mai2Constants.GAME_CODE_MILK, Mai2Constants.GAME_CODE_MILK,
Mai2Constants.GAME_CODE_MURASAKI, Mai2Constants.GAME_CODE_MURASAKI,

43
titles/mai2/cn2023.py Normal file
View File

@ -0,0 +1,43 @@
from typing import Dict
from core.config import CoreConfig
from titles.mai2.festival import Mai2Festival
from titles.mai2.const import Mai2Constants
from titles.mai2.config import Mai2Config
class Mai2CN2023(Mai2Festival):
def __init__(self, cfg: CoreConfig, game_cfg: Mai2Config) -> None:
super().__init__(cfg, game_cfg)
self.version = Mai2Constants.VER_MAIMAI_DX_FESTIVAL
def handle_get_game_setting_api_request(self, data: Dict):
return {
"gameSetting": {
"isMaintenance": False,
"requestInterval": 1800,
"rebootStartTime": "2020-01-01 07:00:00.0",
"rebootEndTime": "2020-01-01 07:59:59.0",
"movieUploadLimit": 100,
"movieStatus": 1,
"movieServerUri": "",
"deliverServerUri": "",
"oldServerUri": "",
"usbDlServerUri": "",
"rebootInterval": 0,
},
"isAouAccession": False,
}
def handle_get_user_extend_api_request(self, data: Dict) -> Dict:
extend = self.data.profile.get_profile_extend(data["userId"], self.version)
if extend is None:
return
extend_dict = extend._asdict()
extend_dict.pop("id")
extend_dict.pop("user")
extend_dict.pop("version")
extend_dict["isPhotoAgree"] = False
return {"userId": data["userId"], "userExtend": extend_dict}

View File

@ -28,6 +28,51 @@ class Mai2Constants:
GAME_CODE_MILK = "SDDZ" GAME_CODE_MILK = "SDDZ"
GAME_CODE_FINALE = "SDEY" GAME_CODE_FINALE = "SDEY"
GAME_CODE_DX = "SDEZ" GAME_CODE_DX = "SDEZ"
GAME_CODE_DX_CN = "SDGB"
ChnHandlerMap_2023 = {
"df7e9c35e8f1d31dc327d583408e90d0": "GetGameChargeApi",
"1ac0c2058d942a18399610a37dd20358": "GetGameEventApi",
"372ef22cc41839e3b8c4707cad5324ee": "GetGameNgMusicIdApi",
"86fe0258e34b7fa265791edfbb6366aa": "GetGameRankingApi",
"86adcd106b5e4c4949f86e534010bb99": "GetGameSettingApi",
"bc5548b3ff3548140901279b17349e9c": "GetGameTournamentInfoApi",
"f9f83fc287e241c3d2a941ab5c9845a9": "GetTransferFriendApi",
"4f24fc175e4502e405d3650ecac5ee20": "GetUserActivityApi",
"48cd2ccd926ebe544e38fbd1495d1210": "GetUserCardApi",
"b98d4b35dadcc3c121971802b892aa26": "GetUserCharacterApi",
"49df62c94b5bf9c6092880a91166671e": "GetUserChargeApi",
"26474eb8e8f83f522c3fa017c7d3c6f6": "GetUserCourseApi",
"40ed28c7e603f7559cd8d16c6399e6eb": "GetUserDataApi",
"40cd997e881eb29040e20c7b5b2b22af": "GetUserExtendApi",
"943671dac71ea85ebe850856887cde3e": "GetUserFavoriteApi",
"4bcf0a53b49884c582801620fc8fda89": "GetUserFriendSeasonRankingApi",
"14e68e6a7a4cce9b1b87e91365271230": "GetUserGhostApi",
"a0cf80f9feb2261a9975bfbde6b36261": "GetUserItemApi",
"f8cbf404789ebadeefde45db227960a9": "GetUserLoginBonusApi",
"4a800f9ad31e9463895a024ad8c8f92e": "GetUserMapApi",
"d3bd8bb98e306472a16873db22a9f52f": "GetUserMusicApi",
"b2a3e39bc0932be284ffc7a24b8942f6": "GetUserOptionApi",
"defaae160852c0438f5c61fedfa85628": "GetUserPortraitApi",
"7b75e0110f28db678a8ea1839956a3ae": "GetUserPreviewApi",
"edafd9a8158b9a2b768c504e5e9beef7": "GetUserRatingApi",
"b58c88ca93d22577b1f661eb638ab353": "GetUserRecommendRateMusicApi",
"d3b1c5303f39dc883121e3bc2752dc32": "GetUserRecommendSelectionMusicApi",
"bfaa99205d692df04acd9152019b1158": "GetUserRegionApi",
"262f6b3ee97d3ade84fbb1c77f1a3f8f": "GetUserScoreRankingApi",
"92b4b94dfb05fbab78f7cb9f3362b86e": "PingApi",
"e632f71d878ca3d663a869c3eaf4a6d3": "UploadUserChargelogApi",
"24f785ce415736c9fcaf192a2d781054": "UploadUserPhotoApi",
"fa332c56cea3f545a3dd4417bae27aea": "UploadUserPlaylogApi",
"3123dd873f5cf3647a3f8f7e28e63950": "UploadUserPortraitApi",
"732e94bf8e87711b2016d9765f0e3ec4": "UpsertClientBookkeepingApi",
"43c43ca5ff0fc70b9a89db11334714c0": "UpsertClientSettingApi",
"df76314dfb75cd41b787213f45ddc639": "UpsertClientTestmodeApi",
"60d168b9b3792a6824ee20b02a385b48": "UpsertClientUploadApi",
"58896357b8cf354aa942a694828d2ca4": "UpsertUserAllApi",
"0223710b103248981096f31b1024cedd": "UserLoginApi",
"cc30d041222440909b40ef617de972f1": "UserLogoutApi",
}
CONFIG_NAME = "mai2.yaml" CONFIG_NAME = "mai2.yaml"
@ -54,6 +99,17 @@ class Mai2Constants:
VER_MAIMAI_DX_FESTIVAL = 19 VER_MAIMAI_DX_FESTIVAL = 19
VER_MAIMAI_DX_FESTIVAL_PLUS = 20 VER_MAIMAI_DX_FESTIVAL_PLUS = 20
ChnHandlerMapMatch = {
VER_MAIMAI_DX_FESTIVAL: ChnHandlerMap_2023
}
ChnAesMatch = {
VER_MAIMAI_DX_FESTIVAL: [
b"F2Rc8F0x2Ly6LiIFy9K>s_Y0Bum62H;R",
b"PR12H;E2Brw@5kJ<"
]
}
VERSION_STRING = ( VERSION_STRING = (
"maimai", "maimai",
"maimai PLUS", "maimai PLUS",

42
titles/mai2/cryption.py Normal file
View File

@ -0,0 +1,42 @@
import zlib
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import base64
class CipherAES:
def __init__(self,AES_KEY,AES_IV, BLOCK_SIZE=128, KEY_SIZE=256):
self.BLOCK_SIZE = BLOCK_SIZE
self.KEY_SIZE = KEY_SIZE
self.AES_KEY = AES_KEY
self.AES_IV = AES_IV
def _pad(self,data):
block_size = self.BLOCK_SIZE // 8
padding_length = block_size - len(data) % block_size
return data + bytes([padding_length]) * padding_length
def _unpad(self, padded_data):
pad_char = padded_data[-1]
if not 1 <= pad_char <= self.BLOCK_SIZE // 8:
raise ValueError("Invalid padding")
return padded_data[:-pad_char]
def encrypt(self, plaintext):
if isinstance(plaintext, str):
plaintext = plaintext.encode('utf-8')
backend = default_backend()
cipher = Cipher(algorithms.AES(self.AES_KEY), modes.CBC(self.AES_IV), backend=backend)
encryptor = cipher.encryptor()
padded_plaintext = self._pad(plaintext)
ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
return ciphertext
def decrypt(self, ciphertext):
backend = default_backend()
cipher = Cipher(algorithms.AES(self.AES_KEY), modes.CBC(self.AES_IV), backend=backend)
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(ciphertext) + decryptor.finalize()
return self._unpad(decrypted_data)

View File

@ -13,6 +13,8 @@ from typing import Tuple, List, Dict
from core.config import CoreConfig from core.config import CoreConfig
from core.utils import Utils from core.utils import Utils
from core.title import BaseServlet from core.title import BaseServlet
from . import cryption
from .cn2023 import Mai2CN2023
from .config import Mai2Config from .config import Mai2Config
from .const import Mai2Constants from .const import Mai2Constants
from .base import Mai2Base from .base import Mai2Base
@ -60,6 +62,30 @@ class Mai2Servlet(BaseServlet):
Mai2FestivalPlus, Mai2FestivalPlus,
] ]
self.cn_versions = [
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
Mai2CN2023,
None,
]
self.logger = logging.getLogger("mai2") self.logger = logging.getLogger("mai2")
if not hasattr(self.logger, "initted"): if not hasattr(self.logger, "initted"):
log_fmt_str = "[%(asctime)s] Mai2 | %(levelname)s | %(message)s" log_fmt_str = "[%(asctime)s] Mai2 | %(levelname)s | %(message)s"
@ -232,12 +258,11 @@ class Mai2Servlet(BaseServlet):
def handle_mai2(self, request: Request, game_code: str, matchers: Dict) -> bytes: def handle_mai2(self, request: Request, game_code: str, matchers: Dict) -> bytes:
endpoint = matchers['endpoint'] endpoint = matchers['endpoint']
version = int(matchers['version']) version = int(matchers['version'])
if endpoint.lower() == "ping":
return zlib.compress(b'{"returnCode": "1"}')
req_raw = request.content.getvalue() req_raw = request.content.getvalue()
internal_ver = 0 internal_ver = 0
client_ip = Utils.get_ip_addr(request) client_ip = Utils.get_ip_addr(request)
if version < 105: # 1.0 if version < 105: # 1.0
internal_ver = Mai2Constants.VER_MAIMAI_DX internal_ver = Mai2Constants.VER_MAIMAI_DX
elif version >= 105 and version < 110: # PLUS elif version >= 105 and version < 110: # PLUS
@ -250,11 +275,43 @@ class Mai2Servlet(BaseServlet):
internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE
elif version >= 125 and version < 130: # UNiVERSE PLUS elif version >= 125 and version < 130: # UNiVERSE PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE_PLUS internal_ver = Mai2Constants.VER_MAIMAI_DX_UNIVERSE_PLUS
elif version >= 130 and version < 135: # FESTiVAL elif version >= 130 and version < 135: # FESTiVAL OR CN2023
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL
elif version >= 135: # FESTiVAL PLUS elif version >= 135: # FESTiVAL PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL_PLUS internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL_PLUS
is_cn = False
if endpoint in Mai2Constants.ChnHandlerMapMatch[internal_ver].keys():
endpoint = Mai2Constants.ChnHandlerMapMatch[internal_ver][endpoint]
is_cn = True
if endpoint.endswith("MaimaiChn"):
is_cn = True
endpoint = endpoint[:-9]
cn_crypto = cryption.CipherAES(Mai2Constants.ChnAesMatch[internal_ver][0], Mai2Constants.ChnAesMatch[internal_ver][1])
try:
unzip = zlib.decompress(req_raw)
except zlib.error as e:
self.logger.error(
f"Failed to decompress v{version} {endpoint} request -> {e}"
)
return zlib.compress(cn_crypto.encrypt(b'{"stat": "0"}') if is_cn else b'{"stat": "0"}')
try:
decrypted = cn_crypto.decrypt(unzip)
is_cn = True
except:
self.logger.info(f"Failed to decrypt v{version} {endpoint} request, maybe not encrypted!")
is_cn = False
decrypted = unzip
req_data = json.loads(decrypted)
if endpoint.lower() == "ping":
return zlib.compress(cn_crypto.encrypt(b'{"returnCode": "1"}') if is_cn else b'{"returnCode": "1"}')
if ( if (
request.getHeader("Mai-Encoding") is not None request.getHeader("Mai-Encoding") is not None
or request.getHeader("X-Mai-Encoding") is not None or request.getHeader("X-Mai-Encoding") is not None
@ -269,22 +326,11 @@ class Mai2Servlet(BaseServlet):
f"Encryption v{enc_ver} - User-Agent: {request.getHeader('User-Agent')}" f"Encryption v{enc_ver} - User-Agent: {request.getHeader('User-Agent')}"
) )
try:
unzip = zlib.decompress(req_raw)
except zlib.error as e:
self.logger.error(
f"Failed to decompress v{version} {endpoint} request -> {e}"
)
return zlib.compress(b'{"stat": "0"}')
req_data = json.loads(unzip)
self.logger.info(f"v{version} {endpoint} request from {client_ip}") self.logger.info(f"v{version} {endpoint} request from {client_ip}")
self.logger.debug(req_data) self.logger.debug(req_data)
func_to_find = "handle_" + inflection.underscore(endpoint) + "_request" func_to_find = "handle_" + inflection.underscore(endpoint) + "_request"
handler_cls = self.versions[internal_ver](self.core_cfg, self.game_cfg) handler_cls = self.versions[internal_ver](self.core_cfg, self.game_cfg) if not is_cn else self.cn_versions[internal_ver](self.core_cfg, self.game_cfg)
if not hasattr(handler_cls, func_to_find): if not hasattr(handler_cls, func_to_find):
self.logger.warning(f"Unhandled v{version} request {endpoint}") self.logger.warning(f"Unhandled v{version} request {endpoint}")
@ -297,14 +343,14 @@ class Mai2Servlet(BaseServlet):
except Exception as e: except Exception as e:
self.logger.error(f"Error handling v{version} method {endpoint} - {e}") self.logger.error(f"Error handling v{version} method {endpoint} - {e}")
return zlib.compress(b'{"stat": "0"}') return zlib.compress(cn_crypto.encrypt(b'{"stat": "0"}') if is_cn else b'{"stat": "0"}')
if resp == None: if resp == None:
resp = {"returnCode": 1} resp = {"returnCode": 1}
self.logger.debug(f"Response {resp}") self.logger.debug(f"Response {resp}")
return zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8")) return zlib.compress(cn_crypto.encrypt(json.dumps(resp, ensure_ascii=False).encode("utf-8")) if is_cn else json.dumps(resp, ensure_ascii=False).encode("utf-8"))
def handle_old_srv(self, request: Request, game_code: str, matchers: Dict) -> bytes: def handle_old_srv(self, request: Request, game_code: str, matchers: Dict) -> bytes:
endpoint = matchers['endpoint'] endpoint = matchers['endpoint']