Files
artemis/titles/ongeki/read.py
2025-04-08 17:59:19 -04:00

260 lines
12 KiB
Python

import os
import xml.etree.ElementTree as ET
from typing import Optional
from read import BaseReader
from core.config import CoreConfig
from titles.ongeki.database import OngekiData
from titles.ongeki.const import OngekiConstants
class OngekiReader(BaseReader):
def __init__(
self,
config: CoreConfig,
version: int,
bin_dir: Optional[str],
opt_dir: Optional[str],
extra: Optional[str],
) -> None:
super().__init__(config, version, bin_dir, opt_dir, extra)
self.data = OngekiData(config)
try:
self.logger.info(
f"Start importer for {OngekiConstants.game_ver_to_string(version)}"
)
except IndexError:
self.logger.error(f"Invalid ongeki version {version}")
exit(1)
def parse_version(self, troot) -> int:
version_ids = {
"1000": OngekiConstants.VER_ONGEKI,
"1005": OngekiConstants.VER_ONGEKI_PLUS,
"1010": OngekiConstants.VER_ONGEKI_SUMMER,
"1015": OngekiConstants.VER_ONGEKI_SUMMER_PLUS,
"1020": OngekiConstants.VER_ONGEKI_RED,
"1025": OngekiConstants.VER_ONGEKI_RED_PLUS,
"1030": OngekiConstants.VER_ONGEKI_BRIGHT,
"1035": OngekiConstants.VER_ONGEKI_BRIGHT_MEMORY,
"1040": OngekiConstants.VER_ONGEKI_BRIGHT_MEMORY,
"1045": OngekiConstants.VER_ONGEKI_BRIGHT_MEMORY_ACT3,
}
node = troot.find("VersionID").find("id")
if node.text not in version_ids:
self.logger.warning(f"Unknown VersionID {node.text}")
return OngekiConstants.VER_ONGEKI_BRIGHT_MEMORY
return version_ids[node.text]
async def read(self) -> None:
data_dirs = []
if self.bin_dir is not None:
data_dirs += self.get_data_directories(self.bin_dir)
if self.opt_dir is not None:
data_dirs += self.get_data_directories(self.opt_dir)
for dir in data_dirs:
this_opt_id = await self.read_opt_info(dir)
await self.read_events(f"{dir}/event", this_opt_id)
await self.read_music(f"{dir}/music", this_opt_id)
await self.read_card(f"{dir}/card", this_opt_id)
await self.read_reward(f"{dir}/reward", this_opt_id)
async def read_card(self, base_dir: str, opt_id: int = None) -> None:
self.logger.info(f"Reading cards from {base_dir}...")
for root, dirs, files in os.walk(base_dir):
for dir in dirs:
if os.path.exists(f"{root}/{dir}/Card.xml"):
with open(f"{root}/{dir}/Card.xml", "r", encoding="utf-8") as f:
troot = ET.fromstring(f.read())
card_id = int(troot.find("Name").find("id").text)
# skip already existing cards
# Hay1tsme 2025/04/08: What is this for, and why does it only check for BM cards?
if (
await self.data.static.get_card(
OngekiConstants.VER_ONGEKI_BRIGHT_MEMORY, card_id
)
is not None
):
self.logger.info(f"Card {card_id} already added, skipping")
continue
name = troot.find("Name").find("str").text
chara_id = int(troot.find("CharaID").find("id").text)
nick_name = troot.find("NickName").text
school = troot.find("School").find("str").text
attribute = troot.find("Attribute").text
gakunen = troot.find("Gakunen").find("str").text
rarity = OngekiConstants.RARITY_TYPES[
troot.find("Rarity").text
].value
level_param = []
for lvl in troot.find("LevelParam").findall("int"):
level_param.append(lvl.text)
skill_id = int(troot.find("SkillID").find("id").text)
cho_kai_ka_skill_id = int(
troot.find("ChoKaikaSkillID").find("id").text
)
card_number = troot.find("CardNumberString").text
await self.data.static.put_card(
self.parse_version(troot),
card_id,
opt_id,
name=name,
charaId=chara_id,
nickName=nick_name,
school=school,
attribute=attribute,
gakunen=gakunen,
rarity=rarity,
levelParam=",".join(level_param),
skillId=skill_id,
choKaikaSkillId=cho_kai_ka_skill_id,
cardNumber=card_number,
)
self.logger.info(f"Added card {card_id}")
async def read_events(self, base_dir: str, opt_id: int = None) -> None:
self.logger.info(f"Reading events from {base_dir}...")
for root, dirs, files in os.walk(base_dir):
for dir in dirs:
if os.path.exists(f"{root}/{dir}/Event.xml"):
with open(f"{root}/{dir}/Event.xml", "r", encoding="utf-8") as f:
troot = ET.fromstring(f.read())
name = troot.find("Name").find("str").text
id = int(troot.find("Name").find("id").text)
event_type = OngekiConstants.EVT_TYPES[
troot.find("EventType").text
].value
if troot.find("EventType").text == "MissionEvent":
name = (troot.find("Event").find("MissionName").find("str").text)
await self.data.static.put_event(self.version, id, event_type, name, opt_id)
self.logger.info(f"Added event {id}")
async def read_music(self, base_dir: str, opt_id: int = None) -> None:
self.logger.info(f"Reading music from {base_dir}...")
for root, dirs, files in os.walk(base_dir):
for dir in dirs:
if os.path.exists(f"{root}/{dir}/Music.xml"):
strdata = ""
with open(f"{root}/{dir}/Music.xml", "r", encoding="utf-8") as f:
strdata = f.read()
troot = ET.fromstring(strdata)
if root is None:
continue
name = troot.find("Name")
song_id = name.find("id").text
title = name.find("str").text
artist = troot.find("ArtistName").find("str").text
genre = troot.find("Genre").find("str").text
version = self.parse_version(troot)
fumens = troot.find("FumenData")
for fumens_data in fumens.findall("FumenData"):
path = fumens_data.find("FumenFile").find("path").text
if path is None or not os.path.exists(f"{root}/{dir}/{path}"):
continue
chart_id = int(path.split(".")[0].split("_")[1])
level = float(
f"{fumens_data.find('FumenConstIntegerPart').text}.{fumens_data.find('FumenConstFractionalPart').text}"
)
await self.data.static.put_chart(
version, song_id, chart_id, title, artist, genre, level, opt_id
)
self.logger.info(f"Added song {song_id} chart {chart_id}")
async def read_reward(self, base_dir: str, opt_id: int = None) -> None:
self.logger.info(f"Reading rewards from {base_dir}...")
for root, dirs, files in os.walk(base_dir):
for dir in dirs:
if os.path.exists(f"{root}/{dir}/Reward.xml"):
strdata = ""
with open(f"{root}/{dir}/Reward.xml", "r", encoding="utf-8") as f:
strdata = f.read()
troot = ET.fromstring(strdata)
if root is None:
continue
name = troot.find("Name")
rewardId = name.find("id").text
rewardname = name.find("str").text
itemKind = OngekiConstants.REWARD_TYPES[troot.find("ItemType").text].value
itemId = troot.find("RewardItem").find("ItemName").find("id").text
await self.data.static.put_reward(self.version, rewardId, rewardname, itemKind, itemId, opt_id)
self.logger.info(f"Added reward {rewardId}")
async def read_opt_info(self, directory: str) -> Optional[int]:
datacfg_file = os.path.join(directory, "DataConfig.xml")
if not os.path.exists(datacfg_file):
self.logger.warning(f"{datacfg_file} does not contain DataConfig.xml, opt info will not be read")
return None
with open(datacfg_file, encoding="utf-8") as f:
troot = ET.fromstring(f.read())
if troot.find("DataConfig/version") is None:
self.logger.warning(f"{directory}/DataConfig.xml contains no Version section, opt info will not be read")
return None
ver_maj = troot.find("DataConfig/version/major")
ver_min = troot.find("DataConfig/version/minor")
ver_rel = troot.find("DataConfig/version/release")
cm_maj = troot.find("DataConfig/cardMakerVersion/major")
cm_min = troot.find("DataConfig/cardMakerVersion/minor")
cm_rel = troot.find("DataConfig/cardMakerVersion/release")
if ver_maj is None: # Probably not worth checking that the other sections exist
self.logger.warning(f"{datacfg_file} contains no major item in the Version section, opt info will not be read")
return None
if ver_min is None: # Probably not worth checking that the other sections exist
self.logger.warning(f"{datacfg_file} contains no minor item in the Version section, opt info will not be read")
return None
if ver_rel is None: # Probably not worth checking that the other sections exist
self.logger.warning(f"{datacfg_file} contains no release item in the Version section, opt info will not be read")
return None
opt_folder = os.path.basename(os.path.normpath(directory))
opt_id = await self.data.static.get_opt_by_version_folder(self.version, opt_folder)
if not opt_id:
opt_id = await self.data.static.put_opt(self.version, opt_folder, int(ver_rel.text), int(cm_rel.text) if cm_rel else None)
if not opt_id:
self.logger.error(f"Failed to put opt folder info for {opt_folder}")
return None
else:
opt_id = opt_id['id']
self.logger.info(
f"Opt folder {opt_folder} (Database ID {opt_id}) contains v{ver_maj.text}.{ver_min.text}.{ver_rel.text} (cm v{cm_maj.text if cm_maj else 'None'}.{cm_min.text if cm_min else 'None'}.{cm_rel.text if cm_rel else 'None'})"
)
return opt_id