From 84a87f5a7faa296673a8cb960130bc1d6ec32ab5 Mon Sep 17 00:00:00 2001 From: Bottersnike Date: Thu, 17 Nov 2022 01:59:17 +0000 Subject: [PATCH] Server stuff --- .gitignore | 3 --- .gitmodules | 3 --- eaapi/__init__.py | 5 ++++ eaapi/cardconv.py | 12 +++++----- eaapi/const.py | 12 ++++++++-- eaapi/crypt.py | 3 ++- eaapi/decoder.py | 53 ++++++++++++++++++++++++++++-------------- eaapi/encoder.py | 33 ++++++++++++++++---------- eaapi/exception.py | 12 ++++++++++ eaapi/keys.py | 4 ++++ eaapi/keys.template.py | 5 ---- eaapi/lz77.py | 12 +++++----- eaapi/misc.py | 17 ++++++++------ eaapi/node.py | 36 +++++++++++++++++++--------- eaapi/packer.py | 8 +++---- eaapi/wrapper.py | 4 ++-- 16 files changed, 143 insertions(+), 79 deletions(-) delete mode 100644 .gitmodules create mode 100644 eaapi/keys.py delete mode 100644 eaapi/keys.template.py diff --git a/.gitignore b/.gitignore index dd060ac..0d9ee4b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,3 @@ __pycache__/ *.db misc/ *.egg-info - -# Sorry -eaapi/keys.py diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index 330db1f..0000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "server"] - path = eaapi/server - url = https://gitea.tendokyu.moe/eamuse/server.git diff --git a/eaapi/__init__.py b/eaapi/__init__.py index 6c732e8..4ee6ff3 100644 --- a/eaapi/__init__.py +++ b/eaapi/__init__.py @@ -5,9 +5,14 @@ from .decoder import Decoder from .wrapper import wrap, unwrap from .misc import parse_model +from .exception import EAAPIException +from . import crypt + __all__ = ( "Type", "ServicesMode", "Compression", "XMLNode", "Encoder", "Decoder", "wrap", "unwrap", "parse_model", + "EAAPIException", + "crypt", ) diff --git a/eaapi/cardconv.py b/eaapi/cardconv.py index f0031aa..78821ee 100644 --- a/eaapi/cardconv.py +++ b/eaapi/cardconv.py @@ -8,17 +8,17 @@ from .keys import CARDCONV_KEY from .const import CARD_ALPHABET -def enc_des(uid): +def enc_des(uid: bytes) -> bytes: cipher = DES3.new(CARDCONV_KEY, DES3.MODE_CBC, iv=b'\0' * 8) return cipher.encrypt(uid) -def dec_des(uid): +def dec_des(uid: bytes) -> bytes: cipher = DES3.new(CARDCONV_KEY, DES3.MODE_CBC, iv=b'\0' * 8) return cipher.decrypt(uid) -def checksum(data): +def checksum(data: bytes) -> int: chk = sum(data[i] * (i % 3 + 1) for i in range(15)) while chk > 31: @@ -27,7 +27,7 @@ def checksum(data): return chk -def uid_to_konami(uid): +def uid_to_konami(uid: str) -> str: assert_true(len(uid) == 16, "UID must be 16 bytes", InvalidCard) if uid.upper().startswith("E004"): @@ -52,7 +52,7 @@ def uid_to_konami(uid): return "".join(CARD_ALPHABET[i] for i in out) -def konami_to_uid(konami_id): +def konami_to_uid(konami_id: str) -> str: if konami_id[14] == "1": card_type = 1 elif konami_id[14] == "2": @@ -62,7 +62,7 @@ def konami_to_uid(konami_id): assert_true(len(konami_id) == 16, "ID must be 16 characters", InvalidCard) assert_true(all(i in CARD_ALPHABET for i in konami_id), "ID contains invalid characters", InvalidCard) - card = [CARD_ALPHABET.index(i) for i in konami_id] + card = bytearray([CARD_ALPHABET.index(i) for i in konami_id]) assert_true(card[11] % 2 == card[12] % 2, "Parity check failed", InvalidCard) assert_true(card[13] == card[12] ^ 1, "Card invalid", InvalidCard) assert_true(card[15] == checksum(card), "Checksum failed", InvalidCard) diff --git a/eaapi/const.py b/eaapi/const.py index 7f86292..00c257a 100644 --- a/eaapi/const.py +++ b/eaapi/const.py @@ -59,7 +59,7 @@ class _Type: fmt: str names: List[str] c_name: str - convert: Callable + convert: Callable | None size: int = 1 no_check: bool = False @@ -75,7 +75,14 @@ class _Type: return (*map(self.convert, value),) -def parse_ip(ip): +def parse_ip(ip: int | str) -> tuple[int, int, int, int]: + if isinstance(ip, int): + return ( + (ip >> 24) & 0xff, + (ip >> 16) & 0xff, + (ip >> 8) & 0xff, + (ip >> 0) & 0xff, + ) return (*map(int, ip.split(".")),) @@ -92,6 +99,7 @@ class Type(enum.Enum): Blob = _Type(0x0a, "S", ["bin", "binary"], "char[]", bytes) Str = _Type(0x0b, "s", ["str", "string"], "char[]", unescape) IPv4 = _Type(0x0c, "4B", ["ip4"], "uint8[4]", parse_ip, 1, True) + IPv4_Int = _Type(0x0c, "I", ["ip4"], "uint8[4]", parse_ip, 1, True) Time = _Type(0x0d, "I", ["time"], "uint32", int) Float = _Type(0x0e, "f", ["float", "f"], "float", float) Double = _Type(0x0f, "d", ["double", "d"], "double", float) diff --git a/eaapi/crypt.py b/eaapi/crypt.py index e92fac1..6e9d7fc 100644 --- a/eaapi/crypt.py +++ b/eaapi/crypt.py @@ -25,7 +25,8 @@ prng = new_prng() def validate_key(info): match = re.match(r"^(\d)-([0-9a-f]{8})-([0-9a-f]{4})$", info) - assert_true(match, "Invalid eamuse info key") + assert_true(match is not None, "Invalid eamuse info key") + assert match is not None version = match.group(1) assert_true(version == "1", f"Unsupported encryption version ({version})") diff --git a/eaapi/decoder.py b/eaapi/decoder.py index 6ce0a52..18120e2 100644 --- a/eaapi/decoder.py +++ b/eaapi/decoder.py @@ -14,7 +14,7 @@ except ModuleNotFoundError: from .packer import Packer from .const import ( NAME_MAX_COMPRESSED, NAME_MAX_DECOMPRESSED, ATTR, PACK_ALPHABET, END_NODE, END_DOC, ARRAY_BIT, - ENCODING, CONTENT, CONTENT_COMP, CONTENT_FULL, XML_ENCODING, Type + ENCODING, CONTENT, CONTENT_COMP, CONTENT_FULL, XML_ENCODING, DEFAULT_ENCODING, Type ) from .misc import unpack, py_encoding, assert_true from .node import XMLNode @@ -45,7 +45,7 @@ class Decoder: if self.packer: self.packer.notify_skipped(length) raw = self.stream.read(length) - return raw.decode(py_encoding(self.encoding)).rstrip("\0") + return raw.decode(py_encoding(self.encoding or DEFAULT_ENCODING)).rstrip("\0") length = struct.calcsize("=" + s_format) if self.packer and align: @@ -55,22 +55,27 @@ class Decoder: value = struct.unpack(">" + s_format, data) return value[0] if single else value - def _read_node_value(self, node): + def _read_node_value(self, node: XMLNode) -> None: fmt = node.type.value.fmt count = 1 if node.is_array: length = struct.calcsize("=" + fmt) - count = self.read("I") // length + nbytes = self.read("I") + assert isinstance(nbytes, int) + count = nbytes // length values = [] for _ in range(count): values.append(self.read(fmt, single=len(fmt) == 1, align=False)) + + assert self.packer is not None self.packer.notify_skipped(count * length) - return values + node.value = values + else: + node.value = self.read(fmt, single=len(fmt) == 1) - node.value = self.read(fmt, single=len(fmt) == 1) - - def _read_metadata_name(self): + def _read_metadata_name(self) -> str: length = self.read("B") + assert isinstance(length, int) if not self.compressed: if length < 0x80: @@ -78,14 +83,16 @@ class Decoder: # i.e. length = (length & ~0x40) + 1 length -= 0x3f else: - length = (length << 8) | self.read("B") + extra = self.read("B") + assert isinstance(extra, int) + length = (length << 8) | extra # i.e. length = (length & ~0x8000) + 0x41 length -= 0x7fbf assert_true(length <= NAME_MAX_DECOMPRESSED, "Name length too long", DecodeError) name = self.stream.read(length) assert_true(len(name) == length, "Not enough bytes to read name", DecodeError) - return name.decode(self.encoding) + return name.decode(self.encoding or "") out = "" if length == 0: @@ -99,7 +106,7 @@ class Decoder: def _read_metadata(self, type_): name = self._read_metadata_name() - node = XMLNode(name, type_, None, encoding=self.encoding) + node = XMLNode(name, type_, None, encoding=self.encoding or DEFAULT_ENCODING) while (child := self.read("B")) != END_NODE: if child == ATTR: @@ -109,8 +116,8 @@ class Decoder: node.children.append(attr) else: node.children.append(self._read_metadata(child)) - is_array = not not (type_ & ARRAY_BIT) - if is_array: + + if type_ & ARRAY_BIT: node.value = [] return node @@ -140,6 +147,8 @@ class Decoder: def _read_xml_string(self): assert_true(etree is not None, "lxml missing", DecodeError) + assert etree is not None + parser = etree.XMLParser(remove_comments=True) tree = etree.XML(self.stream.read(), parser) self.encoding = XML_ENCODING[tree.getroottree().docinfo.encoding.upper()] @@ -164,7 +173,11 @@ class Decoder: d_type = type_.value if d_type.size == 1 and not is_array: - value = d_type._parse(node.text or "") + try: + value = d_type._parse(node.text or "") + except ValueError: + print(f"Failed to parse {node.tag} ({d_type.names[0]}): {repr(node.text)}") + raise else: data = node.text.split(" ") @@ -174,7 +187,7 @@ class Decoder: if not is_array: value = value[0] - xml_node = XMLNode(node.tag, type_, value, encoding=self.encoding) + xml_node = XMLNode(node.tag, type_, value, encoding=self.encoding or DEFAULT_ENCODING) for i in node.getchildren(): xml_node.children.append(walk(i)) @@ -198,21 +211,27 @@ class Decoder: self._read_magic() header_len = self.read("I") + assert isinstance(header_len, int) start = self.stream.tell() schema = self._read_metadata(self.read("B")) assert_true(self.read("B") == END_DOC, "Unterminated schema", DecodeError) padding = header_len - (self.stream.tell() - start) assert_true(padding >= 0, "Invalid schema definition", DecodeError) - assert_true(all(i == 0 for i in self.stream.read(padding)), "Invalid schema padding", DecodeError) + assert_true( + all(i == 0 for i in self.stream.read(padding)), "Invalid schema padding", DecodeError + ) body_len = self.read("I") + assert isinstance(body_len, int) start = self.stream.tell() self.packer = Packer(start) data = self._read_databody(schema) self.stream.seek(self.packer.request_allocation(0)) padding = body_len - (self.stream.tell() - start) assert_true(padding >= 0, "Data shape not match schema", DecodeError) - assert_true(all(i == 0 for i in self.stream.read(padding)), "Invalid data padding", DecodeError) + assert_true( + all(i == 0 for i in self.stream.read(padding)), "Invalid data padding", DecodeError + ) assert_true(self.stream.read(1) == b"", "Trailing data unconsumed", DecodeError) diff --git a/eaapi/encoder.py b/eaapi/encoder.py index 6a79af6..38ad581 100644 --- a/eaapi/encoder.py +++ b/eaapi/encoder.py @@ -9,6 +9,7 @@ from .const import ( CONTENT_ASCII_SCHEMA ) from .exception import EncodeError +from .node import XMLNode class Encoder: @@ -36,11 +37,13 @@ class Encoder: def write(self, s_format, value, single=True): if s_format == "S": + assert self.packer is not None self.write("L", len(value)) self.stream.write(value) self.packer.notify_skipped(len(value)) return if s_format == "s": + assert self.packer is not None value = value.encode(py_encoding(ENCODING[self.encoding])) + b"\0" self.write("L", len(value)) self.stream.write(value) @@ -49,24 +52,30 @@ class Encoder: length = struct.calcsize("=" + s_format) - if not isinstance(value, list): - value = [value] - count = len(value) - if count != 1: - self.write("L", count * length) - self.packer.notify_skipped(count * length) + if isinstance(value, list): + assert self.packer is not None + self.write("L", len(value) * length) + self.packer.notify_skipped(len(value) * length) - for x in value: - if self.packer and count == 1: + for x in value: + try: + if single: + self.stream.write(struct.pack(f">{s_format}", x)) + else: + self.stream.write(struct.pack(f">{s_format}", *x)) + except struct.error: + raise ValueError(f"Failed to pack {s_format}: {repr(x)}") + else: + if self.packer: self.stream.seek(self.packer.request_allocation(length)) try: if single: - self.stream.write(struct.pack(f">{s_format}", x)) + self.stream.write(struct.pack(f">{s_format}", value)) else: - self.stream.write(struct.pack(f">{s_format}", *x)) + self.stream.write(struct.pack(f">{s_format}", *value)) except struct.error: - raise ValueError(f"Failed to pack {s_format}: {repr(x)}") + raise ValueError(f"Failed to pack {s_format}: {repr(value)}") def _write_node_value(self, type_, value): fmt = type_.value.fmt @@ -105,7 +114,7 @@ class Encoder: self._write_metadata(child) self.write("B", END_NODE) - def _write_databody(self, data): + def _write_databody(self, data: XMLNode): self._write_node_value(data.type, data.value) for attr in data.attributes: diff --git a/eaapi/exception.py b/eaapi/exception.py index d544a17..3089704 100644 --- a/eaapi/exception.py +++ b/eaapi/exception.py @@ -20,3 +20,15 @@ class EncodeError(CheckFailed): class InvalidModel(EAAPIException): pass + + +class XMLStrutureError(EAAPIException): + pass + + +class NodeNotFound(XMLStrutureError, IndexError): + pass + + +class AttributeNotFound(XMLStrutureError, KeyError): + pass diff --git a/eaapi/keys.py b/eaapi/keys.py new file mode 100644 index 0000000..9325df2 --- /dev/null +++ b/eaapi/keys.py @@ -0,0 +1,4 @@ +CARDCONV_KEY = b"?I'llB2c.YouXXXeMeHaYpy!" +CARDCONV_KEY = bytes(i * 2 for i in CARDCONV_KEY) # Preprocess the key + +EA_KEY = b"\x69\xD7\x46\x27\xD9\x85\xEE\x21\x87\x16\x15\x70\xD0\x8D\x93\xB1\x24\x55\x03\x5B\x6D\xF0\xD8\x20\x5D\xF5" diff --git a/eaapi/keys.template.py b/eaapi/keys.template.py deleted file mode 100644 index 3932d9b..0000000 --- a/eaapi/keys.template.py +++ /dev/null @@ -1,5 +0,0 @@ -CARDCONV_KEY = b"" -EA_KEY = b"" - -# Perhaps my [flag collection](https://bsnk.me/eamuse/flags.html) could be of interest -raise NotImplementedError diff --git a/eaapi/lz77.py b/eaapi/lz77.py index d6a3561..c5d4353 100644 --- a/eaapi/lz77.py +++ b/eaapi/lz77.py @@ -10,7 +10,7 @@ MAX_LEN = 0xF + THRESHOLD MAX_BUFFER = 0x10 + 1 -def match_current(window, pos, max_len, data, dpos): +def match_current(window: bytes, pos: int, max_len: int, data: bytes, dpos: int) -> int: length = 0 while ( dpos + length < len(data) @@ -22,7 +22,7 @@ def match_current(window, pos, max_len, data, dpos): return length -def match_window(window, pos, data, d_pos): +def match_window(window: bytes, pos: int, data: bytes, d_pos: int) -> None | tuple[int, int]: max_pos = 0 max_len = 0 for i in range(THRESHOLD, LOOK_RANGE): @@ -37,9 +37,9 @@ def match_window(window, pos, data, d_pos): return None -def lz77_compress(data): +def lz77_compress(data: bytes) -> bytes: output = bytearray() - window = [0] * WINDOW_SIZE + window = bytearray(WINDOW_SIZE) current_pos = 0 current_window = 0 current_buffer = 0 @@ -95,10 +95,10 @@ def lz77_compress(data): return bytes(output) -def lz77_decompress(data): +def lz77_decompress(data: bytes) -> bytes: output = bytearray() cur_byte = 0 - window = [0] * WINDOW_SIZE + window = bytearray(WINDOW_SIZE) window_cursor = 0 while cur_byte < len(data): diff --git a/eaapi/misc.py b/eaapi/misc.py index 66382b0..4d8331d 100644 --- a/eaapi/misc.py +++ b/eaapi/misc.py @@ -1,24 +1,27 @@ import inspect import re +from typing import Type + from .exception import CheckFailed, InvalidModel -def assert_true(check, reason, exc=CheckFailed): +def assert_true(check: bool, reason: str, exc: Type[Exception] = CheckFailed): if not check: line = inspect.stack()[1].code_context - print() - print("\n".join(line)) + if line: + print() + print("\n".join(line)) raise exc(reason) -def py_encoding(name): +def py_encoding(name: str) -> str: if name.startswith("shift-jis"): return "shift-jis" return name -def parse_model(model): +def parse_model(model: str) -> tuple[str, str, str, str, str]: # e.g. KFC:J:A:A:2019020600 match = re.match(r"^([A-Z0-9]{3}):([A-Z]):([A-Z]):([A-Z])(?::(\d{10}))?$", model) if match is None: @@ -27,7 +30,7 @@ def parse_model(model): return gamecode, dest, spec, rev, datecode -def pack(data, width): +def pack(data, width: int) -> bytes: assert_true(1 <= width <= 8, "Invalid pack size") assert_true(all(i < (1 << width) for i in data), "Data too large for packing") bit_buf = in_buf = 0 @@ -48,7 +51,7 @@ def pack(data, width): return bytes(output) -def unpack(data, width): +def unpack(data, width: int) -> bytes: assert_true(1 <= width <= 8, "Invalid pack size") bit_buf = in_buf = 0 output = bytearray() diff --git a/eaapi/node.py b/eaapi/node.py index 7c7d9e3..9983b85 100644 --- a/eaapi/node.py +++ b/eaapi/node.py @@ -1,17 +1,20 @@ import binascii import re +from typing import Generator, Any + from html import escape from .misc import assert_true from .const import DEFAULT_ENCODING, NAME_MAX_COMPRESSED, XML_ENCODING_BACK, Type +from .exception import XMLStrutureError, NodeNotFound, AttributeNotFound class XMLNode: def __init__(self, name, type_, value, attributes=None, encoding=DEFAULT_ENCODING): self.name = name self.type = type_ if isinstance(type_, Type) else Type.from_val(type_) - self.value = value + self.value: Any = value # TODO: A stricter way to do this. Subclassing? self.children = [] self.attributes = {} if attributes is not None: @@ -41,12 +44,12 @@ class XMLNode: for i in self.children: if i.name == child: return i._xpath(attr, path) - raise IndexError + raise NodeNotFound if not attr: return self if attr in self.attributes: return self.attributes[attr] - raise IndexError + raise AttributeNotFound def xpath(self, path): match = re.match(r"^(?:@([\w:]+)/)?((?:[\w:]+(?:/|$))+)", path) @@ -64,22 +67,26 @@ class XMLNode: def __len__(self): return len(self.children) - def __iter__(self): + def __iter__(self) -> Generator["XMLNode", None, None]: for i in self.children: yield i def get(self, name, default=None): try: return self[name] - except IndexError: - return default - except KeyError: + except XMLStrutureError: return default def __getitem__(self, name): if isinstance(name, int): - return self.children[name] - return self.attributes[name] + try: + return self.children[name] + except IndexError: + raise NodeNotFound + try: + return self.attributes[name] + except KeyError: + raise AttributeNotFound def __setitem__(self, name, value): self.attributes[name] = value @@ -94,12 +101,19 @@ class XMLNode: def __str__(self): return self.to_str(pretty=True) - def _value_str(self, value): + def _value_str(self, value: Any) -> str: if isinstance(value, list): return " ".join(map(self._value_str, value)) if self.type == Type.Blob: return binascii.hexlify(value).decode() - if self.type == Type.IPv4: + if self.type == Type.IPv4 or self.type == Type.IPv4_Int: + if isinstance(value, int): + value = ( + (value >> 24) & 0xff, + (value >> 16) & 0xff, + (value >> 8) & 0xff, + (value >> 0) & 0xff, + ) return f"{value[0]}.{value[1]}.{value[2]}.{value[3]}" if self.type in (Type.Float, Type.TwoFloat, Type.ThreeFloat): return f"{value:.6f}" diff --git a/eaapi/packer.py b/eaapi/packer.py index a8b3733..9bda77c 100644 --- a/eaapi/packer.py +++ b/eaapi/packer.py @@ -2,17 +2,17 @@ import math class Packer: - def __init__(self, offset=0): + def __init__(self, offset: int = 0): self._word_cursor = offset self._short_cursor = offset self._byte_cursor = offset self._boundary = offset % 4 - def _next_block(self): + def _next_block(self) -> int: self._word_cursor += 4 return self._word_cursor - 4 - def request_allocation(self, size): + def request_allocation(self, size: int) -> int: if size == 0: return self._word_cursor elif size == 1: @@ -33,7 +33,7 @@ class Packer: self._word_cursor += 4 return old_cursor - def notify_skipped(self, no_bytes): + def notify_skipped(self, no_bytes: int) -> None: for _ in range(math.ceil(no_bytes / 4)): self.request_allocation(4) diff --git a/eaapi/wrapper.py b/eaapi/wrapper.py index ac920df..0f88bb3 100644 --- a/eaapi/wrapper.py +++ b/eaapi/wrapper.py @@ -2,7 +2,7 @@ from .crypt import ea_symmetric_crypt from .lz77 import lz77_compress, lz77_decompress -def wrap(packet, info=None, compressed=True): +def wrap(packet: bytes, info: str | None = None, compressed: bool = True) -> bytes: if compressed: packet = lz77_compress(packet) if info is None: @@ -10,7 +10,7 @@ def wrap(packet, info=None, compressed=True): return ea_symmetric_crypt(packet, info) -def unwrap(packet, info=None, compressed=True): +def unwrap(packet: bytes, info: str | None = None, compressed: bool = True) -> bytes: if info is None: decrypted = packet else: