import json import inflection import yaml import string import logging import coloredlogs import zlib import base64 import urllib.parse from os import path from typing import Dict, List, Tuple from logging.handlers import TimedRotatingFileHandler from starlette.routing import Route from starlette.responses import Response from starlette.requests import Request from starlette.responses import PlainTextResponse from core.config import CoreConfig from core.title import BaseServlet from core.utils import Utils from titles.fgoa.base import FGOABase from titles.fgoa.config import FGOAConfig from titles.fgoa.const import FGOAConstants class FGOAServlet(BaseServlet): def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None: self.core_cfg = core_cfg self.game_cfg = FGOAConfig() if path.exists(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}"): self.game_cfg.update( yaml.safe_load(open(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}")) ) self.versions = [ FGOABase(core_cfg, self.game_cfg), ] self.logger = logging.getLogger("fgoa") log_fmt_str = "[%(asctime)s] FGOA | %(levelname)s | %(message)s" log_fmt = logging.Formatter(log_fmt_str) fileHandler = TimedRotatingFileHandler( "{0}/{1}.log".format(self.core_cfg.server.log_dir, "fgoa"), encoding="utf8", when="d", backupCount=10, ) fileHandler.setFormatter(log_fmt) consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(log_fmt) self.logger.addHandler(fileHandler) self.logger.addHandler(consoleHandler) self.logger.setLevel(self.game_cfg.server.loglevel) coloredlogs.install( level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str ) @classmethod def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool: game_cfg = FGOAConfig() if path.exists(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}"): game_cfg.update( yaml.safe_load(open(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}")) ) if not game_cfg.server.enable: return False return True def get_routes(self) -> List[Route]: return [ Route("/SDEJ/{version:int}/{endpoint:str}", self.render_POST, methods=['POST']) ] def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]: if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80: return (f"http://{self.core_cfg.server.hostname}:{Utils.get_title_port(self.core_cfg)}/{game_code}/{game_ver}", self.core_cfg.server.hostname) return (f"http://{self.core_cfg.server.hostname}/{game_code}/{game_ver}", self.core_cfg.server.hostname) async def render_POST(self, request: Request) -> bytes: version: int = request.path_params.get('version') endpoint: str = request.path_params.get('endpoint') req_raw = await request.body() internal_ver = 0 client_ip = Utils.get_ip_addr(request) if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32: # If we get a 32 character long hex string, it's a hash and we're # doing encrypted. The likelyhood of false positives is low but # technically not 0 self.logger.error("Encryption not supported at this time") self.logger.debug(req_raw) self.logger.info(f"v{version} {endpoint} request from {client_ip}") func_to_find = "handle_" + inflection.underscore(endpoint) + "_request" try: handler = getattr(self.versions[internal_ver], func_to_find) resp = await handler(req_raw) except Exception as e: self.logger.error(f"Error handling v{version} method {endpoint} - {e}") raise return Response(zlib.compress(b'{"stat": "0"}')) if resp is None: resp = {"returnCode": 1} self.logger.debug(f"Response {resp}") return Response(zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8")))