feat(cm.read): Add support for reading packed files

This commit is contained in:
2024-07-08 22:55:42 +07:00
parent 4dcaf93dc7
commit 5e690cb28c

View File

@ -1,8 +1,11 @@
from io import IOBase
import os import os
import csv import csv
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from typing import Optional from typing import Optional
from construct import Array, Int32sl, Int64sl, Struct, Int32ul, Const
from read import BaseReader from read import BaseReader
from core.config import CoreConfig from core.config import CoreConfig
from titles.ongeki.database import OngekiData from titles.ongeki.database import OngekiData
@ -59,6 +62,9 @@ class CardMakerReader(BaseReader):
await self.read_chuni_gacha(f"{data_dir}/CHU/Data/A000/gacha") await self.read_chuni_gacha(f"{data_dir}/CHU/Data/A000/gacha")
await self.read_mai2_card(f"{data_dir}/MAI/Data/A000/card") await self.read_mai2_card(f"{data_dir}/MAI/Data/A000/card")
await self.read_ongeki_pack(f"{data_dir}/MU3/Data/A000.pac")
for file, func in static_datas.items(): for file, func in static_datas.items():
if os.path.exists(f"{self.bin_dir}/MU3/{file}"): if os.path.exists(f"{self.bin_dir}/MU3/{file}"):
read_csv = getattr(CardMakerReader, func) read_csv = getattr(CardMakerReader, func)
@ -71,8 +77,6 @@ class CardMakerReader(BaseReader):
if self.opt_dir is not None: if self.opt_dir is not None:
data_dirs = self.get_data_directories(self.opt_dir) data_dirs = self.get_data_directories(self.opt_dir)
# ONGEKI (MU3) cnnot easily access the bin data(A000.pac)
# so only opt_dir will work for now
for dir in data_dirs: for dir in data_dirs:
await self.read_chuni_card(f"{dir}/CHU/card") await self.read_chuni_card(f"{dir}/CHU/card")
await self.read_chuni_gacha(f"{dir}/CHU/gacha") await self.read_chuni_gacha(f"{dir}/CHU/gacha")
@ -83,6 +87,7 @@ class CardMakerReader(BaseReader):
self.logger.info(f"Reading cards from {base_dir}...") self.logger.info(f"Reading cards from {base_dir}...")
version_ids = { version_ids = {
"v1_50": ChuniConstants.VER_CHUNITHM_PARADISE,
"v2_00": ChuniConstants.VER_CHUNITHM_NEW, "v2_00": ChuniConstants.VER_CHUNITHM_NEW,
"v2_05": ChuniConstants.VER_CHUNITHM_NEW_PLUS, "v2_05": ChuniConstants.VER_CHUNITHM_NEW_PLUS,
"v2_10": ChuniConstants.VER_CHUNITHM_SUN, "v2_10": ChuniConstants.VER_CHUNITHM_SUN,
@ -131,6 +136,7 @@ class CardMakerReader(BaseReader):
self.logger.info(f"Reading gachas from {base_dir}...") self.logger.info(f"Reading gachas from {base_dir}...")
version_ids = { version_ids = {
"v1_50": ChuniConstants.VER_CHUNITHM_PARADISE,
"v2_00": ChuniConstants.VER_CHUNITHM_NEW, "v2_00": ChuniConstants.VER_CHUNITHM_NEW,
"v2_05": ChuniConstants.VER_CHUNITHM_NEW_PLUS, "v2_05": ChuniConstants.VER_CHUNITHM_NEW_PLUS,
"v2_10": ChuniConstants.VER_CHUNITHM_SUN, "v2_10": ChuniConstants.VER_CHUNITHM_SUN,
@ -213,6 +219,11 @@ class CardMakerReader(BaseReader):
name = troot.find("name").find("str").text name = troot.find("name").find("str").text
card_id = int(troot.find("name").find("id").text) card_id = int(troot.find("name").find("id").text)
version = troot.find("enableVersion").find("str").text
if version not in version_ids:
self.logger.warning(f"Card {card_id} is on an unsupported version {version}.")
continue
version = version_ids[ version = version_ids[
troot.find("enableVersion").find("str").text troot.find("enableVersion").find("str").text
@ -262,9 +273,7 @@ class CardMakerReader(BaseReader):
self.logger.info(f"Added ongeki card {row['cardId']} to gacha") self.logger.info(f"Added ongeki card {row['cardId']} to gacha")
async def read_ongeki_gacha(self, base_dir: str) -> None: async def read_ongeki_gacha_xml(self, troot: ET.Element):
self.logger.info(f"Reading gachas from {base_dir}...")
# assuming some GachaKinds based on the GachaType # assuming some GachaKinds based on the GachaType
type_to_kind = { type_to_kind = {
"Normal": "Normal", "Normal": "Normal",
@ -274,12 +283,6 @@ class CardMakerReader(BaseReader):
"FreeSR": "Free", "FreeSR": "Free",
} }
for root, dirs, files in os.walk(base_dir):
for dir in dirs:
if os.path.exists(f"{root}/{dir}/Gacha.xml"):
with open(f"{root}/{dir}/Gacha.xml", "r", encoding="utf-8") as f:
troot = ET.fromstring(f.read())
name = troot.find("Name").find("str").text name = troot.find("Name").find("str").text
gacha_id = int(troot.find("Name").find("id").text) gacha_id = int(troot.find("Name").find("id").text)
@ -290,10 +293,8 @@ class CardMakerReader(BaseReader):
) )
is not None is not None
): ):
self.logger.info( self.logger.info(f"Gacha {gacha_id} already added, skipping")
f"Gacha {gacha_id} already added, skipping" return
)
continue
# 1140 is the first bright memory gacha # 1140 is the first bright memory gacha
if gacha_id < 1140: if gacha_id < 1140:
@ -320,3 +321,123 @@ class CardMakerReader(BaseReader):
maxSelectPoint=max_select_point, maxSelectPoint=max_select_point,
) )
self.logger.info(f"Added ongeki gacha {gacha_id}") self.logger.info(f"Added ongeki gacha {gacha_id}")
async def read_ongeki_gacha(self, base_dir: str) -> None:
self.logger.info(f"Reading gachas from {base_dir}...")
for root, dirs, _ in os.walk(base_dir):
for dir in dirs:
if os.path.exists(f"{root}/{dir}/Gacha.xml"):
with open(f"{root}/{dir}/Gacha.xml", "r", encoding="utf-8") as f:
troot = ET.fromstring(f.read())
await self.read_ongeki_gacha_xml(troot)
async def read_ongeki_pack(self, packfile: str):
if not os.path.exists(packfile):
self.logger.warning(f"O.N.G.E.K.I. VFS file {packfile} not found.")
return
with open(packfile, "rb") as f:
# First, read the header to know how much data is metadata
header = VFSHeader.parse(f.read(VFSHeader.sizeof()))
meta_size = header.num_entries * VFSFileData.sizeof() + VFSHeader.sizeof()
# Now that we know, read the entire metadata in.
f.seek(0, os.SEEK_SET)
metadata = VFSMetadata.parse(f.read(meta_size))
# Read the string table, which contains the file names.
f.seek(header.offset_string, os.SEEK_SET)
string_table = f.read(header.offset_data - header.offset_string).decode(
"utf-16-le"
)
# Now that we have everything, time to build the VFS tree. The first item in the tree should be the root.
tree = VFSDirectory(metadata, string_table, 0)
gachas = tree.directories["gacha"]
for gacha in gachas.directories.values():
troot = ET.fromstring(
gacha.files["Gacha.xml"].read_from_file(f).decode("utf-8")
)
await self.read_ongeki_gacha_xml(troot)
# For whatever reason, CardMaker uses a packed filesystem for O.N.G.E.K.I. A000,
# whose data structures are the 3 structs below.
VFSHeader = Struct(
"magic" / Const(0x4B434150, Int32ul),
"reserved" / Int32ul,
"num_entries" / Int64sl,
"offset_string" / Int64sl * "offset for the string table which contains all filenamess",
"offset_data" / Int64sl * "offset of actual file data",
)
VFSFileData = Struct(
"type" / Int32sl * "0 for file, 1 for directory",
"flags" / Int32sl * "doesn't seem to be used",
"name_offset" / Int32sl * "offset of the filename in the string table",
"name_length" / Int32sl * "length of the filename",
"data_length" / Int32sl * "length of file data",
"data_offset" / Int64sl * "offset of file data starting from offset_data",
)
VFSMetadata = Struct(
"header" / VFSHeader,
"files" / Array(lambda ctx: ctx.header.num_entries, VFSFileData),
)
class VFSFile:
# vfs is an instance of VFSMetadata, Python type system sucks.
def __init__(self, vfs, string_table: str, file_id: int) -> None:
metadata = vfs.files[file_id]
if metadata.type != 0:
raise ValueError(
f"File ID {file_id} is not a file (found type {metadata.type})"
)
self.name = string_table[
metadata.name_offset : metadata.name_offset + metadata.name_length
]
self.offset = vfs.header.offset_data + metadata.data_offset
self.length = metadata.data_length
def read_from_file(self, f: IOBase):
if not f.seekable():
raise ValueError("Received an IO object that could not seek.")
f.seek(self.offset, os.SEEK_SET)
return f.read(self.length)
class VFSDirectory:
def __init__(self, vfs, string_table: str, file_id: int) -> None:
metadata = vfs.files[file_id]
if metadata.type != 1:
raise ValueError(
f"File ID {file_id} is not a directory (found type {metadata.type})"
)
self.name = string_table[
metadata.name_offset : metadata.name_offset + metadata.name_length
]
self.files: dict[str, VFSFile] = {}
self.directories: dict[str, VFSDirectory] = {}
for i in range(metadata.data_length):
child_id = metadata.data_offset + i
child_metadata = vfs.files[child_id]
if child_metadata.type == 0:
f = VFSFile(vfs, string_table, child_id)
self.files[f.name] = f
elif child_metadata.type == 1:
d = VFSDirectory(vfs, string_table, child_id)
self.directories[d.name] = d
else:
raise ValueError(
f"File ID {child_id} has invalid file type {child_metadata.type} (expected 0 or 1)"
)