core: add columns to machine table, bump to v5

This commit is contained in:
Hay1tsme 2023-07-23 22:21:41 -04:00
parent f417be671b
commit b943807904
5 changed files with 59 additions and 7 deletions

View File

@ -15,7 +15,7 @@ from core.utils import Utils
class Data: class Data:
current_schema_version = 4 current_schema_version = 5
engine = None engine = None
session = None session = None
user = None user = None

View File

@ -1,9 +1,10 @@
from typing import Optional, Dict from typing import Optional, Dict, List
from sqlalchemy import Table, Column from sqlalchemy import Table, Column, and_, or_
from sqlalchemy.sql.schema import ForeignKey, PrimaryKeyConstraint from sqlalchemy.sql.schema import ForeignKey, PrimaryKeyConstraint
from sqlalchemy.types import Integer, String, Boolean from sqlalchemy.types import Integer, String, Boolean, JSON
from sqlalchemy.sql import func, select from sqlalchemy.sql import func, select
from sqlalchemy.dialects.mysql import insert from sqlalchemy.dialects.mysql import insert
from sqlalchemy.engine import Row
import re import re
from core.data.schema.base import BaseData, metadata from core.data.schema.base import BaseData, metadata
@ -40,6 +41,9 @@ machine = Table(
Column("timezone", String(255)), Column("timezone", String(255)),
Column("ota_enable", Boolean), Column("ota_enable", Boolean),
Column("is_cab", Boolean), Column("is_cab", Boolean),
Column("memo", String(255)),
Column("is_cab", Boolean),
Column("data", JSON),
mysql_charset="utf8mb4", mysql_charset="utf8mb4",
) )
@ -65,7 +69,7 @@ arcade_owner = Table(
class ArcadeData(BaseData): class ArcadeData(BaseData):
def get_machine(self, serial: str = None, id: int = None) -> Optional[Dict]: def get_machine(self, serial: str = None, id: int = None) -> Optional[Row]:
if serial is not None: if serial is not None:
serial = serial.replace("-", "") serial = serial.replace("-", "")
if len(serial) == 11: if len(serial) == 11:
@ -130,13 +134,20 @@ class ArcadeData(BaseData):
f"Failed to update board id for machine {machine_id} -> {boardid}" f"Failed to update board id for machine {machine_id} -> {boardid}"
) )
def get_arcade(self, id: int) -> Optional[Dict]: def get_arcade(self, id: int) -> Optional[Row]:
sql = arcade.select(arcade.c.id == id) sql = arcade.select(arcade.c.id == id)
result = self.execute(sql) result = self.execute(sql)
if result is None: if result is None:
return None return None
return result.fetchone() return result.fetchone()
def get_arcade_machines(self, id: int) -> Optional[List[Row]]:
sql = machine.select(machine.c.arcade == id)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def put_arcade( def put_arcade(
self, self,
name: str, name: str,
@ -165,7 +176,21 @@ class ArcadeData(BaseData):
return None return None
return result.lastrowid return result.lastrowid
def get_arcade_owners(self, arcade_id: int) -> Optional[Dict]: def get_arcades_managed_by_user(self, user_id: int) -> Optional[List[Row]]:
sql = select(arcade).join(arcade_owner, arcade_owner.c.arcade == arcade.c.id).where(arcade_owner.c.user == user_id)
result = self.execute(sql)
if result is None:
return False
return result.fetchall()
def get_manager_permissions(self, user_id: int, arcade_id: int) -> Optional[int]:
sql = select(arcade_owner.c.permissions).where(and_(arcade_owner.c.user == user_id, arcade_owner.c.arcade == arcade_id))
result = self.execute(sql)
if result is None:
return False
return result.fetchone()
def get_arcade_owners(self, arcade_id: int) -> Optional[Row]:
sql = select(arcade_owner).where(arcade_owner.c.arcade == arcade_id) sql = select(arcade_owner).where(arcade_owner.c.arcade == arcade_id)
result = self.execute(sql) result = self.execute(sql)
@ -217,3 +242,10 @@ class ArcadeData(BaseData):
return False return False
return True return True
def find_arcade_by_name(self, name: str) -> List[Row]:
sql = arcade.select(or_(arcade.c.name.like(f"%{name}%"), arcade.c.nickname.like(f"%{name}%")))
result = self.execute(sql)
if result is None:
return False
return result.fetchall()

View File

@ -107,3 +107,17 @@ class UserData(BaseData):
if result is None: if result is None:
return None return None
return result.fetchall() return result.fetchall()
def find_user_by_email(self, email: str) -> Row:
sql = select(aime_user).where(aime_user.c.email == email)
result = self.execute(sql)
if result is None:
return False
return result.fetchone()
def find_user_by_username(self, username: str) -> List[Row]:
sql = aime_user.select(aime_user.c.username.like(f"%{username}%"))
result = self.execute(sql)
if result is None:
return False
return result.fetchall()

View File

@ -0,0 +1,3 @@
ALTER TABLE machine DROP COLUMN memo;
ALTER TABLE machine DROP COLUMN is_blacklisted;
ALTER TABLE machine DROP COLUMN `data`;

View File

@ -0,0 +1,3 @@
ALTER TABLE machine ADD memo varchar(255) NULL;
ALTER TABLE machine ADD is_blacklisted tinyint(1) NULL;
ALTER TABLE machine ADD `data` longtext NULL;