chuni: add method hashing support

This commit is contained in:
Hay1tsme 2023-03-16 21:27:03 -04:00
parent 2af7751504
commit 8b718c601f
1 changed files with 54 additions and 22 deletions

View File

@ -8,8 +8,10 @@ import inflection
import string
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA1
from os import path
from typing import Tuple
from typing import Tuple, Dict
from core import CoreConfig, Utils
from titles.chuni.config import ChuniConfig
@ -33,27 +35,43 @@ class ChuniServlet:
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = ChuniConfig()
self.hash_table: Dict[Dict[str, str]] = {}
if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"):
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"))
)
self.versions = [
ChuniBase(core_cfg, self.game_cfg),
ChuniPlus(core_cfg, self.game_cfg),
ChuniAir(core_cfg, self.game_cfg),
ChuniAirPlus(core_cfg, self.game_cfg),
ChuniStar(core_cfg, self.game_cfg),
ChuniStarPlus(core_cfg, self.game_cfg),
ChuniAmazon(core_cfg, self.game_cfg),
ChuniAmazonPlus(core_cfg, self.game_cfg),
ChuniCrystal(core_cfg, self.game_cfg),
ChuniCrystalPlus(core_cfg, self.game_cfg),
ChuniParadise(core_cfg, self.game_cfg),
ChuniNew(core_cfg, self.game_cfg),
ChuniNewPlus(core_cfg, self.game_cfg),
ChuniBase,
ChuniPlus,
ChuniAir,
ChuniAirPlus,
ChuniStar,
ChuniStarPlus,
ChuniAmazon,
ChuniAmazonPlus,
ChuniCrystal,
ChuniCrystalPlus,
ChuniParadise,
ChuniNew,
ChuniNewPlus,
]
for version, keys in self.game_cfg.crypto.keys.items():
if len(keys) < 3:
continue
self.hash_table[version] = {}
method_list = [method for method in dir(self.versions[version]) if not method.startswith('__')]
for method in method_list:
method_fixed = inflection.camelize(method)
hash = PBKDF2(method_fixed, bytes.fromhex(keys[2]), 128, count=44, hmac_hash_module=SHA1)
self.hash_table[version][hash.hex()] = method_fixed
self.logger.debug(f"Hashed v{version} method {method_fixed} with {bytes.fromhex(keys[2])} to get {hash.hex()}")
self.logger = logging.getLogger("chuni")
if not hasattr(self.logger, "inited"):
@ -144,25 +162,38 @@ class ChuniServlet:
# If we get a 32 character long hex string, it's a hash and we're
# doing encrypted. The likelyhood of false positives is low but
# technically not 0
endpoint = request.getHeader("User-Agent").split("#")[0]
if internal_ver < ChuniConstants.VER_CHUNITHM_NEW:
endpoint = request.getHeader("User-Agent").split("#")[0]
else:
if internal_ver not in self.hash_table:
self.logger.error(f"v{version} does not support encryption or no keys entered")
return zlib.compress(b'{"stat": "0"}')
elif endpoint.lower() not in self.hash_table[internal_ver]:
self.logger.error(f"No hash found for v{version} endpoint {endpoint}")
return zlib.compress(b'{"stat": "0"}')
endpoint = self.hash_table[internal_ver][endpoint.lower()]
try:
crypt = AES.new(
bytes.fromhex(self.game_cfg.crypto.keys[str(internal_ver)][0]),
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][0]),
AES.MODE_CBC,
bytes.fromhex(self.game_cfg.crypto.keys[str(internal_ver)][1]),
bytes.fromhex(self.game_cfg.crypto.keys[internal_ver][1]),
)
req_raw = crypt.decrypt(req_raw)
except:
except Exception as e:
self.logger.error(
f"Failed to decrypt v{version} request to {endpoint} -> {req_raw}"
f"Failed to decrypt v{version} request to {endpoint} -> {e}"
)
return zlib.compress(b'{"stat": "0"}')
encrtped = True
if not encrtped and self.game_cfg.crypto.encrypted_only:
if not encrtped and self.game_cfg.crypto.encrypted_only and internal_ver >= ChuniConstants.VER_CHUNITHM_CRYSTAL_PLUS:
self.logger.error(
f"Unencrypted v{version} {endpoint} request, but config is set to encrypted only: {req_raw}"
)
@ -185,14 +216,15 @@ class ChuniServlet:
self.logger.debug(req_data)
func_to_find = "handle_" + inflection.underscore(endpoint) + "_request"
handler_cls = self.versions[internal_ver](self.core_cfg, self.game_cfg)
if not hasattr(self.versions[internal_ver], func_to_find):
if not hasattr(handler_cls, func_to_find):
self.logger.warning(f"Unhandled v{version} request {endpoint}")
resp = {"returnCode": 1}
else:
try:
handler = getattr(self.versions[internal_ver], func_to_find)
handler = getattr(handler_cls, func_to_find)
resp = handler(req_data)
except Exception as e: