artemis/titles/idz/index.py

175 lines
5.9 KiB
Python

from twisted.web.http import Request
import yaml
import logging
import coloredlogs
from logging.handlers import TimedRotatingFileHandler
from os import path
from typing import Tuple, List
from twisted.internet import reactor, endpoints
from twisted.web import server, resource
import importlib
from core.config import CoreConfig
from .config import IDZConfig
from .const import IDZConstants
from .userdb import IDZUserDBFactory, IDZUserDBWeb, IDZKey
from .echo import IDZEcho
from .handlers import IDZHandlerLoadConfigB
class IDZServlet:
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = IDZConfig()
if path.exists(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"):
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"))
)
self.logger = logging.getLogger("idz")
if not hasattr(self.logger, "inited"):
log_fmt_str = "[%(asctime)s] IDZ | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "idz"),
encoding="utf8",
when="d",
backupCount=10,
)
self.rsa_keys: List[IDZKey] = []
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.inited = True
@classmethod
def rsaHashKeyN(cls, data):
hash_ = 0
for i in data:
hash_ = (
hash_ * IDZConstants.HASH_MUL + (i ^ IDZConstants.HASH_XOR)
^ IDZConstants.HASH_LUT[i & 0xF]
)
hash_ &= 0xFFFFFFFF
return hash_
@classmethod
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
game_cfg = IDZConfig()
if path.exists(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return (False, "", "")
if len(game_cfg.rsa_keys) <= 0 or not game_cfg.server.aes_key:
logging.getLogger("idz").error("IDZ: No RSA/AES keys! IDZ cannot start")
return (False, "", "")
hostname = (
core_cfg.title.hostname
if not game_cfg.server.hostname
else game_cfg.server.hostname
)
return (
True,
f"",
f"{hostname}:{game_cfg.ports.userdb}",
)
def setup(self):
for key in self.game_cfg.rsa_keys:
if "N" not in key or "d" not in key or "e" not in key:
self.logger.error(f"Invalid IDZ key {key}")
continue
hashN = self.rsaHashKeyN(str(key["N"]).encode())
self.rsa_keys.append(IDZKey(key["N"], key["d"], key["e"], hashN))
if len(self.rsa_keys) <= 0:
self.logger.error("No valid RSA keys provided! IDZ cannot start!")
return
handler_map = [{} for _ in range(IDZConstants.NUM_VERS)]
handler_mod = mod = importlib.import_module(f"titles.idz.handlers")
for cls_name in dir(handler_mod):
if cls_name.startswith("__"):
continue
try:
mod = getattr(handler_mod, cls_name)
mod_cmds: List = getattr(mod, "cmd_codes")
while len(mod_cmds) < IDZConstants.NUM_VERS:
mod_cmds.append(None)
for i in range(len(mod_cmds)):
if mod_cmds[i] is None:
mod_cmds[i] = mod_cmds[i - 1]
handler_map[i][mod_cmds[i]] = mod
except AttributeError as e:
continue
endpoints.serverFromString(
reactor,
f"tcp:{self.game_cfg.ports.userdb}:interface={self.core_cfg.server.listen_address}",
).listen(
IDZUserDBFactory(self.core_cfg, self.game_cfg, self.rsa_keys, handler_map)
)
reactor.listenUDP(
self.game_cfg.ports.echo, IDZEcho(self.core_cfg, self.game_cfg)
)
reactor.listenUDP(
self.game_cfg.ports.echo + 1, IDZEcho(self.core_cfg, self.game_cfg)
)
reactor.listenUDP(
self.game_cfg.ports.match, IDZEcho(self.core_cfg, self.game_cfg)
)
reactor.listenUDP(
self.game_cfg.ports.userdb + 1, IDZEcho(self.core_cfg, self.game_cfg)
)
self.logger.info(f"UserDB Listening on port {self.game_cfg.ports.userdb}")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
req_raw = request.content.getvalue()
self.logger.info(f"IDZ POST request: {url_path} - {req_raw}")
return b""
def render_GET(self, request: Request, version: int, url_path: str) -> bytes:
self.logger.info(f"IDZ GET request: {url_path}")
request.responseHeaders.setRawHeaders(
"Content-Type", [b"text/plain; charset=utf-8"]
)
request.responseHeaders.setRawHeaders(
"Last-Modified", [b"Sun, 23 Apr 2023 05:33:20 GMT"]
)
news = (
self.game_cfg.server.news
if self.game_cfg.server.news
else f"Welcome to Initial D Arcade Stage Zero on {self.core_cfg.server.name}!"
)
news += "\r\n"
news = "1979/01/01 00:00:00 2099/12/31 23:59:59 " + news
return news.encode()