add support for Maimai DX CN 2023 (舞萌DX 2023), add support for chime #139
10
core/app.py
10
core/app.py
@ -10,6 +10,7 @@ from os import environ, path, mkdir, W_OK, access
|
||||
from typing import List
|
||||
|
||||
from core import CoreConfig, TitleServlet, MuchaServlet, AllnetServlet, BillingServlet, AimedbServlette
|
||||
from core.chimedb import ChimeServlet
|
||||
from core.frontend import FrontendServlet
|
||||
|
||||
async def dummy_rt(request: Request):
|
||||
@ -86,6 +87,15 @@ if not cfg.allnet.standalone:
|
||||
Route("/dl/ini/{file:str}", allnet.handle_dlorder_ini),
|
||||
]
|
||||
|
||||
if cfg.chimedb.enable:
|
||||
chimedb = ChimeServlet(cfg, cfg_dir)
|
||||
route_lst += [
|
||||
Route("/wc_aime/api/alive_check", chimedb.handle_qr_alive, methods=["POST"]),
|
||||
Route("/qrcode/api/alive_check", chimedb.handle_qr_alive, methods=["POST"]),
|
||||
Route("/wc_aime/api/get_data", chimedb.handle_qr_lookup, methods=["POST"])
|
||||
]
|
||||
|
||||
|
||||
for code, game in title.title_registry.items():
|
||||
route_lst += game.get_routes()
|
||||
|
||||
|
123
core/chimedb.py
Normal file
123
core/chimedb.py
Normal file
@ -0,0 +1,123 @@
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
from logging.handlers import TimedRotatingFileHandler
|
||||
from typing import Dict
|
||||
|
||||
import coloredlogs
|
||||
from starlette.responses import PlainTextResponse
|
||||
from twisted.web.http import Request
|
||||
|
||||
from core.config import CoreConfig
|
||||
from core.data import Data
|
||||
|
||||
|
||||
class ChimeServlet:
|
||||
def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None:
|
||||
self.config = core_cfg
|
||||
self.config_folder = cfg_folder
|
||||
|
||||
self.data = Data(core_cfg)
|
||||
|
||||
self.logger = logging.getLogger("chimedb")
|
||||
if not hasattr(self.logger, "initted"):
|
||||
log_fmt_str = "[%(asctime)s] Chimedb | %(levelname)s | %(message)s"
|
||||
log_fmt = logging.Formatter(log_fmt_str)
|
||||
|
||||
fileHandler = TimedRotatingFileHandler(
|
||||
"{0}/{1}.log".format(self.config.server.log_dir, "chimedb"),
|
||||
when="d",
|
||||
backupCount=10,
|
||||
)
|
||||
fileHandler.setFormatter(log_fmt)
|
||||
|
||||
consoleHandler = logging.StreamHandler()
|
||||
consoleHandler.setFormatter(log_fmt)
|
||||
|
||||
self.logger.addHandler(fileHandler)
|
||||
self.logger.addHandler(consoleHandler)
|
||||
|
||||
self.logger.setLevel(self.config.aimedb.loglevel)
|
||||
coloredlogs.install(
|
||||
level=core_cfg.aimedb.loglevel, logger=self.logger, fmt=log_fmt_str
|
||||
)
|
||||
self.logger.initted = True
|
||||
|
||||
if not core_cfg.chimedb.key:
|
||||
self.logger.error("!!!KEY NOT SET!!!")
|
||||
exit(1)
|
||||
|
||||
self.logger.info("Serving")
|
||||
|
||||
async def handle_qr_alive(self, request: Request):
|
||||
return PlainTextResponse("alive")
|
||||
|
||||
async def handle_qr_lookup(self, request: Request) -> bytes:
|
||||
req = json.loads(await request.body())
|
||||
access_code = req["qrCode"][-20:]
|
||||
timestamp = req["timestamp"]
|
||||
|
||||
try:
|
||||
userId = await self._lookup(access_code)
|
||||
data = json.dumps({
|
||||
"userID": userId,
|
||||
"errorID": 0,
|
||||
"timestamp": timestamp,
|
||||
"key": self._hash_key(userId, timestamp)
|
||||
})
|
||||
except Exception as e:
|
||||
|
||||
self.logger.error(e.with_traceback(None))
|
||||
|
||||
data = json.dumps({
|
||||
"userID": -1,
|
||||
"errorID": 1,
|
||||
"timestamp": timestamp,
|
||||
"key": self._hash_key(-1, timestamp)
|
||||
})
|
||||
|
||||
return PlainTextResponse(data)
|
||||
|
||||
|
||||
def _hash_key(self, chip_id, timestamp):
|
||||
input_string = f"{chip_id}{timestamp}{self.config.chimedb.key}"
|
||||
hash_object = hashlib.sha256(input_string.encode('utf-8'))
|
||||
hex_dig = hash_object.hexdigest()
|
||||
|
||||
formatted_hex = format(int(hex_dig, 16), '064x').upper()
|
||||
|
||||
return formatted_hex
|
||||
|
||||
async def _lookup(self, access_code):
|
||||
user_id = await self.data.card.get_user_id_from_card(access_code)
|
||||
|
||||
self.logger.info(f"access_code {access_code} -> user_id {user_id}")
|
||||
|
||||
if not user_id or user_id <= 0:
|
||||
user_id = await self._register(access_code)
|
||||
|
||||
return user_id
|
||||
|
||||
async def _register(self, access_code):
|
||||
user_id = -1
|
||||
|
||||
if self.config.server.allow_user_registration:
|
||||
user_id = await self.data.user.create_user()
|
||||
|
||||
if user_id is None:
|
||||
self.logger.error("Failed to register user!")
|
||||
user_id = -1
|
||||
else:
|
||||
card_id = await self.data.card.create_card(user_id, access_code)
|
||||
|
||||
if card_id is None:
|
||||
self.logger.error("Failed to register card!")
|
||||
user_id = -1
|
||||
|
||||
self.logger.info(
|
||||
f"Register access code {access_code} -> user_id {user_id}"
|
||||
)
|
||||
else:
|
||||
self.logger.info(f"Registration blocked!: access code {access_code}")
|
||||
|
||||
return user_id
|
@ -370,6 +370,32 @@ class MuchaConfig:
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class ChimedbConfig:
|
||||
def __init__(self, parent_config: "CoreConfig") -> None:
|
||||
self.__config = parent_config
|
||||
|
||||
@property
|
||||
def enable(self) -> bool:
|
||||
return CoreConfig.get_config_field(
|
||||
self.__config, "core", "chimedb", "enable", default=True
|
||||
)
|
||||
|
||||
@property
|
||||
def loglevel(self) -> int:
|
||||
return CoreConfig.str_to_loglevel(
|
||||
CoreConfig.get_config_field(
|
||||
self.__config, "core", "chimedb", "loglevel", default="info"
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def key(self) -> str:
|
||||
return CoreConfig.get_config_field(
|
||||
self.__config, "core", "chimedb", "key", default=""
|
||||
)
|
||||
|
||||
|
||||
class CoreConfig(dict):
|
||||
def __init__(self) -> None:
|
||||
self.server = ServerConfig(self)
|
||||
@ -380,6 +406,7 @@ class CoreConfig(dict):
|
||||
self.billing = BillingConfig(self)
|
||||
self.aimedb = AimedbConfig(self)
|
||||
self.mucha = MuchaConfig(self)
|
||||
self.chimedb = ChimedbConfig(self)
|
||||
|
||||
@classmethod
|
||||
def str_to_loglevel(cls, level_str: str):
|
||||
|
42
core/crypto.py
Normal file
42
core/crypto.py
Normal file
@ -0,0 +1,42 @@
|
||||
import zlib
|
||||
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import base64
|
||||
|
||||
class CipherAES:
|
||||
def __init__(self,AES_KEY,AES_IV, BLOCK_SIZE=128, KEY_SIZE=256):
|
||||
self.BLOCK_SIZE = BLOCK_SIZE
|
||||
self.KEY_SIZE = KEY_SIZE
|
||||
self.AES_KEY = AES_KEY
|
||||
self.AES_IV = AES_IV
|
||||
|
||||
def _pad(self,data):
|
||||
block_size = self.BLOCK_SIZE // 8
|
||||
padding_length = block_size - len(data) % block_size
|
||||
return data + bytes([padding_length]) * padding_length
|
||||
|
||||
def _unpad(self, padded_data):
|
||||
pad_char = padded_data[-1]
|
||||
if not 1 <= pad_char <= self.BLOCK_SIZE // 8:
|
||||
raise ValueError("Invalid padding")
|
||||
return padded_data[:-pad_char]
|
||||
|
||||
def encrypt(self, plaintext):
|
||||
if isinstance(plaintext, str):
|
||||
plaintext = plaintext.encode('utf-8')
|
||||
backend = default_backend()
|
||||
cipher = Cipher(algorithms.AES(self.AES_KEY), modes.CBC(self.AES_IV), backend=backend)
|
||||
encryptor = cipher.encryptor()
|
||||
|
||||
padded_plaintext = self._pad(plaintext)
|
||||
ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
|
||||
return ciphertext
|
||||
|
||||
def decrypt(self, ciphertext):
|
||||
backend = default_backend()
|
||||
cipher = Cipher(algorithms.AES(self.AES_KEY), modes.CBC(self.AES_IV), backend=backend)
|
||||
decryptor = cipher.decryptor()
|
||||
|
||||
decrypted_data = decryptor.update(ciphertext) + decryptor.finalize()
|
||||
return self._unpad(decrypted_data)
|
@ -64,3 +64,7 @@ aimedb:
|
||||
|
||||
mucha:
|
||||
loglevel: "info"
|
||||
|
||||
chimedb:
|
||||
enable: False
|
||||
key: ""
|
||||
|
@ -16,4 +16,5 @@ game_codes = [
|
||||
Mai2Constants.GAME_CODE_GREEN,
|
||||
Mai2Constants.GAME_CODE,
|
||||
Mai2Constants.GAME_CODE_DX_INT,
|
||||
Mai2Constants.GAME_CODE_DX_CHN,
|
||||
]
|
||||
|
@ -29,6 +29,7 @@ class Mai2Constants:
|
||||
GAME_CODE_FINALE = "SDEY"
|
||||
GAME_CODE_DX = "SDEZ"
|
||||
GAME_CODE_DX_INT = "SDGA"
|
||||
GAME_CODE_DX_CHN = "SDGB"
|
||||
|
||||
CONFIG_NAME = "mai2.yaml"
|
||||
|
||||
|
@ -11,12 +11,11 @@ from logging.handlers import TimedRotatingFileHandler
|
||||
from os import path, mkdir
|
||||
from typing import Tuple, List, Dict
|
||||
from Crypto.Hash import MD5
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad
|
||||
|
||||
from core.config import CoreConfig
|
||||
from core.utils import Utils
|
||||
from core.title import BaseServlet
|
||||
from core.crypto import CipherAES
|
||||
from .config import Mai2Config
|
||||
from .const import Mai2Constants
|
||||
from .base import Mai2Base
|
||||
@ -321,6 +320,19 @@ class Mai2Servlet(BaseServlet):
|
||||
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL
|
||||
elif version >= 135 and version < 140: # FESTiVAL PLUS
|
||||
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL_PLUS
|
||||
elif game_code == "SDGB": # CN
|
||||
if 130 <= version < 135: # Maimai DX CN 2023 (FESTiVAL)
|
||||
internal_ver = Mai2Constants.VER_MAIMAI_DX_FESTIVAL
|
||||
|
||||
try:
|
||||
unzip = zlib.decompress(req_raw)
|
||||
|
||||
except zlib.error as e:
|
||||
self.logger.error(
|
||||
f"Failed to decompress v{version} {endpoint} request -> {e}"
|
||||
)
|
||||
return Response(zlib.compress(b'{"stat": "0"}'))
|
||||
|
||||
|
||||
if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32:
|
||||
# If we get a 32 character long hex string, it's a hash and we're
|
||||
@ -342,13 +354,12 @@ class Mai2Servlet(BaseServlet):
|
||||
endpoint = self.hash_table[internal_ver][endpoint.lower()]
|
||||
|
||||
try:
|
||||
crypt = AES.new(
|
||||
crypt = CipherAES(
|
||||
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
|
||||
AES.MODE_CBC,
|
||||
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
|
||||
)
|
||||
|
||||
req_raw = crypt.decrypt(req_raw)
|
||||
decrypted = crypt.decrypt(unzip)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
@ -360,6 +371,10 @@ class Mai2Servlet(BaseServlet):
|
||||
|
||||
encrypted = True
|
||||
|
||||
|
||||
req_data = json.loads(decrypted if encrypted else unzip)
|
||||
|
||||
|
||||
if (
|
||||
not encrypted
|
||||
and self.game_cfg.crypto.encrypted_only
|
||||
@ -371,25 +386,17 @@ class Mai2Servlet(BaseServlet):
|
||||
)
|
||||
return Response(zlib.compress(b'{"stat": "0"}'))
|
||||
|
||||
try:
|
||||
unzip = zlib.decompress(req_raw)
|
||||
|
||||
except zlib.error as e:
|
||||
self.logger.error(
|
||||
f"Failed to decompress v{version} {endpoint} request -> {e}"
|
||||
)
|
||||
return Response(zlib.compress(b'{"stat": "0"}'))
|
||||
|
||||
req_data = json.loads(unzip)
|
||||
|
||||
self.logger.info(f"v{version} {endpoint} request from {client_ip}")
|
||||
self.logger.debug(req_data)
|
||||
|
||||
endpoint = (
|
||||
endpoint.replace("MaimaiExp", "")
|
||||
if game_code == Mai2Constants.GAME_CODE_DX_INT
|
||||
else endpoint
|
||||
)
|
||||
if game_code == Mai2Constants.GAME_CODE_DX_INT:
|
||||
endpoint = endpoint.replace("MaimaiExp", "")
|
||||
elif game_code == Mai2Constants.GAME_CODE_DX_CHN:
|
||||
endpoint = endpoint.replace("MaimaiChn", "")
|
||||
|
||||
|
||||
func_to_find = "handle_" + inflection.underscore(endpoint) + "_request"
|
||||
handler_cls = self.versions[internal_ver](self.core_cfg, self.game_cfg)
|
||||
|
||||
@ -411,20 +418,19 @@ class Mai2Servlet(BaseServlet):
|
||||
|
||||
self.logger.debug(f"Response {resp}")
|
||||
|
||||
zipped = zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8"))
|
||||
|
||||
if not encrypted or version < 110:
|
||||
return Response(zipped)
|
||||
return Response(zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8")))
|
||||
|
||||
padded = pad(zipped, 16)
|
||||
# padded = pad(zipped, 16)
|
||||
|
||||
crypt = AES.new(
|
||||
crypt = CipherAES(
|
||||
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
|
||||
AES.MODE_CBC,
|
||||
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
|
||||
)
|
||||
|
||||
return Response(crypt.encrypt(padded))
|
||||
return Response(
|
||||
zlib.compress(crypt.encrypt(json.dumps(resp, ensure_ascii=False).encode("utf-8")))
|
||||
)
|
||||
|
||||
|
||||
async def handle_old_srv(self, request: Request) -> bytes:
|
||||
|
Loading…
Reference in New Issue
Block a user