artemis/titles/idac/index.py

168 lines
5.9 KiB
Python

import json
import traceback
import inflection
import yaml
import logging
import coloredlogs
from os import path
from typing import Dict, List, Tuple
from logging.handlers import TimedRotatingFileHandler
from twisted.web import server
from twisted.web.http import Request
from twisted.internet import reactor, endpoints
from core.config import CoreConfig
from core.utils import Utils
from titles.idac.base import IDACBase
from titles.idac.season2 import IDACSeason2
from titles.idac.config import IDACConfig
from titles.idac.const import IDACConstants
from titles.idac.echo import IDACEchoUDP
from titles.idac.matching import IDACMatching
class IDACServlet:
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = IDACConfig()
if path.exists(f"{cfg_dir}/{IDACConstants.CONFIG_NAME}"):
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{IDACConstants.CONFIG_NAME}"))
)
self.versions = [
IDACBase(core_cfg, self.game_cfg),
IDACSeason2(core_cfg, self.game_cfg)
]
self.logger = logging.getLogger("idac")
log_fmt_str = "[%(asctime)s] IDAC | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "idac"),
encoding="utf8",
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
@classmethod
def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool:
game_cfg = IDACConfig()
if path.exists(f"{cfg_dir}/{IDACConstants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{IDACConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return False
return True
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[("render_POST", "/SDGT/{version}/initiald/{category}/{endpoint}", {})]
)
def get_allnet_info(
self, game_code: str, game_ver: int, keychip: str
) -> Tuple[bool, str, str]:
title_port_int = Utils.get_title_port(self.core_cfg)
t_port = f":{title_port_int}" if title_port_int and not self.core_cfg.server.is_using_proxy else ""
return (
f"",
# requires http or else it defaults to https
f"http://{self.core_cfg.title.hostname}{t_port}/{game_code}/{game_ver}/",
)
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
req_raw = request.content.getvalue()
internal_ver = 0
version = int(matchers['version'])
category = matchers['category']
endpoint = matchers['endpoint']
client_ip = Utils.get_ip_addr(request)
if version >= 100 and version < 140: # IDAC Season 1
internal_ver = IDACConstants.VER_IDAC_SEASON_1
elif version >= 140 and version < 171: # IDAC Season 2
internal_ver = IDACConstants.VER_IDAC_SEASON_2
header_application = self.decode_header(request.getAllHeaders())
req_data = json.loads(req_raw)
self.logger.info(f"v{version} {endpoint} request from {client_ip}")
self.logger.debug(f"Headers: {header_application}")
self.logger.debug(req_data)
# func_to_find = "handle_" + inflection.underscore(endpoint) + "_request"
func_to_find = "handle_"
func_to_find += f"{category.lower()}_" if not category == "" else ""
func_to_find += f"{endpoint.lower()}_request"
if not hasattr(self.versions[internal_ver], func_to_find):
self.logger.warning(f"Unhandled v{version} request {endpoint}")
return '{"status_code": "0"}'.encode("utf-8")
resp = None
try:
handler = getattr(self.versions[internal_ver], func_to_find)
resp = handler(req_data, header_application)
except Exception as e:
traceback.print_exc()
self.logger.error(f"Error handling v{version} method {endpoint} - {e}")
return '{"status_code": "0"}'.encode("utf-8")
if resp is None:
resp = {"status_code": "0"}
self.logger.debug(f"Response {resp}")
return json.dumps(resp, ensure_ascii=False).encode("utf-8")
def decode_header(self, data: Dict) -> Dict:
app: str = data[b"application"].decode()
ret = {}
for x in app.split(", "):
y = x.split("=")
ret[y[0]] = y[1].replace('"', "")
return ret
def setup(self):
if self.game_cfg.server.enable:
endpoints.serverFromString(
reactor,
f"tcp:{self.game_cfg.server.matching}:interface={self.core_cfg.server.listen_address}",
).listen(server.Site(IDACMatching(self.core_cfg, self.game_cfg)))
reactor.listenUDP(
self.game_cfg.server.echo1,
IDACEchoUDP(self.core_cfg, self.game_cfg, self.game_cfg.server.echo1),
)
reactor.listenUDP(
self.game_cfg.server.echo2,
IDACEchoUDP(self.core_cfg, self.game_cfg, self.game_cfg.server.echo2),
)
self.logger.info(f"Matching listening on {self.game_cfg.server.matching} with echos on {self.game_cfg.server.echo1} and {self.game_cfg.server.echo2}")