3
2
forked from Dniel97/artemis

database: add create-owner, migrate-card, and cleanup commands

This commit is contained in:
Hay1tsme 2023-03-04 00:04:47 -05:00
parent 3181e1f4f8
commit f5d4f519d3
4 changed files with 127 additions and 20 deletions

View File

@ -1,12 +1,12 @@
import logging, coloredlogs import logging, coloredlogs
from typing import Any, Dict, List from typing import Optional
from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import create_engine from sqlalchemy import create_engine
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
from datetime import datetime import importlib, os
import importlib, os, json import secrets, string
import bcrypt
from hashlib import sha256 from hashlib import sha256
from core.config import CoreConfig from core.config import CoreConfig
@ -138,3 +138,61 @@ class Data:
return None return None
self.logger.info(f"Successfully migrated {game} to schema version {version}") self.logger.info(f"Successfully migrated {game} to schema version {version}")
def create_owner(self, email: Optional[str] = None) -> None:
pw = ''.join(secrets.choice(string.ascii_letters + string.digits) for i in range(20))
hash = bcrypt.hashpw(pw.encode(), bcrypt.gensalt())
user_id = self.user.create_user(email=email, permission=255, password=hash)
if user_id is None:
self.logger.error(f"Failed to create owner with email {email}")
return
card_id = self.card.create_card(user_id, "00000000000000000000")
if card_id is None:
self.logger.error(f"Failed to create card for owner with id {user_id}")
return
self.logger.warn(f"Successfully created owner with email {email}, access code 00000000000000000000, and password {pw} Make sure to change this password and assign a real card ASAP!")
def migrate_card(self, old_ac: str, new_ac: str, should_force: bool) -> None:
if old_ac == new_ac:
self.logger.error("Both access codes are the same!")
return
new_card = self.card.get_card_by_access_code(new_ac)
if new_card is None:
self.card.update_access_code(old_ac, new_ac)
return
if not should_force:
self.logger.warn(f"Card already exists for access code {new_ac} (id {new_card['id']}). If you wish to continue, rerun with the '--force' flag."\
f" All exiting data on the target card {new_ac} will be perminently erased and replaced with data from card {old_ac}.")
return
self.logger.info(f"All exiting data on the target card {new_ac} will be perminently erased and replaced with data from card {old_ac}.")
self.card.delete_card(new_card["id"])
self.card.update_access_code(old_ac, new_ac)
hanging_user = self.user.get_user(new_card["user"])
if hanging_user["password"] is None:
self.logger.info(f"Delete hanging user {hanging_user['id']}")
self.user.delete_user(hanging_user['id'])
def delete_hanging_users(self) -> None:
"""
Finds and deletes users that have not registered for the webui that have no cards assocated with them.
"""
unreg_users = self.user.get_unregistered_users()
if unreg_users is None:
self.logger.error("Error occoured finding unregistered users")
for user in unreg_users:
cards = self.card.get_user_cards(user['id'])
if cards is None:
self.logger.error(f"Error getting cards for user {user['id']}")
continue
if not cards:
self.logger.info(f"Delete hanging user {user['id']}")
self.user.delete_user(user['id'])

View File

@ -3,6 +3,7 @@ from sqlalchemy import Table, Column, UniqueConstraint
from sqlalchemy.types import Integer, String, Boolean, TIMESTAMP from sqlalchemy.types import Integer, String, Boolean, TIMESTAMP
from sqlalchemy.sql.schema import ForeignKey from sqlalchemy.sql.schema import ForeignKey
from sqlalchemy.sql import func from sqlalchemy.sql import func
from sqlalchemy.engine import Row
from core.data.schema.base import BaseData, metadata from core.data.schema.base import BaseData, metadata
@ -21,21 +22,44 @@ aime_card = Table(
) )
class CardData(BaseData): class CardData(BaseData):
def get_user_id_from_card(self, access_code: str) -> Optional[int]: def get_card_by_access_code(self, access_code: str) -> Optional[Row]:
"""
Given a 20 digit access code as a string, get the user id associated with that card
"""
sql = aime_card.select(aime_card.c.access_code == access_code) sql = aime_card.select(aime_card.c.access_code == access_code)
result = self.execute(sql) result = self.execute(sql)
if result is None: return None if result is None: return None
return result.fetchone()
card = result.fetchone() def get_card_by_id(self, card_id: int) -> Optional[Row]:
sql = aime_card.select(aime_card.c.id == card_id)
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def update_access_code(self, old_ac: str, new_ac: str) -> None:
sql = aime_card.update(aime_card.c.access_code == old_ac).values(access_code = new_ac)
result = self.execute(sql)
if result is None:
self.logger.error(f"Failed to change card access code from {old_ac} to {new_ac}")
def get_user_id_from_card(self, access_code: str) -> Optional[int]:
"""
Given a 20 digit access code as a string, get the user id associated with that card
"""
card = self.get_card_by_access_code(access_code)
if card is None: return None if card is None: return None
return int(card["user"]) return int(card["user"])
def get_user_cards(self, aime_id: int) -> Optional[List[Dict]]: def delete_card(self, card_id: int) -> None:
sql = aime_card.delete(aime_card.c.id == card_id)
result = self.execute(sql)
if result is None:
self.logger.error(f"Failed to delete card with id {card_id}")
def get_user_cards(self, aime_id: int) -> Optional[List[Row]]:
""" """
Returns all cards owned by a user Returns all cards owned by a user
""" """

View File

@ -1,13 +1,10 @@
from enum import Enum from enum import Enum
from typing import Dict, Optional from typing import Optional, List
from sqlalchemy import Table, Column, and_ from sqlalchemy import Table, Column
from sqlalchemy.types import Integer, String, TIMESTAMP from sqlalchemy.types import Integer, String, TIMESTAMP
from sqlalchemy.sql.schema import ForeignKey
from sqlalchemy.sql import func from sqlalchemy.sql import func
from sqlalchemy.dialects.mysql import insert from sqlalchemy.dialects.mysql import insert
from sqlalchemy.sql import func, select, Delete from sqlalchemy.sql import func, select
from uuid import uuid4
from datetime import datetime, timedelta
from sqlalchemy.engine import Row from sqlalchemy.engine import Row
import bcrypt import bcrypt
@ -34,9 +31,6 @@ class PermissionBits(Enum):
class UserData(BaseData): class UserData(BaseData):
def create_user(self, id: int = None, username: str = None, email: str = None, password: str = None, permission: int = 1) -> Optional[int]: def create_user(self, id: int = None, username: str = None, email: str = None, password: str = None, permission: int = 1) -> Optional[int]:
if email is None:
permission = 1
if id is None: if id is None:
sql = insert(aime_user).values( sql = insert(aime_user).values(
username=username, username=username,
@ -83,3 +77,21 @@ class UserData(BaseData):
# ALTER TABLE isn't in sqlalchemy so we do this the ugly way # ALTER TABLE isn't in sqlalchemy so we do this the ugly way
sql = f"ALTER TABLE aime_user AUTO_INCREMENT={ai_value}" sql = f"ALTER TABLE aime_user AUTO_INCREMENT={ai_value}"
self.execute(sql) self.execute(sql)
def delete_user(self, user_id: int) -> None:
sql = aime_user.delete(aime_user.c.id == user_id)
result = self.execute(sql)
if result is None:
self.logger.error(f"Failed to delete user with id {user_id}")
def get_unregistered_users(self) -> List[Row]:
"""
Returns a list of users who have not registered with the webui. They may or may not have cards.
"""
sql = select(aime_user).where(aime_user.c.password == None)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()

View File

@ -8,6 +8,10 @@ if __name__=='__main__':
parser.add_argument("--config", "-c", type=str, help="Config folder to use", default="config") parser.add_argument("--config", "-c", type=str, help="Config folder to use", default="config")
parser.add_argument("--version", "-v", type=str, help="Version of the database to upgrade/rollback to") parser.add_argument("--version", "-v", type=str, help="Version of the database to upgrade/rollback to")
parser.add_argument("--game", "-g", type=str, help="Game code of the game who's schema will be updated/rolled back. Ex. SDFE") parser.add_argument("--game", "-g", type=str, help="Game code of the game who's schema will be updated/rolled back. Ex. SDFE")
parser.add_argument("--email", "-e", type=str, help="Email for the new user")
parser.add_argument("--old_ac", "-o", type=str, help="Access code to transfer from")
parser.add_argument("--new_ac", "-n", type=str, help="Access code to transfer to")
parser.add_argument("--force", "-f", type=bool, help="Force the action to happen")
parser.add_argument("action", type=str, help="DB Action, create, recreate, upgrade, or rollback") parser.add_argument("action", type=str, help="DB Action, create, recreate, upgrade, or rollback")
args = parser.parse_args() args = parser.parse_args()
@ -33,4 +37,13 @@ if __name__=='__main__':
else: else:
data.migrate_database(args.game, int(args.version), args.action) data.migrate_database(args.game, int(args.version), args.action)
elif args.action == "create-owner":
data.create_owner(args.email)
elif args.action == "migrate-card":
data.migrate_card(args.old_ac, args.new_ac, args.force)
elif args.action == "cleanup":
data.delete_hanging_users()
data.logger.info("Done") data.logger.info("Done")