forked from Hay1tsme/artemis
		
	
		
			
				
	
	
		
			304 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			304 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from typing import Optional, Dict, List
 | |
| from os import walk, path
 | |
| import urllib
 | |
| 
 | |
| from read import BaseReader
 | |
| from core.config import CoreConfig
 | |
| from titles.diva.database import DivaData
 | |
| from titles.diva.const import DivaConstants
 | |
| 
 | |
| 
 | |
| class DivaReader(BaseReader):
 | |
|     def __init__(
 | |
|         self,
 | |
|         config: CoreConfig,
 | |
|         version: int,
 | |
|         bin_dir: Optional[str],
 | |
|         opt_dir: Optional[str],
 | |
|         extra: Optional[str],
 | |
|     ) -> None:
 | |
|         super().__init__(config, version, bin_dir, opt_dir, extra)
 | |
|         self.data = DivaData(config)
 | |
| 
 | |
|         try:
 | |
|             self.logger.info(
 | |
|                 f"Start importer for {DivaConstants.game_ver_to_string(version)}"
 | |
|             )
 | |
|         except IndexError:
 | |
|             self.logger.error(f"Invalid project diva version {version}")
 | |
|             exit(1)
 | |
| 
 | |
|     async def read(self) -> None:
 | |
|         pull_bin_ram = True
 | |
|         pull_bin_rom = True
 | |
|         pull_opt_rom = True
 | |
| 
 | |
|         if not path.exists(f"{self.bin_dir}/ram"):
 | |
|             self.logger.warning(f"Couldn't find ram folder in {self.bin_dir}, skipping")
 | |
|             pull_bin_ram = False
 | |
| 
 | |
|         if not path.exists(f"{self.bin_dir}/rom"):
 | |
|             self.logger.warning(f"Couldn't find rom folder in {self.bin_dir}, skipping")
 | |
|             pull_bin_rom = False
 | |
| 
 | |
|         if self.opt_dir is not None:
 | |
|             opt_dirs = self.get_data_directories(self.opt_dir)
 | |
|         else:
 | |
|             pull_opt_rom = False
 | |
|             self.logger.warning("No option directory specified, skipping")
 | |
| 
 | |
|         if pull_bin_ram:
 | |
|             await self.read_ram(f"{self.bin_dir}/ram")
 | |
|         if pull_bin_rom:
 | |
|             await self.read_rom(f"{self.bin_dir}/rom")
 | |
|         if pull_opt_rom:
 | |
|             for dir in opt_dirs:
 | |
|                 await self.read_rom(f"{dir}/rom")
 | |
| 
 | |
|     async def read_ram(self, ram_root_dir: str) -> None:
 | |
|         self.logger.info(f"Read RAM from {ram_root_dir}")
 | |
| 
 | |
|         if path.exists(f"{ram_root_dir}/databank"):
 | |
|             for root, dirs, files in walk(f"{ram_root_dir}/databank"):
 | |
|                 for file in files:
 | |
|                     if (
 | |
|                         file.startswith("ShopCatalog_")
 | |
|                         or file.startswith("CustomizeItemCatalog_")
 | |
|                         or (
 | |
|                             file.startswith("QuestInfo")
 | |
|                             and not file.startswith("QuestInfoTm")
 | |
|                         )
 | |
|                     ):
 | |
|                         with open(f"{root}/{file}", "r") as f:
 | |
|                             file_data: str = urllib.parse.unquote(
 | |
|                                 urllib.parse.unquote(f.read())
 | |
|                             )
 | |
|                             if file_data == "***":
 | |
|                                 self.logger.info(f"{file} is empty, skipping")
 | |
|                                 continue
 | |
| 
 | |
|                             file_lines: List[str] = file_data.split("\n")
 | |
| 
 | |
|                             for line in file_lines:
 | |
|                                 split = line.split(",")
 | |
| 
 | |
|                                 if not split[0]:
 | |
|                                     split.pop(0)
 | |
| 
 | |
|                                 if file.startswith("ShopCatalog_"):
 | |
|                                     for x in range(0, len(split), 7):
 | |
|                                         self.logger.info(
 | |
|                                             f"Added shop item {split[x+0]}"
 | |
|                                         )
 | |
| 
 | |
|                                         await self.data.static.put_shop(
 | |
|                                             self.version,
 | |
|                                             split[x + 0],
 | |
|                                             split[x + 2],
 | |
|                                             split[x + 6],
 | |
|                                             split[x + 3],
 | |
|                                             split[x + 1],
 | |
|                                             split[x + 4],
 | |
|                                             split[x + 5],
 | |
|                                         )
 | |
| 
 | |
|                                 elif (
 | |
|                                     file.startswith("CustomizeItemCatalog_")
 | |
|                                     and len(split) >= 7
 | |
|                                 ):
 | |
|                                     for x in range(0, len(split), 7):
 | |
|                                         self.logger.info(f"Added item {split[x+0]}")
 | |
| 
 | |
|                                         await self.data.static.put_items(
 | |
|                                             self.version,
 | |
|                                             split[x + 0],
 | |
|                                             split[x + 2],
 | |
|                                             split[x + 6],
 | |
|                                             split[x + 3],
 | |
|                                             split[x + 1],
 | |
|                                             split[x + 4],
 | |
|                                             split[x + 5],
 | |
|                                         )
 | |
| 
 | |
|                                 elif file.startswith("QuestInfo") and len(split) >= 9:
 | |
|                                     self.logger.info(f"Added quest {split[0]}")
 | |
| 
 | |
|                                     await self.data.static.put_quests(
 | |
|                                         self.version,
 | |
|                                         split[0],
 | |
|                                         split[6],
 | |
|                                         split[2],
 | |
|                                         split[3],
 | |
|                                         split[7],
 | |
|                                         split[8],
 | |
|                                         split[1],
 | |
|                                         split[4],
 | |
|                                         split[5],
 | |
|                                     )
 | |
| 
 | |
|                                 else:
 | |
|                                     continue
 | |
|         else:
 | |
|             self.logger.warning(f"Databank folder not found in {ram_root_dir}, skipping")
 | |
| 
 | |
|     async def read_rom(self, rom_root_dir: str) -> None:
 | |
|         self.logger.info(f"Read ROM from {rom_root_dir}")
 | |
|         pv_list: Dict[str, Dict] = {}
 | |
| 
 | |
|         if path.exists(f"{rom_root_dir}/mdata_pv_db.txt"):
 | |
|             file_path = f"{rom_root_dir}/mdata_pv_db.txt"
 | |
|         elif path.exists(f"{rom_root_dir}/pv_db.txt"):
 | |
|             file_path = f"{rom_root_dir}/pv_db.txt"
 | |
|         else:
 | |
|             self.logger.warning(
 | |
|                 f"Cannot find pv_db.txt or mdata_pv_db.txt in {rom_root_dir}, skipping"
 | |
|             )
 | |
|             return
 | |
| 
 | |
|         with open(file_path, "r", encoding="utf-8") as f:
 | |
|             for line in f.readlines():
 | |
|                 if line.startswith("#") or not line:
 | |
|                     continue
 | |
| 
 | |
|                 line_split = line.split("=")
 | |
|                 if len(line_split) != 2:
 | |
|                     continue
 | |
| 
 | |
|                 key = line_split[0]
 | |
|                 val = line_split[1]
 | |
|                 if val.endswith("\n"):
 | |
|                     val = val[:-1]
 | |
| 
 | |
|                 key_split = key.split(".")
 | |
|                 pv_id = key_split[0]
 | |
|                 key_args = []
 | |
| 
 | |
|                 for x in range(1, len(key_split)):
 | |
|                     key_args.append(key_split[x])
 | |
| 
 | |
|                 try:
 | |
|                     pv_list[pv_id] = self.add_branch(pv_list[pv_id], key_args, val)
 | |
|                 except KeyError:
 | |
|                     pv_list[pv_id] = {}
 | |
|                     pv_list[pv_id] = self.add_branch(pv_list[pv_id], key_args, val)
 | |
| 
 | |
|         for pv_id, pv_data in pv_list.items():
 | |
|             song_id = int(pv_id.split("_")[1])
 | |
|             if "songinfo" not in pv_data:
 | |
|                 continue
 | |
|             if "illustrator" not in pv_data["songinfo"]:
 | |
|                 pv_data["songinfo"]["illustrator"] = "-"
 | |
|             if "arranger" not in pv_data["songinfo"]:
 | |
|                 pv_data["songinfo"]["arranger"] = "-"
 | |
|             if "lyrics" not in pv_data["songinfo"]:
 | |
|                 pv_data["songinfo"]["lyrics"] = "-"
 | |
|             if "music" not in pv_data["songinfo"]:
 | |
|                 pv_data["songinfo"]["music"] = "-"
 | |
| 
 | |
|             if "easy" in pv_data["difficulty"] and "0" in pv_data["difficulty"]["easy"]:
 | |
|                 diff = pv_data["difficulty"]["easy"]["0"]["level"].split("_")
 | |
|                 self.logger.info(f"Added song {song_id} chart 0")
 | |
| 
 | |
|                 await self.data.static.put_music(
 | |
|                     self.version,
 | |
|                     song_id,
 | |
|                     0,
 | |
|                     pv_data["song_name"],
 | |
|                     pv_data["songinfo"]["arranger"],
 | |
|                     pv_data["songinfo"]["illustrator"],
 | |
|                     pv_data["songinfo"]["lyrics"],
 | |
|                     pv_data["songinfo"]["music"],
 | |
|                     float(f"{diff[2]}.{diff[3]}"),
 | |
|                     pv_data["bpm"],
 | |
|                     pv_data["date"],
 | |
|                 )
 | |
| 
 | |
|             if (
 | |
|                 "normal" in pv_data["difficulty"]
 | |
|                 and "0" in pv_data["difficulty"]["normal"]
 | |
|             ):
 | |
|                 diff = pv_data["difficulty"]["normal"]["0"]["level"].split("_")
 | |
|                 self.logger.info(f"Added song {song_id} chart 1")
 | |
| 
 | |
|                 await self.data.static.put_music(
 | |
|                     self.version,
 | |
|                     song_id,
 | |
|                     1,
 | |
|                     pv_data["song_name"],
 | |
|                     pv_data["songinfo"]["arranger"],
 | |
|                     pv_data["songinfo"]["illustrator"],
 | |
|                     pv_data["songinfo"]["lyrics"],
 | |
|                     pv_data["songinfo"]["music"],
 | |
|                     float(f"{diff[2]}.{diff[3]}"),
 | |
|                     pv_data["bpm"],
 | |
|                     pv_data["date"],
 | |
|                 )
 | |
| 
 | |
|             if "hard" in pv_data["difficulty"] and "0" in pv_data["difficulty"]["hard"]:
 | |
|                 diff = pv_data["difficulty"]["hard"]["0"]["level"].split("_")
 | |
|                 self.logger.info(f"Added song {song_id} chart 2")
 | |
| 
 | |
|                 await self.data.static.put_music(
 | |
|                     self.version,
 | |
|                     song_id,
 | |
|                     2,
 | |
|                     pv_data["song_name"],
 | |
|                     pv_data["songinfo"]["arranger"],
 | |
|                     pv_data["songinfo"]["illustrator"],
 | |
|                     pv_data["songinfo"]["lyrics"],
 | |
|                     pv_data["songinfo"]["music"],
 | |
|                     float(f"{diff[2]}.{diff[3]}"),
 | |
|                     pv_data["bpm"],
 | |
|                     pv_data["date"],
 | |
|                 )
 | |
| 
 | |
|             if "extreme" in pv_data["difficulty"]:
 | |
|                 if "0" in pv_data["difficulty"]["extreme"]:
 | |
|                     diff = pv_data["difficulty"]["extreme"]["0"]["level"].split("_")
 | |
|                     self.logger.info(f"Added song {song_id} chart 3")
 | |
| 
 | |
|                     await self.data.static.put_music(
 | |
|                         self.version,
 | |
|                         song_id,
 | |
|                         3,
 | |
|                         pv_data["song_name"],
 | |
|                         pv_data["songinfo"]["arranger"],
 | |
|                         pv_data["songinfo"]["illustrator"],
 | |
|                         pv_data["songinfo"]["lyrics"],
 | |
|                         pv_data["songinfo"]["music"],
 | |
|                         float(f"{diff[2]}.{diff[3]}"),
 | |
|                         pv_data["bpm"],
 | |
|                         pv_data["date"],
 | |
|                     )
 | |
| 
 | |
|                 if "1" in pv_data["difficulty"]["extreme"]:
 | |
|                     diff = pv_data["difficulty"]["extreme"]["1"]["level"].split("_")
 | |
|                     self.logger.info(f"Added song {song_id} chart 4")
 | |
| 
 | |
|                     await self.data.static.put_music(
 | |
|                         self.version,
 | |
|                         song_id,
 | |
|                         4,
 | |
|                         pv_data["song_name"],
 | |
|                         pv_data["songinfo"]["arranger"],
 | |
|                         pv_data["songinfo"]["illustrator"],
 | |
|                         pv_data["songinfo"]["lyrics"],
 | |
|                         pv_data["songinfo"]["music"],
 | |
|                         float(f"{diff[2]}.{diff[3]}"),
 | |
|                         pv_data["bpm"],
 | |
|                         pv_data["date"],
 | |
|                     )
 | |
| 
 | |
|     def add_branch(self, tree: Dict, vector: List, value: str):
 | |
|         """
 | |
|         Recursivly adds nodes to a dictionary
 | |
|         Author: iJames on StackOverflow
 | |
|         """
 | |
|         key = vector[0]
 | |
|         tree[key] = (
 | |
|             value
 | |
|             if len(vector) == 1
 | |
|             else self.add_branch(tree.get(key, {}), vector[1:], value)
 | |
|         )
 | |
|         return tree
 |