artemis/core/utils.py

77 lines
3.1 KiB
Python
Raw Normal View History

from typing import Dict, Any, Optional
2023-02-16 22:13:41 +00:00
from types import ModuleType
2024-01-09 08:07:04 +00:00
from starlette.requests import Request
2023-02-24 19:13:31 +00:00
import logging
2023-02-16 22:13:41 +00:00
import importlib
from os import walk
import jwt
from base64 import b64decode
from datetime import datetime, timezone
2023-02-16 22:13:41 +00:00
from .config import CoreConfig
2023-03-09 16:38:58 +00:00
2023-02-16 22:13:41 +00:00
class Utils:
real_title_port = None
real_title_port_ssl = None
2023-02-16 22:13:41 +00:00
@classmethod
def get_all_titles(cls) -> Dict[str, ModuleType]:
ret: Dict[str, Any] = {}
for root, dirs, files in walk("titles"):
2023-03-09 16:38:58 +00:00
for dir in dirs:
2023-02-16 22:13:41 +00:00
if not dir.startswith("__"):
try:
mod = importlib.import_module(f"titles.{dir}")
if hasattr(mod, "game_codes") and hasattr(
mod, "index"
): # Minimum required to function
2023-04-15 04:12:45 +00:00
ret[dir] = mod
2023-02-16 22:13:41 +00:00
except ImportError as e:
2023-02-24 19:13:31 +00:00
logging.getLogger("core").error(f"get_all_titles: {dir} - {e}")
raise
2023-02-16 22:13:41 +00:00
return ret
@classmethod
def get_ip_addr(cls, req: Request) -> str:
2024-01-09 08:07:04 +00:00
return req.headers.get("x-forwarded-for", req.client.host)
@classmethod
def get_title_port(cls, cfg: CoreConfig):
if cls.real_title_port is not None: return cls.real_title_port
2024-01-09 08:07:04 +00:00
cls.real_title_port = cfg.server.proxy_port if cfg.server.is_using_proxy else cfg.server.port
return cls.real_title_port
2023-11-30 23:22:01 +00:00
def create_sega_auth_key(aime_id: int, game: str, place_id: int, keychip_id: str, b64_secret: str, exp_seconds: int = 86400, err_logger: str = 'aimedb') -> Optional[str]:
logger = logging.getLogger(err_logger)
try:
2023-11-30 23:22:01 +00:00
return jwt.encode({ "aime_id": aime_id, "game": game, "place_id": place_id, "keychip_id": keychip_id, "exp": int(datetime.now(tz=timezone.utc).timestamp()) + exp_seconds }, b64decode(b64_secret), algorithm="HS256")
except jwt.InvalidKeyError:
logger.error("Failed to encode Sega Auth Key because the secret is invalid!")
return None
except Exception as e:
logger.error(f"Unknown exception occoured when encoding Sega Auth Key! {e}")
return None
def decode_sega_auth_key(token: str, b64_secret: str, err_logger: str = 'aimedb') -> Optional[Dict]:
logger = logging.getLogger(err_logger)
try:
return jwt.decode(token, "secret", b64decode(b64_secret), algorithms=["HS256"], options={"verify_signature": True})
except jwt.ExpiredSignatureError:
logger.error("Sega Auth Key failed to validate due to an expired signature!")
return None
except jwt.InvalidSignatureError:
logger.error("Sega Auth Key failed to validate due to an invalid signature!")
return None
except jwt.DecodeError as e:
logger.error(f"Sega Auth Key failed to decode! {e}")
return None
except jwt.InvalidTokenError as e:
logger.error(f"Sega Auth Key is invalid! {e}")
return None
except Exception as e:
logger.error(f"Unknown exception occoured when decoding Sega Auth Key! {e}")
return None