from enum import Enum from typing import Dict, Optional from sqlalchemy import Table, Column, and_ from sqlalchemy.types import Integer, String, TIMESTAMP from sqlalchemy.sql.schema import ForeignKey from sqlalchemy.sql import func from sqlalchemy.dialects.mysql import insert from sqlalchemy.sql import func, select, Delete from uuid import uuid4 from datetime import datetime, timedelta from sqlalchemy.engine import Row import bcrypt from core.data.schema.base import BaseData, metadata aime_user = Table( "aime_user", metadata, Column("id", Integer, nullable=False, primary_key=True, autoincrement=True), Column("username", String(25), unique=True), Column("email", String(255), unique=True), Column("password", String(255)), Column("permissions", Integer), Column("created_date", TIMESTAMP, server_default=func.now()), Column("last_login_date", TIMESTAMP, onupdate=func.now()), Column("suspend_expire_time", TIMESTAMP), mysql_charset='utf8mb4' ) class PermissionBits(Enum): PermUser = 1 PermMod = 2 PermSysAdmin = 4 class UserData(BaseData): def create_user(self, id: int = None, username: str = None, email: str = None, password: str = None, permission: int = 1) -> Optional[int]: if email is None: permission = 1 if id is None: sql = insert(aime_user).values( username=username, email=email, password=password, permissions=permission ) else: sql = insert(aime_user).values( id=id, username=username, email=email, password=password, permissions=permission ) conflict = sql.on_duplicate_key_update( username=username, email=email, password=password, permissions=permission ) result = self.execute(conflict) if result is None: return None return result.lastrowid def get_user(self, user_id: int) -> Optional[Row]: sql = select(aime_user).where(aime_user.c.id == user_id) result = self.execute(sql) if result is None: return False return result.fetchone() def check_password(self, user_id: int, passwd: bytes = None) -> bool: usr = self.get_user(user_id) if usr is None: return False if usr['password'] is None: return False return bcrypt.checkpw(passwd, usr['password'].encode()) def reset_autoincrement(self, ai_value: int) -> None: # ALTER TABLE isn't in sqlalchemy so we do this the ugly way sql = f"ALTER TABLE aime_user AUTO_INCREMENT={ai_value}" self.execute(sql)