from starlette.requests import Request from starlette.routing import Route from starlette.responses import Response import logging import coloredlogs from logging.handlers import TimedRotatingFileHandler import zlib import yaml import json import inflection import string from Crypto.Cipher import AES from Crypto.Util.Padding import pad from Crypto.Protocol.KDF import PBKDF2 from Crypto.Hash import SHA1 from os import path from typing import Tuple, Dict, List from core import CoreConfig, Utils from core.title import BaseServlet from .config import ChuniConfig from .const import ChuniConstants from .base import ChuniBase from .plus import ChuniPlus from .air import ChuniAir from .airplus import ChuniAirPlus from .star import ChuniStar from .starplus import ChuniStarPlus from .amazon import ChuniAmazon from .amazonplus import ChuniAmazonPlus from .crystal import ChuniCrystal from .crystalplus import ChuniCrystalPlus from .paradise import ChuniParadise from .new import ChuniNew from .newplus import ChuniNewPlus from .sun import ChuniSun from .sunplus import ChuniSunPlus from .luminous import ChuniLuminous class ChuniServlet(BaseServlet): def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None: super().__init__(core_cfg, cfg_dir) self.game_cfg = ChuniConfig() self.hash_table: Dict[str, Dict[str, str]] = {} if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"): self.game_cfg.update( yaml.safe_load(open(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}")) ) self.versions = [ ChuniBase, ChuniPlus, ChuniAir, ChuniAirPlus, ChuniStar, ChuniStarPlus, ChuniAmazon, ChuniAmazonPlus, ChuniCrystal, ChuniCrystalPlus, ChuniParadise, ChuniNew, ChuniNewPlus, ChuniSun, ChuniSunPlus, ChuniLuminous, ] self.logger = logging.getLogger("chuni") if not hasattr(self.logger, "inited"): log_fmt_str = "[%(asctime)s] Chunithm | %(levelname)s | %(message)s" log_fmt = logging.Formatter(log_fmt_str) fileHandler = TimedRotatingFileHandler( "{0}/{1}.log".format(self.core_cfg.server.log_dir, "chuni"), encoding="utf8", when="d", backupCount=10, ) fileHandler.setFormatter(log_fmt) consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(log_fmt) self.logger.addHandler(fileHandler) self.logger.addHandler(consoleHandler) self.logger.setLevel(self.game_cfg.server.loglevel) coloredlogs.install( level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str ) self.logger.inited = True known_iter_counts = { ChuniConstants.VER_CHUNITHM_CRYSTAL_PLUS: 67, ChuniConstants.VER_CHUNITHM_PARADISE: 44, f"{ChuniConstants.VER_CHUNITHM_PARADISE}_int": 25, ChuniConstants.VER_CHUNITHM_NEW: 54, f"{ChuniConstants.VER_CHUNITHM_NEW}_int": 49, ChuniConstants.VER_CHUNITHM_NEW_PLUS: 25, ChuniConstants.VER_CHUNITHM_SUN: 70, ChuniConstants.VER_CHUNITHM_SUN_PLUS: 36, ChuniConstants.VER_CHUNITHM_LUMINOUS: 8, } for version, keys in self.game_cfg.crypto.keys.items(): if len(keys) < 3: continue if isinstance(version, int): version_idx = version else: version_idx = int(version.split("_")[0]) salt = bytes.fromhex(keys[2]) if len(keys) >= 4: iter_count = keys[3] elif (iter_count := known_iter_counts.get(version)) is None: self.logger.error( "Number of iteration rounds for version %s is not known, but it is not specified in the config", version, ) continue self.hash_table[version] = {} method_list = [ method for method in dir(self.versions[version_idx]) if not method.startswith("__") ] for method in method_list: method_fixed = inflection.camelize(method)[6:-7] # This only applies for CHUNITHM NEW International and later for some reason. # CHUNITHM SUPERSTAR (PLUS) did not add "Exp" to the endpoint when hashing. if ( isinstance(version, str) and version.endswith("_int") and version_idx >= ChuniConstants.VER_CHUNITHM_NEW ): method_fixed += "C3Exp" hash = PBKDF2( method_fixed, salt, 128, count=iter_count, hmac_hash_module=SHA1, ) hashed_name = hash.hex()[:32] # truncate unused bytes like the game does self.hash_table[version][hashed_name] = method_fixed self.logger.debug( f"Hashed v{version} method {method_fixed} with {salt} to get {hashed_name}" ) @classmethod def is_game_enabled( cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str ) -> bool: game_cfg = ChuniConfig() if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"): game_cfg.update( yaml.safe_load(open(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}")) ) if not game_cfg.server.enable: return False return True def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]: if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80: return (f"http://{self.core_cfg.server.hostname}:{Utils.get_title_port(self.core_cfg)}/{game_code}/{game_ver}/", self.core_cfg.server.hostname) return (f"http://{self.core_cfg.server.hostname}/{game_code}/{game_ver}/", self.core_cfg.server.hostname) def get_routes(self) -> List[Route]: return [ Route("/{game:str}/{version:int}/ChuniServlet/{endpoint:str}", self.render_POST, methods=['POST']), Route("/{game:str}/{version:int}/ChuniServlet/MatchingServer/{endpoint:str}", self.render_POST, methods=['POST']), ] async def render_POST(self, request: Request) -> bytes: endpoint: str = request.path_params.get('endpoint') version: int = request.path_params.get('version') game_code: str = request.path_params.get('game') if endpoint.lower() == "ping": return Response(zlib.compress(b'{"returnCode": "1"}')) req_raw = await request.body() encrtped = False internal_ver = 0 client_ip = Utils.get_ip_addr(request) if game_code == "SDHD" or game_code == "SDBT": # JP if version < 105: # 1.0 internal_ver = ChuniConstants.VER_CHUNITHM elif version >= 105 and version < 110: # PLUS internal_ver = ChuniConstants.VER_CHUNITHM_PLUS elif version >= 110 and version < 115: # AIR internal_ver = ChuniConstants.VER_CHUNITHM_AIR elif version >= 115 and version < 120: # AIR PLUS internal_ver = ChuniConstants.VER_CHUNITHM_AIR_PLUS elif version >= 120 and version < 125: # STAR internal_ver = ChuniConstants.VER_CHUNITHM_STAR elif version >= 125 and version < 130: # STAR PLUS internal_ver = ChuniConstants.VER_CHUNITHM_STAR_PLUS elif version >= 130 and version < 135: # AMAZON internal_ver = ChuniConstants.VER_CHUNITHM_AMAZON elif version >= 135 and version < 140: # AMAZON PLUS internal_ver = ChuniConstants.VER_CHUNITHM_AMAZON_PLUS elif version >= 140 and version < 145: # CRYSTAL internal_ver = ChuniConstants.VER_CHUNITHM_CRYSTAL elif version >= 145 and version < 150: # CRYSTAL PLUS internal_ver = ChuniConstants.VER_CHUNITHM_CRYSTAL_PLUS elif version >= 150 and version < 200: # PARADISE internal_ver = ChuniConstants.VER_CHUNITHM_PARADISE elif version >= 200 and version < 205: # NEW!! internal_ver = ChuniConstants.VER_CHUNITHM_NEW elif version >= 205 and version < 210: # NEW PLUS!! internal_ver = ChuniConstants.VER_CHUNITHM_NEW_PLUS elif version >= 210 and version < 215: # SUN internal_ver = ChuniConstants.VER_CHUNITHM_SUN elif version >= 215 and version < 220: # SUN PLUS internal_ver = ChuniConstants.VER_CHUNITHM_SUN_PLUS elif version >= 220: # LUMINOUS internal_ver = ChuniConstants.VER_CHUNITHM_LUMINOUS elif game_code == "SDGS": # Int if version < 110: # SUPERSTAR / SUPERSTAR PLUS internal_ver = ChuniConstants.VER_CHUNITHM_PARADISE # SUPERSTAR / SUPERSTAR PLUS worked fine with it elif version >= 110 and version < 115: # NEW internal_ver = ChuniConstants.VER_CHUNITHM_NEW elif version >= 115 and version < 120: # NEW PLUS!! internal_ver = ChuniConstants.VER_CHUNITHM_NEW_PLUS elif version >= 120 and version < 125: # SUN internal_ver = ChuniConstants.VER_CHUNITHM_SUN elif version >= 125 and version < 130: # SUN PLUS internal_ver = ChuniConstants.VER_CHUNITHM_SUN_PLUS elif version >= 130: # LUMINOUS internal_ver = ChuniConstants.VER_CHUNITHM_LUMINOUS if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32: # If we get a 32 character long hex string, it's a hash and we're # doing encrypted. The likelihood of false positives is low but # technically not 0 if game_code == "SDGS": crypto_cfg_key = f"{internal_ver}_int" hash_table_key = f"{internal_ver}_int" else: crypto_cfg_key = internal_ver hash_table_key = internal_ver if internal_ver < ChuniConstants.VER_CHUNITHM_NEW: endpoint = request.headers.get("User-Agent").split("#")[0] else: if hash_table_key not in self.hash_table: self.logger.error( f"v{version} does not support encryption or no keys entered" ) return Response(zlib.compress(b'{"stat": "0"}')) elif endpoint.lower() not in self.hash_table[hash_table_key]: self.logger.error( f"No hash found for v{version} endpoint {endpoint}" ) return Response(zlib.compress(b'{"stat": "0"}')) endpoint = self.hash_table[hash_table_key][endpoint.lower()] try: crypt = AES.new( bytes.fromhex(self.game_cfg.crypto.keys[crypto_cfg_key][0]), AES.MODE_CBC, bytes.fromhex(self.game_cfg.crypto.keys[crypto_cfg_key][1]), ) req_raw = crypt.decrypt(req_raw) except Exception as e: self.logger.error( f"Failed to decrypt v{version} request to {endpoint} -> {e}" ) return Response(zlib.compress(b'{"stat": "0"}')) encrtped = True if ( not encrtped and self.game_cfg.crypto.encrypted_only and internal_ver >= ChuniConstants.VER_CHUNITHM_CRYSTAL_PLUS ): self.logger.error( f"Unencrypted v{version} {endpoint} request, but config is set to encrypted only: {req_raw}" ) return Response(zlib.compress(b'{"stat": "0"}')) try: unzip = zlib.decompress(req_raw) except zlib.error as e: self.logger.error( f"Failed to decompress v{version} {endpoint} request -> {e}" ) return Response(zlib.compress(b'{"stat": "0"}')) req_data = json.loads(unzip) self.logger.info(f"v{version} {endpoint} request from {client_ip}") self.logger.debug(req_data) if game_code == "SDGS" and version >= 110: endpoint = endpoint.replace("C3Exp", "") elif game_code == "SDGS" and version < 110: endpoint = endpoint.replace("Exp", "") else: endpoint = endpoint func_to_find = "handle_" + inflection.underscore(endpoint) + "_request" handler_cls = self.versions[internal_ver](self.core_cfg, self.game_cfg) if not hasattr(handler_cls, func_to_find): self.logger.warning(f"Unhandled v{version} request {endpoint}") resp = {"returnCode": 1} else: try: handler = getattr(handler_cls, func_to_find) resp = await handler(req_data) except Exception as e: self.logger.error(f"Error handling v{version} method {endpoint} - {e}") return Response(zlib.compress(b'{"stat": "0"}')) if resp is None: resp = {"returnCode": 1} self.logger.debug(f"Response {resp}") zipped = zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8")) if not encrtped: return Response(zipped) padded = pad(zipped, 16) crypt = AES.new( bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]), AES.MODE_CBC, bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]), ) return Response(crypt.encrypt(padded))