1
0
Fork 0
artemis/titles/diva/index.py

128 lines
4.4 KiB
Python

from twisted.web.http import Request
import yaml
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
import zlib
import json
import base64
from os import path
from typing import Tuple
from titles.diva.handlers.base import *
from core.config import CoreConfig
from titles.diva.config import DivaConfig
from titles.diva.const import DivaConstants
from titles.diva.base import DivaBase
class DivaServlet:
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = DivaConfig()
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"))
)
self.base = DivaBase(core_cfg, self.game_cfg)
self.logger = logging.getLogger("diva")
log_fmt_str = "[%(asctime)s] Diva | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "diva"),
encoding="utf8",
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
@classmethod
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
game_cfg = DivaConfig()
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
"",
)
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def render_POST(self, req: Request, version: int, url_path: str) -> bytes:
req_raw = req.content.getvalue()
url_header = req.getAllHeaders()
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
if "THIS_STRING_SEPARATES" in str(url_header):
binary_request = req_raw.splitlines()
binary_cmd_decoded = binary_request[3].decode("utf-8")
req_cls = BaseBinaryRequest(binary_cmd_decoded)
else:
json_string = json.dumps(
req_raw.decode("utf-8")
) # Take the response and decode as UTF-8 and dump
b64string = json_string.replace(
r"\n", "\n"
) # Remove all \n and separate them as new lines
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
try:
url_data = zlib.decompress(gz_string) # Decompressing the gzip
except zlib.error as e:
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
return b"stat=0"
try:
req_cls = BaseRequest(url_data)
except DivaRequestParseException as e:
self.logger.error(e)
return b"stat=0"
self.logger.debug(f"Request: {url_data}\nHeaders: {url_header}")
self.logger.info(
f"{req_cls.cmd} request from {req_cls.kc_serial}/{req_cls.b_serial} at {req.getClientAddress().host}"
)
handler_str = f"handle_{req_cls.cmd}_request"
if not hasattr(self.base, handler_str):
self.logger.warn(f"Unhandled cmd {req_cls.cmd}")
return BaseResponse(req_cls.cmd, req_cls.req_id).make().encode()
handler = getattr(self.base, handler_str)
response = handler(req_cls.raw)
if response is None or response == "":
response = BaseResponse(req_cls.cmd, req_cls.req_id).make()
if not response.startswith("cmd="):
response = f"cmd={req_cls.cmd}&req_id={req_cls.req_id}&stat=ok" + response
self.logger.debug(f"Response: {response}")
return response.encode(errors="ignore")