adding encryption support for ongeki

This commit is contained in:
Midorica 2023-09-09 12:03:42 -04:00
parent 238e39f415
commit 090b3148d8
3 changed files with 133 additions and 20 deletions

View File

@ -35,3 +35,6 @@ version:
card_maker: 1.30.01
7:
card_maker: 1.35.03
crypto:
encrypted_only: False

View File

@ -48,9 +48,30 @@ class OngekiCardMakerVersionConfig:
self.__config, "ongeki", "version", default={}
).get(version)
class OngekiCryptoConfig:
def __init__(self, parent_config: "OngekiConfig") -> None:
self.__config = parent_config
@property
def keys(self) -> Dict:
"""
in the form of:
internal_version: [key, iv]
all values are hex strings
"""
return CoreConfig.get_config_field(
self.__config, "ongeki", "crypto", "keys", default={}
)
@property
def encrypted_only(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "ongeki", "crypto", "encrypted_only", default=False
)
class OngekiConfig(dict):
def __init__(self) -> None:
self.server = OngekiServerConfig(self)
self.gachas = OngekiGachaConfig(self)
self.version = OngekiCardMakerVersionConfig(self)
self.crypto = OngekiCryptoConfig(self)

View File

@ -7,6 +7,10 @@ import logging
import coloredlogs
import zlib
from logging.handlers import TimedRotatingFileHandler
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA1
from os import path
from typing import Tuple
@ -28,6 +32,7 @@ class OngekiServlet:
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = OngekiConfig()
self.hash_table: Dict[Dict[str, str]] = {}
if path.exists(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}"):
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}"))
@ -45,27 +50,60 @@ class OngekiServlet:
]
self.logger = logging.getLogger("ongeki")
log_fmt_str = "[%(asctime)s] Ongeki | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "ongeki"),
encoding="utf8",
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
if not hasattr(self.logger, "inited"):
log_fmt_str = "[%(asctime)s] Ongeki | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "ongeki"),
encoding="utf8",
when="d",
backupCount=10,
)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
fileHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.inited = True
for version, keys in self.game_cfg.crypto.keys.items():
if len(keys) < 3:
continue
self.hash_table[version] = {}
method_list = [
method
for method in dir(self.versions[version])
if not method.startswith("__")
]
for method in method_list:
method_fixed = inflection.camelize(method)[6:-7]
# number of iterations is 64 on Bright Memory
iter_count = 64
hash = PBKDF2(
method_fixed,
bytes.fromhex(keys[2]),
128,
count=iter_count,
hmac_hash_module=SHA1,
)
hashed_name = hash.hex()[:32] # truncate unused bytes like the game does
self.hash_table[version][hashed_name] = method_fixed
self.logger.debug(
f"Hashed v{version} method {method_fixed} with {bytes.fromhex(keys[2])} to get {hash.hex()[:32]}"
)
@classmethod
def get_allnet_info(
@ -100,6 +138,7 @@ class OngekiServlet:
req_raw = request.content.getvalue()
url_split = url_path.split("/")
encrtped = False
internal_ver = 0
endpoint = url_split[len(url_split) - 1]
client_ip = Utils.get_ip_addr(request)
@ -125,8 +164,45 @@ class OngekiServlet:
# If we get a 32 character long hex string, it's a hash and we're
# doing encrypted. The likelyhood of false positives is low but
# technically not 0
self.logger.error("Encryption not supported at this time")
return b""
if internal_ver not in self.hash_table:
self.logger.error(
f"v{version} does not support encryption or no keys entered"
)
return zlib.compress(b'{"stat": "0"}')
elif endpoint.lower() not in self.hash_table[internal_ver]:
self.logger.error(
f"No hash found for v{version} endpoint {endpoint}"
)
return zlib.compress(b'{"stat": "0"}')
endpoint = self.hash_table[internal_ver][endpoint.lower()]
try:
crypt = AES.new(
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
AES.MODE_CBC,
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
)
req_raw = crypt.decrypt(req_raw)
except Exception as e:
self.logger.error(
f"Failed to decrypt v{version} request to {endpoint} -> {e}"
)
return zlib.compress(b'{"stat": "0"}')
encrtped = True
if (
not encrtped
and self.game_cfg.crypto.encrypted_only
):
self.logger.error(
f"Unencrypted v{version} {endpoint} request, but config is set to encrypted only: {req_raw}"
)
return zlib.compress(b'{"stat": "0"}')
try:
unzip = zlib.decompress(req_raw)
@ -163,4 +239,17 @@ class OngekiServlet:
self.logger.debug(f"Response {resp}")
return zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8"))
zipped = zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8"))
if not encrtped:
return zipped
padded = pad(zipped, 16)
crypt = AES.new(
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
AES.MODE_CBC,
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
)
return crypt.encrypt(padded)