From 8748b3d94910b6e04204dcb7c4483a546ecd0e5c Mon Sep 17 00:00:00 2001 From: FGO Date: Mon, 1 Apr 2024 19:01:03 +0000 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20titles/FGOA/index.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- titles/FGOA/index.py | 123 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 titles/FGOA/index.py diff --git a/titles/FGOA/index.py b/titles/FGOA/index.py new file mode 100644 index 0000000..b1d5a22 --- /dev/null +++ b/titles/FGOA/index.py @@ -0,0 +1,123 @@ +import json +import inflection +import yaml +import string +import logging +import coloredlogs +import zlib +import base64 +import urllib.parse + +from os import path +from typing import Dict, List, Tuple +from logging.handlers import TimedRotatingFileHandler + +from starlette.routing import Route +from starlette.responses import Response +from starlette.requests import Request +from starlette.responses import PlainTextResponse + +from core.config import CoreConfig +from core.title import BaseServlet +from core.utils import Utils + +from titles.fgoa.base import FGOABase +from titles.fgoa.config import FGOAConfig +from titles.fgoa.const import FGOAConstants + +class FGOAServlet(BaseServlet): + def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None: + self.core_cfg = core_cfg + self.game_cfg = FGOAConfig() + if path.exists(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}"): + self.game_cfg.update( + yaml.safe_load(open(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}")) + ) + + self.versions = [ + FGOABase(core_cfg, self.game_cfg), + ] + + self.logger = logging.getLogger("fgoa") + log_fmt_str = "[%(asctime)s] FGOA | %(levelname)s | %(message)s" + log_fmt = logging.Formatter(log_fmt_str) + fileHandler = TimedRotatingFileHandler( + "{0}/{1}.log".format(self.core_cfg.server.log_dir, "fgoa"), + encoding="utf8", + when="d", + backupCount=10, + ) + + fileHandler.setFormatter(log_fmt) + + consoleHandler = logging.StreamHandler() + consoleHandler.setFormatter(log_fmt) + + self.logger.addHandler(fileHandler) + self.logger.addHandler(consoleHandler) + + self.logger.setLevel(self.game_cfg.server.loglevel) + coloredlogs.install( + level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str + ) + + @classmethod + def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool: + game_cfg = FGOAConfig() + + if path.exists(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}"): + game_cfg.update( + yaml.safe_load(open(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}")) + ) + + if not game_cfg.server.enable: + return False + + return True + + def get_routes(self) -> List[Route]: + return [ + Route("/SDEJ/{version:int}/{endpoint:str}", self.render_POST, methods=['POST']) + ] + + def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]: + if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80: + return (f"http://{self.core_cfg.server.hostname}:{Utils.get_title_port(self.core_cfg)}/{game_code}/{game_ver}", self.core_cfg.server.hostname) + + return (f"http://{self.core_cfg.server.hostname}/{game_code}/{game_ver}", self.core_cfg.server.hostname) + + + async def render_POST(self, request: Request) -> bytes: + version: int = request.path_params.get('version') + endpoint: str = request.path_params.get('endpoint') + req_raw = await request.body() + internal_ver = 0 + client_ip = Utils.get_ip_addr(request) + + if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32: + # If we get a 32 character long hex string, it's a hash and we're + # doing encrypted. The likelyhood of false positives is low but + # technically not 0 + self.logger.error("Encryption not supported at this time") + + self.logger.debug(req_raw) + + self.logger.info(f"v{version} {endpoint} request from {client_ip}") + + func_to_find = "handle_" + inflection.underscore(endpoint) + "_request" + + try: + handler = getattr(self.versions[internal_ver], func_to_find) + resp = await handler(req_raw) + + except Exception as e: + self.logger.error(f"Error handling v{version} method {endpoint} - {e}") + raise + return Response(zlib.compress(b'{"stat": "0"}')) + + if resp is None: + resp = {"returnCode": 1} + + self.logger.debug(f"Response {resp}") + + return Response(zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8"))) \ No newline at end of file