from twisted.web.http import Request import yaml import logging, coloredlogs from logging.handlers import TimedRotatingFileHandler import zlib import json import base64 from os import path from typing import Tuple from titles.diva.handlers.base import * from core.config import CoreConfig from titles.diva.config import DivaConfig from titles.diva.const import DivaConstants from titles.diva.base import DivaBase class DivaServlet: def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None: self.core_cfg = core_cfg self.game_cfg = DivaConfig() if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"): self.game_cfg.update( yaml.safe_load(open(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}")) ) self.base = DivaBase(core_cfg, self.game_cfg) self.logger = logging.getLogger("diva") log_fmt_str = "[%(asctime)s] Diva | %(levelname)s | %(message)s" log_fmt = logging.Formatter(log_fmt_str) fileHandler = TimedRotatingFileHandler( "{0}/{1}.log".format(self.core_cfg.server.log_dir, "diva"), encoding="utf8", when="d", backupCount=10, ) fileHandler.setFormatter(log_fmt) consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(log_fmt) self.logger.addHandler(fileHandler) self.logger.addHandler(consoleHandler) self.logger.setLevel(self.game_cfg.server.loglevel) coloredlogs.install( level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str ) @classmethod def get_allnet_info( cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str ) -> Tuple[bool, str, str]: game_cfg = DivaConfig() if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"): game_cfg.update( yaml.safe_load(open(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}")) ) if not game_cfg.server.enable: return (False, "", "") if core_cfg.server.is_develop: return ( True, f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/", "", ) return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "") def render_POST(self, req: Request, version: int, url_path: str) -> bytes: req_raw = req.content.getvalue() url_header = req.getAllHeaders() req.responseHeaders.addRawHeader(b"content-type", b"text/plain") if "THIS_STRING_SEPARATES" in str(url_header): binary_request = req_raw.splitlines() binary_cmd_decoded = binary_request[3].decode("utf-8") req_cls = BaseBinaryRequest(binary_cmd_decoded) else: json_string = json.dumps( req_raw.decode("utf-8") ) # Take the response and decode as UTF-8 and dump b64string = json_string.replace( r"\n", "\n" ) # Remove all \n and separate them as new lines gz_string = base64.b64decode(b64string) # Decompressing the base64 string try: url_data = zlib.decompress(gz_string) # Decompressing the gzip except zlib.error as e: self.logger.error(f"Failed to defalte! {e} -> {gz_string}") return b"stat=0" try: req_cls = BaseRequest(url_data) except DivaRequestParseException as e: self.logger.error(e) return b"stat=0" self.logger.debug(f"Request: {url_data}\nHeaders: {url_header}") self.logger.info( f"{req_cls.cmd} request from {req_cls.kc_serial}/{req_cls.b_serial} at {req.getClientAddress().host}" ) handler_str = f"handle_{req_cls.cmd}_request" if not hasattr(self.base, handler_str): self.logger.warn(f"Unhandled cmd {req_cls.cmd}") return BaseResponse(req_cls.cmd, req_cls.req_id).make().encode() handler = getattr(self.base, handler_str) response = handler(req_cls.raw) if response is None or response == "": response = BaseResponse(req_cls.cmd, req_cls.req_id).make() if not response.startswith("cmd="): response = f"cmd={req_cls.cmd}&req_id={req_cls.req_id}&stat=ok&" + response self.logger.debug(f"Response: {response}") return response.encode(errors="ignore")