diff --git a/core/adb_handlers/felica.py b/core/adb_handlers/felica.py index e2f8c11..479e84d 100644 --- a/core/adb_handlers/felica.py +++ b/core/adb_handlers/felica.py @@ -52,6 +52,7 @@ class ADBFelicaLookup2Response(ADBBaseResponse): self.access_code = access_code if access_code is not None else "00000000000000000000" self.company = CompanyCodes.SEGA self.portal_status = PortalRegStatus.NO_REG + self.auth_key = [0] * 256 @classmethod def from_req(cls, req: ADBHeader, user_id: Union[int, None] = None, access_code: Union[str, None] = None) -> "ADBFelicaLookup2Response": @@ -76,7 +77,7 @@ class ADBFelicaLookup2Response(ADBBaseResponse): access_code = bytes.fromhex(self.access_code), portal_status = self.portal_status.value, company_code = self.company.value, - auth_key = [0] * 256 # Unsupported + auth_key = self.auth_key )) self.head.length = HEADER_SIZE + len(resp_struct) diff --git a/core/adb_handlers/lookup.py b/core/adb_handlers/lookup.py index 076fc0a..0640493 100644 --- a/core/adb_handlers/lookup.py +++ b/core/adb_handlers/lookup.py @@ -52,6 +52,7 @@ class ADBLookupExResponse(ADBBaseResponse): super().__init__(code, length, status, game_id, store_id, keychip_id) self.user_id = user_id if user_id is not None else -1 self.portal_reg = PortalRegStatus.NO_REG + self.auth_key = [0] * 256 @classmethod def from_req(cls, req: ADBHeader, user_id: Union[int, None]) -> "ADBLookupExResponse": @@ -72,7 +73,7 @@ class ADBLookupExResponse(ADBBaseResponse): body = resp_struct.build(dict( user_id = self.user_id, portal_reg = self.portal_reg.value, - auth_key = [0] * 256, + auth_key = self.auth_key, relation1 = -1, relation2 = -1 )) diff --git a/core/aimedb.py b/core/aimedb.py index 5388600..c376749 100644 --- a/core/aimedb.py +++ b/core/aimedb.py @@ -7,6 +7,7 @@ from typing_extensions import Final from logging.handlers import TimedRotatingFileHandler from core.config import CoreConfig +from core.utils import create_sega_auth_key from core.data import Data from .adb_handlers import * @@ -179,6 +180,14 @@ class AimedbProtocol(Protocol): self.logger.info( f"access_code {req.access_code} -> user_id {ret.user_id}" ) + + if user_id > 0 and self.config.aimedb.id_secret: + auth_key = create_sega_auth_key(user_id, req.head.game_id, req.head.store_id, self.config.aimedb.id_secret) + if auth_key is not None: + auth_key_extra_len = 256 - len(auth_key) + auth_key_full = auth_key.encode() + (b"\0" * auth_key_extra_len) # TODO: impl + self.logger.debug(f"Generated auth token {auth_key}") + return ret def handle_felica_lookup(self, data: bytes, resp_code: int) -> bytes: @@ -241,7 +250,16 @@ class AimedbProtocol(Protocol): f"idm {req.idm} ipm {req.pmm} -> access_code {access_code} user_id {user_id}" ) - return ADBFelicaLookup2Response.from_req(req.head, user_id, access_code) + resp = ADBFelicaLookup2Response.from_req(req.head, user_id, access_code) + + if user_id > 0 and self.config.aimedb.id_secret: + auth_key = create_sega_auth_key(user_id, req.head.game_id, req.head.store_id, self.config.aimedb.id_secret) + if auth_key is not None: + auth_key_extra_len = 256 - len(auth_key) + auth_key_full = auth_key.encode() + (b"\0" * auth_key_extra_len) # TODO: impl + self.logger.debug(f"Generated auth token {auth_key}") + + return resp def handle_campaign_clear(self, data: bytes, resp_code: int) -> ADBBaseResponse: req = ADBCampaignClearRequest(data) diff --git a/core/config.py b/core/config.py index 684ec15..44d83a8 100644 --- a/core/config.py +++ b/core/config.py @@ -314,6 +314,12 @@ class AimedbConfig: self.__config, "core", "aimedb", "key", default="" ) + @property + def id_secret(self) -> str: + return CoreConfig.get_config_field( + self.__config, "core", "aimedb", "id_secret", default="" + ) + class MuchaConfig: def __init__(self, parent_config: "CoreConfig") -> None: diff --git a/core/utils.py b/core/utils.py index 1f5ccf2..2d1854e 100644 --- a/core/utils.py +++ b/core/utils.py @@ -64,10 +64,10 @@ class Utils: return cls.real_title_port_ssl -def create_sega_auth_key(aime_id: int, game: str, ver: float, place_id: str, b64_secret: str, err_logger: str = 'aimedb') -> Optional[str]: +def create_sega_auth_key(aime_id: int, game: str, place_id: str, b64_secret: str, err_logger: str = 'aimedb') -> Optional[str]: logger = logging.getLogger(err_logger) try: - return jwt.encode({ "aime_id": aime_id, "game": game, "ver": ver, "place_id": place_id, "exp": int(datetime.now(tz=timezone.utc).timestamp()) + 86400 }, b64decode(b64_secret), algorithm="HS256") + return jwt.encode({ "aime_id": aime_id, "game": game, "place_id": place_id, "exp": int(datetime.now(tz=timezone.utc).timestamp()) + 86400 }, b64decode(b64_secret), algorithm="HS256") except jwt.InvalidKeyError: logger.error("Failed to encode Sega Auth Key because the secret is invalid!") return None diff --git a/example_config/core.yaml b/example_config/core.yaml index b0997d8..69234a5 100644 --- a/example_config/core.yaml +++ b/example_config/core.yaml @@ -56,6 +56,7 @@ aimedb: loglevel: "info" port: 22345 key: "" + id_secret: "" mucha: enable: False