artemis/titles/diva/index.py
2023-11-20 11:29:10 -05:00

134 lines
4.8 KiB
Python

from twisted.web.http import Request
import yaml
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
import zlib
import json
import base64
from os import path
from typing import Tuple, Dict, List
from titles.diva.handlers.base import *
from core.config import CoreConfig
from core.title import BaseServlet
from core.utils import Utils
from .config import DivaConfig
from .const import DivaConstants
from .base import DivaBase
class DivaServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
super().__init__(core_cfg, cfg_dir)
self.game_cfg = DivaConfig()
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"))
)
self.base = DivaBase(core_cfg, self.game_cfg)
self.logger = logging.getLogger("diva")
log_fmt_str = "[%(asctime)s] Diva | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "diva"),
encoding="utf8",
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[("render_POST", "/DivaServlet/", {})]
)
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
return (f"http://{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}/DivaServlet/", "")
return (f"http://{self.core_cfg.title.hostname}/DivaServlet/", "")
@classmethod
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> bool:
game_cfg = DivaConfig()
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return False
return True
def render_POST(self, request: Request, game_code: str, matchers: Dict) -> bytes:
req_raw = request.content.getvalue()
url_header = request.getAllHeaders()
if "THIS_STRING_SEPARATES" in str(url_header):
binary_request = req_raw.splitlines()
binary_cmd_decoded = binary_request[3].decode("utf-8")
url_data = binary_cmd_decoded # for logging
req_cls = BaseBinaryRequest(binary_cmd_decoded)
else:
json_string = json.dumps(
req_raw.decode("utf-8")
) # Take the response and decode as UTF-8 and dump
b64string = json_string.replace(
r"\n", "\n"
) # Remove all \n and separate them as new lines
gz_string = base64.b64decode(b64string) # Decompressing the base64 string
try:
url_data = zlib.decompress(gz_string) # Decompressing the gzip
except zlib.error as e:
self.logger.error(f"Failed to defalte! {e} -> {gz_string}")
return b"stat=0"
try:
req_cls = BaseRequest(url_data)
except DivaRequestParseException as e:
self.logger.error(e)
return b"stat=0"
self.logger.debug(f"Request: {url_data}\nHeaders: {url_header}")
self.logger.info(
f"{req_cls.cmd} request from {req_cls.kc_serial}/{req_cls.b_serial} at {Utils.get_ip_addr(request)}"
)
handler_str = f"handle_{req_cls.cmd}_request"
if not hasattr(self.base, handler_str):
self.logger.warn(f"Unhandled cmd {req_cls.cmd}")
return BaseResponse(req_cls.cmd, req_cls.req_id).make().encode()
handler = getattr(self.base, handler_str)
response = handler(req_cls.raw)
if response is None or response == "":
response = BaseResponse(req_cls.cmd, req_cls.req_id).make()
if not response.startswith("cmd="):
response = f"cmd={req_cls.cmd}&req_id={req_cls.req_id}&stat=ok" + response
self.logger.debug(f"Response: {response}")
return response.encode(errors="ignore")