from typing import Tuple, Dict, List from twisted.web.http import Request import yaml import logging, coloredlogs from logging.handlers import TimedRotatingFileHandler from os import path from Crypto.Cipher import Blowfish from hashlib import md5 import secrets from core import CoreConfig, Utils from core.title import BaseServlet from titles.sao.config import SaoConfig from titles.sao.const import SaoConstants from titles.sao.base import SaoBase from titles.sao.handlers.base import * class SaoServlet(BaseServlet): def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None: super().__init__(core_cfg, cfg_dir) self.config_dir = cfg_dir self.game_cfg = SaoConfig() if path.exists(f"{cfg_dir}/sao.yaml"): self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/sao.yaml"))) self.logger = logging.getLogger("sao") if not hasattr(self.logger, "inited"): log_fmt_str = "[%(asctime)s] SAO | %(levelname)s | %(message)s" log_fmt = logging.Formatter(log_fmt_str) fileHandler = TimedRotatingFileHandler( "{0}/{1}.log".format(self.core_cfg.server.log_dir, "sao"), encoding="utf8", when="d", backupCount=10, ) fileHandler.setFormatter(log_fmt) consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(log_fmt) self.logger.addHandler(fileHandler) self.logger.addHandler(consoleHandler) self.logger.setLevel(self.game_cfg.server.loglevel) coloredlogs.install( level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str ) self.logger.inited = True self.base = SaoBase(core_cfg, self.game_cfg) self.static_hash = None if self.game_cfg.hash.verify_hash: self.static_hash = md5(self.game_cfg.hash.hash_base.encode()).digest() # Greate hashing guys, really validates the data def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]: return ( [], [("render_POST", "/{datecode}/proto/if/{category}/{endpoint}", {})] ) @classmethod def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool: game_cfg = SaoConfig() if path.exists(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"): game_cfg.update( yaml.safe_load(open(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}")) ) if not game_cfg.server.enable: return False return True def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]: port_ssl = Utils.get_title_port_ssl(self.core_cfg) port_normal = Utils.get_title_port(self.core_cfg) proto = "http" port = f":{port_normal}" if not self.core_cfg.server.is_using_proxy and port_normal != 80 else "" if self.game_cfg.server.use_https: proto = "https" port = f":{port_ssl}" if not self.core_cfg.server.is_using_proxy and port_ssl != 443 else "" return (f"{proto}://{self.core_cfg.title.hostname}{port}/", "") def get_mucha_info(self, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str]: if not self.game_cfg.server.enable: return (False, "") return (True, "SAO1") def render_POST(self, request: Request, game_code: str, matchers: Dict) -> bytes: endpoint = matchers.get('endpoint', '') request.responseHeaders.addRawHeader(b"content-type", b"text/html; charset=utf-8") iv = b"" req_raw = request.content.read() if len(req_raw) < 40: self.logger.warn(f"Malformed request to {endpoint} - {req_raw.hex()}") return b"" req_header = SaoRequestHeader(req_raw) cmd_str = f"{req_header.cmd:04x}" if self.game_cfg.hash.verify_hash and self.static_hash != req_header.hash: self.logger.error(f"Hash mismatch! Expecting {self.static_hash} but recieved {req_header.hash}") return b"" if self.game_cfg.crypt.enable: iv = req_raw[40:48] cipher = Blowfish.new(self.game_cfg.crypt.key.encode(), Blowfish.MODE_CBC, iv) crypt_data = req_raw[48:] req_data = cipher.decrypt(crypt_data) self.logger.debug(f"Decrypted {req_data.hex()} with IV {iv.hex()}") else: req_data = req_raw[40:] handler = getattr(self.base, f"handle_{cmd_str}", self.base.handle_noop) self.logger.info(f"{endpoint} - {cmd_str} request") self.logger.debug(f"Request: {req_raw.hex()}") resp = handler(req_header, req_data) if resp is None: resp = SaoNoopResponse(req_header.cmd + 1).make() if type(resp) == bytes: pass elif issubclass(resp, SaoBaseResponse): resp = resp.make() else: self.logger.error(f"Unknown response type {type(resp)}") return b"" self.logger.debug(f"Response: {resp.hex()}") if self.game_cfg.crypt.enable: iv = secrets.token_bytes(8) data_to_crypt = resp[24:] while len(data_to_crypt) % 8 != 0: data_to_crypt += b"\x00" cipher = Blowfish.new(self.game_cfg.crypt.key.encode(), Blowfish.MODE_CBC, iv) data_crypt = cipher.encrypt(data_to_crypt) crypt_data_len = len(data_crypt) + len(iv) tmp = struct.pack("!I", crypt_data_len) # does it want the length of the encrypted response?? resp = resp[:20] + tmp + iv + data_crypt self.logger.debug(f"Encrypted Response: {resp.hex()}") return resp