from starlette.requests import Request from starlette.responses import PlainTextResponse from starlette.routing import Route import yaml import logging, coloredlogs from logging.handlers import TimedRotatingFileHandler import zlib import json import base64 from os import path from typing import Tuple, Dict, List from titles.diva.handlers.base import * from core.config import CoreConfig from core.title import BaseServlet from core.utils import Utils from .config import DivaConfig from .const import DivaConstants from .base import DivaBase class DivaServlet(BaseServlet): def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None: super().__init__(core_cfg, cfg_dir) self.game_cfg = DivaConfig() if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"): self.game_cfg.update( yaml.safe_load(open(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}")) ) self.base = DivaBase(core_cfg, self.game_cfg) self.logger = logging.getLogger("diva") log_fmt_str = "[%(asctime)s] Diva | %(levelname)s | %(message)s" log_fmt = logging.Formatter(log_fmt_str) fileHandler = TimedRotatingFileHandler( "{0}/{1}.log".format(self.core_cfg.server.log_dir, "diva"), encoding="utf8", when="d", backupCount=10, ) fileHandler.setFormatter(log_fmt) consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(log_fmt) self.logger.addHandler(fileHandler) self.logger.addHandler(consoleHandler) self.logger.setLevel(self.game_cfg.server.loglevel) coloredlogs.install( level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str ) def get_routes(self) -> List[Route]: return [ Route("/DivaServlet/", self.render_POST, methods=['POST']) ] def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]: if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80: return (f"http://{self.core_cfg.server.hostname}:{Utils.get_title_port(self.core_cfg)}/DivaServlet/", self.core_cfg.server.hostname) return (f"http://{self.core_cfg.server.hostname}/DivaServlet/", self.core_cfg.server.hostname) @classmethod def is_game_enabled( cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str ) -> bool: game_cfg = DivaConfig() if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"): game_cfg.update( yaml.safe_load(open(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}")) ) if not game_cfg.server.enable: return False return True async def render_POST(self, request: Request, game_code: str, matchers: Dict) -> bytes: req_raw = await request.body() url_header = request.headers # Ping Dispatch if "THIS_STRING_SEPARATES" in url_header: binary_request = req_raw.splitlines() binary_cmd_decoded = binary_request[3].decode("utf-8") url_data = binary_cmd_decoded # for logging req_cls = BaseBinaryRequest(binary_cmd_decoded) else: json_string = json.dumps( req_raw.decode("utf-8") ) # Take the response and decode as UTF-8 and dump b64string = json_string.replace( r"\n", "\n" ) # Remove all \n and separate them as new lines gz_string = base64.b64decode(b64string) # Decompressing the base64 string try: url_data = zlib.decompress(gz_string) # Decompressing the gzip except zlib.error as e: self.logger.error(f"Failed to defalte! {e} -> {gz_string}") return PlainTextResponse("stat=0") try: req_cls = BaseRequest(url_data) except DivaRequestParseException as e: self.logger.error(e) return PlainTextResponse("stat=0") self.logger.debug(f"Request: {url_data}\nHeaders: {url_header}") self.logger.info( f"{req_cls.cmd} request from {req_cls.kc_serial}/{req_cls.b_serial} at {Utils.get_ip_addr(request)}" ) handler_str = f"handle_{req_cls.cmd}_request" if not hasattr(self.base, handler_str): self.logger.warn(f"Unhandled cmd {req_cls.cmd}") return PlainTextResponse(BaseResponse(req_cls.cmd, req_cls.req_id).make()) handler = getattr(self.base, handler_str) response = handler(req_cls.raw) if response is None or response == "": response = BaseResponse(req_cls.cmd, req_cls.req_id).make() if not response.startswith("cmd="): response = f"cmd={req_cls.cmd}&req_id={req_cls.req_id}&stat=ok" + response self.logger.debug(f"Response: {response}") return PlainTextResponse(response)