artemis/titles/sao/index.py

157 lines
5.8 KiB
Python
Raw Normal View History

from typing import Tuple, Dict, List
from twisted.web.http import Request
import yaml
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
from os import path
2023-11-09 16:51:50 +00:00
from Crypto.Cipher import Blowfish
2023-11-09 23:11:58 +00:00
from hashlib import md5
2023-11-13 18:47:17 +00:00
import secrets
from core import CoreConfig, Utils
from core.title import BaseServlet
from titles.sao.config import SaoConfig
from titles.sao.const import SaoConstants
from titles.sao.base import SaoBase
from titles.sao.handlers.base import *
class SaoServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
super().__init__(core_cfg, cfg_dir)
self.config_dir = cfg_dir
self.game_cfg = SaoConfig()
if path.exists(f"{cfg_dir}/sao.yaml"):
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/sao.yaml")))
self.logger = logging.getLogger("sao")
if not hasattr(self.logger, "inited"):
log_fmt_str = "[%(asctime)s] SAO | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "sao"),
encoding="utf8",
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.inited = True
self.base = SaoBase(core_cfg, self.game_cfg)
2023-11-09 23:11:58 +00:00
self.static_hash = None
if self.game_cfg.hash.verify_hash:
2023-11-10 02:00:51 +00:00
self.static_hash = md5(self.game_cfg.hash.hash_base.encode()).digest() # Greate hashing guys, really validates the data
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[("render_POST", "/{datecode}/proto/if/{category}/{endpoint}", {})]
)
@classmethod
def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool:
game_cfg = SaoConfig()
if path.exists(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return False
return True
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
2023-12-10 23:35:59 +00:00
port_ssl = Utils.get_title_port_ssl(self.core_cfg)
port_normal = Utils.get_title_port(self.core_cfg)
proto = "http"
port = f":{port_normal}" if not self.core_cfg.server.is_using_proxy and port_normal != 80 else ""
if self.game_cfg.server.use_https:
proto = "https"
port = f":{port_ssl}" if not self.core_cfg.server.is_using_proxy and port_ssl != 443 else ""
2023-11-11 04:20:30 +00:00
2023-12-10 23:35:59 +00:00
return (f"{proto}://{self.core_cfg.title.hostname}{port}/", "")
def get_mucha_info(self, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str]:
if not self.game_cfg.server.enable:
return (False, "")
return (True, "SAO1")
def render_POST(self, request: Request, game_code: str, matchers: Dict) -> bytes:
endpoint = matchers.get('endpoint', '')
request.responseHeaders.addRawHeader(b"content-type", b"text/html; charset=utf-8")
2023-11-10 01:56:03 +00:00
iv = b""
2023-11-09 18:16:32 +00:00
req_raw = request.content.read()
2023-12-10 23:35:59 +00:00
if len(req_raw) < 40:
self.logger.warn(f"Malformed request to {endpoint} - {req_raw.hex()}")
return b""
2023-11-09 18:16:32 +00:00
req_header = SaoRequestHeader(req_raw)
cmd_str = f"{req_header.cmd:04x}"
2023-11-09 23:11:58 +00:00
if self.game_cfg.hash.verify_hash and self.static_hash != req_header.hash:
self.logger.error(f"Hash mismatch! Expecting {self.static_hash} but recieved {req_header.hash}")
return b""
2023-11-09 18:16:32 +00:00
if self.game_cfg.crypt.enable:
2023-11-09 23:11:58 +00:00
iv = req_raw[40:48]
2023-11-10 02:00:51 +00:00
cipher = Blowfish.new(self.game_cfg.crypt.key.encode(), Blowfish.MODE_CBC, iv)
2023-11-09 23:11:58 +00:00
crypt_data = req_raw[48:]
2023-11-09 18:16:32 +00:00
req_data = cipher.decrypt(crypt_data)
2023-11-09 23:11:58 +00:00
self.logger.debug(f"Decrypted {req_data.hex()} with IV {iv.hex()}")
2023-11-09 18:16:32 +00:00
else:
2023-11-09 23:11:58 +00:00
req_data = req_raw[40:]
2023-11-09 18:16:32 +00:00
handler = getattr(self.base, f"handle_{cmd_str}", self.base.handle_noop)
self.logger.info(f"{endpoint} - {cmd_str} request")
self.logger.debug(f"Request: {req_raw.hex()}")
resp = handler(req_header, req_data)
2023-11-15 03:08:09 +00:00
if resp is None:
resp = SaoNoopResponse(req_header.cmd + 1).make()
2023-11-15 03:11:24 +00:00
if type(resp) == bytes:
pass
2023-11-15 03:08:09 +00:00
elif issubclass(resp, SaoBaseResponse):
resp = resp.make()
2023-11-10 02:09:55 +00:00
2023-11-15 03:11:24 +00:00
else:
self.logger.error(f"Unknown response type {type(resp)}")
return b""
2023-07-05 16:35:00 +00:00
self.logger.debug(f"Response: {resp.hex()}")
2023-11-10 01:56:03 +00:00
if self.game_cfg.crypt.enable:
2023-11-13 18:47:17 +00:00
iv = secrets.token_bytes(8)
2023-11-10 02:05:30 +00:00
data_to_crypt = resp[24:]
while len(data_to_crypt) % 8 != 0:
data_to_crypt += b"\x00"
2023-11-10 02:00:51 +00:00
cipher = Blowfish.new(self.game_cfg.crypt.key.encode(), Blowfish.MODE_CBC, iv)
2023-11-10 02:05:30 +00:00
data_crypt = cipher.encrypt(data_to_crypt)
2023-11-10 02:23:19 +00:00
crypt_data_len = len(data_crypt) + len(iv)
2023-11-10 02:17:18 +00:00
tmp = struct.pack("!I", crypt_data_len) # does it want the length of the encrypted response??
resp = resp[:20] + tmp + iv + data_crypt
2023-11-10 01:56:33 +00:00
self.logger.debug(f"Encrypted Response: {resp.hex()}")
2023-11-10 01:56:03 +00:00
2023-07-05 16:35:00 +00:00
return resp