1
0
Fork 0

database: fix autoupdate

This commit is contained in:
Hay1tsme 2023-04-15 01:31:52 -04:00
parent 4419310086
commit 9895068125
2 changed files with 46 additions and 19 deletions

View File

@ -1,5 +1,5 @@
import logging, coloredlogs
from typing import Optional
from typing import Optional, Dict, List
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import create_engine
@ -137,14 +137,13 @@ class Data:
titles = Utils.get_all_titles()
for folder, mod in titles.items():
for game_ in mod.game_codes:
if not game_ == game: continue
if hasattr(mod, "current_schema_version"):
version = mod.current_schema_version
else:
self.logger.warn(f"current_schema_version not found for {folder}")
if not mod.game_codes[0] == game: continue
if hasattr(mod, "current_schema_version"):
version = mod.current_schema_version
else:
self.logger.warn(f"current_schema_version not found for {folder}")
else:
version = self.current_schema_version
@ -282,17 +281,45 @@ class Data:
self.user.delete_user(user["id"])
def autoupgrade(self) -> None:
all_games = self.base.get_all_schema_vers()
if all_games is None:
all_game_versions = self.base.get_all_schema_vers()
if all_game_versions is None:
self.logger.warn("Failed to get schema versions")
return
for x in all_games:
print(all_game_versions)
all_games = Utils.get_all_titles()
all_games_list: Dict[str, int] = {}
for _, mod in all_games.items():
if hasattr(mod, "current_schema_version"):
all_games_list[mod.game_codes[0]] = mod.current_schema_version
for x in all_game_versions:
game = x["game"].upper()
update_ver = 1
for y in range(2, 100):
update_ver = int(x["version"])
latest_ver = all_games_list.get(game, 1)
if game == "CORE":
latest_ver = self.current_schema_version
if update_ver == latest_ver:
self.logger.info(f"{game} is already latest version")
continue
for y in range(update_ver + 1, latest_ver + 1):
if os.path.exists(f"core/data/schema/versions/{game}_{y}_upgrade.sql"):
update_ver = y
with open(
f"core/data/schema/versions/{game}_{y}_upgrade.sql",
"r",
encoding="utf-8",
) as f:
sql = f.read()
result = self.base.execute(sql)
if result is None:
self.logger.error(f"Error execuing sql script for game {game} v{y}!")
continue
else:
break
self.migrate_database(game, update_ver, "upgrade")
self.logger.warning(f"Could not find script {game}_{y}_upgrade.sql")
self.base.set_schema_ver(latest_ver, game)

View File

@ -47,7 +47,7 @@ class BaseData:
res = None
try:
self.logger.info(f"SQL Execute: {''.join(str(sql).splitlines())} || {opts}")
self.logger.info(f"SQL Execute: {''.join(str(sql).splitlines())}")
res = self.conn.execute(text(sql), opts)
except SQLAlchemyError as e: