Add support for initial d zero

This commit is contained in:
Hay1tsme 2023-04-23 04:38:28 -04:00
parent 58a088b9a4
commit d8c3ed5c01
31 changed files with 1125 additions and 0 deletions

11
example_config/idz.yaml Normal file
View File

@ -0,0 +1,11 @@
server:
enable: True
loglevel: "info"
hostname: ""
news: ""
aes_key: ""
ports:
userdb: 10000
match: 10010
echo: 10020

8
titles/idz/__init__.py Normal file
View File

@ -0,0 +1,8 @@
from titles.idz.index import IDZServlet
from titles.idz.const import IDZConstants
from titles.idz.database import IDZData
index = IDZServlet
database = IDZData
game_codes = [IDZConstants.GAME_CODE]
current_schema_version = 1

67
titles/idz/config.py Normal file
View File

@ -0,0 +1,67 @@
from typing import List, Dict
from core.config import CoreConfig
class IDZServerConfig:
def __init__(self, parent_config: "IDZConfig") -> None:
self.__config = parent_config
@property
def enable(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "idz", "server", "enable", default=True
)
@property
def loglevel(self) -> int:
return CoreConfig.str_to_loglevel(
CoreConfig.get_config_field(
self.__config, "idz", "server", "loglevel", default="info"
)
)
@property
def hostname(self) -> str:
return CoreConfig.get_config_field(
self.__config, "idz", "server", "hostname", default=""
)
@property
def news(self) -> str:
return CoreConfig.get_config_field(
self.__config, "idz", "server", "news", default=""
)
@property
def aes_key(self) -> str:
return CoreConfig.get_config_field(
self.__config, "idz", "server", "aes_key", default=""
)
class IDZPortsConfig:
def __init__(self, parent_config: "IDZConfig") -> None:
self.__config = parent_config
@property
def userdb(self) -> int:
return CoreConfig.get_config_field(self.__config, "idz", "ports", "userdb", default=10000)
@property
def match(self) -> int:
return CoreConfig.get_config_field(self.__config, "idz", "ports", "match", default=10010)
@property
def echo(self) -> int:
return CoreConfig.get_config_field(self.__config, "idz", "ports", "echo", default=10020)
class IDZConfig(dict):
def __init__(self) -> None:
self.server = IDZServerConfig(self)
self.ports = IDZPortsConfig(self)
@property
def rsa_keys(self) -> List[Dict]:
return CoreConfig.get_config_field(self, "idz", "rsa_keys", default=[])

40
titles/idz/const.py Normal file
View File

@ -0,0 +1,40 @@
from enum import Enum
class IDZConstants:
GAME_CODE = "SDDF"
CONFIG_NAME = "idz.yaml"
VER_IDZ_110 = 0
VER_IDZ_130 = 1
VER_IDZ_210 = 2
VER_IDZ_230 = 3
NUM_VERS = 4
VERSION_NAMES = (
"Initial D Arcade Stage Zero v1.10",
"Initial D Arcade Stage Zero v1.30",
"Initial D Arcade Stage Zero v2.10",
"Initial D Arcade Stage Zero v2.30",
)
class PROFILE_STATUS(Enum):
LOCKED = 0
UNLOCKED = 1
OLD = 2
HASH_LUT = [
# No clue
0x9C82E674, 0x5A4738D9, 0x8B8D7AE0, 0x29EC9D81,
# These three are from AES TE0
0x1209091B, 0x1D83839E, 0x582C2C74, 0x341A1A2E,
0x361B1B2D, 0xDC6E6EB2, 0xB45A5AEE, 0x5BA0A0FB,
0xA45252F6, 0x763B3B4D, 0xB7D6D661, 0x7DB3B3CE,
]
HASH_NUM = 0
HASH_MUL = [5, 7, 11, 12][HASH_NUM]
HASH_XOR = [0xB3, 0x8C, 0x14, 0x50][HASH_NUM]
@classmethod
def game_ver_to_string(cls, ver: int):
return cls.VERSION_NAMES[ver]

6
titles/idz/database.py Normal file
View File

@ -0,0 +1,6 @@
from core.data import Data
from core.config import CoreConfig
class IDZData(Data):
def __init__(self, cfg: CoreConfig) -> None:
super().__init__(cfg)

16
titles/idz/echo.py Normal file
View File

@ -0,0 +1,16 @@
from twisted.internet.protocol import DatagramProtocol
import logging
from core.config import CoreConfig
from .config import IDZConfig
class IDZEcho(DatagramProtocol):
def __init__(self, cfg: CoreConfig, game_cfg: IDZConfig) -> None:
super().__init__()
self.core_config = cfg
self.game_config = game_cfg
self.logger = logging.getLogger("idz")
def datagramReceived(self, data, addr):
self.logger.debug(f"Echo from from {addr[0]}:{addr[1]} -> {self.transport.getHost().port} - {data.hex()}")
self.transport.write(data, addr)

View File

@ -0,0 +1,39 @@
from .base import IDZHandlerBase
from .load_server_info import IDZHandlerLoadServerInfo
from .load_ghost import IDZHandlerLoadGhost
from .load_config import IDZHandlerLoadConfigA, IDZHandlerLoadConfigB
from .load_top_ten import IDZHandlerLoadTopTen
from .update_story_clear_num import IDZHandlerUpdateStoryClearNum
from .save_expedition import IDZHandlerSaveExpedition
from .load_2on2 import IDZHandlerLoad2on2A, IDZHandlerLoad2on2B
from .load_team_ranking import IDZHandlerLoadTeamRankingA, IDZHandlerLoadTeamRankingB
from .discover_profile import IDZHandlerDiscoverProfile
from .lock_profile import IDZHandlerLockProfile
from .check_team_names import IDZHandlerCheckTeamName
from .unknown import IDZHandlerUnknown
from .create_profile import IDZHandlerCreateProfile
from .create_auto_team import IDZHandlerCreateAutoTeam
from .load_profile import IDZHandlerLoadProfile
from .save_profile import IDZHandlerSaveProfile
from .update_provisional_store_rank import IDZHandlerUpdateProvisionalStoreRank
from .load_reward_table import IDZHandlerLoadRewardTable
from .save_topic import IDZHandlerSaveTopic

View File

@ -0,0 +1,25 @@
import logging
import struct
from core.data import Data
from core.config import CoreConfig
from ..config import IDZConfig
from ..const import IDZConstants
class IDZHandlerBase():
name = "generic"
cmd_codes = [0x0000] * IDZConstants.NUM_VERS
rsp_codes = [0x0001] * IDZConstants.NUM_VERS
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
self.core_config = core_cfg
self.game_cfg = game_cfg
self.data = Data(core_cfg)
self.logger = logging.getLogger("idz")
self.game = IDZConstants.GAME_CODE
self.version = version
self.size = 0x30
def handle(self, data: bytes) -> bytearray:
ret = bytearray([0] * self.size)
struct.pack_into("<H", ret, 0x0, self.rsp_codes[self.version])
return ret

View File

@ -0,0 +1,19 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
class IDZHandlerCheckTeamName(IDZHandlerBase):
cmd_codes = [0x00a2, 0x00a2, 0x0097, 0x0097]
rsp_codes = [0x00a3, 0x00a3, 0x0098, 0x0098]
name = "check_team_name"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x0010
def handle(self, data: bytes) -> bytearray:
ret = super().handle(data)
struct.pack_into("<I", ret, 0x4, 0x1)
return data

View File

@ -0,0 +1,49 @@
from operator import indexOf
import struct
import json
from random import choice, randrange
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
AUTO_TEAM_NAMES = ["スピードスターズ", "レッドサンズ", "ナイトキッズ"]
FULL_WIDTH_NUMS = ["\uff10", "\uff11", "\uff12", "\uff13", "\uff14", "\uff15", "\uff16", "\uff17", "\uff18", "\uff19"]
class IDZHandlerCreateAutoTeam(IDZHandlerBase):
cmd_codes = [0x007b, 0x007b, 0x0077, 0x0077]
rsp_codes = [0x007c, 0x007c, 0x0078, 0x0078]
name = "create_auto_team"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x0ca0
def handle(self, data: bytes) -> bytearray:
ret = super().handle(data)
aime_id = struct.unpack_from("<I", data, 0x04)[0]
name = choice(AUTO_TEAM_NAMES)
bg = indexOf(AUTO_TEAM_NAMES, name)
number = choice(FULL_WIDTH_NUMS) + choice(FULL_WIDTH_NUMS) + choice(FULL_WIDTH_NUMS)
tdata = {
"id": aime_id,
"bg": bg,
"fx": 0,
}
tdata = {
"id": aime_id,
"name": (name + number),
"bg": bg,
"fx": 0,
}
tname = tdata['name'].encode("shift-jis")
struct.pack_into("<I", ret, 0x0C, tdata["id"])
struct.pack_into(f"{len(tname)}s", ret, 0x24, tname)
struct.pack_into("<I", ret, 0x80, tdata["id"])
struct.pack_into(f"<I", ret, 0xD8, tdata["bg"])
struct.pack_into(f"<I", ret, 0xDC, tdata["fx"])
return ret

View File

@ -0,0 +1,91 @@
import json
import struct
from datetime import datetime
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
class IDZHandlerCreateProfile(IDZHandlerBase):
cmd_codes = [0x0066, 0x0066, 0x0064, 0x0064]
rsp_codes = [0x0067, 0x0065, 0x0065, 0x0065]
name = "create_profile"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x0020
def handle(self, data: bytes) -> bytearray:
ret = super().handle(data)
aime_id = struct.unpack_from("<L", data, 0x04)[0]
name = data[0x1E:0x0034].decode("shift-jis").replace("\x00", "")
car = data[0x40:0xa0].hex()
chara = data[0xa8:0xbc].hex()
self.logger.info(f"Create profile for {name} (aime id {aime_id})")
auto_team = None
if not auto_team:
team = {
"bg": 0,
"id": 0,
"shop": ""
}
else:
tdata = json.loads(auto_team["data"])
team = {
"bg": tdata["bg"],
"id": tdata["fx"],
"shop": ""
}
profile_data={
"profile": {
"xp": 0,
"lv": 1,
"fame": 0,
"dpoint": 0,
"milage": 0,
"playstamps": 0,
"last_login": int(datetime.now().timestamp()),
"car_str": car, # These should probably be chaged to dicts
"chara_str": chara, # But this works for now...
},
"options": {
"music": 0,
"pack": 13640,
"aura": 0,
"paper_cup": 0,
"gauges": 5,
"driving_style": 0
},
"missions": {
"team": [],
"solo": []
},
"story": {
"x": 0,
"y": 0,
"rows": {}
},
"unlocks": {
"auras": 1,
"cup": 0,
"gauges": 32,
"music": 0,
"last_mileage_reward": 0,
},
"team": team
}
if self.version > 2:
struct.pack_into("<L", ret, 0x04, aime_id)
else:
struct.pack_into("<L", ret, 0x08, aime_id)
return ret

View File

View File

@ -0,0 +1,23 @@
import struct
from typing import Tuple, List, Dict
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
class IDZHandlerDiscoverProfile(IDZHandlerBase):
cmd_codes = [0x006b, 0x0067]
rsp_codes = [0x006c, 0x0068, 0x0068,0x0068]
name = "discover_profile"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x0010
def handle(self, data: bytes) -> bytearray:
ret = super().handle(data)
user_id = struct.unpack_from("<I", data, 0x04)[0]
profile = None
struct.pack_into("<H", ret, 0x04, int(profile is not None))
return ret

View File

@ -0,0 +1,36 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
from ..const import IDZConstants
class IDZHandlerLoad2on2A(IDZHandlerBase):
cmd_codes = [0x00b0, 0x00b0, 0x00a3, 0x00a3]
rsp_codes = [0x00b1, 0x00b1, 0x00a4, 0x00a4]
name = "load_2on2A"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x04c0
if version >= IDZConstants.VER_IDZ_210:
self.size = 0x1290
def handle(self, data: bytes) -> bytearray:
return super().handle(data)
class IDZHandlerLoad2on2B(IDZHandlerBase):
cmd_codes = [0x0132] * 4
rsp_codes = [0x0133] * 4
name = "load_2on2B"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x04c0
if version >= IDZConstants.VER_IDZ_210:
self.size = 0x0540
def handle(self, data: bytes) -> bytearray:
return super().handle(data)

View File

@ -0,0 +1,41 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
from ..const import IDZConstants
class IDZHandlerLoadConfigA(IDZHandlerBase):
cmd_codes = [0x0004] * IDZConstants.NUM_VERS
rsp_codes = [0x0005] * IDZConstants.NUM_VERS
name = "load_config_a"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x01a0
if self.version > 1:
self.size = 0x05e0
def handle(self, data: bytes) -> bytearray:
ret = super().handle(data)
struct.pack_into("<H", ret, 0x02, 1)
struct.pack_into("<I", ret, 0x16, 230)
return ret
class IDZHandlerLoadConfigB(IDZHandlerBase):
cmd_codes = [0x00a0] * IDZConstants.NUM_VERS
rsp_codes = [0x00ac, 0x00ac, 0x00a1, 0x00a1]
name = "load_config_b"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x0230
if self.version > 1:
self.size = 0x0240
def handle(self, data: bytes) -> bytearray:
ret = super().handle(data)
struct.pack_into("<H", ret, 0x02, 1)
return ret

View File

@ -0,0 +1,39 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
class IDZHandlerLoadGhost(IDZHandlerBase):
cmd_codes = [0x00a0, 0x00a0, 0x0095, 0x0095]
rsp_codes = [0x00a1, 0x00a1, 0x0096, 0x0096]
name = "load_ghost"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x0070
def handle(self, data: bytes) -> bytearray:
ret = super().handle(data)
struct.pack_into("<I", ret, 0x02, 0x5)
struct.pack_into("<L", ret, 0x04, 0x0)
struct.pack_into("<l", ret, 0x08, -1)
struct.pack_into("<L", ret, 0x0C, 0x1D4C0)
struct.pack_into("<L", ret, 0x10, 0x1D4C0)
struct.pack_into("<L", ret, 0x14, 0x1D4C0)
struct.pack_into("<L", ret, 0x38, 0x0)
struct.pack_into("<l", ret, 0x3C, -1)
struct.pack_into("<L", ret, 0x40, 0x1D4C0)
struct.pack_into("<L", ret, 0x44, 0x1D4C0)
struct.pack_into("<L", ret, 0x48, 0x1D4C0)
struct.pack_into("<L", ret, 0x4C, 0x1D4C0)
struct.pack_into("<i", ret, 0x50, -1)
struct.pack_into("<H", ret, 0x52, 0)
struct.pack_into("<H", ret, 0x53, 0)
struct.pack_into("<H", ret, 0x54, 0)
struct.pack_into("<H", ret, 0x55, 0)
struct.pack_into("<H", ret, 0x58, 0)
return ret

View File

@ -0,0 +1,28 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
from ..const import IDZConstants
class IDZHandlerLoadProfile(IDZHandlerBase):
cmd_codes = [0x0067, 0x012f, 0x012f, 0x0142]
rsp_codes = [0x0065, 0x012e, 0x012e, 0x0141]
name = "load_profile"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
if self.version == IDZConstants.VER_IDZ_110:
self.size = 0x0d30
elif self.version == IDZConstants.VER_IDZ_130:
self.size = 0x0ea0
elif self.version == IDZConstants.VER_IDZ_210:
self.size = 0x1360
elif self.version == IDZConstants.VER_IDZ_230:
self.size = 0x1640
def handle(self, data: bytes) -> bytearray:
ret = super().handle(data)
aime_id = struct.unpack_from("<L", data, 0x04)[0]
return ret

View File

@ -0,0 +1,17 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
class IDZHandlerLoadRewardTable(IDZHandlerBase):
cmd_codes = [0x0086, 0x0086, 0x007F, 0x007F]
rsp_codes = [0x0087, 0x0087, 0x0080, 0x0080]
name = "load_reward_table"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x01c0
def handle(self, data: bytes) -> bytearray:
return super().handle(data)

View File

@ -0,0 +1,60 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
from ..const import IDZConstants
class IDZHandlerLoadServerInfo(IDZHandlerBase):
cmd_codes = [0x0006] * IDZConstants.NUM_VERS
rsp_codes = [0x0007] * IDZConstants.NUM_VERS
name = "load_server_info1"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x04b0
def handle(self, data: bytes) -> bytearray:
ret = super().handle(data)
offset = 0
if self.version >= IDZConstants.VER_IDZ_210:
offset = 2
news_str = f"http://{self.core_config.title.hostname}:{self.core_config.title.port}/SDDF/230/news/news80**.txt"
err_str = f"http://{self.core_config.title.hostname}:{self.core_config.title.port}/SDDF/230/error"
len_hostname = len(self.core_config.title.hostname)
len_news = len(news_str)
len_error = len(err_str)
struct.pack_into("<I", ret, 0x2 + offset, 1) # Status
struct.pack_into(f"{len_hostname}s", ret, 0x4 + offset, self.core_config.title.hostname.encode())
struct.pack_into("<I", ret, 0x84 + offset, self.game_cfg.ports.userdb)
struct.pack_into("<I", ret, 0x86 + offset, self.game_cfg.ports.userdb + 1)
struct.pack_into(f"{len_hostname}s", ret, 0x88 + offset, self.core_config.title.hostname.encode())
struct.pack_into("<I", ret, 0x108 + offset, self.game_cfg.ports.match - 1)
struct.pack_into("<I", ret, 0x10a + offset, self.game_cfg.ports.match - 3)
struct.pack_into("<I", ret, 0x10c + offset, self.game_cfg.ports.match - 2)
struct.pack_into("<I", ret, 0x10e + offset, self.game_cfg.ports.match + 2)
struct.pack_into("<I", ret, 0x110 + offset, self.game_cfg.ports.match + 3)
struct.pack_into("<I", ret, 0x112 + offset, self.game_cfg.ports.match + 1)
struct.pack_into(f"{len_hostname}s", ret, 0x114 + offset, self.core_config.title.hostname.encode())
struct.pack_into("<I", ret, 0x194 + offset, self.game_cfg.ports.echo + 2)
struct.pack_into(f"{len_hostname}s", ret, 0x0199 + offset, self.core_config.title.hostname.encode())
struct.pack_into("<I", ret, 0x0219 + offset, self.game_cfg.ports.echo + 3)
struct.pack_into(f"{len_hostname}s", ret, 0x021c + offset, self.core_config.title.hostname.encode())
struct.pack_into(f"{len_hostname}s", ret, 0x029c + offset, self.core_config.title.hostname.encode())
struct.pack_into(f"{len_hostname}s", ret, 0x031c + offset, self.core_config.title.hostname.encode())
struct.pack_into("<I", ret, 0x39c + offset, self.game_cfg.ports.echo)
struct.pack_into("<I", ret, 0x39e + offset, self.game_cfg.ports.echo + 1)
struct.pack_into(f"{len_news}s", ret, 0x03a0 + offset, news_str.encode())
struct.pack_into(f"{len_error}s", ret, 0x0424 + offset, err_str.encode())
return ret

View File

@ -0,0 +1,29 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
class IDZHandlerLoadTeamRankingA(IDZHandlerBase):
cmd_codes = [0x00b9, 0x00b9, 0x00a7, 0x00a7]
rsp_codes = [0x00b1] * 4
name = "load_team_ranking_a"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x0ba0
def handle(self, data: bytes) -> bytearray:
return super().handle(data)
class IDZHandlerLoadTeamRankingB(IDZHandlerBase):
cmd_codes = [0x00bb, 0x00bb, 0x00a9, 0x00a9]
rsp_codes = [0x00a8] * 4
name = "load_team_ranking_b"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x0ba0
def handle(self, data: bytes) -> bytearray:
return super().handle(data)

View File

@ -0,0 +1,30 @@
import struct
from typing import Tuple, List, Dict
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
class IDZHandlerLoadTopTen(IDZHandlerBase):
cmd_codes = [0x012c] * 4
rsp_codes = [0x00ce] * 4
name = "load_top_ten"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x1720
def handle(self, data: bytes) -> bytearray:
ret = super().handle(data)
tracks_dates: List[Tuple[int, int]] = []
for i in range(32):
tracks_dates.append(
(struct.unpack_from("<H", data, 0x04 + (2 * i))[0], "little",
struct.unpack_from("<I", data, 0x44 + (4 * i))[0], "little")
)
# TODO: Best scores
for i in range (3):
offset = 0x16c0 + 0x1c * i
struct.pack_into("<B", ret, offset + 0x02, 0xff)
return ret

View File

@ -0,0 +1,39 @@
import struct
from typing import Dict
from datetime import datetime, timedelta
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
from ..const import IDZConstants
class IDZHandlerLockProfile(IDZHandlerBase):
cmd_codes = [0x0069, 0x0069, 0x0065, 0x0065]
rsp_codes = [0x006a, 0x006a, 0x0066, 0x0066]
name = "lock_profile"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x0020
def handle(self, data: bytes) -> bytearray:
ret = super().handle(data)
profile_data = {
"status": IDZConstants.PROFILE_STATUS.UNLOCKED.value,
"expire_time": int((datetime.now() + timedelta(hours=1)).timestamp() / 1000)
}
user_id = struct.unpack_from("<I", data, 0x04)[0]
profile = None
if profile is None and self.version > 0:
old_profile = None
if old_profile is not None:
profile_data["status"] = IDZConstants.PROFILE_STATUS.OLD.value
return self.handle_common(profile_data, ret)
def handle_common(cls, data: Dict, ret: bytearray) -> bytearray:
struct.pack_into("<B", ret, 0x18, data["status"])
struct.pack_into("<h", ret, 0x1a, -1)
struct.pack_into("<I", ret, 0x1c, data["expire_time"])
return ret

View File

@ -0,0 +1,14 @@
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
from ..const import IDZConstants
class IDZHandlerSaveExpedition(IDZHandlerBase):
cmd_codes = [0x008c, 0x013f]
name = "save_expedition"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
def handle(self, data: bytes) -> bytearray:
return super().handle(data)

View File

@ -0,0 +1,15 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
class IDZHandlerSaveProfile(IDZHandlerBase):
cmd_codes = [0x0068, 0x0138, 0x0138, 0x0143]
name = "save_profile"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
def handle(self, data: bytes) -> bytearray:
return super().handle(data)

View File

@ -0,0 +1,17 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
class IDZHandlerSaveTopic(IDZHandlerBase):
cmd_codes = [0x009A, 0x009A, 0x0091, 0x0091]
rsp_codes = [0x009B, 0x009B, 0x0092, 0x0092]
name = "save_topic"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x05d0
def handle(self, data: bytes) -> bytearray:
return super().handle(data)

View File

@ -0,0 +1,12 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
class IDZHandlerUnknown(IDZHandlerBase):
cmd_codes = [0x00ad, 0x00ad, 0x00a2, 0x00a2]
name = "unknown"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)

View File

@ -0,0 +1,20 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
class IDZHandlerUpdateProvisionalStoreRank(IDZHandlerBase):
cmd_codes = [0x0082, 0x0082, 0x007C, 0x007C]
rsp_codes = [0x0083, 0x0083, 0x007D, 0x007D]
name = "update_provisional_store_ranking"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
self.size = 0x02b0
def handle(self, data: bytes) -> bytearray:
return super().handle(data)
def handle_common(cls, aime_id: int, ret: bytearray) -> bytearray:
pass

View File

@ -0,0 +1,26 @@
import struct
from .base import IDZHandlerBase
from core.config import CoreConfig
from ..config import IDZConfig
from ..const import IDZConstants
class IDZHandlerUpdateStoryClearNum(IDZHandlerBase):
cmd_codes = [0x007f, 0x097f, 0x013d, 0x0144]
rsp_codes = [0x0080, 0x013e, 0x013e, 0x0145]
name = "update_story_clear_num"
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, version: int) -> None:
super().__init__(core_cfg, game_cfg, version)
if self.version == IDZConstants.VER_IDZ_110:
self.size = 0x0220
elif self.version == IDZConstants.VER_IDZ_130:
self.size = 0x04f0
elif self.version == IDZConstants.VER_IDZ_210:
self.size = 0x0510
elif self.version == IDZConstants.VER_IDZ_230:
self.size = 0x0800
def handle(self, data: bytes) -> bytearray:
return super().handle(data)

146
titles/idz/index.py Normal file
View File

@ -0,0 +1,146 @@
from twisted.web.http import Request
import yaml
import logging
import coloredlogs
from logging.handlers import TimedRotatingFileHandler
from os import path
from typing import Tuple, List
from twisted.internet import reactor, endpoints
from twisted.web import server, resource
import importlib
from core.config import CoreConfig
from .config import IDZConfig
from .const import IDZConstants
from .userdb import IDZUserDBFactory, IDZUserDBWeb, IDZKey
from .echo import IDZEcho
from .handlers import IDZHandlerLoadConfigB
class IDZServlet:
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.core_cfg = core_cfg
self.game_cfg = IDZConfig()
if path.exists(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"):
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"))
)
self.logger = logging.getLogger("idz")
if not hasattr(self.logger, "inited"):
log_fmt_str = "[%(asctime)s] IDZ | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "idz"),
encoding="utf8",
when="d",
backupCount=10,
)
self.rsa_keys: List[IDZKey] = []
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.inited = True
@classmethod
def rsaHashKeyN(cls, data):
hash_ = 0
for i in data:
hash_ = hash_ * IDZConstants.HASH_MUL + (i ^ IDZConstants.HASH_XOR) ^ IDZConstants.HASH_LUT[i & 0xf]
hash_ &= 0xffffffff
return hash_
@classmethod
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
game_cfg = IDZConfig()
if path.exists(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"):
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{IDZConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return (False, "", "")
if len(game_cfg.rsa_keys) <= 0 or not game_cfg.server.aes_key:
logging.getLogger("idz").error("IDZ: No RSA/AES keys! IDZ cannot start")
return (False, "", "")
hostname = core_cfg.title.hostname if not game_cfg.server.hostname else game_cfg.server.hostname
return (
True,
f"",
f"{hostname}:{game_cfg.ports.userdb}",
)
def setup(self):
for key in self.game_cfg.rsa_keys:
if "N" not in key or "d" not in key or "e" not in key:
self.logger.error(f"Invalid IDZ key {key}")
continue
hashN = self.rsaHashKeyN(str(key["N"]).encode())
self.rsa_keys.append(IDZKey(key["N"], key["d"], key["e"], hashN))
if len(self.rsa_keys) <= 0:
self.logger.error("No valid RSA keys provided! IDZ cannot start!")
return
handler_map = [{} for _ in range(IDZConstants.NUM_VERS)]
handler_mod = mod = importlib.import_module(f"titles.idz.handlers")
for cls_name in dir(handler_mod):
if cls_name.startswith("__"):
continue
try:
mod = getattr(handler_mod, cls_name)
mod_cmds: List = getattr(mod, "cmd_codes")
while len(mod_cmds) < IDZConstants.NUM_VERS:
mod_cmds.append(None)
for i in range(len(mod_cmds)):
if mod_cmds[i] is None:
mod_cmds[i] = mod_cmds[i - 1]
handler_map[i][mod_cmds[i]] = mod
except AttributeError as e:
continue
endpoints.serverFromString(reactor, f"tcp:{self.game_cfg.ports.userdb}:interface={self.core_cfg.server.listen_address}")\
.listen(IDZUserDBFactory(self.core_cfg, self.game_cfg, self.rsa_keys, handler_map))
reactor.listenUDP(self.game_cfg.ports.echo, IDZEcho(self.core_cfg, self.game_cfg))
reactor.listenUDP(self.game_cfg.ports.echo + 1, IDZEcho(self.core_cfg, self.game_cfg))
reactor.listenUDP(self.game_cfg.ports.match, IDZEcho(self.core_cfg, self.game_cfg))
reactor.listenUDP(self.game_cfg.ports.userdb + 1, IDZEcho(self.core_cfg, self.game_cfg))
self.logger.info(f"UserDB Listening on port {self.game_cfg.ports.userdb}")
def render_POST(self, request: Request, version: int, url_path: str) -> bytes:
req_raw = request.content.getvalue()
self.logger.info(f"IDZ POST request: {url_path} - {req_raw}")
return b""
def render_GET(self, request: Request, version: int, url_path: str) -> bytes:
self.logger.info(f"IDZ GET request: {url_path}")
request.responseHeaders.setRawHeaders('Content-Type', [b"text/plain; charset=utf-8"])
request.responseHeaders.setRawHeaders("Last-Modified", [b"Sun, 23 Apr 2023 05:33:20 GMT"])
news = self.game_cfg.server.news if self.game_cfg.server.news else f"Welcome to Initial D Arcade Stage Zero on {self.core_cfg.server.name}!"
news += "\r\n"
news = "1979/01/01 00:00:00 2099/12/31 23:59:59 " + news
return news.encode()

0
titles/idz/match.py Normal file
View File

162
titles/idz/userdb.py Normal file
View File

@ -0,0 +1,162 @@
from twisted.internet.protocol import Factory, Protocol
import logging, coloredlogs
from Crypto.Cipher import AES
import struct
from typing import Dict, Optional, List, Type
from twisted.web import server, resource
from twisted.internet import reactor, endpoints
from twisted.web.http import Request
from routes import Mapper
import random
from os import walk
import importlib
from core.config import CoreConfig
from .database import IDZData
from .config import IDZConfig
from .const import IDZConstants
from .handlers import IDZHandlerBase
HANDLER_MAP: List[Dict]
class IDZKey:
def __init__(self, n, d, e, hashN: int) -> None:
self.N = n
self.d = d
self.e = e
self.hashN = hashN
class IDZUserDBProtocol(Protocol):
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig, keys: List[IDZKey], handlers: List[Dict]) -> None:
self.logger = logging.getLogger('idz')
self.core_config = core_cfg
self.game_config = game_cfg
self.rsa_keys = keys
self.handlers = handlers
self.static_key = bytes.fromhex(self.game_config.server.aes_key)
self.version = None
self.version_internal = None
self.skip_next = False
def append_padding(self, data: bytes):
"""Appends 0s to the end of the data until it's at the correct size"""
length = struct.unpack_from("<H", data, 6)
padding_size = length[0] - len(data)
data += bytes(padding_size)
return data
def connectionMade(self) -> None:
self.logger.debug(f"{self.transport.getPeer().host} Connected")
base = 0
for i in range(len(self.static_key) - 1):
shift = 8 * i
byte = self.static_key[i]
base |= byte << shift
rsa_key = random.choice(self.rsa_keys)
key_enc: int = pow(base, rsa_key.e, rsa_key.N)
result = key_enc.to_bytes(0x40, "little") + struct.pack("<I", 0x01020304) + rsa_key.hashN.to_bytes(4, "little")
self.logger.debug(f"Send handshake {result.hex()}")
self.transport.write(result)
def connectionLost(self, reason) -> None:
self.logger.debug(
f"{self.transport.getPeer().host} Disconnected - {reason.value}"
)
def dataReceived(self, data: bytes) -> None:
self.logger.debug(f"Receive data {data.hex()}")
crypt = AES.new(self.static_key, AES.MODE_ECB)
data_dec = crypt.decrypt(data)
self.logger.debug(f"Decrypt data {data_dec.hex()}")
magic = struct.unpack_from("<I", data_dec, 0)[0]
if magic == 0xFE78571D:
# Ignore
self.logger.info(f"Userdb serverbox request {data_dec.hex()}")
self.skip_next = True
self.transport.write(b"\x00")
return
elif magic == 0x01020304:
self.version = int(data_dec[16:19].decode())
if self.version == 110:
self.version_internal = IDZConstants.VER_IDZ_110
elif self.version == 130:
self.version_internal = IDZConstants.VER_IDZ_130
elif self.version == 210:
self.version_internal = IDZConstants.VER_IDZ_210
elif self.version == 230:
self.version_internal = IDZConstants.VER_IDZ_230
else:
self.logger.warn(f"Bad version v{self.version}")
self.version = None
self.version_internal = None
self.logger.debug(f"Userdb v{self.version} handshake response from {self.transport.getPeer().host}")
return
elif self.skip_next:
self.skip_next = False
self.transport.write(b"\x00")
return
elif self.version is None:
# We didn't get a handshake before, and this isn't one now, so we're up the creek
self.logger.info(f"Bad UserDB request from from {self.transport.getPeer().host}")
self.transport.write(b"\x00")
return
cmd = struct.unpack_from("<H", data_dec, 0)[0]
handler_cls: Optional[Type[IDZHandlerBase]] = self.handlers[self.version_internal].get(cmd, None)
if handler_cls is None:
self.logger.warn(f"No handler for v{self.version} {hex(cmd)} cmd")
handler_cls = IDZHandlerBase
handler = handler_cls(self.core_config, self.game_config, self.version_internal)
self.logger.info(f"Userdb v{self.version} {handler.name} request from {self.transport.getPeer().host}")
response = handler.handle(data_dec)
self.logger.debug(f"Response: {response.hex()}")
crypt = AES.new(self.static_key, AES.MODE_ECB)
response_enc = crypt.encrypt(response)
self.transport.write(response_enc)
class IDZUserDBFactory(Factory):
protocol = IDZUserDBProtocol
def __init__(self, cfg: CoreConfig, game_cfg: IDZConfig, keys: List[IDZKey], handlers: List[Dict]) -> None:
self.core_config = cfg
self.game_config = game_cfg
self.keys = keys
self.handlers = handlers
def buildProtocol(self, addr):
return IDZUserDBProtocol(self.core_config, self.game_config, self.keys, self.handlers)
class IDZUserDBWeb(resource.Resource):
def __init__(self, core_cfg: CoreConfig, game_cfg: IDZConfig):
super().__init__()
self.isLeaf = True
self.core_config = core_cfg
self.game_config = game_cfg
self.logger = logging.getLogger('idz')
def render_POST(self, request: Request) -> bytes:
self.logger.info(f"IDZUserDBWeb POST from {request.getClientAddress().host} to {request.uri} with data {request.content.getvalue()}")
return b""
def render_GET(self, request: Request) -> bytes:
self.logger.info(f"IDZUserDBWeb GET from {request.getClientAddress().host} to {request.uri}")
return b""