forked from Hay1tsme/artemis
sao: add decoder helpers, hash checks
This commit is contained in:
parent
95234a421c
commit
3c06f46644
@ -7,3 +7,7 @@ crypt:
|
|||||||
enable: False
|
enable: False
|
||||||
key: ""
|
key: ""
|
||||||
iv: ""
|
iv: ""
|
||||||
|
|
||||||
|
hash:
|
||||||
|
verify_hash: False
|
||||||
|
hash_base: ""
|
@ -51,8 +51,25 @@ class SaoCryptConfig:
|
|||||||
self.__config, "sao", "crypt", "iv", default=""
|
self.__config, "sao", "crypt", "iv", default=""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
class SaoHashConfig:
|
||||||
|
def __init__(self, parent_config: "SaoConfig"):
|
||||||
|
self.__config = parent_config
|
||||||
|
|
||||||
|
@property
|
||||||
|
def verify_hash(self) -> bool:
|
||||||
|
return CoreConfig.get_config_field(
|
||||||
|
self.__config, "sao", "hash", "verify_hash", default=False
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def hash_base(self) -> str:
|
||||||
|
return CoreConfig.get_config_field(
|
||||||
|
self.__config, "sao", "hash", "hash_base", default=""
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class SaoConfig(dict):
|
class SaoConfig(dict):
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.server = SaoServerConfig(self)
|
self.server = SaoServerConfig(self)
|
||||||
self.crypt = SaoCryptConfig(self)
|
self.crypt = SaoCryptConfig(self)
|
||||||
|
self.hash = SaoHashConfig(self)
|
||||||
|
@ -1 +1,2 @@
|
|||||||
from titles.sao.handlers.base import *
|
from .base import *
|
||||||
|
from .helpers import *
|
@ -1,22 +1,26 @@
|
|||||||
import struct
|
import struct
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from construct import *
|
from construct import *
|
||||||
import sys
|
from .helpers import *
|
||||||
import csv
|
import csv
|
||||||
from csv import *
|
from csv import *
|
||||||
|
|
||||||
class SaoRequestHeader:
|
class SaoRequestHeader:
|
||||||
def __init__(self, data: bytes) -> None:
|
def __init__(self, data: bytes) -> None:
|
||||||
collection = struct.unpack_from("!H6xIII16s", data)
|
collection = struct.unpack_from("!H6xIII16sI", data)
|
||||||
self.cmd: int = collection[0]
|
self.cmd: int = collection[0]
|
||||||
self.vendor_id: int = collection[1]
|
self.vendor_id: int = collection[1]
|
||||||
self.game_id: int = collection[2]
|
self.game_id: int = collection[2]
|
||||||
self.version_id: int = collection[3]
|
self.version_id: int = collection[3]
|
||||||
self.checksum: str = collection[4]
|
self.hash: str = collection[4]
|
||||||
|
self.data_len: str = collection[5]
|
||||||
|
|
||||||
class SaoBaseRequest:
|
class SaoBaseRequest:
|
||||||
def __init__(self, header: SaoRequestHeader, data: bytes) -> None:
|
def __init__(self, header: SaoRequestHeader, data: bytes) -> None:
|
||||||
self.header = header
|
self.header = header
|
||||||
|
if self.header.data_len != len(data):
|
||||||
|
logging.getLogger('sao').error(f"Expected {self.header.data_len} data bytes byt got {len(data)}!")
|
||||||
|
# TODO: Raise an error here
|
||||||
|
|
||||||
class SaoBaseResponse:
|
class SaoBaseResponse:
|
||||||
def __init__(self, cmd_id: int) -> None:
|
def __init__(self, cmd_id: int) -> None:
|
||||||
@ -2750,3 +2754,25 @@ class SaoScanQrQuestProfileCardResponse(SaoBaseResponse):
|
|||||||
|
|
||||||
self.length = len(resp_data)
|
self.length = len(resp_data)
|
||||||
return super().make() + resp_data
|
return super().make() + resp_data
|
||||||
|
|
||||||
|
class SaoConsumeCreditGuestRequest(SaoBaseRequest):
|
||||||
|
def __init__(self, header: SaoRequestHeader, data: bytes) -> None:
|
||||||
|
super().__init__(header, data)
|
||||||
|
off = 0
|
||||||
|
shop_id = decode_str(data, off)
|
||||||
|
self.shop_id = shop_id[0]
|
||||||
|
off += shop_id[1]
|
||||||
|
|
||||||
|
serial_num = decode_str(data, off)
|
||||||
|
self.serial_num = serial_num[0]
|
||||||
|
off += serial_num[1]
|
||||||
|
|
||||||
|
self.cab_type = decode_byte(data, off)
|
||||||
|
off += BYTE_OFF
|
||||||
|
|
||||||
|
self.act_type = decode_byte(data, off)
|
||||||
|
off += BYTE_OFF
|
||||||
|
|
||||||
|
self.consume_num = decode_byte(data, off)
|
||||||
|
off += BYTE_OFF
|
||||||
|
|
50
titles/sao/handlers/helpers.py
Normal file
50
titles/sao/handlers/helpers.py
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
from typing import Tuple
|
||||||
|
import struct
|
||||||
|
import logging
|
||||||
|
|
||||||
|
BIGINT_OFF = 16
|
||||||
|
LONG_OFF = 8
|
||||||
|
INT_OFF = 4
|
||||||
|
SHORT_OFF = 2
|
||||||
|
BYTE_OFF = 1
|
||||||
|
|
||||||
|
def decode_num(data: bytes, offset: int, size: int) -> int:
|
||||||
|
try:
|
||||||
|
return int.from_bytes(data[offset:offset + size], 'big')
|
||||||
|
except:
|
||||||
|
logging.getLogger('sao').error(f"Failed to parse {data[offset:offset + size]} as BE number of width {size}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def decode_byte(data: bytes, offset: int) -> int:
|
||||||
|
return decode_num(data, offset, BYTE_OFF)
|
||||||
|
|
||||||
|
def decode_short(data: bytes, offset: int) -> int:
|
||||||
|
return decode_num(data, offset, SHORT_OFF)
|
||||||
|
|
||||||
|
def decode_int(data: bytes, offset: int) -> int:
|
||||||
|
return decode_num(data, offset, INT_OFF)
|
||||||
|
|
||||||
|
def decode_long(data: bytes, offset: int) -> int:
|
||||||
|
return decode_num(data, offset, LONG_OFF)
|
||||||
|
|
||||||
|
def decode_bigint(data: bytes, offset: int) -> int:
|
||||||
|
return decode_num(data, offset, BIGINT_OFF)
|
||||||
|
|
||||||
|
def decode_str(data: bytes, offset: int) -> Tuple[str, int]:
|
||||||
|
try:
|
||||||
|
str_len = decode_int(data, offset)
|
||||||
|
num_bytes_decoded = INT_OFF + str_len
|
||||||
|
str_out = data[offset + INT_OFF:offset + num_bytes_decoded].decode("utf-16-le", errors="replace")
|
||||||
|
return (str_out, num_bytes_decoded)
|
||||||
|
except:
|
||||||
|
logging.getLogger('sao').error(f"Failed to parse {data[offset:]} as string!")
|
||||||
|
return ("", 0)
|
||||||
|
|
||||||
|
def encode_str(s: str) -> bytes:
|
||||||
|
try:
|
||||||
|
str_bytes = s.encode("utf-16-le", errors="replace")
|
||||||
|
str_len_bytes = struct.pack("!I", len(str_bytes))
|
||||||
|
return str_len_bytes + str_bytes
|
||||||
|
except:
|
||||||
|
logging.getLogger('sao').error(f"Failed to encode {s} as bytes!")
|
||||||
|
return b""
|
@ -5,6 +5,7 @@ import logging, coloredlogs
|
|||||||
from logging.handlers import TimedRotatingFileHandler
|
from logging.handlers import TimedRotatingFileHandler
|
||||||
from os import path
|
from os import path
|
||||||
from Crypto.Cipher import Blowfish
|
from Crypto.Cipher import Blowfish
|
||||||
|
from hashlib import md5
|
||||||
|
|
||||||
from core import CoreConfig, Utils
|
from core import CoreConfig, Utils
|
||||||
from core.title import BaseServlet
|
from core.title import BaseServlet
|
||||||
@ -48,6 +49,10 @@ class SaoServlet(BaseServlet):
|
|||||||
self.logger.inited = True
|
self.logger.inited = True
|
||||||
|
|
||||||
self.base = SaoBase(core_cfg, self.game_cfg)
|
self.base = SaoBase(core_cfg, self.game_cfg)
|
||||||
|
self.static_hash = None
|
||||||
|
|
||||||
|
if self.game_cfg.hash.verify_hash:
|
||||||
|
self.static_hash = md5(self.game_cfg.hash.hash_base) # Greate hashing guys, really validates the data
|
||||||
|
|
||||||
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
|
def get_endpoint_matchers(self) -> Tuple[List[Tuple[str, str, Dict]], List[Tuple[str, str, Dict]]]:
|
||||||
return (
|
return (
|
||||||
@ -93,14 +98,20 @@ class SaoServlet(BaseServlet):
|
|||||||
|
|
||||||
cmd_str = f"{req_header.cmd:04x}"
|
cmd_str = f"{req_header.cmd:04x}"
|
||||||
|
|
||||||
|
if self.game_cfg.hash.verify_hash and self.static_hash != req_header.hash:
|
||||||
|
self.logger.error(f"Hash mismatch! Expecting {self.static_hash} but recieved {req_header.hash}")
|
||||||
|
return b""
|
||||||
|
|
||||||
if self.game_cfg.crypt.enable:
|
if self.game_cfg.crypt.enable:
|
||||||
iv = req_raw[30:38]
|
|
||||||
|
iv = req_raw[40:48]
|
||||||
cipher = Blowfish.new(self.game_cfg.crypt.key, Blowfish.MODE_CBC, iv)
|
cipher = Blowfish.new(self.game_cfg.crypt.key, Blowfish.MODE_CBC, iv)
|
||||||
crypt_data = req_raw[38:]
|
crypt_data = req_raw[48:]
|
||||||
req_data = cipher.decrypt(crypt_data)
|
req_data = cipher.decrypt(crypt_data)
|
||||||
|
self.logger.debug(f"Decrypted {req_data.hex()} with IV {iv.hex()}")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
req_data = req_raw[30:]
|
req_data = req_raw[40:]
|
||||||
|
|
||||||
handler = getattr(self.base, f"handle_{cmd_str}", None)
|
handler = getattr(self.base, f"handle_{cmd_str}", None)
|
||||||
if handler is None:
|
if handler is None:
|
||||||
|
Loading…
Reference in New Issue
Block a user