artemis/core/data/schema/user.py

125 lines
4.1 KiB
Python
Raw Normal View History

from enum import Enum
from typing import Dict, Optional
2023-02-19 20:40:25 +00:00
from sqlalchemy import Table, Column, and_
from sqlalchemy.types import Integer, String, TIMESTAMP
from sqlalchemy.sql.schema import ForeignKey
from sqlalchemy.sql import func
2023-02-19 20:40:25 +00:00
from sqlalchemy.dialects.mysql import insert
from sqlalchemy.sql import func, select, Delete
from uuid import uuid4
from datetime import datetime, timedelta
from sqlalchemy.engine import Row
from core.data.schema.base import BaseData, metadata
aime_user = Table(
"aime_user",
metadata,
Column("id", Integer, nullable=False, primary_key=True, autoincrement=True),
Column("username", String(25), unique=True),
Column("email", String(255), unique=True),
Column("password", String(255)),
Column("permissions", Integer),
Column("created_date", TIMESTAMP, server_default=func.now()),
Column("last_login_date", TIMESTAMP, onupdate=func.now()),
Column("suspend_expire_time", TIMESTAMP),
mysql_charset='utf8mb4'
)
frontend_session = Table(
"frontend_session",
metadata,
Column("id", Integer, primary_key=True, unique=True),
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"), nullable=False),
2023-02-19 20:40:25 +00:00
Column("ip", String(15)),
Column('session_cookie', String(32), nullable=False, unique=True),
Column("expires", TIMESTAMP, nullable=False),
mysql_charset='utf8mb4'
)
class PermissionBits(Enum):
PermUser = 1
PermMod = 2
PermSysAdmin = 4
class UserData(BaseData):
2023-02-19 20:40:25 +00:00
def create_user(self, id: int = None, username: str = None, email: str = None, password: str = None, permission: int = 1) -> Optional[int]:
if email is None:
2023-02-19 20:40:25 +00:00
permission = 1
if id is None:
sql = insert(aime_user).values(
username=username,
email=email,
password=password,
permissions=permission
)
else:
2023-02-19 20:40:25 +00:00
sql = insert(aime_user).values(
id=id,
username=username,
email=email,
password=password,
permissions=permission
)
2023-02-19 20:40:25 +00:00
conflict = sql.on_duplicate_key_update(
username=username,
email=email,
password=password,
permissions=permission
)
2023-02-19 20:40:25 +00:00
result = self.execute(conflict)
if result is None: return None
return result.lastrowid
2023-02-19 20:40:25 +00:00
def login(self, user_id: int, passwd: bytes = None, ip: str = "0.0.0.0") -> Optional[str]:
sql = select(aime_user).where(and_(aime_user.c.id == user_id, aime_user.c.password == passwd))
result = self.execute(sql)
if result is None: return None
usr = result.fetchone()
if usr is None: return None
return self.create_session(user_id, ip)
def check_session(self, cookie: str, ip: str = "0.0.0.0") -> Optional[Row]:
sql = select(frontend_session).where(
and_(
frontend_session.c.session_cookie == cookie,
frontend_session.c.ip == ip
)
)
result = self.execute(sql)
if result is None: return None
return result.fetchone()
def delete_session(self, session_id: int) -> bool:
sql = Delete(frontend_session).where(frontend_session.c.id == session_id)
result = self.execute(sql)
if result is None: return False
return True
def create_session(self, user_id: int, ip: str = "0.0.0.0", expires: datetime = datetime.now() + timedelta(days=1)) -> Optional[str]:
cookie = uuid4().hex
sql = insert(frontend_session).values(
user = user_id,
ip = ip,
session_cookie = cookie,
expires = expires
)
result = self.execute(sql)
if result is None:
return None
return cookie
def reset_autoincrement(self, ai_value: int) -> None:
2023-02-19 20:40:25 +00:00
# ALTER TABLE isn't in sqlalchemy so we do this the ugly way
sql = f"ALTER TABLE aime_user AUTO_INCREMENT={ai_value}"
self.execute(sql)