Server stuff

Bottersnike 4 months ago
parent f3e9db8cb1
commit 4ca5e03f59

3
.gitmodules vendored

@ -1,3 +0,0 @@
[submodule "server"]
path = eaapi/server
url = https://gitea.tendokyu.moe/eamuse/server.git

@ -1,13 +1,18 @@
from .const import Type, ServicesMode, Compression
from .node import XMLNode
from .encoder import Encoder
from .decoder import Decoder
from .wrapper import wrap, unwrap
from .misc import parse_model
__all__ = (
"Type", "ServicesMode", "Compression",
"XMLNode", "Encoder", "Decoder",
"wrap", "unwrap",
"parse_model",
)
from .const import Type, ServicesMode, Compression
from .node import XMLNode
from .encoder import Encoder
from .decoder import Decoder
from .wrapper import wrap, unwrap
from .misc import parse_model
from .exception import EAAPIException
from . import crypt
__all__ = (
"Type", "ServicesMode", "Compression",
"XMLNode", "Encoder", "Decoder",
"wrap", "unwrap",
"parse_model",
"EAAPIException",
"crypt",
)

@ -1,85 +1,85 @@
import binascii
from Crypto.Cipher import DES3
from .misc import assert_true, pack, unpack
from .exception import InvalidCard
from .keys import CARDCONV_KEY
from .const import CARD_ALPHABET
def enc_des(uid):
cipher = DES3.new(CARDCONV_KEY, DES3.MODE_CBC, iv=b'\0' * 8)
return cipher.encrypt(uid)
def dec_des(uid):
cipher = DES3.new(CARDCONV_KEY, DES3.MODE_CBC, iv=b'\0' * 8)
return cipher.decrypt(uid)
def checksum(data):
chk = sum(data[i] * (i % 3 + 1) for i in range(15))
while chk > 31:
chk = (chk >> 5) + (chk & 31)
return chk
def uid_to_konami(uid):
assert_true(len(uid) == 16, "UID must be 16 bytes", InvalidCard)
if uid.upper().startswith("E004"):
card_type = 1
elif uid.upper().startswith("0"):
card_type = 2
else:
raise InvalidCard("Invalid UID prefix")
kid = binascii.unhexlify(uid)
assert_true(len(kid) == 8, "ID must be 8 bytes", InvalidCard)
out = bytearray(unpack(enc_des(kid[::-1]), 5)[:13]) + b'\0\0\0'
out[0] ^= card_type
out[13] = 1
for i in range(1, 14):
out[i] ^= out[i - 1]
out[14] = card_type
out[15] = checksum(out)
return "".join(CARD_ALPHABET[i] for i in out)
def konami_to_uid(konami_id):
if konami_id[14] == "1":
card_type = 1
elif konami_id[14] == "2":
card_type = 2
else:
raise InvalidCard("Invalid ID")
assert_true(len(konami_id) == 16, "ID must be 16 characters", InvalidCard)
assert_true(all(i in CARD_ALPHABET for i in konami_id), "ID contains invalid characters", InvalidCard)
card = [CARD_ALPHABET.index(i) for i in konami_id]
assert_true(card[11] % 2 == card[12] % 2, "Parity check failed", InvalidCard)
assert_true(card[13] == card[12] ^ 1, "Card invalid", InvalidCard)
assert_true(card[15] == checksum(card), "Checksum failed", InvalidCard)
for i in range(13, 0, -1):
card[i] ^= card[i - 1]
card[0] ^= card_type
card_id = dec_des(pack(card[:13], 5)[:8])[::-1]
card_id = binascii.hexlify(card_id).decode().upper()
if card_type == 1:
assert_true(card_id[:4] == "E004", "Invalid card type", InvalidCard)
elif card_type == 2:
assert_true(card_id[0] == "0", "Invalid card type", InvalidCard)
return card_id
__all__ = ("konami_to_uid", "uid_to_konami")
import binascii
from Crypto.Cipher import DES3
from .misc import assert_true, pack, unpack
from .exception import InvalidCard
from .keys import CARDCONV_KEY
from .const import CARD_ALPHABET
def enc_des(uid: bytes) -> bytes:
cipher = DES3.new(CARDCONV_KEY, DES3.MODE_CBC, iv=b'\0' * 8)
return cipher.encrypt(uid)
def dec_des(uid: bytes) -> bytes:
cipher = DES3.new(CARDCONV_KEY, DES3.MODE_CBC, iv=b'\0' * 8)
return cipher.decrypt(uid)
def checksum(data: bytes) -> int:
chk = sum(data[i] * (i % 3 + 1) for i in range(15))
while chk > 31:
chk = (chk >> 5) + (chk & 31)
return chk
def uid_to_konami(uid: str) -> str:
assert_true(len(uid) == 16, "UID must be 16 bytes", InvalidCard)
if uid.upper().startswith("E004"):
card_type = 1
elif uid.upper().startswith("0"):
card_type = 2
else:
raise InvalidCard("Invalid UID prefix")
kid = binascii.unhexlify(uid)
assert_true(len(kid) == 8, "ID must be 8 bytes", InvalidCard)
out = bytearray(unpack(enc_des(kid[::-1]), 5)[:13]) + b'\0\0\0'
out[0] ^= card_type
out[13] = 1
for i in range(1, 14):
out[i] ^= out[i - 1]
out[14] = card_type
out[15] = checksum(out)
return "".join(CARD_ALPHABET[i] for i in out)
def konami_to_uid(konami_id: str) -> str:
if konami_id[14] == "1":
card_type = 1
elif konami_id[14] == "2":
card_type = 2
else:
raise InvalidCard("Invalid ID")
assert_true(len(konami_id) == 16, "ID must be 16 characters", InvalidCard)
assert_true(all(i in CARD_ALPHABET for i in konami_id), "ID contains invalid characters", InvalidCard)
card = bytearray([CARD_ALPHABET.index(i) for i in konami_id])
assert_true(card[11] % 2 == card[12] % 2, "Parity check failed", InvalidCard)
assert_true(card[13] == card[12] ^ 1, "Card invalid", InvalidCard)
assert_true(card[15] == checksum(card), "Checksum failed", InvalidCard)
for i in range(13, 0, -1):
card[i] ^= card[i - 1]
card[0] ^= card_type
card_id = dec_des(pack(card[:13], 5)[:8])[::-1]
card_id = binascii.hexlify(card_id).decode().upper()
if card_type == 1:
assert_true(card_id[:4] == "E004", "Invalid card type", InvalidCard)
elif card_type == 2:
assert_true(card_id[0] == "0", "Invalid card type", InvalidCard)
return card_id
__all__ = ("konami_to_uid", "uid_to_konami")

@ -1,163 +1,171 @@
import enum
from dataclasses import dataclass
from html import unescape
from typing import List, Callable
from .misc import assert_true
CARD_ALPHABET = "0123456789ABCDEFGHJKLMNPRSTUWXYZ"
NAME_MAX_COMPRESSED = 0x24
NAME_MAX_DECOMPRESSED = 0x1000
ENCODING = {
0x20: "ascii",
0x40: "iso-8859-1",
0x60: "euc-jp",
0x80: "shift-jis",
0xA0: "utf-8",
}
DEFAULT_ENCODING = ENCODING[0x80] # Shift-JIS
ENCODING[0x00] = DEFAULT_ENCODING
XML_ENCODING = {
"ASCII": "ascii",
"ISO-8859-1": "iso-8859-1",
"EUC-JP": "euc-jp",
"SHIFT_JIS": "shift-jis",
"SHIFT-JIS": "shift-jis",
"UTF-8": "utf-8",
}
ENCODING_BACK = {v: k for k, v in ENCODING.items()}
XML_ENCODING_BACK = {v: k for k, v in XML_ENCODING.items()}
ATTR = 0x2E
END_NODE = 0xFE
END_DOC = 0xFF
PACK_ALPHABET = "0123456789:ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz"
CONTENT_COMP_FULL = 0x42
CONTENT_COMP_SCHEMA = 0x43
CONTENT_FINGERPRINT = 0x44 # TODO: Identify how exactly this differs from the others
CONTENT_ASCII_FULL = 0x45
CONTENT_ASCII_SCHEMA = 0x46
CONTENT_COMP = (CONTENT_COMP_FULL, CONTENT_COMP_SCHEMA)
CONTENT_FULL = (CONTENT_COMP_FULL, CONTENT_ASCII_FULL)
CONTENT = (
CONTENT_COMP_FULL, CONTENT_COMP_SCHEMA, CONTENT_FINGERPRINT,
CONTENT_ASCII_FULL, CONTENT_ASCII_SCHEMA
)
ARRAY_BIT = 0x40
ARRAY_MASK = ARRAY_BIT - 1
@dataclass
class _Type:
id: int
fmt: str
names: List[str]
c_name: str
convert: Callable
size: int = 1
no_check: bool = False
def _parse(self, value):
if self.convert is None:
return ()
if self.size == 1:
if isinstance(value, (list, tuple)) and len(value) == 1:
value = value[0]
return self.convert(value)
if not self.no_check:
assert_true(len(value) == self.size, "Invalid node data")
return (*map(self.convert, value),)
def parse_ip(ip):
return (*map(int, ip.split(".")),)
class Type(enum.Enum):
Void = _Type(0x01, "", ["void"], "void", None)
S8 = _Type(0x02, "b", ["s8"], "int8", int)
U8 = _Type(0x03, "B", ["u8"], "uint8", int)
S16 = _Type(0x04, "h", ["s16"], "int16", int)
U16 = _Type(0x05, "H", ["u16"], "uint16", int)
S32 = _Type(0x06, "i", ["s32"], "int32", int)
U32 = _Type(0x07, "I", ["u32"], "uint32", int)
S64 = _Type(0x08, "q", ["s64"], "int64", int)
U64 = _Type(0x09, "Q", ["u64"], "uint64", int)
Blob = _Type(0x0a, "S", ["bin", "binary"], "char[]", bytes)
Str = _Type(0x0b, "s", ["str", "string"], "char[]", unescape)
IPv4 = _Type(0x0c, "4B", ["ip4"], "uint8[4]", parse_ip, 1, True)
Time = _Type(0x0d, "I", ["time"], "uint32", int)
Float = _Type(0x0e, "f", ["float", "f"], "float", float)
Double = _Type(0x0f, "d", ["double", "d"], "double", float)
TwoS8 = _Type(0x10, "2b", ["2s8"], "int8[2]", int, 2)
TwoU8 = _Type(0x11, "2B", ["2u8"], "uint8[2]", int, 2)
TwoS16 = _Type(0x12, "2h", ["2s16"], "int16[2]", int, 2)
TwoU16 = _Type(0x13, "2H", ["2u16"], "uint16[2]", int, 2)
TwoS32 = _Type(0x14, "2i", ["2s32"], "int32[2]", int, 2)
TwoU32 = _Type(0x15, "2I", ["2u32"], "uint32[2]", int, 2)
TwoS64 = _Type(0x16, "2q", ["2s64", "vs64"], "int16[2]", int, 2)
TwoU64 = _Type(0x17, "2Q", ["2u64", "vu64"], "uint16[2]", int, 2)
TwoFloat = _Type(0x18, "2f", ["2f"], "float[2]", float, 2)
TwoDouble = _Type(0x19, "2d", ["2d", "vd"], "double[2]", float, 2)
ThreeS8 = _Type(0x1a, "3b", ["3s8"], "int8[3]", int, 3)
ThreeU8 = _Type(0x1b, "3B", ["3u8"], "uint8[3]", int, 3)
ThreeS16 = _Type(0x1c, "3h", ["3s16"], "int16[3]", int, 3)
ThreeU16 = _Type(0x1d, "3H", ["3u16"], "uint16[3]", int, 3)
ThreeS32 = _Type(0x1e, "3i", ["3s32"], "int32[3]", int, 3)
ThreeU32 = _Type(0x1f, "3I", ["3u32"], "uint32[3]", int, 3)
ThreeS64 = _Type(0x20, "3q", ["3s64"], "int64[3]", int, 3)
ThreeU64 = _Type(0x21, "3Q", ["3u64"], "uint64[3]", int, 3)
ThreeFloat = _Type(0x22, "3f", ["3f"], "float[3]", float, 3)
ThreeDouble = _Type(0x23, "3d", ["3d"], "double[3]", float, 3)
FourS8 = _Type(0x24, "4b", ["4s8"], "int8[4]", int, 4)
FourU8 = _Type(0x25, "4B", ["4u8"], "uint8[4]", int, 4)
FourS16 = _Type(0x26, "4h", ["4s16"], "int16[4]", int, 4)
FourU16 = _Type(0x27, "4H", ["4u16"], "uint8[4]", int, 4)
FourS32 = _Type(0x28, "4i", ["4s32", "vs32"], "int32[4]", int, 4)
FourU32 = _Type(0x29, "4I", ["4u32", "vs32"], "uint32[4]", int, 4)
FourS64 = _Type(0x2a, "4q", ["4s64"], "int64[4]", int, 4)
FourU64 = _Type(0x2b, "4Q", ["4u64"], "uint64[4]", int, 4)
FourFloat = _Type(0x2c, "4f", ["4f", "vf"], "float[4]", float, 4)
FourDouble = _Type(0x2d, "4d", ["4d"], "double[4]", float, 4)
Attr = _Type(0x2e, "s", ["attr"], "char[]", None)
Array = _Type(0x2f, "", ["array"], "", None)
VecS8 = _Type(0x30, "16b", ["vs8"], "int8[16]", int, 16)
VecU8 = _Type(0x31, "16B", ["vu8"], "uint8[16]", int, 16)
VecS16 = _Type(0x32, "8h", ["vs16"], "int8[8]", int, 8)
VecU16 = _Type(0x33, "8H", ["vu16"], "uint8[8]", int, 8)
Bool = _Type(0x34, "b", ["bool", "b"], "bool", int)
TwoBool = _Type(0x35, "2b", ["2b"], "bool[2]", int, 2)
ThreeBool = _Type(0x36, "3b", ["3b"], "bool[3]", int, 3)
FourBool = _Type(0x37, "4b", ["4b"], "bool[4]", int, 4)
VecBool = _Type(0x38, "16b", ["vb"], "bool[16]", int, 16)
@classmethod
def from_val(cls, value):
for i in cls:
if i.value.id == value & ARRAY_MASK:
return i
raise ValueError(f"Unknown node type {value}")
class ServicesMode(enum.Enum):
Operation = "operation"
Debug = "debug"
Test = "test"
Factory = "factory"
class Compression(enum.Enum):
Lz77 = "lz77"
None_ = "none"
import enum
from dataclasses import dataclass
from html import unescape
from typing import List, Callable
from .misc import assert_true
CARD_ALPHABET = "0123456789ABCDEFGHJKLMNPRSTUWXYZ"
NAME_MAX_COMPRESSED = 0x24
NAME_MAX_DECOMPRESSED = 0x1000
ENCODING = {
0x20: "ascii",
0x40: "iso-8859-1",
0x60: "euc-jp",
0x80: "shift-jis",
0xA0: "utf-8",
}
DEFAULT_ENCODING = ENCODING[0x80] # Shift-JIS
ENCODING[0x00] = DEFAULT_ENCODING
XML_ENCODING = {
"ASCII": "ascii",
"ISO-8859-1": "iso-8859-1",
"EUC-JP": "euc-jp",
"SHIFT_JIS": "shift-jis",
"SHIFT-JIS": "shift-jis",
"UTF-8": "utf-8",
}
ENCODING_BACK = {v: k for k, v in ENCODING.items()}
XML_ENCODING_BACK = {v: k for k, v in XML_ENCODING.items()}
ATTR = 0x2E
END_NODE = 0xFE
END_DOC = 0xFF
PACK_ALPHABET = "0123456789:ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz"
CONTENT_COMP_FULL = 0x42
CONTENT_COMP_SCHEMA = 0x43
CONTENT_FINGERPRINT = 0x44 # TODO: Identify how exactly this differs from the others
CONTENT_ASCII_FULL = 0x45
CONTENT_ASCII_SCHEMA = 0x46
CONTENT_COMP = (CONTENT_COMP_FULL, CONTENT_COMP_SCHEMA)
CONTENT_FULL = (CONTENT_COMP_FULL, CONTENT_ASCII_FULL)
CONTENT = (
CONTENT_COMP_FULL, CONTENT_COMP_SCHEMA, CONTENT_FINGERPRINT,
CONTENT_ASCII_FULL, CONTENT_ASCII_SCHEMA
)
ARRAY_BIT = 0x40
ARRAY_MASK = ARRAY_BIT - 1
@dataclass
class _Type:
id: int
fmt: str
names: List[str]
c_name: str
convert: Callable | None
size: int = 1
no_check: bool = False
def _parse(self, value):
if self.convert is None:
return ()
if self.size == 1:
if isinstance(value, (list, tuple)) and len(value) == 1:
value = value[0]
return self.convert(value)
if not self.no_check:
assert_true(len(value) == self.size, "Invalid node data")
return (*map(self.convert, value),)
def parse_ip(ip: int | str) -> tuple[int, int, int, int]:
if isinstance(ip, int):
return (
(ip >> 24) & 0xff,
(ip >> 16) & 0xff,
(ip >> 8) & 0xff,
(ip >> 0) & 0xff,
)
return (*map(int, ip.split(".")),)
class Type(enum.Enum):
Void = _Type(0x01, "", ["void"], "void", None)
S8 = _Type(0x02, "b", ["s8"], "int8", int)
U8 = _Type(0x03, "B", ["u8"], "uint8", int)
S16 = _Type(0x04, "h", ["s16"], "int16", int)
U16 = _Type(0x05, "H", ["u16"], "uint16", int)
S32 = _Type(0x06, "i", ["s32"], "int32", int)
U32 = _Type(0x07, "I", ["u32"], "uint32", int)
S64 = _Type(0x08, "q", ["s64"], "int64", int)
U64 = _Type(0x09, "Q", ["u64"], "uint64", int)
Blob = _Type(0x0a, "S", ["bin", "binary"], "char[]", bytes)
Str = _Type(0x0b, "s", ["str", "string"], "char[]", unescape)
IPv4 = _Type(0x0c, "4B", ["ip4"], "uint8[4]", parse_ip, 1, True)
IPv4_Int = _Type(0x0c, "I", ["ip4"], "uint8[4]", parse_ip, 1, True)
Time = _Type(0x0d, "I", ["time"], "uint32", int)
Float = _Type(0x0e, "f", ["float", "f"], "float", float)
Double = _Type(0x0f, "d", ["double", "d"], "double", float)
TwoS8 = _Type(0x10, "2b", ["2s8"], "int8[2]", int, 2)
TwoU8 = _Type(0x11, "2B", ["2u8"], "uint8[2]", int, 2)
TwoS16 = _Type(0x12, "2h", ["2s16"], "int16[2]", int, 2)
TwoU16 = _Type(0x13, "2H", ["2u16"], "uint16[2]", int, 2)
TwoS32 = _Type(0x14, "2i", ["2s32"], "int32[2]", int, 2)
TwoU32 = _Type(0x15, "2I", ["2u32"], "uint32[2]", int, 2)
TwoS64 = _Type(0x16, "2q", ["2s64", "vs64"], "int16[2]", int, 2)
TwoU64 = _Type(0x17, "2Q", ["2u64", "vu64"], "uint16[2]", int, 2)
TwoFloat = _Type(0x18, "2f", ["2f"], "float[2]", float, 2)
TwoDouble = _Type(0x19, "2d", ["2d", "vd"], "double[2]", float, 2)
ThreeS8 = _Type(0x1a, "3b", ["3s8"], "int8[3]", int, 3)
ThreeU8 = _Type(0x1b, "3B", ["3u8"], "uint8[3]", int, 3)
ThreeS16 = _Type(0x1c, "3h", ["3s16"], "int16[3]", int, 3)
ThreeU16 = _Type(0x1d, "3H", ["3u16"], "uint16[3]", int, 3)
ThreeS32 = _Type(0x1e, "3i", ["3s32"], "int32[3]", int, 3)
ThreeU32 = _Type(0x1f, "3I", ["3u32"], "uint32[3]", int, 3)
ThreeS64 = _Type(0x20, "3q", ["3s64"], "int64[3]", int, 3)
ThreeU64 = _Type(0x21, "3Q", ["3u64"], "uint64[3]", int, 3)
ThreeFloat = _Type(0x22, "3f", ["3f"], "float[3]", float, 3)
ThreeDouble = _Type(0x23, "3d", ["3d"], "double[3]", float, 3)
FourS8 = _Type(0x24, "4b", ["4s8"], "int8[4]", int, 4)
FourU8 = _Type(0x25, "4B", ["4u8"], "uint8[4]", int, 4)
FourS16 = _Type(0x26, "4h", ["4s16"], "int16[4]", int, 4)
FourU16 = _Type(0x27, "4H", ["4u16"], "uint8[4]", int, 4)
FourS32 = _Type(0x28, "4i", ["4s32", "vs32"], "int32[4]", int, 4)
FourU32 = _Type(0x29, "4I", ["4u32", "vs32"], "uint32[4]", int, 4)
FourS64 = _Type(0x2a, "4q", ["4s64"], "int64[4]", int, 4)
FourU64 = _Type(0x2b, "4Q", ["4u64"], "uint64[4]", int, 4)
FourFloat = _Type(0x2c, "4f", ["4f", "vf"], "float[4]", float, 4)
FourDouble = _Type(0x2d, "4d", ["4d"], "double[4]", float, 4)
Attr = _Type(0x2e, "s", ["attr"], "char[]", None)
Array = _Type(0x2f, "", ["array"], "", None)
VecS8 = _Type(0x30, "16b", ["vs8"], "int8[16]", int, 16)
VecU8 = _Type(0x31, "16B", ["vu8"], "uint8[16]", int, 16)
VecS16 = _Type(0x32, "8h", ["vs16"], "int8[8]", int, 8)
VecU16 = _Type(0x33, "8H", ["vu16"], "uint8[8]", int, 8)
Bool = _Type(0x34, "b", ["bool", "b"], "bool", int)
TwoBool = _Type(0x35, "2b", ["2b"], "bool[2]", int, 2)
ThreeBool = _Type(0x36, "3b", ["3b"], "bool[3]", int, 3)
FourBool = _Type(0x37, "4b", ["4b"], "bool[4]", int, 4)
VecBool = _Type(0x38, "16b", ["vb"], "bool[16]", int, 16)
@classmethod
def from_val(cls, value):
for i in cls:
if i.value.id == value & ARRAY_MASK:
return i
raise ValueError(f"Unknown node type {value}")
class ServicesMode(enum.Enum):
Operation = "operation"
Debug = "debug"
Test = "test"
Factory = "factory"
class Compression(enum.Enum):
Lz77 = "lz77"
None_ = "none"

@ -1,49 +1,50 @@
import binascii
import hashlib
import time
import re
from Crypto.Cipher import ARC4
from .misc import assert_true
from .keys import EA_KEY
def new_prng():
state = 0x41c64e6d
while True:
x = (state * 0x838c9cda) + 0x6072
# state = (state * 0x41c64e6d + 0x3039)
# state = (state * 0x41c64e6d + 0x3039)
state = (state * 0xc2a29a69 + 0xd3dc167e) & 0xffffffff
yield (x & 0x7fff0000) | state >> 15 & 0xffff
prng = new_prng()
def validate_key(info):
match = re.match(r"^(\d)-([0-9a-f]{8})-([0-9a-f]{4})$", info)
assert_true(match, "Invalid eamuse info key")
version = match.group(1)
assert_true(version == "1", f"Unsupported encryption version ({version})")
seconds = binascii.unhexlify(match.group(2)) # 4 bytes
rng = binascii.unhexlify(match.group(3)) # 2 bytes
return seconds, rng
def get_key(prng_=None):
return f"1-{int(time.time()):08x}-{(next(prng_ or prng) & 0xffff):04x}"
def ea_symmetric_crypt(data, info):
seconds, rng = validate_key(info)
key = hashlib.md5(seconds + rng + EA_KEY).digest()
return ARC4.new(key).encrypt(data)
__all__ = ("new_prng", "prng", "validate_key", "get_key", "ea_symmetric_crypt")
import binascii
import hashlib
import time
import re
from Crypto.Cipher import ARC4
from .misc import assert_true
from .keys import EA_KEY
def new_prng():
state = 0x41c64e6d
while True:
x = (state * 0x838c9cda) + 0x6072
# state = (state * 0x41c64e6d + 0x3039)
# state = (state * 0x41c64e6d + 0x3039)
state = (state * 0xc2a29a69 + 0xd3dc167e) & 0xffffffff
yield (x & 0x7fff0000) | state >> 15 & 0xffff
prng = new_prng()
def validate_key(info):
match = re.match(r"^(\d)-([0-9a-f]{8})-([0-9a-f]{4})$", info)
assert_true(match is not None, "Invalid eamuse info key")
assert match is not None
version = match.group(1)
assert_true(version == "1", f"Unsupported encryption version ({version})")
seconds = binascii.unhexlify(match.group(2)) # 4 bytes
rng = binascii.unhexlify(match.group(3)) # 2 bytes
return seconds, rng
def get_key(prng_=None):
return f"1-{int(time.time()):08x}-{(next(prng_ or prng) & 0xffff):04x}"
def ea_symmetric_crypt(data, info):
seconds, rng = validate_key(info)
key = hashlib.md5(seconds + rng + EA_KEY).digest()
return ARC4.new(key).encrypt(data)
__all__ = ("new_prng", "prng", "validate_key", "get_key", "ea_symmetric_crypt")

@ -1,222 +1,241 @@
import math
import struct
import io
from html import unescape
try:
from lxml import etree
except ModuleNotFoundError:
print("W", "lxml not found, XML strings will not be supported")
etree = None
from .packer import Packer
from .const import (
NAME_MAX_COMPRESSED, NAME_MAX_DECOMPRESSED, ATTR, PACK_ALPHABET, END_NODE, END_DOC, ARRAY_BIT,
ENCODING, CONTENT, CONTENT_COMP, CONTENT_FULL, XML_ENCODING, Type
)
from .misc import unpack, py_encoding, assert_true
from .node import XMLNode
from .exception import DecodeError
class Decoder:
def __init__(self, packet):
self.stream = io.BytesIO(packet)
self.is_xml_string = packet.startswith(b"<")
self.encoding = None
self.compressed = False
self.has_data = False
self.packer = None
@classmethod
def decode(cls, packet):
return cls(packet).unpack()
def read(self, s_format, single=True, align=True):
if s_format == "S":
length = self.read("L")
if self.packer:
self.packer.notify_skipped(length)
return self.stream.read(length)
if s_format == "s":
length = self.read("L")
if self.packer:
self.packer.notify_skipped(length)
raw = self.stream.read(length)
return raw.decode(py_encoding(self.encoding)).rstrip("\0")
length = struct.calcsize("=" + s_format)
if self.packer and align:
self.stream.seek(self.packer.request_allocation(length))
data = self.stream.read(length)
assert_true(len(data) == length, "EOF reached", DecodeError)
value = struct.unpack(">" + s_format, data)
return value[0] if single else value
def _read_node_value(self, node):
fmt = node.type.value.fmt
count = 1
if node.is_array:
length = struct.calcsize("=" + fmt)
count = self.read("I") // length
values = []
for _ in range(count):
values.append(self.read(fmt, single=len(fmt) == 1, align=False))
self.packer.notify_skipped(count * length)
return values
node.value = self.read(fmt, single=len(fmt) == 1)
def _read_metadata_name(self):
length = self.read("B")
if not self.compressed:
if length < 0x80:
assert_true(length >= 0x40, "Invalid name length", DecodeError)
# i.e. length = (length & ~0x40) + 1
length -= 0x3f
else:
length = (length << 8) | self.read("B")
# i.e. length = (length & ~0x8000) + 0x41
length -= 0x7fbf
assert_true(length <= NAME_MAX_DECOMPRESSED, "Name length too long", DecodeError)
name = self.stream.read(length)
assert_true(len(name) == length, "Not enough bytes to read name", DecodeError)
return name.decode(self.encoding)
out = ""
if length == 0:
return out
assert_true(length <= NAME_MAX_COMPRESSED, "Name length too long", DecodeError)
no_bytes = math.ceil((length * 6) / 8)
unpacked = unpack(self.stream.read(no_bytes), 6)[:length]
return "".join(PACK_ALPHABET[i] for i in unpacked)
def _read_metadata(self, type_):
name = self._read_metadata_name()
node = XMLNode(name, type_, None, encoding=self.encoding)
while (child := self.read("B")) != END_NODE:
if child == ATTR:
attr = self._read_metadata_name()
assert_true(not attr.startswith("__"), "Invalid binary node name", DecodeError)
# Abuse the array here to maintain order
node.children.append(attr)
else:
node.children.append(self._read_metadata(child))
is_array = not not (type_ & ARRAY_BIT)
if is_array:
node.value = []
return node
def _read_databody(self, node: XMLNode):
self._read_node_value(node)
children = list(node.children)
node.children = []
for i in children:
if isinstance(i, XMLNode):
node.children.append(self._read_databody(i))
else:
node[i] = self.read("s")
return node
def _read_magic(self):
magic, contents, enc, enc_comp = struct.unpack(">BBBB", self.stream.read(4))
assert_true(magic == 0xA0, "Not a packet", DecodeError)
assert_true(~enc & 0xFF == enc_comp, "Malformed packet header", DecodeError)
assert_true(enc in ENCODING, "Unknown packet encoding", DecodeError)
assert_true(contents in CONTENT, "Invalid packet contents", DecodeError)
self.compressed = contents in CONTENT_COMP
self.has_data = contents in CONTENT_FULL or contents == 0x44
self.encoding = ENCODING[enc]
def _read_xml_string(self):
assert_true(etree is not None, "lxml missing", DecodeError)
parser = etree.XMLParser(remove_comments=True)
tree = etree.XML(self.stream.read(), parser)
self.encoding = XML_ENCODING[tree.getroottree().docinfo.encoding.upper()]
self.compressed = False
self.has_data = True
def walk(node):
attrib = {**node.attrib}
type_str = attrib.pop("__type", "void")
for i in Type:
if type_str in i.value.names:
type_ = i
break
else:
raise ValueError("Invalid node type")
attrib.pop("__size", None)
count = attrib.pop("__count", None)
is_array = count is not None
count = 1 if count is None else int(count)
d_type = type_.value
if d_type.size == 1 and not is_array:
value = d_type._parse(node.text or "")
else:
data = node.text.split(" ")
value = []
for i in range(0, len(data), d_type.size):
value.append(d_type._parse(data[i:i+d_type.size]))
if not is_array:
value = value[0]
xml_node = XMLNode(node.tag, type_, value, encoding=self.encoding)
for i in node.getchildren():
xml_node.children.append(walk(i))
for i in attrib:
xml_node[i] = unescape(attrib[i])
return xml_node
return walk(tree)
def unpack(self):
try:
return self._unpack()
except struct.error as e:
raise DecodeError(e)
def _unpack(self):
if self.is_xml_string:
return self._read_xml_string()
self._read_magic()
header_len = self.read("I")
start = self.stream.tell()
schema = self._read_metadata(self.read("B"))
assert_true(self.read("B") == END_DOC, "Unterminated schema", DecodeError)
padding = header_len - (self.stream.tell() - start)
assert_true(padding >= 0, "Invalid schema definition", DecodeError)
assert_true(all(i == 0 for i in self.stream.read(padding)), "Invalid schema padding", DecodeError)
body_len = self.read("I")
start = self.stream.tell()
self.packer = Packer(start)
data = self._read_databody(schema)
self.stream.seek(self.packer.request_allocation(0))
padding = body_len - (self.stream.tell() - start)
assert_true(padding >= 0, "Data shape not match schema", DecodeError)
assert_true(all(i == 0 for i in self.stream.read(padding)), "Invalid data padding", DecodeError)
assert_true(self.stream.read(1) == b"", "Trailing data unconsumed", DecodeError)
return data
__all__ = ("Decoder", )
import math
import struct
import io
from html import unescape
try:
from lxml import etree
except ModuleNotFoundError:
print("W", "lxml not found, XML strings will not be supported")
etree = None
from .packer import Packer
from .const import (
NAME_MAX_COMPRESSED, NAME_MAX_DECOMPRESSED, ATTR, PACK_ALPHABET, END_NODE, END_DOC, ARRAY_BIT,
ENCODING, CONTENT, CONTENT_COMP, CONTENT_FULL, XML_ENCODING, DEFAULT_ENCODING, Type
)
from .misc import unpack, py_encoding, assert_true
from .node import XMLNode
from .exception import DecodeError
class Decoder:
def __init__(self, packet):
self.stream = io.BytesIO(packet)
self.is_xml_string = packet.startswith(b"<")
self.encoding = None
self.compressed = False
self.has_data = False
self.packer = None
@classmethod
def decode(cls, packet):
return cls(packet).unpack()
def read(self, s_format, single=True, align=True):
if s_format == "S":
length = self.read("L")
if self.packer:
self.packer.notify_skipped(length)
return self.stream.read(length)
if s_format == "s":
length = self.read("L")
if self.packer:
self.packer.notify_skipped(length)
raw = self.stream.read(length)
return raw.decode(py_encoding(self.encoding or DEFAULT_ENCODING)).rstrip("\0")
length = struct.calcsize("=" + s_format)
if self.packer and align:
self.stream.seek(self.packer.request_allocation(length))
data = self.stream.read(length)
assert_true(len(data) == length, "EOF reached", DecodeError)
value = struct.unpack(">" + s_format, data)
return value[0] if single else value
def _read_node_value(self, node: XMLNode) -> None:
fmt = node.type.value.fmt
count = 1
if node.is_array:
length = struct.calcsize("=" + fmt)
nbytes = self.read("I")
assert isinstance(nbytes, int)
count = nbytes // length
values = []
for _ in range(count):
values.append(self.read(fmt, single=len(fmt) == 1, align=False))
assert self.packer is not None
self.packer.notify_skipped(count * length)
node.value = values
else:
node.value = self.read(fmt, single=len(fmt) == 1)
def _read_metadata_name(self) -> str:
length = self.read("B")
assert isinstance(length, int)
if not self.compressed:
if length < 0x80:
assert_true(length >= 0x40, "Invalid name length", DecodeError)
# i.e. length = (length & ~0x40) + 1
length -= 0x3f
else:
extra = self.read("B")
assert isinstance(extra, int)
length = (length << 8) | extra
# i.e. length = (length & ~0x8000) + 0x41
length -= 0x7fbf
assert_true(length <= NAME_MAX_DECOMPRESSED, "Name length too long", DecodeError)
name = self.stream.read(length)
assert_true(len(name) == length, "Not enough bytes to read name", DecodeError)
return name.decode(self.encoding or "")
out = ""
if length == 0:
return out
assert_true(length <= NAME_MAX_COMPRESSED, "Name length too long", DecodeError)
no_bytes = math.ceil((length * 6) / 8)
unpacked = unpack(self.stream.read(no_bytes), 6)[:length]
return "".join(PACK_ALPHABET[i] for i in unpacked)
def _read_metadata(self, type_):
name = self._read_metadata_name()
node = XMLNode(name, type_, None, encoding=self.encoding or DEFAULT_ENCODING)
while (child := self.read("B")) != END_NODE:
if child == ATTR:
attr = self._read_metadata_name()
assert_true(not attr.startswith("__"), "Invalid binary node name", DecodeError)
# Abuse the array here to maintain order
node.children.append(attr)
else:
node.children.append(self._read_metadata(child))
if type_ & ARRAY_BIT:
node.value = []
return node
def _read_databody(self, node: XMLNode):
self._read_node_value(node)
children = list(node.children)
node.children = []
for i in children:
if isinstance(i, XMLNode):
node.children.append(self._read_databody(i))
else:
node[i] = self.read("s")
return node
def _read_magic(self):
magic, contents, enc, enc_comp = struct.unpack(">BBBB", self.stream.read(4))
assert_true(magic == 0xA0, "Not a packet", DecodeError)
assert_true(~enc & 0xFF == enc_comp, "Malformed packet header", DecodeError)
assert_true(enc in ENCODING, "Unknown packet encoding", DecodeError)
assert_true(contents in CONTENT, "Invalid packet contents", DecodeError)
self.compressed = contents in CONTENT_COMP
self.has_data = contents in CONTENT_FULL or contents == 0x44
self.encoding = ENCODING[enc]
def _read_xml_string(self):
assert_true(etree is not None, "lxml missing", DecodeError)
assert etree is not None
parser = etree.XMLParser(remove_comments=True)
tree = etree.XML(self.stream.read(), parser)
self.encoding = XML_ENCODING[tree.getroottree().docinfo.encoding.upper()]
self.compressed = False
self.has_data = True
def walk(node):
attrib = {**node.attrib}
type_str = attrib.pop("__type", "void")
for i in Type:
if type_str in i.value.names:
type_ = i
break
else:
raise ValueError("Invalid node type")
attrib.pop("__size", None)
count = attrib.pop("__count", None)
is_array = count is not None
count = 1 if count is None else int(count)
d_type = type_.value
if d_type.size == 1 and not is_array:
try:
value = d_type._parse(node.text or "")
except ValueError:
print(f"Failed to parse {node.tag} ({d_type.names[0]}): {repr(node.text)}")
raise
else:
data = node.text.split(" ")
value = []
for i in range(0, len(data), d_type.size):
value.append(d_type._parse(data[i:i+d_type.size]))
if not is_array:
value = value[0]
xml_node = XMLNode(node.tag, type_, value, encoding=self.encoding or DEFAULT_ENCODING)
for i in node.getchildren():
xml_node.children.append(walk(i))
for i in attrib:
xml_node[i] = unescape(attrib[i])
return xml_node
return walk(tree)
def unpack(self):
try:
return self._unpack()
except struct.error as e:
raise DecodeError(e)
def _unpack(self):
if self.is_xml_string:
return self._read_xml_string()
self._read_magic()
header_len = self.read("I")
assert isinstance(header_len, int)
start = self.stream.tell()
schema = self._read_metadata(self.read("B"))
assert_true(self.read("B") == END_DOC, "Unterminated schema", DecodeError)
padding = header_len - (self.stream.tell() - start)
assert_true(padding >= 0, "Invalid schema definition", DecodeError)
assert_true(
all(i == 0 for i in self.stream.read(padding)), "Invalid schema padding", DecodeError
)
body_len = self.read("I")
assert isinstance(body_len, int)
start = self.stream.tell()
self.packer = Packer(start)
data = self._read_databody(schema)
self.stream.seek(self.packer.request_allocation(0))
padding = body_len - (self.stream.tell() - start)
assert_true(padding >= 0, "Data shape not match schema", DecodeError)
assert_true(
all(i == 0 for i in self.stream.read(padding)), "Invalid data padding", DecodeError
)
assert_true(self.stream.read(1) == b"", "Trailing data unconsumed", DecodeError)
return data
__all__ = ("Decoder", )

@ -1,157 +1,166 @@
import struct
import io
from .packer import Packer
from .misc import pack, py_encoding, assert_true
from .const import (
PACK_ALPHABET, DEFAULT_ENCODING, ENCODING, ENCODING_BACK, NAME_MAX_DECOMPRESSED, ARRAY_BIT,
ATTR, END_NODE, END_DOC, CONTENT_COMP_FULL, CONTENT_COMP_SCHEMA, CONTENT_ASCII_FULL,
CONTENT_ASCII_SCHEMA
)
from .exception import EncodeError
class Encoder:
def __init__(self, encoding=DEFAULT_ENCODING):
self.stream = io.BytesIO()
assert_true(encoding in ENCODING_BACK, f"Unknown encoding {encoding}", EncodeError)
self.encoding = ENCODING_BACK[encoding]
self.packer = None
self._compressed = False
@classmethod
def encode(cls, tree, xml_string=False):
if xml_string:
return tree.to_str(pretty=False).encode(tree.encoding)
encoder = cls(tree.encoding)
encoder.pack(tree)
return encoder.stream.getvalue()
def align(self, to=4, pad_char=b"\0"):
if to < 2:
return
if (dist := self.stream.tell() % to) == 0:
return
self.stream.write(pad_char * (to - dist))
def write(self, s_format, value, single=True):
if s_format == "S":
self.write("L", len(value))
self.stream.write(value)
self.packer.notify_skipped(len(value))
return
if s_format == "s":
value = value.encode(py_encoding(ENCODING[self.encoding])) + b"\0"
self.write("L", len(value))
self.stream.write(value)
self.packer.notify_skipped(len(value))
return
length = struct.calcsize("=" + s_format)
if not isinstance(value, list):
value = [value]
count = len(value)
if count != 1:
self.write("L", count * length)
self.packer.notify_skipped(count * length)
for x in value:
if self.packer and count == 1:
self.stream.seek(self.packer.request_allocation(length))
try:
if single:
self.stream.write(struct.pack(f">{s_format}", x))
else:
self.stream.write(struct.pack(f">{s_format}", *x))
except struct.error:
raise ValueError(f"Failed to pack {s_format}: {repr(x)}")
def _write_node_value(self, type_, value):
fmt = type_.value.fmt
if fmt == "s":
self.write("s", value)
else:
self.write(fmt, value, single=len(fmt) == 1)
def _write_metadata_name(self, name):
if not self._compressed:
assert_true(len(name) <= NAME_MAX_DECOMPRESSED, "Name length too long", EncodeError)
if len(name) > 64:
self.write("H", len(name) + 0x7fbf)
else:
self.write("B", len(name) + 0x3f)
self.stream.write(name.encode(py_encoding(ENCODING[self.encoding])))
return
assert_true(all(i in PACK_ALPHABET for i in name), f"Invalid schema name {name} (invalid chars)", EncodeError)
assert_true(len(name) < 256, f"Invalid schema name {name} (too long)", EncodeError)
self.write("B", len(name))
if len(name) == 0:
return
name = bytearray(PACK_ALPHABET.index(i) for i in name)
self.stream.write(pack(name, 6))
def _write_metadata(self, node):
self.write("B", node.type.value.id | (ARRAY_BIT if node.is_array else 0x00))
self._write_metadata_name(node.name)
for attr in node.attributes:
self.write("B", ATTR)
self._write_metadata_name(attr)
for child in node:
self._write_metadata(child)
self.write("B", END_NODE)
def _write_databody(self, data):
self._write_node_value(data.type, data.value)
for attr in data.attributes:
self.align()
self.write("s", data[attr])
for child in data:
self._write_databody(child)
def _write_magic(self, has_data=True):
if has_data:
contents = CONTENT_COMP_FULL if self._compressed else CONTENT_ASCII_FULL
else:
contents = CONTENT_COMP_SCHEMA if self._compressed else CONTENT_ASCII_SCHEMA
enc_comp = ~self.encoding & 0xFF
self.stream.write(struct.pack(">BBBB", 0xA0, contents, self.encoding, enc_comp))
def pack(self, node):
try:
return self._pack(node)
except struct.error as e:
return EncodeError(e)
def _pack(self, node):
self._compressed = node.can_compress # Opportunically compress if we can
self._write_magic()
schema_start = self.stream.tell()
self.write("I", 0)
self._write_metadata(node)
self.write("B", END_DOC)
self.align()
schema_end = self.stream.tell()
self.stream.seek(schema_start)
self.write("I", schema_end - schema_start - 4)
self.stream.seek(schema_end)
self.write("I", 0)
self.packer = Packer(self.stream.tell())
self._write_databody(node)
self.stream.seek(0, io.SEEK_END)
self.align()
node_end = self.stream.tell()
self.stream.seek(schema_end)
self.packer = None
self.write("I", node_end - schema_end - 4)
__all__ = ("Encoder", )
import struct
import io
from .packer import Packer
from .misc import pack, py_encoding, assert_true
from .const import (
PACK_ALPHABET, DEFAULT_ENCODING, ENCODING, ENCODING_BACK, NAME_MAX_DECOMPRESSED, ARRAY_BIT,
ATTR, END_NODE, END_DOC, CONTENT_COMP_FULL, CONTENT_COMP_SCHEMA, CONTENT_ASCII_FULL,
CONTENT_ASCII_SCHEMA
)
from .exception import EncodeError
from .node import XMLNode
class Encoder:
def __init__(self, encoding=DEFAULT_ENCODING):
self.stream = io.BytesIO()
assert_true(encoding in ENCODING_BACK, f"Unknown encoding {encoding}", EncodeError)
self.encoding = ENCODING_BACK[encoding]
self.packer = None
self._compressed = False
@classmethod
def encode(cls, tree, xml_string=False):
if xml_string:
return tree.to_str(pretty=False).encode(tree.encoding)
encoder = cls(tree.encoding)
encoder.pack(tree)
return encoder.stream.getvalue()
def align(self, to=4, pad_char=b"\0"):
if to < 2:
return
if (dist := self.stream.tell() % to) == 0:
return
self.stream.write(pad_char * (to - dist))
def write(self, s_format, value, single=True):
if s_format == "S":
assert self.packer is not None
self.write("L", len(value))
self.stream.write(value)
self.packer.notify_skipped(len(value))
return
if s_format == "s":
assert self.packer is not None
value = value.encode(py_encoding(ENCODING[self.encoding])) + b"\0"
self.write("L", len(value))
self.stream.write(value)
self.packer.notify_skipped(len(value))
return
length = struct.calcsize("=" + s_format)
if isinstance(value, list):
assert self.packer is not None
self.write("L", len(value) * length)
self.packer.notify_skipped(len(value) * length)
for x in value:
try:
if single:
self.stream.write(struct.pack(f">{s_format}", x))
else:
self.stream.write(struct.pack(f">{s_format}", *x))
except struct.error:
raise ValueError(f"Failed to pack {s_format}: {repr(x)}")
else:
if self.packer:
self.stream.seek(self.packer.request_allocation(length))
try:
if single:
self.stream.write(struct.pack(f">{s_format}", value))
else:
self.stream.write(struct.pack(f">{s_format}", *value))
except struct.error:
raise ValueError(f"Failed to pack {s_format}: {repr(value)}")
def _write_node_value(self, type_, value):
fmt = type_.value.fmt
if fmt == "s":
self.write("s", value)
else:
self.write(fmt, value, single=len(fmt) == 1)
def _write_metadata_name(self, name):
if not self._compressed:
assert_true(len(name) <= NAME_MAX_DECOMPRESSED, "Name length too long", EncodeError)
if len(name) > 64:
self.write</