3
2
forked from Dniel97/artemis

添加 titles/FGOA/index.py

This commit is contained in:
FGO 2024-04-01 19:01:03 +00:00
parent e3f412c40c
commit 8748b3d949

123
titles/FGOA/index.py Normal file
View File

@ -0,0 +1,123 @@
import json
import inflection
import yaml
import string
import logging
import coloredlogs
import zlib
import base64
import urllib.parse
from os import path
from typing import Dict, List, Tuple
from logging.handlers import TimedRotatingFileHandler
from starlette.routing import Route
from starlette.responses import Response
from starlette.requests import Request
from starlette.responses import PlainTextResponse
from core.config import CoreConfig
from core.title import BaseServlet
from core.utils import Utils
from titles.fgoa.base import FGOABase
from titles.fgoa.config import FGOAConfig
from titles.fgoa.const import FGOAConstants
class FGOAServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = FGOAConfig()
if path.exists(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}"):
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}"))
)
self.versions = [
FGOABase(core_cfg, self.game_cfg),
]
self.logger = logging.getLogger("fgoa")
log_fmt_str = "[%(asctime)s] FGOA | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "fgoa"),
encoding="utf8",
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
@classmethod
def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool:
game_cfg = FGOAConfig()
if path.exists(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{FGOAConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return False
return True
def get_routes(self) -> List[Route]:
return [
Route("/SDEJ/{version:int}/{endpoint:str}", self.render_POST, methods=['POST'])
]
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
return (f"http://{self.core_cfg.server.hostname}:{Utils.get_title_port(self.core_cfg)}/{game_code}/{game_ver}", self.core_cfg.server.hostname)
return (f"http://{self.core_cfg.server.hostname}/{game_code}/{game_ver}", self.core_cfg.server.hostname)
async def render_POST(self, request: Request) -> bytes:
version: int = request.path_params.get('version')
endpoint: str = request.path_params.get('endpoint')
req_raw = await request.body()
internal_ver = 0
client_ip = Utils.get_ip_addr(request)
if all(c in string.hexdigits for c in endpoint) and len(endpoint) == 32:
# If we get a 32 character long hex string, it's a hash and we're
# doing encrypted. The likelyhood of false positives is low but
# technically not 0
self.logger.error("Encryption not supported at this time")
self.logger.debug(req_raw)
self.logger.info(f"v{version} {endpoint} request from {client_ip}")
func_to_find = "handle_" + inflection.underscore(endpoint) + "_request"
try:
handler = getattr(self.versions[internal_ver], func_to_find)
resp = await handler(req_raw)
except Exception as e:
self.logger.error(f"Error handling v{version} method {endpoint} - {e}")
raise
return Response(zlib.compress(b'{"stat": "0"}'))
if resp is None:
resp = {"returnCode": 1}
self.logger.debug(f"Response {resp}")
return Response(zlib.compress(json.dumps(resp, ensure_ascii=False).encode("utf-8")))