From 3c06f46644c4749d84459f88b7679f9a8ba5361a Mon Sep 17 00:00:00 2001 From: Hay1tsme Date: Thu, 9 Nov 2023 18:11:58 -0500 Subject: [PATCH] sao: add decoder helpers, hash checks --- example_config/sao.yaml | 4 +++ titles/sao/config.py | 17 +++++++++++ titles/sao/handlers/__init__.py | 3 +- titles/sao/handlers/base.py | 34 +++++++++++++++++++--- titles/sao/handlers/helpers.py | 50 +++++++++++++++++++++++++++++++++ titles/sao/index.py | 17 +++++++++-- 6 files changed, 117 insertions(+), 8 deletions(-) create mode 100644 titles/sao/handlers/helpers.py diff --git a/example_config/sao.yaml b/example_config/sao.yaml index bfb1c08..0209ffe 100644 --- a/example_config/sao.yaml +++ b/example_config/sao.yaml @@ -7,3 +7,7 @@ crypt: enable: False key: "" iv: "" + +hash: + verify_hash: False + hash_base: "" \ No newline at end of file diff --git a/titles/sao/config.py b/titles/sao/config.py index 1ef9991..33d319b 100644 --- a/titles/sao/config.py +++ b/titles/sao/config.py @@ -51,8 +51,25 @@ class SaoCryptConfig: self.__config, "sao", "crypt", "iv", default="" ) +class SaoHashConfig: + def __init__(self, parent_config: "SaoConfig"): + self.__config = parent_config + + @property + def verify_hash(self) -> bool: + return CoreConfig.get_config_field( + self.__config, "sao", "hash", "verify_hash", default=False + ) + + @property + def hash_base(self) -> str: + return CoreConfig.get_config_field( + self.__config, "sao", "hash", "hash_base", default="" + ) + class SaoConfig(dict): def __init__(self) -> None: self.server = SaoServerConfig(self) self.crypt = SaoCryptConfig(self) + self.hash = SaoHashConfig(self) diff --git a/titles/sao/handlers/__init__.py b/titles/sao/handlers/__init__.py index 90a6b4e..4d38136 100644 --- a/titles/sao/handlers/__init__.py +++ b/titles/sao/handlers/__init__.py @@ -1 +1,2 @@ -from titles.sao.handlers.base import * \ No newline at end of file +from .base import * +from .helpers import * \ No newline at end of file diff --git a/titles/sao/handlers/base.py b/titles/sao/handlers/base.py index 08b7937..579e2c8 100644 --- a/titles/sao/handlers/base.py +++ b/titles/sao/handlers/base.py @@ -1,22 +1,26 @@ import struct from datetime import datetime from construct import * -import sys +from .helpers import * import csv from csv import * class SaoRequestHeader: def __init__(self, data: bytes) -> None: - collection = struct.unpack_from("!H6xIII16s", data) + collection = struct.unpack_from("!H6xIII16sI", data) self.cmd: int = collection[0] self.vendor_id: int = collection[1] self.game_id: int = collection[2] self.version_id: int = collection[3] - self.checksum: str = collection[4] + self.hash: str = collection[4] + self.data_len: str = collection[5] class SaoBaseRequest: def __init__(self, header: SaoRequestHeader, data: bytes) -> None: self.header = header + if self.header.data_len != len(data): + logging.getLogger('sao').error(f"Expected {self.header.data_len} data bytes byt got {len(data)}!") + # TODO: Raise an error here class SaoBaseResponse: def __init__(self, cmd_id: int) -> None: @@ -2749,4 +2753,26 @@ class SaoScanQrQuestProfileCardResponse(SaoBaseResponse): resp_data = resp_struct.build(resp_data) self.length = len(resp_data) - return super().make() + resp_data \ No newline at end of file + return super().make() + resp_data + +class SaoConsumeCreditGuestRequest(SaoBaseRequest): + def __init__(self, header: SaoRequestHeader, data: bytes) -> None: + super().__init__(header, data) + off = 0 + shop_id = decode_str(data, off) + self.shop_id = shop_id[0] + off += shop_id[1] + + serial_num = decode_str(data, off) + self.serial_num = serial_num[0] + off += serial_num[1] + + self.cab_type = decode_byte(data, off) + off += BYTE_OFF + + self.act_type = decode_byte(data, off) + off += BYTE_OFF + + self.consume_num = decode_byte(data, off) + off += BYTE_OFF + \ No newline at end of file diff --git a/titles/sao/handlers/helpers.py b/titles/sao/handlers/helpers.py new file mode 100644 index 0000000..c4d4214 --- /dev/null +++ b/titles/sao/handlers/helpers.py @@ -0,0 +1,50 @@ +from typing import Tuple +import struct +import logging + +BIGINT_OFF = 16 +LONG_OFF = 8 +INT_OFF = 4 +SHORT_OFF = 2 +BYTE_OFF = 1 + +def decode_num(data: bytes, offset: int, size: int) -> int: + try: + return int.from_bytes(data[offset:offset + size], 'big') + except: + logging.getLogger('sao').error(f"Failed to parse {data[offset:offset + size]} as BE number of width {size}") + return 0 + +def decode_byte(data: bytes, offset: int) -> int: + return decode_num(data, offset, BYTE_OFF) + +def decode_short(data: bytes, offset: int) -> int: + return decode_num(data, offset, SHORT_OFF) + +def decode_int(data: bytes, offset: int) -> int: + return decode_num(data, offset, INT_OFF) + +def decode_long(data: bytes, offset: int) -> int: + return decode_num(data, offset, LONG_OFF) + +def decode_bigint(data: bytes, offset: int) -> int: + return decode_num(data, offset, BIGINT_OFF) + +def decode_str(data: bytes, offset: int) -> Tuple[str, int]: + try: + str_len = decode_int(data, offset) + num_bytes_decoded = INT_OFF + str_len + str_out = data[offset + INT_OFF:offset + num_bytes_decoded].decode("utf-16-le", errors="replace") + return (str_out, num_bytes_decoded) + except: + logging.getLogger('sao').error(f"Failed to parse {data[offset:]} as string!") + return ("", 0) + +def encode_str(s: str) -> bytes: + try: + str_bytes = s.encode("utf-16-le", errors="replace") + str_len_bytes = struct.pack("!I", len(str_bytes)) + return str_len_bytes + str_bytes + except: + logging.getLogger('sao').error(f"Failed to encode {s} as bytes!") + return b"" \ No newline at end of file diff --git a/titles/sao/index.py b/titles/sao/index.py index 509599e..7f49e64 100644 --- a/titles/sao/index.py +++ b/titles/sao/index.py @@ -5,6 +5,7 @@ import logging, coloredlogs from logging.handlers import TimedRotatingFileHandler from os import path from Crypto.Cipher import Blowfish +from hashlib import md5 from core import CoreConfig, Utils from core.title import BaseServlet @@ -48,6 +49,10 @@ class SaoServlet(BaseServlet): self.logger.inited = True self.base = SaoBase(core_cfg, self.game_cfg) + self.static_hash = None + + if self.game_cfg.hash.verify_hash: + self.static_hash = md5(self.game_cfg.hash.hash_base) # Greate hashing guys, really validates the data def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]: return ( @@ -93,14 +98,20 @@ class SaoServlet(BaseServlet): cmd_str = f"{req_header.cmd:04x}" + if self.game_cfg.hash.verify_hash and self.static_hash != req_header.hash: + self.logger.error(f"Hash mismatch! Expecting {self.static_hash} but recieved {req_header.hash}") + return b"" + if self.game_cfg.crypt.enable: - iv = req_raw[30:38] + + iv = req_raw[40:48] cipher = Blowfish.new(self.game_cfg.crypt.key, Blowfish.MODE_CBC, iv) - crypt_data = req_raw[38:] + crypt_data = req_raw[48:] req_data = cipher.decrypt(crypt_data) + self.logger.debug(f"Decrypted {req_data.hex()} with IV {iv.hex()}") else: - req_data = req_raw[30:] + req_data = req_raw[40:] handler = getattr(self.base, f"handle_{cmd_str}", None) if handler is None: