zaphkito
91f49f52cc
Reviewed-on: #105 Co-authored-by: zaphkito <zaphkito@noreply.gitea.tendokyu.moe> Co-committed-by: zaphkito <zaphkito@noreply.gitea.tendokyu.moe>
332 lines
14 KiB
Python
332 lines
14 KiB
Python
from decimal import Decimal
|
|
import logging
|
|
import os
|
|
import re
|
|
import csv
|
|
import xml.etree.ElementTree as ET
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from read import BaseReader
|
|
from core.config import CoreConfig
|
|
from titles.ongeki.database import OngekiData
|
|
from titles.cm.const import CardMakerConstants
|
|
from titles.ongeki.const import OngekiConstants
|
|
from titles.ongeki.config import OngekiConfig
|
|
from titles.mai2.database import Mai2Data
|
|
from titles.mai2.const import Mai2Constants
|
|
from titles.chuni.database import ChuniData
|
|
from titles.chuni.const import ChuniConstants
|
|
|
|
|
|
class CardMakerReader(BaseReader):
|
|
def __init__(
|
|
self,
|
|
config: CoreConfig,
|
|
version: int,
|
|
bin_dir: Optional[str],
|
|
opt_dir: Optional[str],
|
|
extra: Optional[str],
|
|
) -> None:
|
|
super().__init__(config, version, bin_dir, opt_dir, extra)
|
|
self.ongeki_data = OngekiData(config)
|
|
self.mai2_data = Mai2Data(config)
|
|
self.chuni_data = ChuniData(config)
|
|
|
|
try:
|
|
self.logger.info(
|
|
f"Start importer for {CardMakerConstants.game_ver_to_string(version)}"
|
|
)
|
|
except IndexError:
|
|
self.logger.error(f"Invalid Card Maker version {version}")
|
|
exit(1)
|
|
|
|
def _get_card_maker_directory(self, directory: str) -> str:
|
|
for root, dirs, files in os.walk(directory):
|
|
for dir in dirs:
|
|
if (
|
|
os.path.exists(f"{root}/{dir}/MU3")
|
|
and os.path.exists(f"{root}/{dir}/MAI")
|
|
and os.path.exists(f"{root}/{dir}/CHU")
|
|
):
|
|
return f"{root}/{dir}"
|
|
|
|
async def read(self) -> None:
|
|
static_datas = {
|
|
"static_gachas.csv": "read_ongeki_gacha_csv",
|
|
"static_gacha_cards.csv": "read_ongeki_gacha_card_csv",
|
|
}
|
|
|
|
if self.bin_dir is not None:
|
|
data_dir = self._get_card_maker_directory(self.bin_dir)
|
|
|
|
await self.read_chuni_card(f"{data_dir}/CHU/Data/A000/card")
|
|
await self.read_chuni_gacha(f"{data_dir}/CHU/Data/A000/gacha")
|
|
|
|
await self.read_mai2_card(f"{data_dir}/MAI/Data/A000/card")
|
|
for file, func in static_datas.items():
|
|
if os.path.exists(f"{self.bin_dir}/MU3/{file}"):
|
|
read_csv = getattr(CardMakerReader, func)
|
|
await read_csv(self, f"{self.bin_dir}/MU3/{file}")
|
|
else:
|
|
self.logger.warning(
|
|
f"Couldn't find {file} file in {self.bin_dir}, skipping"
|
|
)
|
|
|
|
if self.opt_dir is not None:
|
|
data_dirs = self.get_data_directories(self.opt_dir)
|
|
|
|
# ONGEKI (MU3) cnnot easily access the bin data(A000.pac)
|
|
# so only opt_dir will work for now
|
|
for dir in data_dirs:
|
|
await self.read_chuni_card(f"{dir}/CHU/card")
|
|
await self.read_chuni_gacha(f"{dir}/CHU/gacha")
|
|
await self.read_mai2_card(f"{dir}/MAI/card")
|
|
await self.read_ongeki_gacha(f"{dir}/MU3/gacha")
|
|
|
|
async def read_chuni_card(self, base_dir: str) -> None:
|
|
self.logger.info(f"Reading cards from {base_dir}...")
|
|
|
|
version_ids = {
|
|
"v2_00": ChuniConstants.VER_CHUNITHM_NEW,
|
|
"v2_05": ChuniConstants.VER_CHUNITHM_NEW_PLUS,
|
|
"v2_10": ChuniConstants.VER_CHUNITHM_SUN,
|
|
}
|
|
|
|
for root, dirs, files in os.walk(base_dir):
|
|
for dir in dirs:
|
|
if os.path.exists(f"{root}/{dir}/Card.xml"):
|
|
with open(f"{root}/{dir}/Card.xml", "r", encoding="utf-8") as f:
|
|
troot = ET.fromstring(f.read())
|
|
|
|
card_id = int(troot.find("name").find("id").text)
|
|
|
|
chara_name = troot.find("chuniCharaName").find("str").text
|
|
chara_id = troot.find("chuniCharaName").find("id").text
|
|
version = version_ids[
|
|
troot.find("netOpenName").find("str").text[:5]
|
|
]
|
|
present_name = troot.find("chuniPresentName").find("str").text
|
|
rarity = int(troot.find("rareType").text)
|
|
label = int(troot.find("labelType").text)
|
|
dif = int(troot.find("difType").text)
|
|
miss = int(troot.find("miss").text)
|
|
combo = int(troot.find("combo").text)
|
|
chain = int(troot.find("chain").text)
|
|
skill_name = troot.find("skillName").text
|
|
|
|
await self.chuni_data.static.put_card(
|
|
version,
|
|
card_id,
|
|
charaName=chara_name,
|
|
charaId=chara_id,
|
|
presentName=present_name,
|
|
rarity=rarity,
|
|
labelType=label,
|
|
difType=dif,
|
|
miss=miss,
|
|
combo=combo,
|
|
chain=chain,
|
|
skillName=skill_name,
|
|
)
|
|
|
|
self.logger.info(f"Added chuni card {card_id}")
|
|
|
|
async def read_chuni_gacha(self, base_dir: str) -> None:
|
|
self.logger.info(f"Reading gachas from {base_dir}...")
|
|
|
|
version_ids = {
|
|
"v2_00": ChuniConstants.VER_CHUNITHM_NEW,
|
|
"v2_05": ChuniConstants.VER_CHUNITHM_NEW_PLUS,
|
|
"v2_10": ChuniConstants.VER_CHUNITHM_SUN,
|
|
}
|
|
|
|
for root, dirs, files in os.walk(base_dir):
|
|
for dir in dirs:
|
|
if os.path.exists(f"{root}/{dir}/Gacha.xml"):
|
|
with open(f"{root}/{dir}/Gacha.xml", "r", encoding="utf-8") as f:
|
|
troot = ET.fromstring(f.read())
|
|
|
|
name = troot.find("gachaName").text
|
|
gacha_id = int(troot.find("name").find("id").text)
|
|
|
|
version = version_ids[
|
|
troot.find("netOpenName").find("str").text[:5]
|
|
]
|
|
ceiling_cnt = int(troot.find("ceilingNum").text)
|
|
gacha_type = int(troot.find("gachaType").text)
|
|
is_ceiling = (
|
|
True if troot.find("ceilingType").text == "1" else False
|
|
)
|
|
|
|
await self.chuni_data.static.put_gacha(
|
|
version,
|
|
gacha_id,
|
|
name,
|
|
type=gacha_type,
|
|
isCeiling=is_ceiling,
|
|
ceilingCnt=ceiling_cnt,
|
|
)
|
|
|
|
self.logger.info(f"Added chuni gacha {gacha_id}")
|
|
|
|
for gacha_card in troot.find("infos").iter("GachaCardDataInfo"):
|
|
# get the card ID from the id element
|
|
card_id = gacha_card.find("cardName").find("id").text
|
|
|
|
# get the weight from the weight element
|
|
weight = int(gacha_card.find("weight").text)
|
|
|
|
# get the pickup flag from the pickup element
|
|
is_pickup = (
|
|
True if gacha_card.find("pickup").text == "1" else False
|
|
)
|
|
|
|
await self.chuni_data.static.put_gacha_card(
|
|
gacha_id,
|
|
card_id,
|
|
weight=weight,
|
|
rarity=2,
|
|
isPickup=is_pickup,
|
|
)
|
|
|
|
self.logger.info(
|
|
f"Added chuni card {card_id} to gacha {gacha_id}"
|
|
)
|
|
|
|
async def read_mai2_card(self, base_dir: str) -> None:
|
|
self.logger.info(f"Reading cards from {base_dir}...")
|
|
|
|
version_ids = {
|
|
"1.00": Mai2Constants.VER_MAIMAI_DX,
|
|
"1.05": Mai2Constants.VER_MAIMAI_DX_PLUS,
|
|
"1.09": Mai2Constants.VER_MAIMAI_DX_PLUS,
|
|
"1.10": Mai2Constants.VER_MAIMAI_DX_SPLASH,
|
|
"1.15": Mai2Constants.VER_MAIMAI_DX_SPLASH_PLUS,
|
|
"1.20": Mai2Constants.VER_MAIMAI_DX_UNIVERSE,
|
|
"1.25": Mai2Constants.VER_MAIMAI_DX_UNIVERSE_PLUS,
|
|
"1.30": Mai2Constants.VER_MAIMAI_DX_FESTIVAL,
|
|
"1.35": Mai2Constants.VER_MAIMAI_DX_FESTIVAL_PLUS,
|
|
}
|
|
|
|
for root, dirs, files in os.walk(base_dir):
|
|
for dir in dirs:
|
|
if os.path.exists(f"{root}/{dir}/Card.xml"):
|
|
with open(f"{root}/{dir}/Card.xml", "r", encoding="utf-8") as f:
|
|
troot = ET.fromstring(f.read())
|
|
|
|
name = troot.find("name").find("str").text
|
|
card_id = int(troot.find("name").find("id").text)
|
|
|
|
version = version_ids[
|
|
troot.find("enableVersion").find("str").text
|
|
]
|
|
|
|
enabled = (
|
|
True if troot.find("disable").text == "false" else False
|
|
)
|
|
|
|
# check if a date is part of the name and disable the
|
|
# card if it is
|
|
enabled = (
|
|
False if re.search(r"\d{2}/\d{2}/\d{2}", name) else enabled
|
|
)
|
|
|
|
await self.mai2_data.static.put_card(
|
|
version, card_id, name, enabled=enabled
|
|
)
|
|
self.logger.info(f"Added mai2 card {card_id}")
|
|
|
|
async def read_ongeki_gacha_csv(self, file_path: str) -> None:
|
|
self.logger.info(f"Reading gachas from {file_path}...")
|
|
|
|
with open(file_path, encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
await self.ongeki_data.static.put_gacha(
|
|
row["version"],
|
|
row["gachaId"],
|
|
row["gachaName"],
|
|
row["kind"],
|
|
type=row["type"],
|
|
isCeiling=True if row["isCeiling"] == "1" else False,
|
|
maxSelectPoint=row["maxSelectPoint"],
|
|
)
|
|
|
|
self.logger.info(f"Added ongeki gacha {row['gachaId']}")
|
|
|
|
async def read_ongeki_gacha_card_csv(self, file_path: str) -> None:
|
|
self.logger.info(f"Reading gacha cards from {file_path}...")
|
|
|
|
with open(file_path, encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
await self.ongeki_data.static.put_gacha_card(
|
|
row["gachaId"],
|
|
row["cardId"],
|
|
rarity=row["rarity"],
|
|
weight=row["weight"],
|
|
isPickup=True if row["isPickup"] == "1" else False,
|
|
isSelect=True if row["isSelect"] == "1" else False,
|
|
)
|
|
|
|
self.logger.info(f"Added ongeki card {row['cardId']} to gacha")
|
|
|
|
async def read_ongeki_gacha(self, base_dir: str) -> None:
|
|
self.logger.info(f"Reading gachas from {base_dir}...")
|
|
|
|
# assuming some GachaKinds based on the GachaType
|
|
type_to_kind = {
|
|
"Normal": "Normal",
|
|
"Pickup": "Pickup",
|
|
"RecoverFiveShotFlag": "BonusRestored",
|
|
"Free": "Free",
|
|
"FreeSR": "Free",
|
|
}
|
|
|
|
for root, dirs, files in os.walk(base_dir):
|
|
for dir in dirs:
|
|
if os.path.exists(f"{root}/{dir}/Gacha.xml"):
|
|
with open(f"{root}/{dir}/Gacha.xml", "r", encoding="utf-8") as f:
|
|
troot = ET.fromstring(f.read())
|
|
|
|
name = troot.find("Name").find("str").text
|
|
gacha_id = int(troot.find("Name").find("id").text)
|
|
|
|
# skip already existing gachas
|
|
if (
|
|
await self.ongeki_data.static.get_gacha(
|
|
OngekiConstants.VER_ONGEKI_BRIGHT_MEMORY, gacha_id
|
|
)
|
|
is not None
|
|
):
|
|
self.logger.info(
|
|
f"Gacha {gacha_id} already added, skipping"
|
|
)
|
|
continue
|
|
|
|
# 1140 is the first bright memory gacha
|
|
if gacha_id < 1140:
|
|
version = OngekiConstants.VER_ONGEKI_BRIGHT
|
|
else:
|
|
version = OngekiConstants.VER_ONGEKI_BRIGHT_MEMORY
|
|
|
|
gacha_kind = OngekiConstants.CM_GACHA_KINDS[
|
|
type_to_kind[troot.find("Type").text]
|
|
].value
|
|
|
|
# hardcode which gachas get "Select Gacha" with 33 points
|
|
is_ceiling, max_select_point = 0, 0
|
|
if gacha_id in {1163, 1164, 1165, 1166, 1167, 1168}:
|
|
is_ceiling = 1
|
|
max_select_point = 33
|
|
|
|
await self.ongeki_data.static.put_gacha(
|
|
version,
|
|
gacha_id,
|
|
name,
|
|
gacha_kind,
|
|
isCeiling=is_ceiling,
|
|
maxSelectPoint=max_select_point,
|
|
)
|
|
self.logger.info(f"Added ongeki gacha {gacha_id}")
|