add support for Maimai DX CN 2023 (舞萌DX 2023), add support for chime #139

Closed
Error063 wants to merge 1 commits from Error063/artemis:maidx-cn-2023 into develop
8 changed files with 240 additions and 26 deletions

View File

@ -10,6 +10,7 @@ from os import environ, path, mkdir, W_OK, access
from typing import List
from core import CoreConfig, TitleServlet, MuchaServlet, AllnetServlet, BillingServlet, AimedbServlette
from core.chimedb import ChimeServlet
from core.frontend import FrontendServlet
async def dummy_rt(request: Request):
@ -86,6 +87,15 @@ if not cfg.allnet.standalone:
Route("/dl/ini/{file:str}", allnet.handle_dlorder_ini),
]
if cfg.chimedb.enable:
chimedb = ChimeServlet(cfg, cfg_dir)
route_lst += [
Route("/wc_aime/api/alive_check", chimedb.handle_qr_alive, methods=["POST"]),
Route("/qrcode/api/alive_check", chimedb.handle_qr_alive, methods=["POST"]),
Route("/wc_aime/api/get_data", chimedb.handle_qr_lookup, methods=["POST"])
]
for code, game in title.title_registry.items():
route_lst += game.get_routes()

123
core/chimedb.py Normal file
View File

@ -0,0 +1,123 @@
import hashlib
import json
import logging
from logging.handlers import TimedRotatingFileHandler
from typing import Dict
import coloredlogs
from starlette.responses import PlainTextResponse
from twisted.web.http import Request
from core.config import CoreConfig
from core.data import Data
class ChimeServlet:
def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None:
self.config = core_cfg
self.config_folder = cfg_folder
self.data = Data(core_cfg)
self.logger = logging.getLogger("chimedb")
if not hasattr(self.logger, "initted"):
log_fmt_str = "[%(asctime)s] Chimedb | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.config.server.log_dir, "chimedb"),
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.config.aimedb.loglevel)
coloredlogs.install(
level=core_cfg.aimedb.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.initted = True
if not core_cfg.chimedb.key:
self.logger.error("!!!KEY NOT SET!!!")
exit(1)
self.logger.info("Serving")
async def handle_qr_alive(self, request: Request):
return PlainTextResponse("alive")
async def handle_qr_lookup(self, request: Request) -> bytes:
req = json.loads(await request.body())
access_code = req["qrCode"][-20:]
timestamp = req["timestamp"]
try:
userId = await self._lookup(access_code)
data = json.dumps({
"userID": userId,
"errorID": 0,
"timestamp": timestamp,
"key": self._hash_key(userId, timestamp)
})
except Exception as e:
self.logger.error(e.with_traceback(None))
data = json.dumps({
"userID": -1,
"errorID": 1,
"timestamp": timestamp,
"key": self._hash_key(-1, timestamp)
})
return PlainTextResponse(data)
def _hash_key(self, chip_id, timestamp):
input_string = f"{chip_id}{timestamp}{self.config.chimedb.key}"
hash_object = hashlib.sha256(input_string.encode('utf-8'))
hex_dig = hash_object.hexdigest()
formatted_hex = format(int(hex_dig, 16), '064x').upper()
return formatted_hex
async def _lookup(self, access_code):
user_id = await self.data.card.get_user_id_from_card(access_code)
self.logger.info(f"access_code {access_code} -> user_id {user_id}")
if not user_id or user_id <= 0:
user_id = await self._register(access_code)
return user_id
async def _register(self, access_code):
user_id = -1
if self.config.server.allow_user_registration:
user_id = await self.data.user.create_user()
if user_id is None:
self.logger.error("Failed to register user!")
user_id = -1
else:
card_id = await self.data.card.create_card(user_id, access_code)
if card_id is None:
self.logger.error("Failed to register card!")
user_id = -1
self.logger.info(
f"Register access code {access_code} -> user_id {user_id}"
)
else:
self.logger.info(f"Registration blocked!: access code {access_code}")
return user_id

View File

@ -370,6 +370,32 @@ class MuchaConfig:
)
)
class ChimedbConfig:
def __init__(self, parent_config: "CoreConfig") -> None:
self.__config = parent_config
@property
def enable(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "core", "chimedb", "enable", default=True
)
@property
def loglevel(self) -> int:
return CoreConfig.str_to_loglevel(
CoreConfig.get_config_field(
self.__config, "core", "chimedb", "loglevel", default="info"
)
)
@property
def key(self) -> str:
return CoreConfig.get_config_field(
self.__config, "core", "chimedb", "key", default=""
)
class CoreConfig(dict):
def __init__(self) -> None:
self.server = ServerConfig(self)
@ -380,6 +406,7 @@ class CoreConfig(dict):
self.billing = BillingConfig(self)
self.aimedb = AimedbConfig(self)
self.mucha = MuchaConfig(self)
self.chimedb = ChimedbConfig(self)
@classmethod
def str_to_loglevel(cls, level_str: str):

42
core/crypto.py Normal file
View File

@ -0,0 +1,42 @@
import zlib
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import base64
class CipherAES:
def __init__(self,AES_KEY,AES_IV, BLOCK_SIZE=128, KEY_SIZE=256):
self.BLOCK_SIZE = BLOCK_SIZE
self.KEY_SIZE = KEY_SIZE
self.AES_KEY = AES_KEY
self.AES_IV = AES_IV
def _pad(self,data):
block_size = self.BLOCK_SIZE // 8
padding_length = block_size - len(data) % block_size
return data + bytes([padding_length]) * padding_length
def _unpad(self, padded_data):
pad_char = padded_data[-1]
if not 1 <= pad_char <= self.BLOCK_SIZE // 8:
raise ValueError("Invalid padding")
return padded_data[:-pad_char]
def encrypt(self, plaintext):
if isinstance(plaintext, str):
plaintext = plaintext.encode('utf-8')
backend = default_backend()
cipher = Cipher(algorithms.AES(self.AES_KEY), modes.CBC(self.AES_IV), backend=backend)
encryptor = cipher.encryptor()
padded_plaintext = self._pad(plaintext)
ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
return ciphertext
def decrypt(self, ciphertext):
backend = default_backend()
cipher = Cipher(algorithms.AES(self.AES_KEY), modes.CBC(self.AES_IV), backend=backend)
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(ciphertext) + decryptor.finalize()
return self._unpad(decrypted_data)

View File

@ -64,3 +64,7 @@ aimedb:
mucha:
loglevel: "info"
chimedb:
enable: False
key: ""

View File

@ -16,4 +16,5 @@ game_codes = [
Mai2Constants.GAME_CODE_GREEN,
Mai2Constants.GAME_CODE,
Mai2Constants.GAME_CODE_DX_INT,
Mai2Constants.GAME_CODE_DX_CHN,
]

View File

@ -29,6 +29,7 @@ class Mai2Constants:
GAME_CODE_FINALE = "SDEY"
GAME_CODE_DX = "SDEZ"
GAME_CODE_DX_INT = "SDGA"
GAME_CODE_DX_CHN = "SDGB"
CONFIG_NAME = "mai2.yaml"

View File

@ -11,12 +11,11 @@ from logging.handlers import TimedRotatingFileHandler
from os import path, mkdir
from typing import Tuple, List, Dict
from Crypto.Hash import MD5
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from core.config import CoreConfig
from core.utils import Utils
from core.title import BaseServlet
from core.crypto import CipherAES
from .config import Mai2Config
from .const import Mai2Constants
from .base import Mai2Base
@ -321,6 +320,19 @@ class Mai2Servlet(BaseServlet):
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL
elif version >= 135 and version < 140: # FESTiVAL PLUS
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL_PLUS
elif game_code == "SDGB": # CN
if 130 <= version < 135: # Maimai DX CN 2023 (FESTiVAL)
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL
try:
unzip = zlib.decompress(req_raw)
except zlib.error as e:
self.logger.error(
f"Failed to decompress v{version} {endpoint} request -> {e}"
)
return Response(zlib.compress(b'{"stat": "0"}'))
if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32:
# If we get a 32 character long hex string, it's a hash and we're
@ -342,13 +354,12 @@ class Mai2Servlet(BaseServlet):
endpoint = self.hash_table[internal_ver][endpoint.lower()]
try:
crypt = AES.new(
crypt = CipherAES(
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
AES.MODE_CBC,
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
)
req_raw = crypt.decrypt(req_raw)
decrypted = crypt.decrypt(unzip)
except Exception as e:
self.logger.error(
@ -359,6 +370,10 @@ class Mai2Servlet(BaseServlet):
return Response(zlib.compress(b'{"stat": "0"}'))
encrypted = True
req_data = json.loads(decrypted if encrypted else unzip)
if (
not encrypted
@ -371,25 +386,17 @@ class Mai2Servlet(BaseServlet):
)
return Response(zlib.compress(b'{"stat": "0"}'))
try:
unzip = zlib.decompress(req_raw)
except zlib.error as e:
self.logger.error(
f"Failed to decompress v{version} {endpoint} request -> {e}"
)
return Response(zlib.compress(b'{"stat": "0"}'))
req_data = json.loads(unzip)
self.logger.info(f"v{version} {endpoint} request from {client_ip}")
self.logger.debug(req_data)
endpoint = (
endpoint.replace("MaimaiExp", "")
if game_code == Mai2Constants.GAME_CODE_DX_INT
else endpoint
)
if game_code == Mai2Constants.GAME_CODE_DX_INT:
endpoint = endpoint.replace("MaimaiExp", "")
elif game_code == Mai2Constants.GAME_CODE_DX_CHN:
endpoint = endpoint.replace("MaimaiChn", "")
func_to_find = "handle_" + inflection.underscore(endpoint) + "_request"
handler_cls = self.versions[internal_ver](self.core_cfg, self.game_cfg)
@ -411,20 +418,19 @@ class Mai2Servlet(BaseServlet):
self.logger.debug(f"Response {resp}")
zipped = zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8"))
if not encrypted or version < 110:
return Response(zipped)
return Response(zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8")))
padded = pad(zipped, 16)
# padded = pad(zipped, 16)
crypt = AES.new(
crypt = CipherAES(
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
AES.MODE_CBC,
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
)
return Response(crypt.encrypt(padded))
return Response(
zlib.compress(crypt.encrypt(json.dumps(resp, ensure_ascii=False).encode("utf-8")))
)
async def handle_old_srv(self, request: Request) -> bytes: