From e8ccee4a104962d2b8294ffb322b0ae33c6691a9 Mon Sep 17 00:00:00 2001 From: Bottersnike Date: Thu, 3 Feb 2022 20:55:07 +0000 Subject: [PATCH] Initial commit --- .gitignore | 9 ++ eaapi/__init__.py | 12 +++ eaapi/cardconv.py | 85 ++++++++++++++++ eaapi/const.py | 151 ++++++++++++++++++++++++++++ eaapi/crypt.py | 49 +++++++++ eaapi/decoder.py | 216 ++++++++++++++++++++++++++++++++++++++++ eaapi/encoder.py | 151 ++++++++++++++++++++++++++++ eaapi/exception.py | 18 ++++ eaapi/keys.template.py | 5 + eaapi/lz77.py | 135 +++++++++++++++++++++++++ eaapi/misc.py | 67 +++++++++++++ eaapi/node.py | 147 +++++++++++++++++++++++++++ eaapi/packer.py | 41 ++++++++ eaapi/wrapper.py | 31 ++++++ requirements.txt | 2 + setup.py | 6 ++ tests/__init__.py | 0 tests/test_cardconv.py | 15 +++ tests/test_decoder.py | 23 +++++ tests/test_encoder.py | 35 +++++++ tests/test_misc.py | 41 ++++++++ tests/test_roundtrip.py | 137 +++++++++++++++++++++++++ 22 files changed, 1376 insertions(+) create mode 100644 .gitignore create mode 100644 eaapi/__init__.py create mode 100644 eaapi/cardconv.py create mode 100644 eaapi/const.py create mode 100644 eaapi/crypt.py create mode 100644 eaapi/decoder.py create mode 100644 eaapi/encoder.py create mode 100644 eaapi/exception.py create mode 100644 eaapi/keys.template.py create mode 100644 eaapi/lz77.py create mode 100644 eaapi/misc.py create mode 100644 eaapi/node.py create mode 100644 eaapi/packer.py create mode 100644 eaapi/wrapper.py create mode 100644 requirements.txt create mode 100644 setup.py create mode 100644 tests/__init__.py create mode 100644 tests/test_cardconv.py create mode 100644 tests/test_decoder.py create mode 100644 tests/test_encoder.py create mode 100644 tests/test_misc.py create mode 100644 tests/test_roundtrip.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dd060ac --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +*.pyc +__pycache__/ +*.log +*.db +misc/ +*.egg-info + +# Sorry +eaapi/keys.py diff --git a/eaapi/__init__.py b/eaapi/__init__.py new file mode 100644 index 0000000..9f84a7f --- /dev/null +++ b/eaapi/__init__.py @@ -0,0 +1,12 @@ +from .const import Type +from .node import XMLNode +from .encoder import Encoder +from .decoder import Decoder +from .wrapper import wrap, unwrap +from .misc import parse_model + +__all__ = ( + "Type", "XMLNode", "Encoder", "Decoder", + "wrap", "unwrap", + "parse_model", +) diff --git a/eaapi/cardconv.py b/eaapi/cardconv.py new file mode 100644 index 0000000..f0031aa --- /dev/null +++ b/eaapi/cardconv.py @@ -0,0 +1,85 @@ +import binascii + +from Crypto.Cipher import DES3 + +from .misc import assert_true, pack, unpack +from .exception import InvalidCard +from .keys import CARDCONV_KEY +from .const import CARD_ALPHABET + + +def enc_des(uid): + cipher = DES3.new(CARDCONV_KEY, DES3.MODE_CBC, iv=b'\0' * 8) + return cipher.encrypt(uid) + + +def dec_des(uid): + cipher = DES3.new(CARDCONV_KEY, DES3.MODE_CBC, iv=b'\0' * 8) + return cipher.decrypt(uid) + + +def checksum(data): + chk = sum(data[i] * (i % 3 + 1) for i in range(15)) + + while chk > 31: + chk = (chk >> 5) + (chk & 31) + + return chk + + +def uid_to_konami(uid): + assert_true(len(uid) == 16, "UID must be 16 bytes", InvalidCard) + + if uid.upper().startswith("E004"): + card_type = 1 + elif uid.upper().startswith("0"): + card_type = 2 + else: + raise InvalidCard("Invalid UID prefix") + + kid = binascii.unhexlify(uid) + assert_true(len(kid) == 8, "ID must be 8 bytes", InvalidCard) + + out = bytearray(unpack(enc_des(kid[::-1]), 5)[:13]) + b'\0\0\0' + + out[0] ^= card_type + out[13] = 1 + for i in range(1, 14): + out[i] ^= out[i - 1] + out[14] = card_type + out[15] = checksum(out) + + return "".join(CARD_ALPHABET[i] for i in out) + + +def konami_to_uid(konami_id): + if konami_id[14] == "1": + card_type = 1 + elif konami_id[14] == "2": + card_type = 2 + else: + raise InvalidCard("Invalid ID") + + assert_true(len(konami_id) == 16, "ID must be 16 characters", InvalidCard) + assert_true(all(i in CARD_ALPHABET for i in konami_id), "ID contains invalid characters", InvalidCard) + card = [CARD_ALPHABET.index(i) for i in konami_id] + assert_true(card[11] % 2 == card[12] % 2, "Parity check failed", InvalidCard) + assert_true(card[13] == card[12] ^ 1, "Card invalid", InvalidCard) + assert_true(card[15] == checksum(card), "Checksum failed", InvalidCard) + + for i in range(13, 0, -1): + card[i] ^= card[i - 1] + + card[0] ^= card_type + + card_id = dec_des(pack(card[:13], 5)[:8])[::-1] + card_id = binascii.hexlify(card_id).decode().upper() + + if card_type == 1: + assert_true(card_id[:4] == "E004", "Invalid card type", InvalidCard) + elif card_type == 2: + assert_true(card_id[0] == "0", "Invalid card type", InvalidCard) + return card_id + + +__all__ = ("konami_to_uid", "uid_to_konami") diff --git a/eaapi/const.py b/eaapi/const.py new file mode 100644 index 0000000..7ad2f0a --- /dev/null +++ b/eaapi/const.py @@ -0,0 +1,151 @@ +import enum + +from dataclasses import dataclass +from html import unescape +from typing import List, Callable + +from .misc import assert_true + + +CARD_ALPHABET = "0123456789ABCDEFGHJKLMNPRSTUWXYZ" + +NAME_MAX_COMPRESSED = 0x24 +NAME_MAX_DECOMPRESSED = 0x1000 + +ENCODING = { + 0x20: "ascii", + 0x40: "iso-8859-1", + 0x60: "euc-jp", + 0x80: "shift-jis", + 0xA0: "utf-8", +} +DEFAULT_ENCODING = ENCODING[0x80] # Shift-JIS +ENCODING[0x00] = DEFAULT_ENCODING +XML_ENCODING = { + "ASCII": "ascii", + "ISO-8859-1": "iso-8859-1", + "EUC-JP": "euc-jp", + "SHIFT_JIS": "shift-jis", + "SHIFT-JIS": "shift-jis", + "UTF-8": "utf-8", +} +ENCODING_BACK = {v: k for k, v in ENCODING.items()} +XML_ENCODING_BACK = {v: k for k, v in XML_ENCODING.items()} +ATTR = 0x2E +END_NODE = 0xFE +END_DOC = 0xFF +PACK_ALPHABET = "0123456789:ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz" + +CONTENT_COMP_FULL = 0x42 +CONTENT_COMP_SCHEMA = 0x43 +CONTENT_FINGERPRINT = 0x44 # TODO: Identify how exactly this differs from the others +CONTENT_ASCII_FULL = 0x45 +CONTENT_ASCII_SCHEMA = 0x46 + +CONTENT_COMP = (CONTENT_COMP_FULL, CONTENT_COMP_SCHEMA) +CONTENT_FULL = (CONTENT_COMP_FULL, CONTENT_ASCII_FULL) +CONTENT = ( + CONTENT_COMP_FULL, CONTENT_COMP_SCHEMA, CONTENT_FINGERPRINT, + CONTENT_ASCII_FULL, CONTENT_ASCII_SCHEMA +) + +ARRAY_BIT = 0x40 +ARRAY_MASK = ARRAY_BIT - 1 + + +@dataclass +class _Type: + id: int + fmt: str + names: List[str] + c_name: str + convert: Callable + size: int = 1 + no_check: bool = False + + def _parse(self, value): + if self.convert is None: + return () + if self.size == 1: + if isinstance(value, (list, tuple)) and len(value) == 1: + value = value[0] + return self.convert(value) + if not self.no_check: + assert_true(len(value) == self.size, "Invalid node data") + return (*map(self.convert, value),) + + +def parse_ip(ip): + return (*map(int, ip.split(".")),) + + +class Type(enum.Enum): + Void = _Type(0x01, "", ["void"], "void", None) + S8 = _Type(0x02, "b", ["s8"], "int8", int) + U8 = _Type(0x03, "B", ["u8"], "uint8", int) + S16 = _Type(0x04, "h", ["s16"], "int16", int) + U16 = _Type(0x05, "H", ["u16"], "uint16", int) + S32 = _Type(0x06, "i", ["s32"], "int32", int) + U32 = _Type(0x07, "I", ["u32"], "uint32", int) + S64 = _Type(0x08, "q", ["s64"], "int64", int) + U64 = _Type(0x09, "Q", ["u64"], "uint64", int) + Blob = _Type(0x0a, "S", ["bin", "binary"], "char[]", bytes) + Str = _Type(0x0b, "s", ["str", "string"], "char[]", unescape) + IPv4 = _Type(0x0c, "4B", ["ip4"], "uint8[4]", parse_ip, 1, True) + Time = _Type(0x0d, "I", ["time"], "uint32", int) + Float = _Type(0x0e, "f", ["float", "f"], "float", float) + Double = _Type(0x0f, "d", ["double", "d"], "double", float) + + TwoS8 = _Type(0x10, "2b", ["2s8"], "int8[2]", int, 2) + TwoU8 = _Type(0x11, "2B", ["2u8"], "uint8[2]", int, 2) + TwoS16 = _Type(0x12, "2h", ["2s16"], "int16[2]", int, 2) + TwoU16 = _Type(0x13, "2H", ["2u16"], "uint16[2]", int, 2) + TwoS32 = _Type(0x14, "2i", ["2s32"], "int32[2]", int, 2) + TwoU32 = _Type(0x15, "2I", ["2u32"], "uint32[2]", int, 2) + TwoS64 = _Type(0x16, "2q", ["2s64", "vs64"], "int16[2]", int, 2) + TwoU64 = _Type(0x17, "2Q", ["2u64", "vu64"], "uint16[2]", int, 2) + TwoFloat = _Type(0x18, "2f", ["2f"], "float[2]", float, 2) + TwoDouble = _Type(0x19, "2d", ["2d", "vd"], "double[2]", float, 2) + + ThreeS8 = _Type(0x1a, "3b", ["3s8"], "int8[3]", int, 3) + ThreeU8 = _Type(0x1b, "3B", ["3u8"], "uint8[3]", int, 3) + ThreeS16 = _Type(0x1c, "3h", ["3s16"], "int16[3]", int, 3) + ThreeU16 = _Type(0x1d, "3H", ["3u16"], "uint16[3]", int, 3) + ThreeS32 = _Type(0x1e, "3i", ["3s32"], "int32[3]", int, 3) + ThreeU32 = _Type(0x1f, "3I", ["3u32"], "uint32[3]", int, 3) + ThreeS64 = _Type(0x20, "3q", ["3s64"], "int64[3]", int, 3) + ThreeU64 = _Type(0x21, "3Q", ["3u64"], "uint64[3]", int, 3) + ThreeFloat = _Type(0x22, "3f", ["3f"], "float[3]", float, 3) + ThreeDouble = _Type(0x23, "3d", ["3d"], "double[3]", float, 3) + + FourS8 = _Type(0x24, "4b", ["4s8"], "int8[4]", int, 4) + FourU8 = _Type(0x25, "4B", ["4u8"], "uint8[4]", int, 4) + FourS16 = _Type(0x26, "4h", ["4s16"], "int16[4]", int, 4) + FourU16 = _Type(0x27, "4H", ["4u16"], "uint8[4]", int, 4) + FourS32 = _Type(0x28, "4i", ["4s32", "vs32"], "int32[4]", int, 4) + FourU32 = _Type(0x29, "4I", ["4u32", "vs32"], "uint32[4]", int, 4) + FourS64 = _Type(0x2a, "4q", ["4s64"], "int64[4]", int, 4) + FourU64 = _Type(0x2b, "4Q", ["4u64"], "uint64[4]", int, 4) + FourFloat = _Type(0x2c, "4f", ["4f", "vf"], "float[4]", float, 4) + FourDouble = _Type(0x2d, "4d", ["4d"], "double[4]", float, 4) + + Attr = _Type(0x2e, "s", ["attr"], "char[]", None) + Array = _Type(0x2f, "", ["array"], "", None) + + VecS8 = _Type(0x30, "16b", ["vs8"], "int8[16]", int, 16) + VecU8 = _Type(0x31, "16B", ["vu8"], "uint8[16]", int, 16) + VecS16 = _Type(0x32, "8h", ["vs16"], "int8[8]", int, 8) + VecU16 = _Type(0x33, "8H", ["vu16"], "uint8[8]", int, 8) + + Bool = _Type(0x34, "b", ["bool", "b"], "bool", int) + TwoBool = _Type(0x35, "2b", ["2b"], "bool[2]", int, 2) + ThreeBool = _Type(0x36, "3b", ["3b"], "bool[3]", int, 3) + FourBool = _Type(0x37, "4b", ["4b"], "bool[4]", int, 4) + VecBool = _Type(0x38, "16b", ["vb"], "bool[16]", int, 16) + + @classmethod + def from_val(cls, value): + for i in cls: + if i.value.id == value & ARRAY_MASK: + return i + raise ValueError(f"Unknown node type {value}") diff --git a/eaapi/crypt.py b/eaapi/crypt.py new file mode 100644 index 0000000..92771b8 --- /dev/null +++ b/eaapi/crypt.py @@ -0,0 +1,49 @@ +import binascii +import hashlib +import time +import re + +from Crypto.Cipher import ARC4 + +from .misc import assert_true +from .keys import EA_KEY + + +def new_prng(): + state = 0x41c64e6d + + while True: + x = (state * 0x838c9cda) + 0x6072 + # state = (state * 0x41c64e6d + 0x3039) + # state = (state * 0x41c64e6d + 0x3039) + state = (state * 0xc2a29a69 + 0xd3dc167e) & 0xffffffff + yield (x & 0x7fff0000) | state >> 15 & 0xffff + + +prng = new_prng() + + +def validate_key(info): + match = re.match(r"^(\d)-([0-9a-f]{8})-([0-9a-f]{4})$", info) + assert_true(match, "Invalid eamuse info key") + version = match.group(1) + assert_true(version == "1", f"Unsupported encryption version ({version})") + + seconds = binascii.unhexlify(match.group(2)) # 4 bytes + rng = binascii.unhexlify(match.group(3)) # 2 bytes + return seconds, rng + + +def get_key(): + return f"1-{int(time.time()):08x}-{(next(prng) & 0xffff):04x}" + + +def ea_symmetric_crypt(data, info): + seconds, rng = validate_key(info) + + key = hashlib.md5(seconds + rng + EA_KEY).digest() + + return ARC4.new(key).encrypt(data) + + +__all__ = ("new_prng", "prng", "validate_key", "get_key", "ea_symmetric_crypt") diff --git a/eaapi/decoder.py b/eaapi/decoder.py new file mode 100644 index 0000000..6b0a88e --- /dev/null +++ b/eaapi/decoder.py @@ -0,0 +1,216 @@ +import math +import struct +import io + +from html import unescape + +try: + from lxml import etree +except ModuleNotFoundError: + print("W", "lxml not found, XML strings will not be supported") + etree = None + + +from .packer import Packer +from .const import ( + NAME_MAX_COMPRESSED, NAME_MAX_DECOMPRESSED, ATTR, PACK_ALPHABET, END_NODE, END_DOC, ARRAY_BIT, + ENCODING, CONTENT, CONTENT_COMP, CONTENT_FULL, XML_ENCODING, Type +) +from .misc import unpack, py_encoding, assert_true +from .node import XMLNode +from .exception import DecodeError + + +class Decoder: + def __init__(self, packet): + self.stream = io.BytesIO(packet) + self.is_xml_string = packet.startswith(b"<") + self.encoding = None + self.compressed = False + self.has_data = False + self.packer = None + + @classmethod + def decode(cls, packet): + return cls(packet).unpack() + + def read(self, s_format, single=True, align=True): + if s_format == "S": + length = self.read("L") + if self.packer: + self.packer.notify_skipped(length) + return self.stream.read(length) + if s_format == "s": + length = self.read("L") + if self.packer: + self.packer.notify_skipped(length) + raw = self.stream.read(length) + return raw.decode(py_encoding(self.encoding)).rstrip("\0") + + length = struct.calcsize("=" + s_format) + if self.packer and align: + self.stream.seek(self.packer.request_allocation(length)) + data = self.stream.read(length) + assert_true(len(data) == length, "EOF reached", DecodeError) + value = struct.unpack(">" + s_format, data) + return value[0] if single else value + + def _read_node_value(self, node): + fmt = node.type.value.fmt + count = 1 + if node.is_array: + length = struct.calcsize("=" + fmt) + count = self.read("I") // length + values = [] + for _ in range(count): + values.append(self.read(fmt, single=len(fmt) == 1, align=False)) + self.packer.notify_skipped(count * length) + return values + + node.value = self.read(fmt, single=len(fmt) == 1) + + def _read_metadata_name(self): + length = self.read("B") + + if not self.compressed: + if length < 0x80: + assert_true(length >= 0x40, "Invalid name length", DecodeError) + # i.e. length = (length & ~0x40) + 1 + length -= 0x3f + else: + length = (length << 8) | self.read("B") + # i.e. length = (length & ~0x8000) + 0x41 + length -= 0x7fbf + assert_true(length <= NAME_MAX_DECOMPRESSED, "Name length too long", DecodeError) + + name = self.stream.read(length) + assert_true(len(name) == length, "Not enough bytes to read name", DecodeError) + return name.decode(self.encoding) + + out = "" + if length == 0: + return out + + assert_true(length <= NAME_MAX_COMPRESSED, "Name length too long", DecodeError) + + no_bytes = math.ceil((length * 6) / 8) + unpacked = unpack(self.stream.read(no_bytes), 6)[:length] + return "".join(PACK_ALPHABET[i] for i in unpacked) + + def _read_metadata(self, type_): + name = self._read_metadata_name() + node = XMLNode(name, type_, None, encoding=self.encoding) + + while (child := self.read("B")) != END_NODE: + if child == ATTR: + attr = self._read_metadata_name() + assert_true(not attr.startswith("__"), "Invalid binary node name", DecodeError) + # Abuse the array here to maintain order + node.children.append(attr) + else: + node.children.append(self._read_metadata(child)) + is_array = not not (type_ & ARRAY_BIT) + if is_array: + node.value = [] + return node + + def _read_databody(self, node: XMLNode): + self._read_node_value(node) + + children = list(node.children) + node.children = [] + for i in children: + if isinstance(i, XMLNode): + node.children.append(self._read_databody(i)) + else: + node[i] = self.read("s") + + return node + + def _read_magic(self): + magic, contents, enc, enc_comp = struct.unpack(">BBBB", self.stream.read(4)) + + assert_true(magic == 0xA0, "Not a packet", DecodeError) + assert_true(~enc & 0xFF == enc_comp, "Malformed packet header", DecodeError) + assert_true(enc in ENCODING, "Unknown packet encoding", DecodeError) + assert_true(contents in CONTENT, "Invalid packet contents", DecodeError) + self.compressed = contents in CONTENT_COMP + self.has_data = contents in CONTENT_FULL or contents == 0x44 + self.encoding = ENCODING[enc] + + def _read_xml_string(self): + assert_true(etree is not None, "lxml missing", DecodeError) + parser = etree.XMLParser(remove_comments=True) + tree = etree.XML(self.stream.read(), parser) + self.encoding = XML_ENCODING[tree.getroottree().docinfo.encoding.upper()] + self.compressed = False + self.has_data = True + + def walk(node): + attrib = {**node.attrib} + type_str = attrib.pop("__type", "void") + for i in Type: + if type_str in i.value.names: + type_ = i + break + else: + raise ValueError("Invalid node type") + attrib.pop("__size", None) + count = attrib.pop("__count", None) + + is_array = count is not None + count = 1 if count is None else int(count) + + d_type = type_.value + + if d_type.size == 1 and not is_array: + value = d_type._parse(node.text or "") + else: + data = node.text.split(" ") + + value = [] + for i in range(0, len(data), d_type.size): + value.append(d_type._parse(data[i:i+d_type.size])) + if not is_array: + value = value[0] + + xml_node = XMLNode(node.tag, type_, value, encoding=self.encoding) + for i in node.getchildren(): + xml_node.children.append(walk(i)) + + for i in attrib: + xml_node[i] = unescape(attrib[i]) + + return xml_node + + return walk(tree) + + def unpack(self): + if self.is_xml_string: + return self._read_xml_string() + + self._read_magic() + + header_len = self.read("I") + start = self.stream.tell() + schema = self._read_metadata(self.read("B")) + assert_true(self.read("B") == END_DOC, "Unterminated schema", DecodeError) + padding = header_len - (self.stream.tell() - start) + assert_true(padding >= 0, "Invalid schema definition", DecodeError) + assert_true(all(i == 0 for i in self.stream.read(padding)), "Invalid schema padding", DecodeError) + + body_len = self.read("I") + start = self.stream.tell() + self.packer = Packer(start) + data = self._read_databody(schema) + self.stream.seek(self.packer.request_allocation(0)) + padding = body_len - (self.stream.tell() - start) + assert_true(padding >= 0, "Data shape not match schema", DecodeError) + assert_true(all(i == 0 for i in self.stream.read(padding)), "Invalid data padding", DecodeError) + + assert_true(self.stream.read(1) == b"", "Trailing data unconsumed", DecodeError) + + return data + + +__all__ = ("Decoder", ) diff --git a/eaapi/encoder.py b/eaapi/encoder.py new file mode 100644 index 0000000..4a930aa --- /dev/null +++ b/eaapi/encoder.py @@ -0,0 +1,151 @@ +import struct +import io + +from .packer import Packer +from .misc import pack, py_encoding, assert_true +from .const import ( + PACK_ALPHABET, DEFAULT_ENCODING, ENCODING, ENCODING_BACK, NAME_MAX_DECOMPRESSED, ARRAY_BIT, + ATTR, END_NODE, END_DOC, CONTENT_COMP_FULL, CONTENT_COMP_SCHEMA, CONTENT_ASCII_FULL, + CONTENT_ASCII_SCHEMA +) +from .exception import EncodeError + + +class Encoder: + def __init__(self, encoding=DEFAULT_ENCODING): + self.stream = io.BytesIO() + assert_true(encoding in ENCODING_BACK, f"Unknown encoding {encoding}", EncodeError) + self.encoding = ENCODING_BACK[encoding] + self.packer = None + self._compressed = False + + @classmethod + def encode(cls, tree, xml_string=False): + if xml_string: + return tree.to_str(pretty=False).encode(tree.encoding) + encoder = cls(tree.encoding) + encoder.pack(tree) + return encoder.stream.getvalue() + + def align(self, to=4, pad_char=b"\0"): + if to < 2: + return + if (dist := self.stream.tell() % to) == 0: + return + self.stream.write(pad_char * (to - dist)) + + def write(self, s_format, value, single=True): + if s_format == "S": + self.write("L", len(value)) + self.stream.write(value) + self.packer.notify_skipped(len(value)) + return + if s_format == "s": + value = value.encode(py_encoding(ENCODING[self.encoding])) + b"\0" + self.write("L", len(value)) + self.stream.write(value) + self.packer.notify_skipped(len(value)) + return + + length = struct.calcsize("=" + s_format) + + if not isinstance(value, list): + value = [value] + count = len(value) + if count != 1: + self.write("L", count * length) + self.packer.notify_skipped(count * length) + + for x in value: + if self.packer and count == 1: + self.stream.seek(self.packer.request_allocation(length)) + + try: + if single: + self.stream.write(struct.pack(f">{s_format}", x)) + else: + self.stream.write(struct.pack(f">{s_format}", *x)) + except struct.error: + raise ValueError(f"Failed to pack {s_format}: {repr(x)}") + + def _write_node_value(self, type_, value): + fmt = type_.value.fmt + if fmt == "s": + self.write("s", value) + else: + self.write(fmt, value, single=len(fmt) == 1) + + def _write_metadata_name(self, name): + if not self._compressed: + assert_true(len(name) <= NAME_MAX_DECOMPRESSED, "Name length too long", EncodeError) + if len(name) > 64: + self.write("H", len(name) + 0x7fbf) + else: + self.write("B", len(name) + 0x3f) + self.stream.write(name.encode(py_encoding(ENCODING[self.encoding]))) + return + + assert_true(all(i in PACK_ALPHABET for i in name), f"Invalid schema name {name} (invalid chars)", EncodeError) + assert_true(len(name) < 256, f"Invalid schema name {name} (too long)", EncodeError) + self.write("B", len(name)) + if len(name) == 0: + return + + name = bytearray(PACK_ALPHABET.index(i) for i in name) + self.stream.write(pack(name, 6)) + + def _write_metadata(self, node): + self.write("B", node.type.value.id | (ARRAY_BIT if node.is_array else 0x00)) + self._write_metadata_name(node.name) + + for attr in node.attributes: + self.write("B", ATTR) + self._write_metadata_name(attr) + for child in node: + self._write_metadata(child) + self.write("B", END_NODE) + + def _write_databody(self, data): + self._write_node_value(data.type, data.value) + + for attr in data.attributes: + self.align() + self.write("s", data[attr]) + for child in data: + self._write_databody(child) + + def _write_magic(self, has_data=True): + if has_data: + contents = CONTENT_COMP_FULL if self._compressed else CONTENT_ASCII_FULL + else: + contents = CONTENT_COMP_SCHEMA if self._compressed else CONTENT_ASCII_SCHEMA + + enc_comp = ~self.encoding & 0xFF + self.stream.write(struct.pack(">BBBB", 0xA0, contents, self.encoding, enc_comp)) + + def pack(self, node): + self._compressed = node.can_compress # Opportunically compress if we can + self._write_magic() + + schema_start = self.stream.tell() + self.write("I", 0) + self._write_metadata(node) + self.write("B", END_DOC) + self.align() + schema_end = self.stream.tell() + self.stream.seek(schema_start) + self.write("I", schema_end - schema_start - 4) + + self.stream.seek(schema_end) + self.write("I", 0) + self.packer = Packer(self.stream.tell()) + self._write_databody(node) + self.stream.seek(0, io.SEEK_END) + self.align() + node_end = self.stream.tell() + self.stream.seek(schema_end) + self.packer = None + self.write("I", node_end - schema_end - 4) + + +__all__ = ("Encoder", ) diff --git a/eaapi/exception.py b/eaapi/exception.py new file mode 100644 index 0000000..c8cb8c2 --- /dev/null +++ b/eaapi/exception.py @@ -0,0 +1,18 @@ +class EAAPIException(Exception): + pass + + +class CheckFailed(EAAPIException): + pass + + +class InvalidCard(CheckFailed): + pass + + +class DecodeError(CheckFailed): + pass + + +class EncodeError(CheckFailed): + pass diff --git a/eaapi/keys.template.py b/eaapi/keys.template.py new file mode 100644 index 0000000..3932d9b --- /dev/null +++ b/eaapi/keys.template.py @@ -0,0 +1,5 @@ +CARDCONV_KEY = b"" +EA_KEY = b"" + +# Perhaps my [flag collection](https://bsnk.me/eamuse/flags.html) could be of interest +raise NotImplementedError diff --git a/eaapi/lz77.py b/eaapi/lz77.py new file mode 100644 index 0000000..d6a3561 --- /dev/null +++ b/eaapi/lz77.py @@ -0,0 +1,135 @@ +from .misc import assert_true + + +WINDOW_SIZE = 0x1000 +WINDOW_MASK = WINDOW_SIZE - 1 +THRESHOLD = 3 +INPLACE_THRESHOLD = 0xA +LOOK_RANGE = 0x200 +MAX_LEN = 0xF + THRESHOLD +MAX_BUFFER = 0x10 + 1 + + +def match_current(window, pos, max_len, data, dpos): + length = 0 + while ( + dpos + length < len(data) + and length < max_len + and window[(pos + length) & WINDOW_MASK] == data[dpos + length] + and length < MAX_LEN + ): + length += 1 + return length + + +def match_window(window, pos, data, d_pos): + max_pos = 0 + max_len = 0 + for i in range(THRESHOLD, LOOK_RANGE): + length = match_current(window, (pos - i) & WINDOW_MASK, i, data, d_pos) + if length >= INPLACE_THRESHOLD: + return (i, length) + if length >= THRESHOLD: + max_pos = i + max_len = length + if max_len >= THRESHOLD: + return (max_pos, max_len) + return None + + +def lz77_compress(data): + output = bytearray() + window = [0] * WINDOW_SIZE + current_pos = 0 + current_window = 0 + current_buffer = 0 + flag_byte = 0 + bit = 0 + buffer = [0] * MAX_BUFFER + pad = 3 + while current_pos < len(data): + flag_byte = 0 + current_buffer = 0 + for bit_pos in range(8): + if current_pos >= len(data): + pad = 0 + flag_byte = flag_byte >> (8 - bit_pos) + buffer[current_buffer] = 0 + buffer[current_buffer + 1] = 0 + current_buffer += 2 + break + else: + found = match_window(window, current_window, data, current_pos) + if found is not None and found[1] >= THRESHOLD: + pos, length = found + + byte1 = pos >> 4 + byte2 = (((pos & 0x0F) << 4) | ((length - THRESHOLD) & 0x0F)) + buffer[current_buffer] = byte1 + buffer[current_buffer + 1] = byte2 + current_buffer += 2 + bit = 0 + for _ in range(length): + window[current_window & WINDOW_MASK] = data[current_pos] + current_pos += 1 + current_window += 1 + else: + buffer[current_buffer] = data[current_pos] + window[current_window] = data[current_pos] + current_pos += 1 + current_window += 1 + current_buffer += 1 + bit = 1 + + flag_byte = (flag_byte >> 1) | ((bit & 1) << 7) + current_window = current_window & WINDOW_MASK + + assert_true(current_buffer < MAX_BUFFER, f"current buffer {current_buffer} > max buffer {MAX_BUFFER}") + + output.append(flag_byte) + for i in range(current_buffer): + output.append(buffer[i]) + for _ in range(pad): + output.append(0) + + return bytes(output) + + +def lz77_decompress(data): + output = bytearray() + cur_byte = 0 + window = [0] * WINDOW_SIZE + window_cursor = 0 + + while cur_byte < len(data): + flag = data[cur_byte] + cur_byte += 1 + + for i in range(8): + if (flag >> i) & 1 == 1: + output.append(data[cur_byte]) + window[window_cursor] = data[cur_byte] + window_cursor = (window_cursor + 1) & WINDOW_MASK + cur_byte += 1 + else: + w = ((data[cur_byte]) << 8) | (data[cur_byte + 1]) + if w == 0: + return bytes(output) + + cur_byte += 2 + position = ((window_cursor - (w >> 4)) & WINDOW_MASK) + length = (w & 0x0F) + THRESHOLD + + for _ in range(length): + b = window[position & WINDOW_MASK] + output.append(b) + window[window_cursor] = b + window_cursor = (window_cursor + 1) & WINDOW_MASK + position += 1 + + return bytes(output) + + +__all__ = ( + "lz77_compress", "lz77_decompress" +) diff --git a/eaapi/misc.py b/eaapi/misc.py new file mode 100644 index 0000000..05221e8 --- /dev/null +++ b/eaapi/misc.py @@ -0,0 +1,67 @@ +import inspect +import re + +from .exception import CheckFailed + + +def assert_true(check, reason, exc=CheckFailed): + if not check: + line = inspect.stack()[1].code_context + print() + print("\n".join(line)) + raise exc(reason) + + +def py_encoding(name): + if name.startswith("shift-jis"): + return "shift-jis" + return name + + +def parse_model(model): + # e.g. KFC:J:A:A:2019020600 + gamecode, dest, spec, rev, version = re.match(r"([A-Z0-9]{3}):([A-Z]):([A-Z]):([A-Z])(?::(\d{8}))?", model).groups() + return gamecode, dest, spec, rev, version + + +def pack(data, width): + assert_true(1 <= width <= 8, "Invalid pack size") + assert_true(all(i < (1 << width) for i in data), "Data too large for packing") + bit_buf = in_buf = 0 + output = bytearray() + for i in data: + bit_buf |= i << (8 - width) + shift = min(8 - in_buf, width) + bit_buf <<= shift + in_buf += shift + if in_buf == 8: + output.append(bit_buf >> 8) + in_buf = width - shift + bit_buf = (bit_buf & 0xff) << in_buf + + if in_buf: + output.append(bit_buf >> in_buf) + + return bytes(output) + + +def unpack(data, width): + assert_true(1 <= width <= 8, "Invalid pack size") + bit_buf = in_buf = 0 + output = bytearray() + for i in data: + bit_buf |= i + bit_buf <<= width - in_buf + in_buf += 8 + while in_buf >= width: + output.append(bit_buf >> 8) + in_buf -= width + bit_buf = (bit_buf & 0xff) << min(width, in_buf) + + if in_buf: + output.append(bit_buf >> (8 + in_buf - width)) + + return bytes(output) + + +__all__ = ("assert_true", "py_encoding", "parse_model", "pack", "unpack") diff --git a/eaapi/node.py b/eaapi/node.py new file mode 100644 index 0000000..71b6713 --- /dev/null +++ b/eaapi/node.py @@ -0,0 +1,147 @@ +import binascii +import re + +from html import escape + +from .misc import assert_true +from .const import DEFAULT_ENCODING, NAME_MAX_COMPRESSED, XML_ENCODING_BACK, Type + + +class XMLNode: + def __init__(self, name, type_, value, attributes=None, encoding=DEFAULT_ENCODING): + self.name = name + self.type = type_ if isinstance(type_, Type) else Type.from_val(type_) + self.value = value + self.children = [] + self.attributes = {} + if attributes is not None: + for i in attributes: + self.attributes[i] = attributes[i] + self.encoding = encoding or DEFAULT_ENCODING + assert_true(encoding in XML_ENCODING_BACK, "Invalid encoding") + + @classmethod + def void(cls, __name, **attributes): + return cls(__name, Type.Void, (), attributes) + + @property + def is_array(self): + return isinstance(self.value, list) + + @property + def can_compress(self): + return ( + (len(self.name) <= NAME_MAX_COMPRESSED) + and all(i.can_compress for i in self.children) + ) + + def _xpath(self, attr, path): + if path: + child = path.pop(0) + for i in self.children: + if i.name == child: + return i._xpath(attr, path) + raise IndexError + if not attr: + return self + if attr in self.attributes: + return self.attributes[attr] + raise IndexError + + def xpath(self, path): + match = re.match(r"^(?:@([\w:]+)/)?((?:[\w:]+(?:/|$))+)", path) + if match is None: + raise ValueError + attr = match.group(1) + path = match.group(2).split("/") + return self._xpath(attr, path) + + def append(self, __name, __type=Type.Void, __value=(), **attributes): + child = XMLNode(__name, __type, __value, attributes) + self.children.append(child) + return child + + def __len__(self): + return len(self.children) + + def __iter__(self): + for i in self.children: + yield i + + def __getitem__(self, name): + if isinstance(name, int): + return self.children[name] + return self.attributes[name] + + def __setitem__(self, name, value): + self.attributes[name] = value + + def to_str(self, pretty=False): + return ( + f'' + + ("\n" if pretty else "") + + self._to_str(pretty) + ) + + def __str__(self): + return self.to_str(pretty=True) + + def _value_str(self, value): + if isinstance(value, list): + return " ".join(map(self._value_str, value)) + if self.type == Type.Blob: + return binascii.hexlify(value).decode() + if self.type == Type.IPv4: + return f"{value[0]}.{value[1]}.{value[2]}.{value[3]}" + if self.type in (Type.Float, Type.TwoFloat, Type.ThreeFloat): + return f"{value:.6f}" + if self.type == Type.Str: + return escape(str(value)) + + return str(value) + + def _to_str(self, pretty, indent=0): + if not pretty: + indent = 0 + nl = "\n" if pretty else "" + tag = f"{' ' * indent}<{self.name}" + + if self.type != Type.Void: + tag += f" __type=\"{self.type.value.names[0]}\"" + if self.type == Type.Blob: + tag += f" __size=\"{len(self.value)}\"" + if self.is_array: + tag += f" __count=\"{len(self.value)}\"" + + attributes = " ".join(f"{i}=\"{escape(j)}\"" for i, j in self.attributes.items()) + if attributes: + tag += " " + attributes + tag += ">" + if self.value is not None and self.type != Type.Void: + if self.is_array: + tag += " ".join(map(self._value_str, self.value)) + else: + tag += self._value_str(self.value) + elif not self.children: + return tag[:-1] + (" " if pretty else "") + "/>" + + for i in self.children: + if isinstance(i, XMLNode): + tag += nl + i._to_str(pretty, indent + 4) + if self.children: + tag += nl + " " * indent + tag += f"" + return tag + + def __eq__(self, other): + return ( + isinstance(other, XMLNode) + and self.name == other.name + and self.type == other.type + and self.value == other.value + and len(self.children) == len(other.children) + and all(i == j for i, j in zip(self.children, other.children)) + ) + + +__all__ = ("XMLNode", ) diff --git a/eaapi/packer.py b/eaapi/packer.py new file mode 100644 index 0000000..a8b3733 --- /dev/null +++ b/eaapi/packer.py @@ -0,0 +1,41 @@ +import math + + +class Packer: + def __init__(self, offset=0): + self._word_cursor = offset + self._short_cursor = offset + self._byte_cursor = offset + self._boundary = offset % 4 + + def _next_block(self): + self._word_cursor += 4 + return self._word_cursor - 4 + + def request_allocation(self, size): + if size == 0: + return self._word_cursor + elif size == 1: + if self._byte_cursor % 4 == self._boundary: + self._byte_cursor = self._next_block() + 1 + else: + self._byte_cursor += 1 + return self._byte_cursor - 1 + elif size == 2: + if self._short_cursor % 4 == self._boundary: + self._short_cursor = self._next_block() + 2 + else: + self._short_cursor += 2 + return self._short_cursor - 2 + else: + old_cursor = self._word_cursor + for _ in range(math.ceil(size / 4)): + self._word_cursor += 4 + return old_cursor + + def notify_skipped(self, no_bytes): + for _ in range(math.ceil(no_bytes / 4)): + self.request_allocation(4) + + +__all__ = ("Packer", ) diff --git a/eaapi/wrapper.py b/eaapi/wrapper.py new file mode 100644 index 0000000..ac920df --- /dev/null +++ b/eaapi/wrapper.py @@ -0,0 +1,31 @@ +from .crypt import ea_symmetric_crypt +from .lz77 import lz77_compress, lz77_decompress + + +def wrap(packet, info=None, compressed=True): + if compressed: + packet = lz77_compress(packet) + if info is None: + return packet + return ea_symmetric_crypt(packet, info) + + +def unwrap(packet, info=None, compressed=True): + if info is None: + decrypted = packet + else: + decrypted = ea_symmetric_crypt(packet, info) + + if compressed is None: + try: + decompressed = lz77_decompress(decrypted) + except IndexError: + return decrypted + if decompressed == b"\0\0\0\0\0\0": + # Decompression almost certainly failed + return decrypted + return decompressed + return lz77_decompress(decrypted) if compressed else decrypted + + +__all__ = ("wrap", "unwrap") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5bbabf1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +pycryptodome +lxml \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..ee61d9b --- /dev/null +++ b/setup.py @@ -0,0 +1,6 @@ +from distutils.core import setup + +setup( + name="eaapi", + packages=["eaapi"], +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_cardconv.py b/tests/test_cardconv.py new file mode 100644 index 0000000..9c818f2 --- /dev/null +++ b/tests/test_cardconv.py @@ -0,0 +1,15 @@ +import unittest + +from eaapi.cardconv import konami_to_uid, uid_to_konami + + +class TestCardconv(unittest.TestCase): + def test_uid_to_konami(self): + # return + self.assertEqual(uid_to_konami("E004000000000000"), "YYBXDXH1U015BA1D") + self.assertEqual(uid_to_konami("0000000000000000"), "007TUT8XJNSSPN2P") + + def test_konami_to_uid(self): + # return + self.assertEqual(konami_to_uid("YYBXDXH1U015BA1D"), "E004000000000000") + self.assertEqual(konami_to_uid("007TUT8XJNSSPN2P"), "0000000000000000") diff --git a/tests/test_decoder.py b/tests/test_decoder.py new file mode 100644 index 0000000..fd8090d --- /dev/null +++ b/tests/test_decoder.py @@ -0,0 +1,23 @@ +import unittest + +from eaapi.decoder import Decoder + + +class TestDecoder(unittest.TestCase): + def test_packed_names(self): + decoder = Decoder(b"\x13\xe6\xda\xa5\xdb\xab\xa8\xc2\x59\xf7\xd3\xcc\xe5\xaf\x4f\x40") + decoder.compressed = True + + self.assertEqual(decoder._read_metadata_name(), "the_quick_brown_fox") + + def test_short_ascii_names(self): + decoder = Decoder(b"\x52the_quick_brown_fox") + decoder.encoding = "shift-jis" + + self.assertEqual(decoder._read_metadata_name(), "the_quick_brown_fox") + + def test_long_ascii_names(self): + decoder = Decoder(b"\x80\x87" + b"the_quick_brown_fox_" * 10) + decoder.encoding = "shift-jis" + + self.assertEqual(decoder._read_metadata_name(), "the_quick_brown_fox_" * 10) diff --git a/tests/test_encoder.py b/tests/test_encoder.py new file mode 100644 index 0000000..671ae17 --- /dev/null +++ b/tests/test_encoder.py @@ -0,0 +1,35 @@ +import unittest + +from eaapi.encoder import Encoder + + +class TestEncoder(unittest.TestCase): + def test_packed_names(self): + encoder = Encoder() + encoder._compressed = True + + encoder._write_metadata_name("the_quick_brown_fox") + + self.assertEqual( + encoder.stream.getvalue(), + b"\x13\xe6\xda\xa5\xdb\xab\xa8\xc2\x59\xf7\xd3\xcc\xe5\xaf\x4f\x40" + ) + + def test_short_ascii_names(self): + encoder = Encoder() + + encoder._write_metadata_name("the_quick_brown_fox") + + self.assertEqual( + encoder.stream.getvalue(), + b"\x52the_quick_brown_fox" + ) + + def test_long_ascii_names(self): + encoder = Encoder() + + encoder._write_metadata_name("the_quick_brown_fox_" * 10) + + value = encoder.stream.getvalue() + self.assertEqual(value[:2], b"\x80\x87") + self.assertEqual(value[2:], b"the_quick_brown_fox_" * 10) diff --git a/tests/test_misc.py b/tests/test_misc.py new file mode 100644 index 0000000..31c0396 --- /dev/null +++ b/tests/test_misc.py @@ -0,0 +1,41 @@ +import unittest + +from eaapi.misc import pack, unpack + + +class TestPackUnpack(unittest.TestCase): + def test_pack_2(self): + self.assertEqual( + pack(b"\0\1\2\3\3\2\1\0", 2), + b"\x1b\xe4" + ) + + def test_pack_4(self): + self.assertEqual( + pack(b"\0\1\2\3\4\5\6\7", 4), + b"\x01\x23\x45\x67" + ) + + def test_pack_5(self): + self.assertEqual( + pack(b"\0\1\2\3\4\5\6\7", 5), + b"\x00\x44\x32\x14\xc7" + ) + + def test_unpack_2(self): + self.assertEqual( + unpack(b"\x1b\xe4", 2), + b"\0\1\2\3\3\2\1\0" + ) + + def test_unpack_4(self): + self.assertEqual( + unpack(b"\x01\x23\x45\x67", 4), + b"\0\1\2\3\4\5\6\7" + ) + + def test_unpack_5(self): + self.assertEqual( + unpack(b"\x00\x44\x32\x14\xc7", 5), + b"\0\1\2\3\4\5\6\7" + ) diff --git a/tests/test_roundtrip.py b/tests/test_roundtrip.py new file mode 100644 index 0000000..02353c8 --- /dev/null +++ b/tests/test_roundtrip.py @@ -0,0 +1,137 @@ +import binascii +import unittest + +from eaapi import unwrap, wrap, Encoder, Decoder + + +TEST_CASES = [ + ( + binascii.unhexlify( + "13c5e3a34517cdaa9e41aa052f2edbc1f36df913fd5e512bb4a33f21e52b9474a2ab579126654757cd31bd326c0957246c69be6d77" + "4719a2a2c4f37ff56429bc7bfa02519ddc5079306f383f8d5664c8250a9bd181363a637cb76acff65c877984c58e1fad2834d1cbfc" + "2bd25d19f07bce3f3fb4886beca3079439a2a38e9a2cc9081d4a364617e27ef4d6981f33d9cf157190bd8da6ebad6dcb9c26523c58" + "d0ae6092e93432f43aac2b61e1fc96a538ac2f77de8fd2b52b2a2c882b208ebe8922dbdb9148703a5e03be1ff2e45cfc55dd83964f" + "d7e1a443227dfdb13917872919f904d14f2ff5cfc3eb8613db2da01d9940f44d48e2" + ), + "1-61bb5f0d-5866" + ), + ( + binascii.unhexlify( + "7e7ec6e960ab49372b054cb56a6378f65ac954aa010e61e67f313694435e3154ab522324ae0d709f156630c65f97e6a1c26f55afdc" + "51ab3e651c7c1b13624729bfc6e9f1642850fbf74de1454e167ecc82cee6b6318101de3d18e21b0f157a1226d28eff15f9dc31649c" + "b7e59689f01b5a32ced864976077d7b65dcb0c09370cfe646dc0594d26b52e9426bd0f7bf9b17c35f65c02886b907dfbe57db4a525" + "786d83bef400414fe24e02a6b830e731430b6584c7751f096e" + ), + "1-61bb6908-16ae" + ), + ( + binascii.unhexlify( + "f444818c52c84d99329099bf06c98ff02b8784bd9ac939c371646c3b918a262a55cdfb98e1f378d689c2e3afafe067066e986fcf85" + "6b870925e1ac0252daec1db455baff064240060b2cb36d35d841bb0049af858ed513385cab964770fbc63d3c29313e83c9aaf696ea" + "b5a0da4a8b06a1f41062624d0ff66e99b533618ac48dda47f4028a88888f667c81669c9e2c69f01c8725965475faec3347f9770e13" + "3c61eb4dbf9a02378f9a2e44e616470338cf59ff004eae170f3a6cfe14ba777d8d0b7638f1164101854d5f29d4b85aa3020621c798" + ), + "1-61bb6ab6-840c" + ), + ( + binascii.unhexlify( + "e638577a7b262667f9dca0a137bf7767627d86695a2a13dbdf7338a9076c4e3b7e85f145fa7b2a9106062f3490ed160e4882a943ab" + "f4e3b26e66daee00fc7b3dfe7fc0624251c44582214c7532a90d2a9afe1a1dfe17369f803522a574aaf4fd4a484417ae5b57aab315" + "50e398282f06f859f40e" + ), + "1-8d578a33-ca84" + ), + ( + binascii.unhexlify( + "9c593d03ae5979b150067743d3f29aaeb31888b6ad5aee21f654ea9274c09cdedc727a6d34052a5f2eed4b91ffb5cb7750bbed1bd4" + "ebe91c781fcd4c172680fb4343c25862e57efe52c2363c0bdb83282534541dfd4cb39ab6a3a74c80e31b31537e0f407af70c90fe9e" + "f8b4cb05c2e0d247666847669744cda5881f4c3658507d5806cb16520211689a64d6dfda72a5503f159633c1eb7eba68553d3b2e7c" + "008c137feffcae5923d8aa1b21137fd538651bad6a4de47d2021bcf59c7d9364b9ddf9330d730c34e86e273b414b4b26c1b468d06c" + "5de95d64f848bfdc1de75f3e79ddf74872459a690605675dee206c47d44f46dfbd56b95b8c670357f3a426e1a2aded8107b0dddb79" + "d33b2eb4608419ec84a004c17f7d289991922d3fbab8f5a57dfd6fba7ca9c33ca93b62aca8a0f4f83cfd30ef993284f0213d2d9f29" + "e7e839a7eb97314ee37b59dcab39c00b7351579833e7cf5cad52f9cd9d1cd0ca979a0b8c0770a72d14fb616ea2c8de8aa01d3e44e0" + "5bcfd6555026931b8f72329ed53e3a0d21021485a0ecdae791a8885cb7bd021e3b4d8412f2991350f1ad7f1c225754c17ce7c29238" + "1046aca3967d80e67e58d0ba22f25799a2257eac0759c0f7ba3e000d140ab415b58177d3f8bc343d913ddd7aba2f5f954947e7b51d" + "1b0aad61114936eed8f58c48e9345f81d4001c634b31efe4f7a6661fa1c79b384dcbeeca0ca74533a3dd8db8980b3e5dfe39248de2" + "56bd16ff0a5eb143fb977fafee7908ee7735b281c8fe986fb853bfe29f22e27cda5d3d173fff83d00a17b837d83355a16ffe87cc1d" + "4bf04d77c2d6addc22f3c461ef0192010469cfa6d3b23f414e38bd93a23b30b9c3b4b6db0c95352e48c3a547ac97803c7fb8f22e10" + "ab26e2f76d8c94321dd9779f0cce5ba7b7c960c7c4c230835e8782703a32ed108b990865e616709aab4ad67518ee255efaee20ee91" + "8a894fc4a9d98ef662c442ed9ac9db318f264633a7ec5dc88ffe22d18de9fee316c4037a4be6b99540ce2bbdb3fcdfabb0da04" + ), + "1-61bb68ff-156b" + ), + ( + binascii.unhexlify( + "f193f0970472de5897182faecd8536cc8dbfada5657e788263b57a91e6890302f4235942268cd8b9599ccc0a8638769229ed69856f" + "adfb15cc0e4828f035814ce41f5a419d969a6dba94cf0b4af831f0674ed56516ccf2c0720ce232a9dced53dfbeba3b0cb5aa933632" + "7ad8a9c2ea8c45c4b7233d1d60d48b82a78183f48b813ec3fa798f01c167d2ec1bc554f96c6f1064602c37a4445933ae06f7fd0c81" + "35036e5399682af75f7f35a75d83b6a1da836245af084fd1dd779abda0334eefdef7e442d421" + ), + "1-61ccf054-a8b3" + ), + ( + binascii.unhexlify( + "2b06d34e48babb9cf4a015f44e3e079d6ca547e54a2e0f5a2e2e5572b1587e9f886697f2654757647c654549c420c39984fc6c3d08" + "92a5ec88dd7a5645123629ec769db2505de90b380dff6da3249af1bc273dcf9f4d3cbdb596d0a08cd41369d763f28a4241ee567e91" + "511b692e9ebe07206b97a3086dd197074a01025d47b315a7997a0b695f6513aaefb9d2a281e1f7d621f01c9cd0ed1f26547807a85b" + "45838302319579deab1cd3ebaee5fb9c53694ca6817ba0fa655a2c3415127ceeff4f357850b4f4af979628829a3b8ab5ee4443e796" + "f043f759b9956fb4638789629cf5d3372382cbeddebf5320c5990b18aa551ba3fbda0bb44d09828cb2d794e6e6c77891872fb68c7e" + "5fe5135fa9cb90125989d699a88a9c9dc2b408f0794cd16269cab0708e36921471e6eba9a2edb59700a95724a663a739e889be6008" + "b9f724ebab6efef406841375a6ea784c2dbe0068c3a3af516ee4ec8161287a4cec3fbf0974dbfa207cf1bcd0e8eac1f5c772fd08b8" + "0fe02920e27941d2370840d3809cf12ec5618372dc0fd4d9346daef55b4fe60356d8896281a971a763e126e8a8dd73bd2b3ac27c4a" + "aaeffd2fbaae5659c0496e036cf1c46fd57b16400b0b819f16ba05e9d77b73e225daf21c6990554094381177b3bd0936f3af85ba7c" + "9b7995f752f0432992dec0ce04e339e77c78710c5b27abb46ffe7c0e10ecbea350cdad276dce01891e7f05a6da9bf2f4c0a07571ca" + "795904138888f87600e301022ba05bd1cbae6294dab611266170eabf02892c299913349002974cd2b15c0449a45f663fde8bca90bf" + "2116ca5706b7024110bcd63531dc13effacfe98b203a3aac063fc6005a7897c10b66ffe66b9a2433e7babb0f2aad3d92ae9e3a7bc9" + "cb94898f86301410aad4b6178f20b325237115a4bc0f4523808e20b9cdd3ac70a98d8373caabecb0b50aadf9874099bb808a73c58b" + "b37badd74989780f7ab4d962b5989accb75e05e3f14c24efbc81e9422aaf1ff1d7ea2974e0da0abddb0a4a3ce373e1806f088f23e8" + "779790d52887d048435fc94f0bcae56a43c29af0d996c99261e926da682b6cf86039e1f11541c272a1e828a083a3cd23b805f328ac" + "b86c051369fbef6629c1447940e89f06205f06ef2bf0fc5a6846b97d72c75611b6f578e9cbc5dc12cb696b4fa6b5c50047d2d4bdf6" + "f7d3b7a5347010eab70dc4c4460e8e0c21a7392dfa485b33908742a47ce8ba1435ff8351b7fb8c6a8a5f4e7a3138f1f70d683ef869" + "f23b10372db713071b843466365e55e1f3fd136042cb3cb8a05f7069a4597d38d29a50998c8f20fe6a0a46bfaf0cd30eb257c355c6" + "ff6e0da906052dd998196fa3a86ef2da5d8b6a1ec14209d690db56ebd759fc7b5bfda006be15533eb119885e34d08f251556c95f1e" + "42f800665ab255a28a9f8a17496001ecfc108258bbc3833c0c684c9d304b45a9c8b71c17cedbbaf496277e83f48ec4ddad87b265c8" + "4e0d706a6baa43d6e4fd608069b6b96ccb94a27c07093f869bc08d0181ddff4619b17e80b90dfab9d843047eb83e192ded7eb168db" + "c21f5daecb124ab11267d06846852507acb1b8c3d8873ae5961ffb34820e3ca822e12754f27f165d17ba7854fa8deed778bd58b61d" + "4930a2c66aca2b198861417acb64f981a9acc996ccedc764e1b0d36c7e003626d9a365c277b41508f0b47623fc5b59239dde5b148d" + "e35495ec7e3e757eebb4bf7ea337d63afeac93f475b2ef8e82d7a0a37e1eb8c565b4ba3a6977130232a3d6ff4e1f8d032d50dc9256" + "bf8924d2446fb97f12b46b2220e63b0393575fa89ed715a0a2c7d1861a86d28ee4fd845edbe2b93375cb59a7da42003309838b960b" + "f63ffa72b6c6ec2bfeae8e880ca360ef1e605e254634d6c006cb818798203c375a1b3a9e0f7edf6a47cc1ce2d8a8c54491bf983afe" + "4d50ae3bcabdc05668e39dfad321b17d1acdf9fa06d69910f97a4fd6e8cfb443b3b7c46dce6e2bb827c9b2bd8cbb49aa3483526094" + "4e4bb0c9dcd3014d3d6466a6" + ), + "1-b0468aca-dd10" + ) +] + + +class TestRoundtrip(unittest.TestCase): + def _test_case(self, case): + data, info = TEST_CASES[case] + + if info is None: + packet = data + else: + packet = unwrap(data, info, None) + + s_data = Decoder.decode(packet) + + encoder = Encoder(s_data.encoding) + encoder.pack(s_data) + encoder.stream.seek(0) + packed = encoder.stream.read() + # We no longer guarantee byte-for-byte matching during a roundtrip + # self.assertEqual(packet, packed, "Binary packet not equal") + + s_data_2 = Decoder.decode(packed) + self.assertEqual(s_data, s_data_2) + + if info is not None: + # data_new is compressed with a different algo, so won't be == data + data_new = wrap(packed, info) + s_data_new = Decoder.decode(unwrap(data_new, info)) + self.assertEqual(s_data_new, s_data) + + +for i in range(len(TEST_CASES)): + setattr(TestRoundtrip, f"test_{i}", lambda self: self._test_case(i))