1
0
forked from Hay1tsme/artemis

initial api changes

This commit is contained in:
Hay1tsme 2023-10-19 19:07:19 -04:00
parent 54c77c047e
commit 046bd1d1f3
10 changed files with 236 additions and 168 deletions

View File

@ -16,10 +16,11 @@ from os import path
import urllib.parse
import math
from core.config import CoreConfig
from core.utils import Utils
from core.data import Data
from core.const import *
from .config import CoreConfig
from .utils import Utils
from .data import Data
from .const import *
from .title import TitleServlet
BILLING_DT_FORMAT: Final[str] = "%Y%m%d%H%M%S"
@ -39,7 +40,6 @@ class AllnetServlet:
self.config = core_cfg
self.config_folder = cfg_folder
self.data = Data(core_cfg)
self.uri_registry: Dict[str, Tuple[str, str]] = {}
self.logger = logging.getLogger("allnet")
if not hasattr(self.logger, "initialized"):
@ -70,18 +70,8 @@ class AllnetServlet:
if len(plugins) == 0:
self.logger.error("No games detected!")
for _, mod in plugins.items():
if hasattr(mod, "index") and hasattr(mod.index, "get_allnet_info"):
for code in mod.game_codes:
enabled, uri, host = mod.index.get_allnet_info(
code, self.config, self.config_folder
)
if enabled:
self.uri_registry[code] = (uri, host)
self.logger.info(
f"Serving {len(self.uri_registry)} game codes port {core_cfg.allnet.port}"
f"Serving {len(TitleServlet.title_registry)} game codes port {core_cfg.allnet.port}"
)
def handle_poweron(self, request: Request, _: Dict):
@ -190,7 +180,7 @@ class AllnetServlet:
arcade["timezone"] if arcade["timezone"] is not None else "+0900" if req.format_ver == 3 else "+09:00"
)
if req.game_id not in self.uri_registry:
if req.game_id not in TitleServlet.title_registry:
if not self.config.server.is_develop:
msg = f"Unrecognised game {req.game_id} attempted allnet auth from {request_ip}."
self.data.base.log_event(
@ -215,11 +205,9 @@ class AllnetServlet:
self.logger.debug(f"Allnet response: {resp_str}")
return (resp_str + "\n").encode("utf-8")
resp.uri, resp.host = self.uri_registry[req.game_id]
int_ver = req.ver.replace(".", "")
resp.uri = resp.uri.replace("$v", int_ver)
resp.host = resp.host.replace("$v", int_ver)
resp.uri, resp.host = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial)
msg = f"{req.serial} authenticated from {request_ip}: {req.game_id} v{req.ver}"
self.data.base.log_event("allnet", "ALLNET_AUTH_SUCCESS", logging.INFO, msg)

View File

@ -36,6 +36,12 @@ class ServerConfig:
self.__config, "core", "server", "is_develop", default=True
)
@property
def is_using_proxy(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "core", "server", "is_using_proxy", default=False
)
@property
def threading(self) -> bool:
return CoreConfig.get_config_field(

View File

@ -7,15 +7,15 @@ from datetime import datetime
from Crypto.Cipher import Blowfish
import pytz
from core import CoreConfig
from core.utils import Utils
from .config import CoreConfig
from .utils import Utils
from .title import TitleServlet
class MuchaServlet:
mucha_registry: List[str] = []
def __init__(self, cfg: CoreConfig, cfg_dir: str) -> None:
self.config = cfg
self.config_dir = cfg_dir
self.mucha_registry: List[str] = []
self.logger = logging.getLogger("mucha")
log_fmt_str = "[%(asctime)s] Mucha | %(levelname)s | %(message)s"
@ -37,9 +37,7 @@ class MuchaServlet:
self.logger.setLevel(cfg.mucha.loglevel)
coloredlogs.install(level=cfg.mucha.loglevel, logger=self.logger, fmt=log_fmt_str)
all_titles = Utils.get_all_titles()
for _, mod in all_titles.items():
for _, mod in TitleServlet.title_registry.items():
if hasattr(mod, "index") and hasattr(mod.index, "get_mucha_info"):
enabled, game_cd = mod.index.get_mucha_info(
self.config, self.config_dir

View File

@ -1,4 +1,4 @@
from typing import Dict, Any
from typing import Dict, List, Tuple
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
from twisted.web.http import Request
@ -7,14 +7,88 @@ from core.config import CoreConfig
from core.data import Data
from core.utils import Utils
class BaseServlet:
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = None
self.logger = logging.getLogger("title")
@classmethod
def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool:
"""Called during boot to check if a specific game code should load.
Args:
game_code (str): 4 character game code
core_cfg (CoreConfig): CoreConfig class
cfg_dir (str): Config directory
Returns:
bool: True if the game is enabled and set to run, False otherwise
"""
return False
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
"""Called during boot to get all matcher endpoints this title servlet handles
Returns:
Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]: A 2-length tuple where offset 0 is GET and offset 1 is POST,
containing a list of 3-length tuples where offset 0 is the name of the function in the handler that should be called, offset 1
is the matching string, and offset 2 is a dict containing rules for the matcher.
"""
return (
[("render_GET", "/{game}/{version}/{endpoint:.*?}", {'game': R'S...'})],
[("render_POST", "/{game}/{version}/{endpoint:.*?}", {'game': R'S...'})]
)
def setup(self) -> None:
"""Called once during boot, should contain any additional setup the handler must do, such as starting any sub-services
"""
pass
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
"""Called any time a request to PowerOn is made to retrieve the url/host strings to be sent back to the game
Args:
game_code (str): 4 character game code
game_ver (int): version, expressed as an integer by multiplying by 100 (1.10 -> 110)
keychip (str): Keychip serial of the requesting machine, can be used to deliver specific URIs to different machines
Returns:
Tuple[str, str]: A tuple where offset 0 is the allnet uri field, and offset 1 is the allnet host field
"""
if not self.core_cfg.server.is_using_proxy:
return (f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/{game_code}/{game_ver}/", "")
return (f"http://{self.core_cfg.title.hostname}/{game_code}/{game_ver}/", "")
def get_mucha_info(self, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str]:
"""Called once during boot to check if this game is a mucha game
Args:
core_cfg (CoreConfig): CoreConfig class
cfg_dir (str): Config directory
Returns:
Tuple[bool, str]: Tuple where offset 0 is true if the game is enabled, false otherwise, and offset 1 is the game CD
"""
return (False, "")
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
self.logger.warn(f"{game_code} Does not dispatch POST")
return None
def render_GET(self, request: Request, game_code: int, matchers: Dict) -> bytes:
self.logger.warn(f"{game_code} Does not dispatch GET")
return None
class TitleServlet:
title_registry: Dict[str, BaseServlet] = {}
def __init__(self, core_cfg: CoreConfig, cfg_folder: str):
super().__init__()
self.config = core_cfg
self.config_folder = cfg_folder
self.data = Data(core_cfg)
self.title_registry: Dict[str, Any] = {}
self.logger = logging.getLogger("title")
if not hasattr(self.logger, "initialized"):
@ -43,62 +117,62 @@ class TitleServlet:
plugins = Utils.get_all_titles()
for folder, mod in plugins.items():
if hasattr(mod, "game_codes") and hasattr(mod, "index"):
if hasattr(mod, "game_codes") and hasattr(mod, "index") and hasattr(mod.index, "is_game_enabled"):
should_call_setup = True
game_servlet: BaseServlet = mod.index
game_codes: List[str] = mod.game_codes
for code in game_codes:
if game_servlet.is_game_enabled(code, self.config, self.config_folder):
handler_cls = game_servlet(self.config, self.config_folder)
if hasattr(mod.index, "get_allnet_info"):
for code in mod.game_codes:
enabled, _, _ = mod.index.get_allnet_info(
code, self.config, self.config_folder
)
if hasattr(handler_cls, "setup") and should_call_setup:
handler_cls.setup()
should_call_setup = False
if enabled:
handler_cls = mod.index(self.config, self.config_folder)
if hasattr(handler_cls, "setup") and should_call_setup:
handler_cls.setup()
should_call_setup = False
self.title_registry[code] = handler_cls
else:
self.logger.warning(f"Game {folder} has no get_allnet_info")
self.title_registry[code] = handler_cls
else:
self.logger.error(f"{folder} missing game_code or index in __init__.py")
self.logger.error(f"{folder} missing game_code or index in __init__.py, or is_game_enabled in index")
self.logger.info(
f"Serving {len(self.title_registry)} game codes {'on port ' + str(core_cfg.title.port) if core_cfg.title.port > 0 else ''}"
)
def render_GET(self, request: Request, endpoints: dict) -> bytes:
code = endpoints["game"]
code = endpoints["title"]
subaction = endpoints['subaction']
if code not in self.title_registry:
self.logger.warning(f"Unknown game code {code}")
request.setResponseCode(404)
return b""
index = self.title_registry[code]
if not hasattr(index, "render_GET"):
self.logger.warning(f"{code} does not dispatch GET")
request.setResponseCode(405)
handler = getattr(index, f"{subaction}", None)
if handler is None:
self.logger.error(f"{code} does not have handler for GET subaction {subaction}")
request.setResponseCode(500)
return b""
return index.render_GET(request, int(endpoints["version"]), endpoints["endpoint"])
return handler(request, code, endpoints)
def render_POST(self, request: Request, endpoints: dict) -> bytes:
code = endpoints["game"]
code = endpoints["title"]
subaction = endpoints['subaction']
if code not in self.title_registry:
self.logger.warning(f"Unknown game code {code}")
request.setResponseCode(404)
return b""
index = self.title_registry[code]
if not hasattr(index, "render_POST"):
self.logger.warning(f"{code} does not dispatch POST")
request.setResponseCode(405)
handler = getattr(index, f"{subaction}", None)
if handler is None:
self.logger.error(f"{code} does not have handler for POST subaction {subaction}")
request.setResponseCode(500)
return b""
return index.render_POST(
request, int(endpoints["version"]), endpoints["endpoint"]
)
endpoints.pop("title")
endpoints.pop("subaction")
return handler(request, code, endpoints)

View File

@ -4,6 +4,7 @@ server:
allow_unregistered_serials: True
name: "ARTEMiS"
is_develop: True
is_using_proxy: False
threading: False
log_dir: "logs"
check_arcade_ip: False

View File

@ -22,8 +22,8 @@ class HttpDispatcher(resource.Resource):
self.map_post = Mapper()
self.logger = logging.getLogger("core")
self.allnet = AllnetServlet(cfg, config_dir)
self.title = TitleServlet(cfg, config_dir)
self.allnet = AllnetServlet(cfg, config_dir)
self.mucha = MuchaServlet(cfg, config_dir)
self.map_get.connect(
@ -144,25 +144,36 @@ class HttpDispatcher(resource.Resource):
conditions=dict(method=["POST"]),
)
self.map_get.connect(
"title_get",
"/{game}/{version}/{endpoint:.*?}",
controller="title",
action="render_GET",
conditions=dict(method=["GET"]),
requirements=dict(game=R"S..."),
)
self.map_post.connect(
"title_post",
"/{game}/{version}/{endpoint:.*?}",
controller="title",
action="render_POST",
conditions=dict(method=["POST"]),
requirements=dict(game=R"S..."),
)
for code, game in self.title.title_registry.items():
get_matchers, post_matchers = game.get_endpoint_matchers()
for m in get_matchers:
self.map_get.connect(
"title_get",
m[1],
controller="title",
action="render_GET",
title=code,
subaction=m[0],
conditions=dict(method=["GET"]),
requirements=m[2],
)
for m in post_matchers:
self.map_post.connect(
"title_post",
m[1],
controller="title",
action="render_POST",
title=code,
subaction=m[0],
conditions=dict(method=["POST"]),
requirements=m[2],
)
def render_GET(self, request: Request) -> bytes:
test = self.map_get.match(request.uri.decode())
print(test)
client_ip = Utils.get_ip_addr(request)
if test is None:

View File

@ -14,27 +14,28 @@ from os import path
from typing import Tuple, Dict
from core import CoreConfig, Utils
from titles.chuni.config import ChuniConfig
from titles.chuni.const import ChuniConstants
from titles.chuni.base import ChuniBase
from titles.chuni.plus import ChuniPlus
from titles.chuni.air import ChuniAir
from titles.chuni.airplus import ChuniAirPlus
from titles.chuni.star import ChuniStar
from titles.chuni.starplus import ChuniStarPlus
from titles.chuni.amazon import ChuniAmazon
from titles.chuni.amazonplus import ChuniAmazonPlus
from titles.chuni.crystal import ChuniCrystal
from titles.chuni.crystalplus import ChuniCrystalPlus
from titles.chuni.paradise import ChuniParadise
from titles.chuni.new import ChuniNew
from titles.chuni.newplus import ChuniNewPlus
from titles.chuni.sun import ChuniSun
from core.title import BaseServlet
from .config import ChuniConfig
from .const import ChuniConstants
from .base import ChuniBase
from .plus import ChuniPlus
from .air import ChuniAir
from .airplus import ChuniAirPlus
from .star import ChuniStar
from .starplus import ChuniStarPlus
from .amazon import ChuniAmazon
from .amazonplus import ChuniAmazonPlus
from .crystal import ChuniCrystal
from .crystalplus import ChuniCrystalPlus
from .paradise import ChuniParadise
from .new import ChuniNew
from .newplus import ChuniNewPlus
from .sun import ChuniSun
class ChuniServlet:
class ChuniServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = ChuniConfig()
self.hash_table: Dict[Dict[str, str]] = {}
if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"):
@ -116,9 +117,9 @@ class ChuniServlet:
)
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = ChuniConfig()
if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"):
game_cfg.update(
@ -126,18 +127,15 @@ class ChuniServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if core_cfg.server.is_develop:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
"",
)
return True
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
url_path = matchers['endpoint']
version = int(matchers['version'])
if url_path.lower() == "ping":
return zlib.compress(b'{"returnCode": "1"}')

View File

@ -7,21 +7,22 @@ import coloredlogs
import zlib
from os import path
from typing import Tuple
from typing import Tuple, List, Dict
from twisted.web.http import Request
from logging.handlers import TimedRotatingFileHandler
from core.config import CoreConfig
from core.utils import Utils
from titles.cm.config import CardMakerConfig
from titles.cm.const import CardMakerConstants
from titles.cm.base import CardMakerBase
from titles.cm.cm135 import CardMaker135
from core.title import BaseServlet
from .config import CardMakerConfig
from .const import CardMakerConstants
from .base import CardMakerBase
from .cm135 import CardMaker135
class CardMakerServlet:
class CardMakerServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = CardMakerConfig()
if path.exists(f"{cfg_dir}/{CardMakerConstants.CONFIG_NAME}"):
self.game_cfg.update(
@ -55,11 +56,10 @@ class CardMakerServlet:
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = CardMakerConfig()
if path.exists(f"{cfg_dir}/{CardMakerConstants.CONFIG_NAME}"):
game_cfg.update(
@ -67,20 +67,14 @@ class CardMakerServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if core_cfg.server.is_develop:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
"",
)
return True
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
version = int(matchers['version'])
req_raw = request.content.getvalue()
url_split = url_path.split("/")
url_split: List[str] = matchers['endpoint'].split("/")
internal_ver = 0
endpoint = url_split[len(url_split) - 1]
client_ip = Utils.get_ip_addr(request)

View File

@ -7,17 +7,18 @@ import json
import urllib.parse
import base64
from os import path
from typing import Tuple
from typing import Tuple, Dict
from core.config import CoreConfig
from titles.diva.config import DivaConfig
from titles.diva.const import DivaConstants
from titles.diva.base import DivaBase
from core.title import BaseServlet
from .config import DivaConfig
from .const import DivaConstants
from .base import DivaBase
class DivaServlet:
class DivaServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = DivaConfig()
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
self.game_cfg.update(
@ -50,9 +51,9 @@ class DivaServlet:
)
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = DivaConfig()
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
game_cfg.update(
@ -60,20 +61,13 @@ class DivaServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if core_cfg.server.is_develop:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
"",
)
return True
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def render_POST(self, req: Request, version: int, url_path: str) -> bytes:
req_raw = req.content.getvalue()
url_header = req.getAllHeaders()
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
req_raw = request.content.getvalue()
url_header = request.getAllHeaders()
# Ping Dispatch
if "THIS_STRING_SEPARATES" in str(url_header):
@ -148,7 +142,7 @@ class DivaServlet:
"utf-8"
)
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
request.responseHeaders.addRawHeader(b"content-type", b"text/plain")
self.logger.debug(
f"Response cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}"
)

View File

@ -4,22 +4,22 @@ import logging
import coloredlogs
from logging.handlers import TimedRotatingFileHandler
from os import path
from typing import Tuple, List
from typing import Tuple, List, Dict
from twisted.internet import reactor, endpoints
from twisted.web import server, resource
import importlib
from core.config import CoreConfig
from core.title import BaseServlet
from .config import IDZConfig
from .const import IDZConstants
from .userdb import IDZUserDBFactory, IDZUserDBWeb, IDZKey
from .userdb import IDZUserDBFactory, IDZKey
from .echo import IDZEcho
from .handlers import IDZHandlerLoadConfigB
class IDZServlet:
class IDZServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = IDZConfig()
if path.exists(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"):
self.game_cfg.update(
@ -65,9 +65,9 @@ class IDZServlet:
return hash_
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = IDZConfig()
if path.exists(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"):
game_cfg.update(
@ -75,21 +75,29 @@ class IDZServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if len(game_cfg.rsa_keys) <= 0 or not game_cfg.server.aes_key:
logging.getLogger("idz").error("IDZ: No RSA/AES keys! IDZ cannot start")
return (False, "", "")
return False
return True
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return[
[("render_GET", "/{game}/{version}/{endpoint:.*?}", {'game': R'S...'})], # TODO: Slim this down to only the news stuff
[]
]
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
hostname = (
core_cfg.title.hostname
if not game_cfg.server.hostname
else game_cfg.server.hostname
self.core_cfg.title.hostname
if not self.game_cfg.server.hostname
else self.game_cfg.server.hostname
)
return (
True,
f"",
f"{hostname}:{game_cfg.ports.userdb}",
f"{hostname}:{self.game_cfg.ports.userdb}",
)
def setup(self):
@ -149,12 +157,8 @@ class IDZServlet:
self.logger.info(f"UserDB Listening on port {self.game_cfg.ports.userdb}")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
req_raw = request.content.getvalue()
self.logger.info(f"IDZ POST request: {url_path} - {req_raw}")
return b""
def render_GET(self, request: Request, version: int, url_path: str) -> bytes:
def render_GET(self, request: Request, game_code: int, matchers: Dict) -> bytes:
url_path = matchers['endpoint']
self.logger.info(f"IDZ GET request: {url_path}")
request.responseHeaders.setRawHeaders(
"Content-Type", [b"text/plain; charset=utf-8"]