From 090b3148d8e57d6ecd5178c0f7740ef0e737de63 Mon Sep 17 00:00:00 2001 From: Midorica Date: Sat, 9 Sep 2023 12:03:42 -0400 Subject: [PATCH] adding encryption support for ongeki --- example_config/ongeki.yaml | 3 + titles/ongeki/config.py | 21 ++++++ titles/ongeki/index.py | 129 +++++++++++++++++++++++++++++++------ 3 files changed, 133 insertions(+), 20 deletions(-) diff --git a/example_config/ongeki.yaml b/example_config/ongeki.yaml index 90233b3..e4088c0 100644 --- a/example_config/ongeki.yaml +++ b/example_config/ongeki.yaml @@ -35,3 +35,6 @@ version: card_maker: 1.30.01 7: card_maker: 1.35.03 + +crypto: + encrypted_only: False \ No newline at end of file diff --git a/titles/ongeki/config.py b/titles/ongeki/config.py index c20b1ed..2321af6 100644 --- a/titles/ongeki/config.py +++ b/titles/ongeki/config.py @@ -48,9 +48,30 @@ class OngekiCardMakerVersionConfig: self.__config, "ongeki", "version", default={} ).get(version) +class OngekiCryptoConfig: + def __init__(self, parent_config: "OngekiConfig") -> None: + self.__config = parent_config + + @property + def keys(self) -> Dict: + """ + in the form of: + internal_version: [key, iv] + all values are hex strings + """ + return CoreConfig.get_config_field( + self.__config, "ongeki", "crypto", "keys", default={} + ) + + @property + def encrypted_only(self) -> bool: + return CoreConfig.get_config_field( + self.__config, "ongeki", "crypto", "encrypted_only", default=False + ) class OngekiConfig(dict): def __init__(self) -> None: self.server = OngekiServerConfig(self) self.gachas = OngekiGachaConfig(self) self.version = OngekiCardMakerVersionConfig(self) + self.crypto = OngekiCryptoConfig(self) diff --git a/titles/ongeki/index.py b/titles/ongeki/index.py index af206e9..a89e8c2 100644 --- a/titles/ongeki/index.py +++ b/titles/ongeki/index.py @@ -7,6 +7,10 @@ import logging import coloredlogs import zlib from logging.handlers import TimedRotatingFileHandler +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad +from Crypto.Protocol.KDF import PBKDF2 +from Crypto.Hash import SHA1 from os import path from typing import Tuple @@ -28,6 +32,7 @@ class OngekiServlet: def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None: self.core_cfg = core_cfg self.game_cfg = OngekiConfig() + self.hash_table: Dict[Dict[str, str]] = {} if path.exists(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}"): self.game_cfg.update( yaml.safe_load(open(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}")) @@ -45,27 +50,60 @@ class OngekiServlet: ] self.logger = logging.getLogger("ongeki") - log_fmt_str = "[%(asctime)s] Ongeki | %(levelname)s | %(message)s" - log_fmt = logging.Formatter(log_fmt_str) - fileHandler = TimedRotatingFileHandler( - "{0}/{1}.log".format(self.core_cfg.server.log_dir, "ongeki"), - encoding="utf8", - when="d", - backupCount=10, - ) - fileHandler.setFormatter(log_fmt) + if not hasattr(self.logger, "inited"): + log_fmt_str = "[%(asctime)s] Ongeki | %(levelname)s | %(message)s" + log_fmt = logging.Formatter(log_fmt_str) + fileHandler = TimedRotatingFileHandler( + "{0}/{1}.log".format(self.core_cfg.server.log_dir, "ongeki"), + encoding="utf8", + when="d", + backupCount=10, + ) - consoleHandler = logging.StreamHandler() - consoleHandler.setFormatter(log_fmt) + fileHandler.setFormatter(log_fmt) - self.logger.addHandler(fileHandler) - self.logger.addHandler(consoleHandler) + consoleHandler = logging.StreamHandler() + consoleHandler.setFormatter(log_fmt) - self.logger.setLevel(self.game_cfg.server.loglevel) - coloredlogs.install( - level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str - ) + self.logger.addHandler(fileHandler) + self.logger.addHandler(consoleHandler) + + self.logger.setLevel(self.game_cfg.server.loglevel) + coloredlogs.install( + level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str + ) + self.logger.inited = True + + for version, keys in self.game_cfg.crypto.keys.items(): + if len(keys) < 3: + continue + + self.hash_table[version] = {} + + method_list = [ + method + for method in dir(self.versions[version]) + if not method.startswith("__") + ] + for method in method_list: + method_fixed = inflection.camelize(method)[6:-7] + # number of iterations is 64 on Bright Memory + iter_count = 64 + hash = PBKDF2( + method_fixed, + bytes.fromhex(keys[2]), + 128, + count=iter_count, + hmac_hash_module=SHA1, + ) + + hashed_name = hash.hex()[:32] # truncate unused bytes like the game does + self.hash_table[version][hashed_name] = method_fixed + + self.logger.debug( + f"Hashed v{version} method {method_fixed} with {bytes.fromhex(keys[2])} to get {hash.hex()[:32]}" + ) @classmethod def get_allnet_info( @@ -100,6 +138,7 @@ class OngekiServlet: req_raw = request.content.getvalue() url_split = url_path.split("/") + encrtped = False internal_ver = 0 endpoint = url_split[len(url_split) - 1] client_ip = Utils.get_ip_addr(request) @@ -125,8 +164,45 @@ class OngekiServlet: # If we get a 32 character long hex string, it's a hash and we're # doing encrypted. The likelyhood of false positives is low but # technically not 0 - self.logger.error("Encryption not supported at this time") - return b"" + if internal_ver not in self.hash_table: + self.logger.error( + f"v{version} does not support encryption or no keys entered" + ) + return zlib.compress(b'{"stat": "0"}') + + elif endpoint.lower() not in self.hash_table[internal_ver]: + self.logger.error( + f"No hash found for v{version} endpoint {endpoint}" + ) + return zlib.compress(b'{"stat": "0"}') + + endpoint = self.hash_table[internal_ver][endpoint.lower()] + + try: + crypt = AES.new( + bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]), + AES.MODE_CBC, + bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]), + ) + + req_raw = crypt.decrypt(req_raw) + + except Exception as e: + self.logger.error( + f"Failed to decrypt v{version} request to {endpoint} -> {e}" + ) + return zlib.compress(b'{"stat": "0"}') + + encrtped = True + + if ( + not encrtped + and self.game_cfg.crypto.encrypted_only + ): + self.logger.error( + f"Unencrypted v{version} {endpoint} request, but config is set to encrypted only: {req_raw}" + ) + return zlib.compress(b'{"stat": "0"}') try: unzip = zlib.decompress(req_raw) @@ -163,4 +239,17 @@ class OngekiServlet: self.logger.debug(f"Response {resp}") - return zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8")) + zipped = zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8")) + + if not encrtped: + return zipped + + padded = pad(zipped, 16) + + crypt = AES.new( + bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]), + AES.MODE_CBC, + bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]), + ) + + return crypt.encrypt(padded) \ No newline at end of file