artemis/titles/mai2/read.py

344 lines
13 KiB
Python

from decimal import Decimal
import logging
import os
import re
import xml.etree.ElementTree as ET
from typing import Any, Dict, List, Optional
from Crypto.Cipher import AES
import zlib
import codecs
from core.config import CoreConfig
from core.data import Data
from read import BaseReader
from titles.mai2.const import Mai2Constants
from titles.mai2.database import Mai2Data
class Mai2Reader(BaseReader):
def __init__(
self,
config: CoreConfig,
version: int,
bin_dir: Optional[str],
opt_dir: Optional[str],
extra: Optional[str],
) -> None:
super().__init__(config, version, bin_dir, opt_dir, extra)
self.data = Mai2Data(config)
try:
self.logger.info(
f"Start importer for {Mai2Constants.game_ver_to_string(version)}"
)
except IndexError:
self.logger.error(f"Invalid maimai DX version {version}")
exit(1)
async def read(self) -> None:
data_dirs = []
if self.version >= Mai2Constants.VER_MAIMAI_DX:
if self.bin_dir is not None:
data_dirs += self.get_data_directories(self.bin_dir)
if self.opt_dir is not None:
data_dirs += self.get_data_directories(self.opt_dir)
for dir in data_dirs:
self.logger.info(f"Read from {dir}")
await self.get_events(f"{dir}/event")
await self.disable_events(f"{dir}/information", f"{dir}/scoreRanking")
await self.read_music(f"{dir}/music")
await self.read_tickets(f"{dir}/ticket")
else:
if not os.path.exists(f"{self.bin_dir}/tables"):
self.logger.error(f"tables directory not found in {self.bin_dir}")
return
if self.version >= Mai2Constants.VER_MAIMAI_MILK:
if self.extra is None:
self.logger.error("Milk - Finale requre an AES key via a hex string send as the --extra flag")
return
key = bytes.fromhex(self.extra)
else:
key = None
evt_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmEvent.bin", key)
txt_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmtextout_jp.bin", key)
score_table = self.load_table_raw(f"{self.bin_dir}/tables", "mmScore.bin", key)
await self.read_old_events(evt_table)
await self.read_old_music(score_table, txt_table)
if self.opt_dir is not None:
evt_table = self.load_table_raw(f"{self.opt_dir}/tables", "mmEvent.bin", key)
txt_table = self.load_table_raw(f"{self.opt_dir}/tables", "mmtextout_jp.bin", key)
score_table = self.load_table_raw(f"{self.opt_dir}/tables", "mmScore.bin", key)
await self.read_old_events(evt_table)
await self.read_old_music(score_table, txt_table)
return
def load_table_raw(self, dir: str, file: str, key: Optional[bytes]) -> Optional[List[Dict[str, str]]]:
if not os.path.exists(f"{dir}/{file}"):
self.logger.warning(f"file {file} does not exist in directory {dir}, skipping")
return
self.logger.info(f"Load table {file} from {dir}")
if key is not None:
cipher = AES.new(key, AES.MODE_CBC)
with open(f"{dir}/{file}", "rb") as f:
f_encrypted = f.read()
f_data = cipher.decrypt(f_encrypted)[0x10:]
else:
with open(f"{dir}/{file}", "rb") as f:
f_data = f.read()[0x10:]
if f_data is None or not f_data:
self.logger.warning(f"file {dir} could not be read, skipping")
return
f_data_deflate = zlib.decompress(f_data, wbits = zlib.MAX_WBITS | 16)[0x12:] # lop off the junk at the beginning
f_decoded = codecs.utf_16_le_decode(f_data_deflate)[0]
f_split = f_decoded.splitlines()
has_struct_def = "struct " in f_decoded
is_struct = False
struct_def = []
tbl_content = []
if has_struct_def:
for x in f_split:
if x.startswith("struct "):
is_struct = True
struct_name = x[7:-1]
continue
if x.startswith("};"):
is_struct = False
break
if is_struct:
try:
struct_def.append(x[x.rindex(" ") + 2: -1])
except ValueError:
self.logger.warning(f"rindex failed on line {x}")
if is_struct:
self.logger.warning("Struct not formatted properly")
if not struct_def:
self.logger.warning("Struct def not found")
name = file[:file.index(".")]
if "_" in name:
name = name[:file.index("_")]
for x in f_split:
if not x.startswith(name.upper()):
continue
line_match = re.match(r"(\w+)\((.*?)\)([ ]+\/{3}<[ ]+(.*))?", x)
if line_match is None:
continue
if not line_match.group(1) == name.upper():
self.logger.warning(f"Strange regex match for line {x} -> {line_match}")
continue
vals = line_match.group(2)
comment = line_match.group(4)
line_dict = {}
vals_split = vals.split(",")
for y in range(len(vals_split)):
stripped = vals_split[y].strip().lstrip("L\"").lstrip("\"").rstrip("\"")
if not stripped or stripped is None:
continue
if has_struct_def and len(struct_def) > y:
line_dict[struct_def[y]] = stripped
else:
line_dict[f'item_{y}'] = stripped
if comment:
line_dict['comment'] = comment
tbl_content.append(line_dict)
if tbl_content:
return tbl_content
else:
self.logger.warning("Failed load table content, skipping")
return
async def get_events(self, base_dir: str) -> None:
self.logger.info(f"Reading events from {base_dir}...")
for root, dirs, files in os.walk(base_dir):
for dir in dirs:
if os.path.exists(f"{root}/{dir}/Event.xml"):
with open(f"{root}/{dir}/Event.xml", encoding="utf-8") as f:
troot = ET.fromstring(f.read())
name = troot.find("name").find("str").text
id = int(troot.find("name").find("id").text)
event_type = int(troot.find("infoType").text)
await self.data.static.put_game_event(
self.version, event_type, id, name
)
self.logger.info(f"Added event {id}...")
async def disable_events(
self, base_information_dir: str, base_score_ranking_dir: str
) -> None:
self.logger.info(f"Reading disabled events from {base_information_dir}...")
for root, dirs, files in os.walk(base_information_dir):
for dir in dirs:
if os.path.exists(f"{root}/{dir}/Information.xml"):
with open(f"{root}/{dir}/Information.xml", encoding="utf-8") as f:
troot = ET.fromstring(f.read())
event_id = int(troot.find("name").find("id").text)
await self.data.static.toggle_game_event(
self.version, event_id, toggle=False
)
self.logger.info(f"Disabled event {event_id}...")
for root, dirs, files in os.walk(base_score_ranking_dir):
for dir in dirs:
if os.path.exists(f"{root}/{dir}/ScoreRanking.xml"):
with open(f"{root}/{dir}/ScoreRanking.xml", encoding="utf-8") as f:
troot = ET.fromstring(f.read())
event_id = int(troot.find("eventName").find("id").text)
await self.data.static.toggle_game_event(
self.version, event_id, toggle=False
)
self.logger.info(f"Disabled event {event_id}...")
# manually disable events wich are known to be problematic
for event_id in [
1,
10,
220311,
220312,
220313,
220314,
220315,
220316,
220317,
220318,
20121821,
21121651,
22091511,
22091512,
22091513,
22091514,
22091515,
22091516,
22091517,
22091518,
22091519,
]:
await self.data.static.toggle_game_event(self.version, event_id, toggle=False)
self.logger.info(f"Disabled event {event_id}...")
async def read_music(self, base_dir: str) -> None:
self.logger.info(f"Reading music from {base_dir}...")
for root, dirs, files in os.walk(base_dir):
for dir in dirs:
if os.path.exists(f"{root}/{dir}/Music.xml"):
with open(f"{root}/{dir}/Music.xml", encoding="utf-8") as f:
troot = ET.fromstring(f.read())
song_id = int(troot.find("name").find("id").text)
title = troot.find("name").find("str").text
artist = troot.find("artistName").find("str").text
genre = troot.find("genreName").find("str").text
bpm = int(troot.find("bpm").text)
added_ver = troot.find("AddVersion").find("str").text
note_data = troot.find("notesData").findall("Notes")
for dif in note_data:
path = dif.find("file").find("path").text
if path is not None:
if os.path.exists(f"{root}/{dir}/{path}"):
chart_id = int(path.split(".")[0].split("_")[1])
diff_num = float(
f"{dif.find('level').text}.{dif.find('levelDecimal').text}"
)
note_designer = (
dif.find("notesDesigner").find("str").text
)
await self.data.static.put_game_music(
self.version,
song_id,
chart_id,
title,
artist,
genre,
bpm,
added_ver,
diff_num,
note_designer,
)
self.logger.info(
f"Added music id {song_id} chart {chart_id}"
)
async def read_tickets(self, base_dir: str) -> None:
self.logger.info(f"Reading tickets from {base_dir}...")
for root, dirs, files in os.walk(base_dir):
for dir in dirs:
if os.path.exists(f"{root}/{dir}/Ticket.xml"):
with open(f"{root}/{dir}/Ticket.xml", encoding="utf-8") as f:
troot = ET.fromstring(f.read())
name = troot.find("name").find("str").text
id = int(troot.find("name").find("id").text)
ticket_type = int(troot.find("ticketKind").find("id").text)
price = int(troot.find("creditNum").text)
await self.data.static.put_game_ticket(
self.version, id, ticket_type, price, name
)
self.logger.info(f"Added ticket {id}...")
async def read_old_events(self, events: Optional[List[Dict[str, str]]]) -> None:
if events is None:
return
for event in events:
evt_id = int(event.get('イベントID', '0'))
evt_expire_time = float(event.get('オフ時強制時期', '0.0'))
is_exp = bool(int(event.get('海外許可', '0')))
is_aou = bool(int(event.get('AOU許可', '0')))
name = event.get('comment', f'evt_{evt_id}')
await self.data.static.put_game_event(self.version, 0, evt_id, name)
if not (is_exp or is_aou):
await self.data.static.toggle_game_event(self.version, evt_id, False)
async def read_old_music(self, scores: Optional[List[Dict[str, str]]], text: Optional[List[Dict[str, str]]]) -> None:
if scores is None or text is None:
return
# TODO