Compare commits

...

19 Commits

26 changed files with 538 additions and 407 deletions

View File

@ -16,10 +16,11 @@ from os import path
import urllib.parse
import math
from core.config import CoreConfig
from core.utils import Utils
from core.data import Data
from core.const import *
from .config import CoreConfig
from .utils import Utils
from .data import Data
from .const import *
from .title import TitleServlet
BILLING_DT_FORMAT: Final[str] = "%Y%m%d%H%M%S"
@ -39,7 +40,6 @@ class AllnetServlet:
self.config = core_cfg
self.config_folder = cfg_folder
self.data = Data(core_cfg)
self.uri_registry: Dict[str, Tuple[str, str]] = {}
self.logger = logging.getLogger("allnet")
if not hasattr(self.logger, "initialized"):
@ -70,18 +70,8 @@ class AllnetServlet:
if len(plugins) == 0:
self.logger.error("No games detected!")
for _, mod in plugins.items():
if hasattr(mod, "index") and hasattr(mod.index, "get_allnet_info"):
for code in mod.game_codes:
enabled, uri, host = mod.index.get_allnet_info(
code, self.config, self.config_folder
)
if enabled:
self.uri_registry[code] = (uri, host)
self.logger.info(
f"Serving {len(self.uri_registry)} game codes port {core_cfg.allnet.port}"
f"Serving {len(TitleServlet.title_registry)} game codes port {core_cfg.allnet.port}"
)
def handle_poweron(self, request: Request, _: Dict):
@ -190,7 +180,7 @@ class AllnetServlet:
arcade["timezone"] if arcade["timezone"] is not None else "+0900" if req.format_ver == 3 else "+09:00"
)
if req.game_id not in self.uri_registry:
if req.game_id not in TitleServlet.title_registry:
if not self.config.server.is_develop:
msg = f"Unrecognised game {req.game_id} attempted allnet auth from {request_ip}."
self.data.base.log_event(
@ -215,11 +205,9 @@ class AllnetServlet:
self.logger.debug(f"Allnet response: {resp_str}")
return (resp_str + "\n").encode("utf-8")
resp.uri, resp.host = self.uri_registry[req.game_id]
int_ver = req.ver.replace(".", "")
resp.uri = resp.uri.replace("$v", int_ver)
resp.host = resp.host.replace("$v", int_ver)
resp.uri, resp.host = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial)
msg = f"{req.serial} authenticated from {request_ip}: {req.game_id} v{req.ver}"
self.data.base.log_event("allnet", "ALLNET_AUTH_SUCCESS", logging.INFO, msg)

View File

@ -36,6 +36,12 @@ class ServerConfig:
self.__config, "core", "server", "is_develop", default=True
)
@property
def is_using_proxy(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "core", "server", "is_using_proxy", default=False
)
@property
def threading(self) -> bool:
return CoreConfig.get_config_field(

View File

@ -7,15 +7,15 @@ from datetime import datetime
from Crypto.Cipher import Blowfish
import pytz
from core import CoreConfig
from core.utils import Utils
from .config import CoreConfig
from .utils import Utils
from .title import TitleServlet
class MuchaServlet:
mucha_registry: List[str] = []
def __init__(self, cfg: CoreConfig, cfg_dir: str) -> None:
self.config = cfg
self.config_dir = cfg_dir
self.mucha_registry: List[str] = []
self.logger = logging.getLogger("mucha")
log_fmt_str = "[%(asctime)s] Mucha | %(levelname)s | %(message)s"
@ -37,11 +37,9 @@ class MuchaServlet:
self.logger.setLevel(cfg.mucha.loglevel)
coloredlogs.install(level=cfg.mucha.loglevel, logger=self.logger, fmt=log_fmt_str)
all_titles = Utils.get_all_titles()
for _, mod in all_titles.items():
if hasattr(mod, "index") and hasattr(mod.index, "get_mucha_info"):
enabled, game_cd = mod.index.get_mucha_info(
for _, mod in TitleServlet.title_registry.items():
if hasattr(mod, "get_mucha_info"):
enabled, game_cd = mod.get_mucha_info(
self.config, self.config_dir
)
if enabled:

View File

@ -1,4 +1,4 @@
from typing import Dict, Any
from typing import Dict, List, Tuple
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
from twisted.web.http import Request
@ -7,14 +7,88 @@ from core.config import CoreConfig
from core.data import Data
from core.utils import Utils
class BaseServlet:
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = None
self.logger = logging.getLogger("title")
@classmethod
def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool:
"""Called during boot to check if a specific game code should load.
Args:
game_code (str): 4 character game code
core_cfg (CoreConfig): CoreConfig class
cfg_dir (str): Config directory
Returns:
bool: True if the game is enabled and set to run, False otherwise
"""
return False
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
"""Called during boot to get all matcher endpoints this title servlet handles
Returns:
Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]: A 2-length tuple where offset 0 is GET and offset 1 is POST,
containing a list of 3-length tuples where offset 0 is the name of the function in the handler that should be called, offset 1
is the matching string, and offset 2 is a dict containing rules for the matcher.
"""
return (
[("render_GET", "/{game}/{version}/{endpoint}", {'game': R'S...'})],
[("render_POST", "/{game}/{version}/{endpoint}", {'game': R'S...'})]
)
def setup(self) -> None:
"""Called once during boot, should contain any additional setup the handler must do, such as starting any sub-services
"""
pass
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
"""Called any time a request to PowerOn is made to retrieve the url/host strings to be sent back to the game
Args:
game_code (str): 4 character game code
game_ver (int): version, expressed as an integer by multiplying by 100 (1.10 -> 110)
keychip (str): Keychip serial of the requesting machine, can be used to deliver specific URIs to different machines
Returns:
Tuple[str, str]: A tuple where offset 0 is the allnet uri field, and offset 1 is the allnet host field
"""
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
return (f"http://{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}/{game_code}/{game_ver}/", "")
return (f"http://{self.core_cfg.title.hostname}/{game_code}/{game_ver}/", "")
def get_mucha_info(self, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str]:
"""Called once during boot to check if this game is a mucha game
Args:
core_cfg (CoreConfig): CoreConfig class
cfg_dir (str): Config directory
Returns:
Tuple[bool, str]: Tuple where offset 0 is true if the game is enabled, false otherwise, and offset 1 is the game CD
"""
return (False, "")
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
self.logger.warn(f"{game_code} Does not dispatch POST")
return None
def render_GET(self, request: Request, game_code: int, matchers: Dict) -> bytes:
self.logger.warn(f"{game_code} Does not dispatch GET")
return None
class TitleServlet:
title_registry: Dict[str, BaseServlet] = {}
def __init__(self, core_cfg: CoreConfig, cfg_folder: str):
super().__init__()
self.config = core_cfg
self.config_folder = cfg_folder
self.data = Data(core_cfg)
self.title_registry: Dict[str, Any] = {}
self.logger = logging.getLogger("title")
if not hasattr(self.logger, "initialized"):
@ -43,62 +117,62 @@ class TitleServlet:
plugins = Utils.get_all_titles()
for folder, mod in plugins.items():
if hasattr(mod, "game_codes") and hasattr(mod, "index"):
if hasattr(mod, "game_codes") and hasattr(mod, "index") and hasattr(mod.index, "is_game_enabled"):
should_call_setup = True
game_servlet: BaseServlet = mod.index
game_codes: List[str] = mod.game_codes
for code in game_codes:
if game_servlet.is_game_enabled(code, self.config, self.config_folder):
handler_cls = game_servlet(self.config, self.config_folder)
if hasattr(mod.index, "get_allnet_info"):
for code in mod.game_codes:
enabled, _, _ = mod.index.get_allnet_info(
code, self.config, self.config_folder
)
if hasattr(handler_cls, "setup") and should_call_setup:
handler_cls.setup()
should_call_setup = False
if enabled:
handler_cls = mod.index(self.config, self.config_folder)
if hasattr(handler_cls, "setup") and should_call_setup:
handler_cls.setup()
should_call_setup = False
self.title_registry[code] = handler_cls
else:
self.logger.warning(f"Game {folder} has no get_allnet_info")
self.title_registry[code] = handler_cls
else:
self.logger.error(f"{folder} missing game_code or index in __init__.py")
self.logger.error(f"{folder} missing game_code or index in __init__.py, or is_game_enabled in index")
self.logger.info(
f"Serving {len(self.title_registry)} game codes {'on port ' + str(core_cfg.title.port) if core_cfg.title.port > 0 else ''}"
)
def render_GET(self, request: Request, endpoints: dict) -> bytes:
code = endpoints["game"]
code = endpoints["title"]
subaction = endpoints['subaction']
if code not in self.title_registry:
self.logger.warning(f"Unknown game code {code}")
request.setResponseCode(404)
return b""
index = self.title_registry[code]
if not hasattr(index, "render_GET"):
self.logger.warning(f"{code} does not dispatch GET")
request.setResponseCode(405)
handler = getattr(index, f"{subaction}", None)
if handler is None:
self.logger.error(f"{code} does not have handler for GET subaction {subaction}")
request.setResponseCode(500)
return b""
return index.render_GET(request, int(endpoints["version"]), endpoints["endpoint"])
return handler(request, code, endpoints)
def render_POST(self, request: Request, endpoints: dict) -> bytes:
code = endpoints["game"]
code = endpoints["title"]
subaction = endpoints['subaction']
if code not in self.title_registry:
self.logger.warning(f"Unknown game code {code}")
request.setResponseCode(404)
return b""
index = self.title_registry[code]
if not hasattr(index, "render_POST"):
self.logger.warning(f"{code} does not dispatch POST")
request.setResponseCode(405)
handler = getattr(index, f"{subaction}", None)
if handler is None:
self.logger.error(f"{code} does not have handler for POST subaction {subaction}")
request.setResponseCode(500)
return b""
return index.render_POST(
request, int(endpoints["version"]), endpoints["endpoint"]
)
endpoints.pop("title")
endpoints.pop("subaction")
return handler(request, code, endpoints)

View File

@ -5,8 +5,11 @@ import logging
import importlib
from os import walk
from .config import CoreConfig
class Utils:
real_title_port = None
real_title_port_ssl = None
@classmethod
def get_all_titles(cls) -> Dict[str, ModuleType]:
ret: Dict[str, Any] = {}
@ -33,3 +36,27 @@ class Utils:
if b"x-forwarded-for" in req.getAllHeaders()
else req.getClientAddress().host
)
@classmethod
def get_title_port(cls, cfg: CoreConfig):
if cls.real_title_port is not None: return cls.real_title_port
if cfg.title.port == 0:
cls.real_title_port = cfg.allnet.port
else:
cls.real_title_port = cfg.title.port
return cls.real_title_port
@classmethod
def get_title_port_ssl(cls, cfg: CoreConfig):
if cls.real_title_port_ssl is not None: return cls.real_title_port_ssl
if cfg.title.port_ssl == 0:
cls.real_title_port_ssl = 443
else:
cls.real_title_port_ssl = cfg.title.port_ssl
return cls.real_title_port_ssl

View File

@ -4,6 +4,7 @@ server:
allow_unregistered_serials: True
name: "ARTEMiS"
is_develop: True
is_using_proxy: False
threading: False
log_dir: "logs"
check_arcade_ip: False

View File

@ -22,8 +22,8 @@ class HttpDispatcher(resource.Resource):
self.map_post = Mapper()
self.logger = logging.getLogger("core")
self.allnet = AllnetServlet(cfg, config_dir)
self.title = TitleServlet(cfg, config_dir)
self.allnet = AllnetServlet(cfg, config_dir)
self.mucha = MuchaServlet(cfg, config_dir)
self.map_get.connect(
@ -144,25 +144,36 @@ class HttpDispatcher(resource.Resource):
conditions=dict(method=["POST"]),
)
self.map_get.connect(
"title_get",
"/{game}/{version}/{endpoint:.*?}",
controller="title",
action="render_GET",
conditions=dict(method=["GET"]),
requirements=dict(game=R"S..."),
)
self.map_post.connect(
"title_post",
"/{game}/{version}/{endpoint:.*?}",
controller="title",
action="render_POST",
conditions=dict(method=["POST"]),
requirements=dict(game=R"S..."),
)
for code, game in self.title.title_registry.items():
get_matchers, post_matchers = game.get_endpoint_matchers()
for m in get_matchers:
self.map_get.connect(
"title_get",
m[1],
controller="title",
action="render_GET",
title=code,
subaction=m[0],
conditions=dict(method=["GET"]),
requirements=m[2],
)
for m in post_matchers:
self.map_post.connect(
"title_post",
m[1],
controller="title",
action="render_POST",
title=code,
subaction=m[0],
conditions=dict(method=["POST"]),
requirements=m[2],
)
def render_GET(self, request: Request) -> bytes:
test = self.map_get.match(request.uri.decode())
print(test)
client_ip = Utils.get_ip_addr(request)
if test is None:

View File

@ -11,30 +11,31 @@ from Crypto.Util.Padding import pad
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA1
from os import path
from typing import Tuple, Dict
from typing import Tuple, Dict, List
from core import CoreConfig, Utils
from titles.chuni.config import ChuniConfig
from titles.chuni.const import ChuniConstants
from titles.chuni.base import ChuniBase
from titles.chuni.plus import ChuniPlus
from titles.chuni.air import ChuniAir
from titles.chuni.airplus import ChuniAirPlus
from titles.chuni.star import ChuniStar
from titles.chuni.starplus import ChuniStarPlus
from titles.chuni.amazon import ChuniAmazon
from titles.chuni.amazonplus import ChuniAmazonPlus
from titles.chuni.crystal import ChuniCrystal
from titles.chuni.crystalplus import ChuniCrystalPlus
from titles.chuni.paradise import ChuniParadise
from titles.chuni.new import ChuniNew
from titles.chuni.newplus import ChuniNewPlus
from titles.chuni.sun import ChuniSun
from core.title import BaseServlet
from .config import ChuniConfig
from .const import ChuniConstants
from .base import ChuniBase
from .plus import ChuniPlus
from .air import ChuniAir
from .airplus import ChuniAirPlus
from .star import ChuniStar
from .starplus import ChuniStarPlus
from .amazon import ChuniAmazon
from .amazonplus import ChuniAmazonPlus
from .crystal import ChuniCrystal
from .crystalplus import ChuniCrystalPlus
from .paradise import ChuniParadise
from .new import ChuniNew
from .newplus import ChuniNewPlus
from .sun import ChuniSun
class ChuniServlet:
class ChuniServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = ChuniConfig()
self.hash_table: Dict[Dict[str, str]] = {}
if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"):
@ -115,10 +116,19 @@ class ChuniServlet:
f"Hashed v{version} method {method_fixed} with {bytes.fromhex(keys[2])} to get {hash.hex()}"
)
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[
("render_POST", "/{version}/ChuniServlet/{endpoint}", {}),
("render_POST", "/{version}/ChuniServlet/MatchingServer/{endpoint}", {})
]
)
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = ChuniConfig()
if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"):
game_cfg.update(
@ -126,26 +136,26 @@ class ChuniServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if core_cfg.server.is_develop:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
"",
)
return True
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
return (f"http://{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}/{game_ver}/", "")
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
return (f"http://{self.core_cfg.title.hostname}/{game_ver}/", "")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
if url_path.lower() == "ping":
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
endpoint = matchers['endpoint']
version = int(matchers['version'])
if endpoint.lower() == "ping":
return zlib.compress(b'{"returnCode": "1"}')
req_raw = request.content.getvalue()
url_split = url_path.split("/")
encrtped = False
internal_ver = 0
endpoint = url_split[len(url_split) - 1]
client_ip = Utils.get_ip_addr(request)
if version < 105: # 1.0

View File

@ -71,11 +71,11 @@ class ChuniNew(ChuniBase):
"matchErrorLimit": 9999,
"romVersion": self.game_cfg.version.version(self.version)["rom"],
"dataVersion": self.game_cfg.version.version(self.version)["data"],
"matchingUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"matchingUriX": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"matchingUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/200/ChuniServlet/",
"matchingUriX": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/200/ChuniServlet/",
# might be really important for online battle to connect the cabs via UDP port 50201
"udpHolePunchUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"reflectorUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"udpHolePunchUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/200/ChuniServlet/",
"reflectorUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/200/ChuniServlet/",
},
"isDumpUpload": False,
"isAou": False,

View File

@ -21,16 +21,16 @@ class ChuniNewPlus(ChuniNew):
]
ret["gameSetting"][
"matchingUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/205/ChuniServlet/"
ret["gameSetting"][
"matchingUriX"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/205/ChuniServlet/"
ret["gameSetting"][
"udpHolePunchUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/205/ChuniServlet/"
ret["gameSetting"][
"reflectorUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/205/ChuniServlet/"
return ret
def handle_cm_get_user_preview_api_request(self, data: Dict) -> Dict:

View File

@ -17,16 +17,16 @@ class ChuniSun(ChuniNewPlus):
ret["gameSetting"]["dataVersion"] = self.game_cfg.version.version(self.version)["data"]
ret["gameSetting"][
"matchingUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/210/ChuniServlet/"
ret["gameSetting"][
"matchingUriX"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/210/ChuniServlet/"
ret["gameSetting"][
"udpHolePunchUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/210/ChuniServlet/"
ret["gameSetting"][
"reflectorUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/210/ChuniServlet/"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/210/ChuniServlet/"
return ret
def handle_cm_get_user_preview_api_request(self, data: Dict) -> Dict:

View File

@ -6,6 +6,7 @@ from enum import Enum
import pytz
from core.config import CoreConfig
from core.utils import Utils
from core.data.cache import cached
from titles.cm.const import CardMakerConstants
from titles.cm.config import CardMakerConfig
@ -29,8 +30,8 @@ class CardMakerBase:
return version.replace(".", "")[:3]
def handle_get_game_connect_api_request(self, data: Dict) -> Dict:
if self.core_cfg.server.is_develop:
uri = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}"
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
uri = f"http://{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}"
else:
uri = f"http://{self.core_cfg.title.hostname}"
@ -44,13 +45,13 @@ class CardMakerBase:
{
"modelKind": 0,
"type": 1,
"titleUri": f"{uri}/SDHD/{self._parse_int_ver(games_ver['chuni'])}/",
"titleUri": f"{uri}/{self._parse_int_ver(games_ver['chuni'])}/",
},
# maimai DX
{
"modelKind": 1,
"type": 1,
"titleUri": f"{uri}/SDEZ/{self._parse_int_ver(games_ver['maimai'])}/",
"titleUri": f"{uri}/{self._parse_int_ver(games_ver['maimai'])}/",
},
# ONGEKI
{

View File

@ -7,21 +7,22 @@ import coloredlogs
import zlib
from os import path
from typing import Tuple
from typing import Tuple, List, Dict
from twisted.web.http import Request
from logging.handlers import TimedRotatingFileHandler
from core.config import CoreConfig
from core.utils import Utils
from titles.cm.config import CardMakerConfig
from titles.cm.const import CardMakerConstants
from titles.cm.base import CardMakerBase
from titles.cm.cm135 import CardMaker135
from core.title import BaseServlet
from .config import CardMakerConfig
from .const import CardMakerConstants
from .base import CardMakerBase
from .cm135 import CardMaker135
class CardMakerServlet:
class CardMakerServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = CardMakerConfig()
if path.exists(f"{cfg_dir}/{CardMakerConstants.CONFIG_NAME}"):
self.game_cfg.update(
@ -55,11 +56,11 @@ class CardMakerServlet:
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = CardMakerConfig()
if path.exists(f"{cfg_dir}/{CardMakerConstants.CONFIG_NAME}"):
game_cfg.update(
@ -67,22 +68,15 @@ class CardMakerServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if core_cfg.server.is_develop:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
"",
)
return True
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
version = int(matchers['version'])
endpoint = matchers['endpoint']
req_raw = request.content.getvalue()
url_split = url_path.split("/")
internal_ver = 0
endpoint = url_split[len(url_split) - 1]
client_ip = Utils.get_ip_addr(request)
if version >= 130 and version < 135: # Card Maker

View File

@ -7,9 +7,9 @@ from hashlib import md5
from datetime import datetime
from core.config import CoreConfig
from titles.cxb.config import CxbConfig
from titles.cxb.const import CxbConstants
from titles.cxb.database import CxbData
from .config import CxbConfig
from .const import CxbConstants
from .database import CxbData
from threading import Thread
@ -192,7 +192,7 @@ class CxbBase:
).decode("utf-8")
)
def task_generateIndexData(versionindex):
def task_generateIndexData(self, versionindex: List[str], uid: int):
try:
v_profile = self.data.profile.get_profile_index(0, uid, self.version)
v_profile_data = v_profile["data"]
@ -274,7 +274,7 @@ class CxbBase:
thread_ScoreData.start()
for v in index:
thread_IndexData = Thread(target=CxbBase.task_generateIndexData(versionindex))
thread_IndexData = Thread(target=CxbBase.task_generateIndexData(self, versionindex, uid))
thread_IndexData.start()
return {"index": index, "data": data1, "version": versionindex}

View File

@ -7,18 +7,20 @@ import re
import inflection
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
from typing import Dict, Tuple
from typing import Dict, Tuple, List
from os import path
from core.config import CoreConfig
from titles.cxb.config import CxbConfig
from titles.cxb.const import CxbConstants
from titles.cxb.rev import CxbRev
from titles.cxb.rss1 import CxbRevSunriseS1
from titles.cxb.rss2 import CxbRevSunriseS2
from core.title import BaseServlet
from core.utils import Utils
from .config import CxbConfig
from .const import CxbConstants
from .rev import CxbRev
from .rss1 import CxbRevSunriseS1
from .rss2 import CxbRevSunriseS2
class CxbServlet(resource.Resource):
class CxbServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.isLeaf = True
self.cfg_dir = cfg_dir
@ -61,9 +63,7 @@ class CxbServlet(resource.Resource):
]
@classmethod
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool:
game_cfg = CxbConfig()
if path.exists(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}"):
game_cfg.update(
@ -71,37 +71,28 @@ class CxbServlet(resource.Resource):
)
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return False
return True
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port_ssl(self.core_cfg) != 443:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
f"https://{self.core_cfg.title.hostname}:{self.core_cfg.title.port_ssl}",
"",
)
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def setup(self):
if self.game_cfg.server.enable:
endpoints.serverFromString(
reactor,
f"tcp:{self.game_cfg.server.port}:interface={self.core_cfg.server.listen_address}",
).listen(server.Site(CxbServlet(self.core_cfg, self.cfg_dir)))
if self.core_cfg.server.is_develop and self.game_cfg.server.ssl_enable:
endpoints.serverFromString(
reactor,
f"ssl:{self.game_cfg.server.port_secure}"
f":interface={self.core_cfg.server.listen_address}:privateKey={self.game_cfg.server.ssl_key}:"
f"certKey={self.game_cfg.server.ssl_cert}",
).listen(server.Site(CxbServlet(self.core_cfg, self.cfg_dir)))
self.logger.info(
f"Ready on ports {self.game_cfg.server.port} & {self.game_cfg.server.port_secure}"
)
else:
self.logger.info(f"Ready on port {self.game_cfg.server.port}")
return (f"https://{self.core_cfg.title.hostname}", "")
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[
("handle_data", "/data", {}),
("handle_action", "/action/{endpoint}", {}),
("handle_auth", "/auth/{endpoint}", {}),
]
)
def render_POST(self, request: Request, version: int, endpoint: str):
version = 0

View File

@ -7,9 +7,9 @@ from datetime import datetime
from core.config import CoreConfig
from core.data import Data, cached
from titles.cxb.config import CxbConfig
from titles.cxb.base import CxbBase
from titles.cxb.const import CxbConstants
from .config import CxbConfig
from .base import CxbBase
from .const import CxbConstants
class CxbRev(CxbBase):

View File

@ -7,9 +7,9 @@ from datetime import datetime
from core.config import CoreConfig
from core.data import Data, cached
from titles.cxb.config import CxbConfig
from titles.cxb.base import CxbBase
from titles.cxb.const import CxbConstants
from .config import CxbConfig
from .base import CxbBase
from .const import CxbConstants
class CxbRevSunriseS1(CxbBase):

View File

@ -7,9 +7,9 @@ from datetime import datetime
from core.config import CoreConfig
from core.data import Data, cached
from titles.cxb.config import CxbConfig
from titles.cxb.base import CxbBase
from titles.cxb.const import CxbConstants
from .config import CxbConfig
from .base import CxbBase
from .const import CxbConstants
class CxbRevSunriseS2(CxbBase):

View File

@ -7,17 +7,19 @@ import json
import urllib.parse
import base64
from os import path
from typing import Tuple
from typing import Tuple, Dict, List
from core.config import CoreConfig
from titles.diva.config import DivaConfig
from titles.diva.const import DivaConstants
from titles.diva.base import DivaBase
from core.title import BaseServlet
from core.utils import Utils
from .config import DivaConfig
from .const import DivaConstants
from .base import DivaBase
class DivaServlet:
class DivaServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = DivaConfig()
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
self.game_cfg.update(
@ -49,10 +51,22 @@ class DivaServlet:
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[("render_POST", "/DivaServlet/", {})]
)
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
return (f"http://{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}/DivaServlet/", "")
return (f"http://{self.core_cfg.title.hostname}/DivaServlet/", "")
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = DivaConfig()
if path.exists(f"{cfg_dir}/{DivaConstants.CONFIG_NAME}"):
game_cfg.update(
@ -60,20 +74,13 @@ class DivaServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if core_cfg.server.is_develop:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
"",
)
return True
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def render_POST(self, req: Request, version: int, url_path: str) -> bytes:
req_raw = req.content.getvalue()
url_header = req.getAllHeaders()
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
req_raw = request.content.getvalue()
url_header = request.getAllHeaders()
# Ping Dispatch
if "THIS_STRING_SEPARATES" in str(url_header):
@ -148,7 +155,7 @@ class DivaServlet:
"utf-8"
)
req.responseHeaders.addRawHeader(b"content-type", b"text/plain")
request.responseHeaders.addRawHeader(b"content-type", b"text/plain")
self.logger.debug(
f"Response cmd={req_data['cmd']}&req_id={req_data['req_id']}&stat=ok{resp}"
)

View File

@ -4,22 +4,22 @@ import logging
import coloredlogs
from logging.handlers import TimedRotatingFileHandler
from os import path
from typing import Tuple, List
from typing import Tuple, List, Dict
from twisted.internet import reactor, endpoints
from twisted.web import server, resource
import importlib
from core.config import CoreConfig
from core.title import BaseServlet
from .config import IDZConfig
from .const import IDZConstants
from .userdb import IDZUserDBFactory, IDZUserDBWeb, IDZKey
from .userdb import IDZUserDBFactory, IDZKey
from .echo import IDZEcho
from .handlers import IDZHandlerLoadConfigB
class IDZServlet:
class IDZServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = IDZConfig()
if path.exists(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"):
self.game_cfg.update(
@ -65,9 +65,9 @@ class IDZServlet:
return hash_
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = IDZConfig()
if path.exists(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"):
game_cfg.update(
@ -75,21 +75,29 @@ class IDZServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if len(game_cfg.rsa_keys) <= 0 or not game_cfg.server.aes_key:
logging.getLogger("idz").error("IDZ: No RSA/AES keys! IDZ cannot start")
return (False, "", "")
return False
return True
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return[
[("render_GET", "/{game}/{version}/{endpoint:.*?}", {'game': R'S...'})], # TODO: Slim this down to only the news stuff
[]
]
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
hostname = (
core_cfg.title.hostname
if not game_cfg.server.hostname
else game_cfg.server.hostname
self.core_cfg.title.hostname
if not self.game_cfg.server.hostname
else self.game_cfg.server.hostname
)
return (
True,
f"",
f"{hostname}:{game_cfg.ports.userdb}",
f"{hostname}:{self.game_cfg.ports.userdb}",
)
def setup(self):
@ -149,12 +157,8 @@ class IDZServlet:
self.logger.info(f"UserDB Listening on port {self.game_cfg.ports.userdb}")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
req_raw = request.content.getvalue()
self.logger.info(f"IDZ POST request: {url_path} - {req_raw}")
return b""
def render_GET(self, request: Request, version: int, url_path: str) -> bytes:
def render_GET(self, request: Request, game_code: int, matchers: Dict) -> bytes:
url_path = matchers['endpoint']
self.logger.info(f"IDZ GET request: {url_path}")
request.responseHeaders.setRawHeaders(
"Content-Type", [b"text/plain; charset=utf-8"]

View File

@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any, Dict, List
import logging
from base64 import b64decode
@ -7,9 +7,10 @@ from PIL import ImageFile
import pytz
from core.config import CoreConfig
from titles.mai2.const import Mai2Constants
from titles.mai2.config import Mai2Config
from titles.mai2.database import Mai2Data
from core.utils import Utils
from .const import Mai2Constants
from .config import Mai2Config
from .database import Mai2Data
class Mai2Base:
@ -22,16 +23,17 @@ class Mai2Base:
self.can_deliver = False
self.can_usbdl = False
self.old_server = ""
self.date_time_format = "%Y-%m-%d %H:%M:%S"
if self.core_config.server.is_develop and self.core_config.title.port > 0:
self.old_server = f"http://{self.core_config.title.hostname}:{self.core_config.title.port}/SDEY/197/"
if not self.core_config.server.is_using_proxy and Utils.get_title_port(self.core_config) != 80:
self.old_server = f"http://{self.core_config.title.hostname}:{Utils.get_title_port(cfg)}/197/MaimaiServlet/"
else:
self.old_server = f"http://{self.core_config.title.hostname}/SDEY/197/"
self.old_server = f"http://{self.core_config.title.hostname}/197/MaimaiServlet/"
def handle_get_game_setting_api_request(self, data: Dict):
# if reboot start/end time is not defined use the default behavior of being a few hours ago
if self.core_cfg.title.reboot_start_time == "" or self.core_cfg.title.reboot_end_time == "":
if self.core_config.title.reboot_start_time == "" or self.core_config.title.reboot_end_time == "":
reboot_start = datetime.strftime(
datetime.utcnow() + timedelta(hours=6), self.date_time_format
)
@ -43,8 +45,8 @@ class Mai2Base:
current_jst = datetime.now(pytz.timezone('Asia/Tokyo')).date()
# parse config start/end times into datetime
reboot_start_time = datetime.strptime(self.core_cfg.title.reboot_start_time, "%H:%M")
reboot_end_time = datetime.strptime(self.core_cfg.title.reboot_end_time, "%H:%M")
reboot_start_time = datetime.strptime(self.core_config.title.reboot_start_time, "%H:%M")
reboot_end_time = datetime.strptime(self.core_config.title.reboot_end_time, "%H:%M")
# offset datetimes with current date/time
reboot_start_time = reboot_start_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo'))

View File

@ -8,27 +8,28 @@ import logging, coloredlogs
import zlib
from logging.handlers import TimedRotatingFileHandler
from os import path, mkdir
from typing import Tuple
from typing import Tuple, List, Dict
from core.config import CoreConfig
from core.utils import Utils
from titles.mai2.config import Mai2Config
from titles.mai2.const import Mai2Constants
from titles.mai2.base import Mai2Base
from titles.mai2.finale import Mai2Finale
from titles.mai2.dx import Mai2DX
from titles.mai2.dxplus import Mai2DXPlus
from titles.mai2.splash import Mai2Splash
from titles.mai2.splashplus import Mai2SplashPlus
from titles.mai2.universe import Mai2Universe
from titles.mai2.universeplus import Mai2UniversePlus
from titles.mai2.festival import Mai2Festival
from titles.mai2.festivalplus import Mai2FestivalPlus
from core.title import BaseServlet
from .config import Mai2Config
from .const import Mai2Constants
from .base import Mai2Base
from .finale import Mai2Finale
from .dx import Mai2DX
from .dxplus import Mai2DXPlus
from .splash import Mai2Splash
from .splashplus import Mai2SplashPlus
from .universe import Mai2Universe
from .universeplus import Mai2UniversePlus
from .festival import Mai2Festival
from .festivalplus import Mai2FestivalPlus
class Mai2Servlet:
class Mai2Servlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = Mai2Config()
if path.exists(f"{cfg_dir}/{Mai2Constants.CONFIG_NAME}"):
self.game_cfg.update(
@ -85,9 +86,9 @@ class Mai2Servlet:
self.logger.initted = True
@classmethod
def get_allnet_info(
def is_game_enabled(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
) -> bool:
game_cfg = Mai2Config()
if path.exists(f"{cfg_dir}/{Mai2Constants.CONFIG_NAME}"):
@ -96,19 +97,37 @@ class Mai2Servlet:
)
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return False
return True
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[
("handle_movie", "/{version}/MaimaiServlet/api/movie/{endpoint:..?}", {}),
("handle_old_srv", "/{version}/MaimaiServlet/old/{endpoint:..?}", {}),
("handle_usbdl", "/{version}/MaimaiServlet/usbdl/{endpoint:..?}", {}),
("handle_deliver", "/{version}/MaimaiServlet/deliver/{endpoint:..?}", {}),
],
[
("handle_movie", "/{version}/MaimaiServlet/api/movie/{endpoint:..?}", {}),
("handle_mai", "/{version}/MaimaiServlet/{endpoint}", {}),
("handle_mai2", "/{version}/Maimai2Servlet/{endpoint}", {}),
]
)
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
servlet_name = "" if game_code == Mai2Constants.GAME_CODE_DX else "MaimaiServlet/"
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
f"{core_cfg.title.hostname}",
f"http://{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}/{game_ver}/{servlet_name}",
f"{self.core_cfg.title.hostname}",
)
return (
True,
f"http://{core_cfg.title.hostname}/{game_code}/$v/",
f"{core_cfg.title.hostname}",
f"http://{self.core_cfg.title.hostname}/{game_code}/{game_ver}/{servlet_name}",
f"{self.core_cfg.title.hostname}",
)
def setup(self):
@ -136,22 +155,17 @@ class Mai2Servlet:
f"Failed to make movie upload directory at {self.game_cfg.uploads.movies_dir}"
)
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
if url_path.lower() == "ping":
def handle_mai2(self, request: Request, game_code: int, matchers: Dict) -> bytes:
endpoint = matchers['endpoint']
version = int(matchers['version'])
if endpoint.lower() == "ping":
return zlib.compress(b'{"returnCode": "1"}')
elif url_path.startswith("api/movie/"):
self.logger.info(f"Movie data: {url_path} - {request.content.getvalue()}")
return b""
req_raw = request.content.getvalue()
url = request.uri.decode()
url_split = url_path.split("/")
internal_ver = 0
endpoint = url_split[len(url_split) - 1]
client_ip = Utils.get_ip_addr(request)
if request.uri.startswith(b"/SDEZ"):
if game_code == "SDEZ":
if version < 105: # 1.0
internal_ver = Mai2Constants.VER_MAIMAI_DX
elif version >= 105 and version < 110: # PLUS

View File

@ -12,25 +12,26 @@ from Crypto.Util.Padding import pad
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA1
from os import path
from typing import Tuple
from typing import Tuple, Dict, List
from core.config import CoreConfig
from core.utils import Utils
from titles.ongeki.config import OngekiConfig
from titles.ongeki.const import OngekiConstants
from titles.ongeki.base import OngekiBase
from titles.ongeki.plus import OngekiPlus
from titles.ongeki.summer import OngekiSummer
from titles.ongeki.summerplus import OngekiSummerPlus
from titles.ongeki.red import OngekiRed
from titles.ongeki.redplus import OngekiRedPlus
from titles.ongeki.bright import OngekiBright
from titles.ongeki.brightmemory import OngekiBrightMemory
from core.title import BaseServlet
from .config import OngekiConfig
from .const import OngekiConstants
from .base import OngekiBase
from .plus import OngekiPlus
from .summer import OngekiSummer
from .summerplus import OngekiSummerPlus
from .red import OngekiRed
from .redplus import OngekiRedPlus
from .bright import OngekiBright
from .brightmemory import OngekiBrightMemory
class OngekiServlet:
class OngekiServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.game_cfg = OngekiConfig()
self.hash_table: Dict[Dict[str, str]] = {}
if path.exists(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}"):
@ -106,9 +107,7 @@ class OngekiServlet:
)
@classmethod
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool:
game_cfg = OngekiConfig()
if path.exists(f"{cfg_dir}/{OngekiConstants.CONFIG_NAME}"):
@ -117,30 +116,31 @@ class OngekiServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return False
return True
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
f"{core_cfg.title.hostname}:{core_cfg.title.port}/",
f"http://{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}/{game_code}/$v/",
f"{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}/",
)
return (
True,
f"http://{core_cfg.title.hostname}/{game_code}/$v/",
f"{core_cfg.title.hostname}/",
f"http://{self.core_cfg.title.hostname}/{game_code}/$v/",
f"{self.core_cfg.title.hostname}/",
)
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
if url_path.lower() == "ping":
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
endpoint = matchers['endpoint']
version = matchers['version']
if endpoint.lower() == "ping":
return zlib.compress(b'{"returnCode": 1}')
req_raw = request.content.getvalue()
url_split = url_path.split("/")
encrtped = False
internal_ver = 0
endpoint = url_split[len(url_split) - 1]
client_ip = Utils.get_ip_addr(request)
if version < 105: # 1.0

View File

@ -1,4 +1,4 @@
from typing import Tuple
from typing import Tuple, List, Dict
from twisted.web.http import Request
from twisted.web import resource
from twisted.internet import reactor
@ -12,6 +12,7 @@ from os import path
from google.protobuf.message import DecodeError
from core import CoreConfig, Utils
from core.title import BaseServlet
from .config import PokkenConfig
from .base import PokkenBase
from .const import PokkenConstants
@ -19,10 +20,9 @@ from .proto import jackal_pb2
from .services import PokkenAdmissionFactory
class PokkenServlet(resource.Resource):
class PokkenServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.isLeaf = True
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.config_dir = cfg_dir
self.game_cfg = PokkenConfig()
if path.exists(f"{cfg_dir}/pokken.yaml"):
@ -56,9 +56,7 @@ class PokkenServlet(resource.Resource):
self.base = PokkenBase(core_cfg, self.game_cfg)
@classmethod
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool:
game_cfg = PokkenConfig()
if path.exists(f"{cfg_dir}/{PokkenConstants.CONFIG_NAME}"):
@ -67,18 +65,31 @@ class PokkenServlet(resource.Resource):
)
if not game_cfg.server.enable:
return (False, "", "")
return False
return True
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
True,
f"https://{game_cfg.server.hostname}:{game_cfg.ports.game}/{game_code}/$v/",
f"{game_cfg.server.hostname}/SDAK/$v/",
[],
[
("render_POST", "/pokken/", {}),
("handle_matching", "/pokken/matching", {}),
]
)
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if self.game_cfg.ports.game != 443:
return (
f"https://{self.game_cfg.server.hostname}:{self.game_cfg.ports.game}/pokken/",
f"{self.game_cfg.server.hostname}/SDAK/$v/",
)
return (
f"https://{self.game_cfg.server.hostname}/pokken/",
f"{self.game_cfg.server.hostname}/pokken/",
)
@classmethod
def get_mucha_info(
cls, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
def get_mucha_info(self, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str]:
game_cfg = PokkenConfig()
if path.exists(f"{cfg_dir}/{PokkenConstants.CONFIG_NAME}"):
@ -97,12 +108,7 @@ class PokkenServlet(resource.Resource):
self.game_cfg.ports.admission, PokkenAdmissionFactory(self.core_cfg, self.game_cfg)
)
def render_POST(
self, request: Request, version: int = 0, endpoints: str = ""
) -> bytes:
if endpoints == "matching":
return self.handle_matching(request)
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
content = request.content.getvalue()
if content == b"":
self.logger.info("Empty request")
@ -131,7 +137,7 @@ class PokkenServlet(resource.Resource):
ret = handler(pokken_request)
return ret
def handle_matching(self, request: Request) -> bytes:
def handle_matching(self, request: Request, game_code: int, matchers: Dict) -> bytes:
if not self.game_cfg.server.enable_matching:
return b""

View File

@ -1,25 +1,21 @@
from typing import Tuple
from typing import Tuple, Dict, List
from twisted.web.http import Request
from twisted.web import resource
import json, ast
from datetime import datetime
import yaml
import logging, coloredlogs
from logging.handlers import TimedRotatingFileHandler
import inflection
from os import path
from core import CoreConfig, Utils
from core.title import BaseServlet
from titles.sao.config import SaoConfig
from titles.sao.const import SaoConstants
from titles.sao.base import SaoBase
from titles.sao.handlers.base import *
class SaoServlet(resource.Resource):
class SaoServlet(BaseServlet):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.isLeaf = True
self.core_cfg = core_cfg
super().__init__(core_cfg, cfg_dir)
self.config_dir = cfg_dir
self.game_cfg = SaoConfig()
if path.exists(f"{cfg_dir}/sao.yaml"):
@ -51,49 +47,41 @@ class SaoServlet(resource.Resource):
self.logger.inited = True
self.base = SaoBase(core_cfg, self.game_cfg)
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[("render_POST", "/SaoServlet/{datecode}/proto/if/{endpoint}", {})]
)
@classmethod
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool:
game_cfg = SaoConfig()
if path.exists(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return (False, "", "")
return False
return True
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
return (
True,
f"http://{game_cfg.server.hostname}:{game_cfg.server.port}/{game_code}/$v/",
f"{game_cfg.server.hostname}/SDEW/$v/",
f"http://{self.game_cfg.server.hostname}:{self.game_cfg.server.port}/SaoServlet/",
f"{self.game_cfg.server.hostname}/SaoServlet/",
)
@classmethod
def get_mucha_info(
cls, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
game_cfg = SaoConfig()
if path.exists(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
def get_mucha_info(self, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str]:
if not self.game_cfg.server.enable:
return (False, "")
return (True, "SAO1")
def setup(self) -> None:
pass
def render_POST(
self, request: Request, version: int = 0, endpoints: str = ""
) -> bytes:
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
req_url = request.uri.decode()
if req_url == "/matching":
self.logger.info("Matching request")

View File

@ -5,20 +5,20 @@ import logging
import json
from hashlib import md5
from twisted.web.http import Request
from typing import Dict, Tuple
from typing import Dict, Tuple, List
from os import path
from core import CoreConfig, Utils
from titles.wacca.config import WaccaConfig
from titles.wacca.config import WaccaConfig
from titles.wacca.const import WaccaConstants
from titles.wacca.reverse import WaccaReverse
from titles.wacca.lilyr import WaccaLilyR
from titles.wacca.lily import WaccaLily
from titles.wacca.s import WaccaS
from titles.wacca.base import WaccaBase
from titles.wacca.handlers.base import BaseResponse
from titles.wacca.handlers.helpers import Version
from .config import WaccaConfig
from .config import WaccaConfig
from .const import WaccaConstants
from .reverse import WaccaReverse
from .lilyr import WaccaLilyR
from .lily import WaccaLily
from .s import WaccaS
from .base import WaccaBase
from .handlers.base import BaseResponse
from .handlers.helpers import Version
class WaccaServlet:
@ -61,10 +61,15 @@ class WaccaServlet:
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
return (
[],
[("render_POST", "/WaccaServlet/api/{api}/{endpoint}", {}),
("render_POST", "/WaccaServlet/api/{api}/{branch}/{endpoint}", {}),]
)
@classmethod
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
def is_game_enabled(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> bool:
game_cfg = WaccaConfig()
if path.exists(f"{cfg_dir}/{WaccaConstants.CONFIG_NAME}"):
game_cfg.update(
@ -72,27 +77,43 @@ class WaccaServlet:
)
if not game_cfg.server.enable:
return (False, "", "")
return False
if core_cfg.server.is_develop:
return True
def get_allnet_info(self, game_code: str, game_ver: int, keychip: str) -> Tuple[str, str]:
if not self.core_cfg.server.is_using_proxy and Utils.get_title_port(self.core_cfg) != 80:
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v",
f"http://{self.core_cfg.title.hostname}:{Utils.get_title_port(self.core_cfg)}/{game_code}/$v",
"",
)
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v", "")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
return (True, f"http://{self.core_cfg.title.hostname}/{game_code}/$v", "")
def render_POST(self, request: Request, game_code: int, matchers: Dict) -> bytes:
def end(resp: Dict) -> bytes:
hash = md5(json.dumps(resp, ensure_ascii=False).encode()).digest()
request.responseHeaders.addRawHeader(b"X-Wacca-Hash", hash.hex().encode())
return json.dumps(resp).encode()
api = matchers['api']
branch = matchers.get('branch', '')
endpoint = matchers['endpoint']
client_ip = Utils.get_ip_addr(request)
if branch:
url_path = f"{api}/{branch}/{endpoint}"
func_to_find = f"handle_{api}_{branch}_{endpoint}_request"
else:
url_path = f"{api}/{endpoint}"
func_to_find = f"handle_{api}_{endpoint}_request"
try:
req_json = json.loads(request.content.getvalue())
version_full = Version(req_json["appVersion"])
except Exception:
self.logger.error(
f"Failed to parse request to {url_path} -> {request.content.getvalue()}"
@ -102,18 +123,6 @@ class WaccaServlet:
resp.message = "不正なリクエスト エラーです"
return end(resp.make())
if "api/" in url_path:
func_to_find = (
"handle_" + url_path.partition("api/")[2].replace("/", "_") + "_request"
)
else:
self.logger.error(f"Malformed url {url_path}")
resp = BaseResponse()
resp.status = 1
resp.message = "Bad URL"
return end(resp.make())
ver_search = int(version_full)
if ver_search < 15000: