344 lines
13 KiB
Python
344 lines
13 KiB
Python
from decimal import Decimal
|
|
import logging
|
|
import os
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from typing import Any, Dict, List, Optional
|
|
from Crypto.Cipher import AES
|
|
import zlib
|
|
import codecs
|
|
|
|
from core.config import CoreConfig
|
|
from core.data import Data
|
|
from read import BaseReader
|
|
from titles.mai2.const import Mai2Constants
|
|
from titles.mai2.database import Mai2Data
|
|
|
|
|
|
class Mai2Reader(BaseReader):
|
|
def __init__(
|
|
self,
|
|
config: CoreConfig,
|
|
version: int,
|
|
bin_dir: Optional[str],
|
|
opt_dir: Optional[str],
|
|
extra: Optional[str],
|
|
) -> None:
|
|
super().__init__(config, version, bin_dir, opt_dir, extra)
|
|
self.data = Mai2Data(config)
|
|
|
|
try:
|
|
self.logger.info(
|
|
f"Start importer for {Mai2Constants.game_ver_to_string(version)}"
|
|
)
|
|
except IndexError:
|
|
self.logger.error(f"Invalid maimai DX version {version}")
|
|
exit(1)
|
|
|
|
async def read(self) -> None:
|
|
data_dirs = []
|
|
if self.version >= Mai2Constants.VER_MAIMAI_DX:
|
|
if self.bin_dir is not None:
|
|
data_dirs += self.get_data_directories(self.bin_dir)
|
|
|
|
if self.opt_dir is not None:
|
|
data_dirs += self.get_data_directories(self.opt_dir)
|
|
|
|
for dir in data_dirs:
|
|
self.logger.info(f"Read from {dir}")
|
|
await self.get_events(f"{dir}/event")
|
|
await self.disable_events(f"{dir}/information", f"{dir}/scoreRanking")
|
|
await self.read_music(f"{dir}/music")
|
|
await self.read_tickets(f"{dir}/ticket")
|
|
|
|
else:
|
|
if not os.path.exists(f"{self.bin_dir}/tables"):
|
|
self.logger.error(f"tables directory not found in {self.bin_dir}")
|
|
return
|
|
|
|
if self.version >= Mai2Constants.VER_MAIMAI_MILK:
|
|
if self.extra is None:
|
|
self.logger.error("Milk - Finale requre an AES key via a hex string send as the --extra flag")
|
|
return
|
|
|
|
key = bytes.fromhex(self.extra)
|
|
|
|
else:
|
|
key = None
|
|
|
|
evt_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmEvent.bin", key)
|
|
txt_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmtextout_jp.bin", key)
|
|
score_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmScore.bin", key)
|
|
|
|
await self.read_old_events(evt_table)
|
|
await self.read_old_music(score_table, txt_table)
|
|
|
|
if self.opt_dir is not None:
|
|
evt_table = self.load_table_raw(f"{self.opt_dir}/tables", "mmEvent.bin", key)
|
|
txt_table = self.load_table_raw(f"{self.opt_dir}/tables", "mmtextout_jp.bin", key)
|
|
score_table = self.load_table_raw(f"{self.opt_dir}/tables", "mmScore.bin", key)
|
|
|
|
await self.read_old_events(evt_table)
|
|
await self.read_old_music(score_table, txt_table)
|
|
|
|
return
|
|
|
|
def load_table_raw(self, dir: str, file: str, key: Optional[bytes]) -> Optional[List[Dict[str, str]]]:
|
|
if not os.path.exists(f"{dir}/{file}"):
|
|
self.logger.warning(f"file {file} does not exist in directory {dir}, skipping")
|
|
return
|
|
|
|
self.logger.info(f"Load table {file} from {dir}")
|
|
if key is not None:
|
|
cipher = AES.new(key, AES.MODE_CBC)
|
|
with open(f"{dir}/{file}", "rb") as f:
|
|
f_encrypted = f.read()
|
|
f_data = cipher.decrypt(f_encrypted)[0x10:]
|
|
|
|
else:
|
|
with open(f"{dir}/{file}", "rb") as f:
|
|
f_data = f.read()[0x10:]
|
|
|
|
if f_data is None or not f_data:
|
|
self.logger.warning(f"file {dir} could not be read, skipping")
|
|
return
|
|
|
|
f_data_deflate = zlib.decompress(f_data, wbits = zlib.MAX_WBITS | 16)[0x12:] # lop off the junk at the beginning
|
|
f_decoded = codecs.utf_16_le_decode(f_data_deflate)[0]
|
|
f_split = f_decoded.splitlines()
|
|
|
|
has_struct_def = "struct " in f_decoded
|
|
is_struct = False
|
|
struct_def = []
|
|
tbl_content = []
|
|
|
|
if has_struct_def:
|
|
for x in f_split:
|
|
if x.startswith("struct "):
|
|
is_struct = True
|
|
struct_name = x[7:-1]
|
|
continue
|
|
|
|
if x.startswith("};"):
|
|
is_struct = False
|
|
break
|
|
|
|
if is_struct:
|
|
try:
|
|
struct_def.append(x[x.rindex(" ") + 2: -1])
|
|
except ValueError:
|
|
self.logger.warning(f"rindex failed on line {x}")
|
|
|
|
if is_struct:
|
|
self.logger.warning("Struct not formatted properly")
|
|
|
|
if not struct_def:
|
|
self.logger.warning("Struct def not found")
|
|
|
|
name = file[:file.index(".")]
|
|
if "_" in name:
|
|
name = name[:file.index("_")]
|
|
|
|
for x in f_split:
|
|
if not x.startswith(name.upper()):
|
|
continue
|
|
|
|
line_match = re.match(r"(\w+)\((.*?)\)([ ]+\/{3}<[ ]+(.*))?", x)
|
|
if line_match is None:
|
|
continue
|
|
|
|
if not line_match.group(1) == name.upper():
|
|
self.logger.warning(f"Strange regex match for line {x} -> {line_match}")
|
|
continue
|
|
|
|
vals = line_match.group(2)
|
|
comment = line_match.group(4)
|
|
line_dict = {}
|
|
|
|
vals_split = vals.split(",")
|
|
for y in range(len(vals_split)):
|
|
stripped = vals_split[y].strip().lstrip("L\"").lstrip("\"").rstrip("\"")
|
|
if not stripped or stripped is None:
|
|
continue
|
|
|
|
if has_struct_def and len(struct_def) > y:
|
|
line_dict[struct_def[y]] = stripped
|
|
|
|
else:
|
|
line_dict[f'item_{y}'] = stripped
|
|
|
|
if comment:
|
|
line_dict['comment'] = comment
|
|
|
|
tbl_content.append(line_dict)
|
|
|
|
if tbl_content:
|
|
return tbl_content
|
|
|
|
else:
|
|
self.logger.warning("Failed load table content, skipping")
|
|
return
|
|
|
|
async def get_events(self, base_dir: str) -> None:
|
|
self.logger.info(f"Reading events from {base_dir}...")
|
|
|
|
for root, dirs, files in os.walk(base_dir):
|
|
for dir in dirs:
|
|
if os.path.exists(f"{root}/{dir}/Event.xml"):
|
|
with open(f"{root}/{dir}/Event.xml", encoding="utf-8") as f:
|
|
troot = ET.fromstring(f.read())
|
|
|
|
name = troot.find("name").find("str").text
|
|
id = int(troot.find("name").find("id").text)
|
|
event_type = int(troot.find("infoType").text)
|
|
|
|
await self.data.static.put_game_event(
|
|
self.version, event_type, id, name
|
|
)
|
|
self.logger.info(f"Added event {id}...")
|
|
|
|
async def disable_events(
|
|
self, base_information_dir: str, base_score_ranking_dir: str
|
|
) -> None:
|
|
self.logger.info(f"Reading disabled events from {base_information_dir}...")
|
|
|
|
for root, dirs, files in os.walk(base_information_dir):
|
|
for dir in dirs:
|
|
if os.path.exists(f"{root}/{dir}/Information.xml"):
|
|
with open(f"{root}/{dir}/Information.xml", encoding="utf-8") as f:
|
|
troot = ET.fromstring(f.read())
|
|
|
|
event_id = int(troot.find("name").find("id").text)
|
|
|
|
await self.data.static.toggle_game_event(
|
|
self.version, event_id, toggle=False
|
|
)
|
|
self.logger.info(f"Disabled event {event_id}...")
|
|
|
|
for root, dirs, files in os.walk(base_score_ranking_dir):
|
|
for dir in dirs:
|
|
if os.path.exists(f"{root}/{dir}/ScoreRanking.xml"):
|
|
with open(f"{root}/{dir}/ScoreRanking.xml", encoding="utf-8") as f:
|
|
troot = ET.fromstring(f.read())
|
|
|
|
event_id = int(troot.find("eventName").find("id").text)
|
|
|
|
await self.data.static.toggle_game_event(
|
|
self.version, event_id, toggle=False
|
|
)
|
|
self.logger.info(f"Disabled event {event_id}...")
|
|
|
|
# manually disable events wich are known to be problematic
|
|
for event_id in [
|
|
1,
|
|
10,
|
|
220311,
|
|
220312,
|
|
220313,
|
|
220314,
|
|
220315,
|
|
220316,
|
|
220317,
|
|
220318,
|
|
20121821,
|
|
21121651,
|
|
22091511,
|
|
22091512,
|
|
22091513,
|
|
22091514,
|
|
22091515,
|
|
22091516,
|
|
22091517,
|
|
22091518,
|
|
22091519,
|
|
]:
|
|
await self.data.static.toggle_game_event(self.version, event_id, toggle=False)
|
|
self.logger.info(f"Disabled event {event_id}...")
|
|
|
|
async def read_music(self, base_dir: str) -> None:
|
|
self.logger.info(f"Reading music from {base_dir}...")
|
|
|
|
for root, dirs, files in os.walk(base_dir):
|
|
for dir in dirs:
|
|
if os.path.exists(f"{root}/{dir}/Music.xml"):
|
|
with open(f"{root}/{dir}/Music.xml", encoding="utf-8") as f:
|
|
troot = ET.fromstring(f.read())
|
|
|
|
song_id = int(troot.find("name").find("id").text)
|
|
title = troot.find("name").find("str").text
|
|
artist = troot.find("artistName").find("str").text
|
|
genre = troot.find("genreName").find("str").text
|
|
bpm = int(troot.find("bpm").text)
|
|
added_ver = troot.find("AddVersion").find("str").text
|
|
|
|
note_data = troot.find("notesData").findall("Notes")
|
|
|
|
for dif in note_data:
|
|
path = dif.find("file").find("path").text
|
|
if path is not None:
|
|
if os.path.exists(f"{root}/{dir}/{path}"):
|
|
chart_id = int(path.split(".")[0].split("_")[1])
|
|
diff_num = float(
|
|
f"{dif.find('level').text}.{dif.find('levelDecimal').text}"
|
|
)
|
|
note_designer = (
|
|
dif.find("notesDesigner").find("str").text
|
|
)
|
|
|
|
await self.data.static.put_game_music(
|
|
self.version,
|
|
song_id,
|
|
chart_id,
|
|
title,
|
|
artist,
|
|
genre,
|
|
bpm,
|
|
added_ver,
|
|
diff_num,
|
|
note_designer,
|
|
)
|
|
|
|
self.logger.info(
|
|
f"Added music id {song_id} chart {chart_id}"
|
|
)
|
|
|
|
async def read_tickets(self, base_dir: str) -> None:
|
|
self.logger.info(f"Reading tickets from {base_dir}...")
|
|
|
|
for root, dirs, files in os.walk(base_dir):
|
|
for dir in dirs:
|
|
if os.path.exists(f"{root}/{dir}/Ticket.xml"):
|
|
with open(f"{root}/{dir}/Ticket.xml", encoding="utf-8") as f:
|
|
troot = ET.fromstring(f.read())
|
|
|
|
name = troot.find("name").find("str").text
|
|
id = int(troot.find("name").find("id").text)
|
|
ticket_type = int(troot.find("ticketKind").find("id").text)
|
|
price = int(troot.find("creditNum").text)
|
|
|
|
await self.data.static.put_game_ticket(
|
|
self.version, id, ticket_type, price, name
|
|
)
|
|
self.logger.info(f"Added ticket {id}...")
|
|
|
|
async def read_old_events(self, events: Optional[List[Dict[str, str]]]) -> None:
|
|
if events is None:
|
|
return
|
|
|
|
for event in events:
|
|
evt_id = int(event.get('イベントID', '0'))
|
|
evt_expire_time = float(event.get('オフ時強制時期', '0.0'))
|
|
is_exp = bool(int(event.get('海外許可', '0')))
|
|
is_aou = bool(int(event.get('AOU許可', '0')))
|
|
name = event.get('comment', f'evt_{evt_id}')
|
|
|
|
await self.data.static.put_game_event(self.version, 0, evt_id, name)
|
|
|
|
if not (is_exp or is_aou):
|
|
await self.data.static.toggle_game_event(self.version, evt_id, False)
|
|
|
|
async def read_old_music(self, scores: Optional[List[Dict[str, str]]], text: Optional[List[Dict[str, str]]]) -> None:
|
|
if scores is None or text is None:
|
|
return
|
|
# TODO
|