272 lines
8.8 KiB
Python
272 lines
8.8 KiB
Python
from typing import Optional, Dict, List
|
|
from sqlalchemy import Table, Column, and_, or_
|
|
from sqlalchemy.sql.schema import ForeignKey, PrimaryKeyConstraint
|
|
from sqlalchemy.types import Integer, String, Boolean, JSON
|
|
from sqlalchemy.sql import func, select
|
|
from sqlalchemy.dialects.mysql import insert
|
|
from sqlalchemy.engine import Row
|
|
import re
|
|
|
|
from core.data.schema.base import BaseData, metadata
|
|
from core.const import *
|
|
|
|
arcade = Table(
|
|
"arcade",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True, nullable=False),
|
|
Column("name", String(255)),
|
|
Column("nickname", String(255)),
|
|
Column("country", String(3)),
|
|
Column("country_id", Integer),
|
|
Column("state", String(255)),
|
|
Column("city", String(255)),
|
|
Column("region_id", Integer),
|
|
Column("timezone", String(255)),
|
|
Column("ip", String(39)),
|
|
mysql_charset="utf8mb4",
|
|
)
|
|
|
|
machine = Table(
|
|
"machine",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True, nullable=False),
|
|
Column(
|
|
"arcade",
|
|
ForeignKey("arcade.id", ondelete="cascade", onupdate="cascade"),
|
|
nullable=False,
|
|
),
|
|
Column("serial", String(15), nullable=False),
|
|
Column("board", String(15)),
|
|
Column("game", String(4)),
|
|
Column("country", String(3)), # overwrites if not null
|
|
Column("timezone", String(255)),
|
|
Column("ota_enable", Boolean),
|
|
Column("memo", String(255)),
|
|
Column("is_cab", Boolean),
|
|
Column("data", JSON),
|
|
mysql_charset="utf8mb4",
|
|
)
|
|
|
|
arcade_owner = Table(
|
|
"arcade_owner",
|
|
metadata,
|
|
Column(
|
|
"user",
|
|
Integer,
|
|
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
|
|
nullable=False,
|
|
),
|
|
Column(
|
|
"arcade",
|
|
Integer,
|
|
ForeignKey("arcade.id", ondelete="cascade", onupdate="cascade"),
|
|
nullable=False,
|
|
),
|
|
Column("permissions", Integer, nullable=False),
|
|
PrimaryKeyConstraint("user", "arcade", name="arcade_owner_pk"),
|
|
mysql_charset="utf8mb4",
|
|
)
|
|
|
|
|
|
class ArcadeData(BaseData):
|
|
async def get_machine(self, serial: str = None, id: int = None) -> Optional[Row]:
|
|
if serial is not None:
|
|
serial = serial.replace("-", "")
|
|
if len(serial) == 11:
|
|
sql = machine.select(machine.c.serial.like(f"{serial}%"))
|
|
|
|
elif len(serial) == 15:
|
|
sql = machine.select(machine.c.serial == serial)
|
|
|
|
else:
|
|
self.logger.error(f"{__name__ }: Malformed serial {serial}")
|
|
return None
|
|
|
|
elif id is not None:
|
|
sql = machine.select(machine.c.id == id)
|
|
|
|
else:
|
|
self.logger.error(f"{__name__ }: Need either serial or ID to look up!")
|
|
return None
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchone()
|
|
|
|
async def create_machine(
|
|
self,
|
|
arcade_id: int,
|
|
serial: str = "",
|
|
board: str = None,
|
|
game: str = None,
|
|
is_cab: bool = False,
|
|
) -> Optional[int]:
|
|
if not arcade_id:
|
|
self.logger.error(f"{__name__ }: Need arcade id!")
|
|
return None
|
|
|
|
sql = machine.insert().values(
|
|
arcade=arcade_id, serial=serial, board=board, game=game, is_cab=is_cab
|
|
)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.lastrowid
|
|
|
|
async def set_machine_serial(self, machine_id: int, serial: str) -> None:
|
|
result = await self.execute(
|
|
machine.update(machine.c.id == machine_id).values(keychip=serial)
|
|
)
|
|
if result is None:
|
|
self.logger.error(
|
|
f"Failed to update serial for machine {machine_id} -> {serial}"
|
|
)
|
|
return result.lastrowid
|
|
|
|
async def set_machine_boardid(self, machine_id: int, boardid: str) -> None:
|
|
result = await self.execute(
|
|
machine.update(machine.c.id == machine_id).values(board=boardid)
|
|
)
|
|
if result is None:
|
|
self.logger.error(
|
|
f"Failed to update board id for machine {machine_id} -> {boardid}"
|
|
)
|
|
|
|
async def get_arcade(self, id: int) -> Optional[Row]:
|
|
sql = arcade.select(arcade.c.id == id)
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchone()
|
|
|
|
async def get_arcade_machines(self, id: int) -> Optional[List[Row]]:
|
|
sql = machine.select(machine.c.arcade == id)
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchall()
|
|
|
|
async def create_arcade(
|
|
self,
|
|
name: str = None,
|
|
nickname: str = None,
|
|
country: str = "JPN",
|
|
country_id: int = 1,
|
|
state: str = "",
|
|
city: str = "",
|
|
region_id: int = 1,
|
|
) -> Optional[int]:
|
|
if nickname is None:
|
|
nickname = name
|
|
|
|
sql = arcade.insert().values(
|
|
name=name,
|
|
nickname=nickname,
|
|
country=country,
|
|
country_id=country_id,
|
|
state=state,
|
|
city=city,
|
|
region_id=region_id,
|
|
)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.lastrowid
|
|
|
|
async def get_arcades_managed_by_user(self, user_id: int) -> Optional[List[Row]]:
|
|
sql = select(arcade).join(arcade_owner, arcade_owner.c.arcade == arcade.c.id).where(arcade_owner.c.user == user_id)
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return False
|
|
return result.fetchall()
|
|
|
|
async def get_manager_permissions(self, user_id: int, arcade_id: int) -> Optional[int]:
|
|
sql = select(arcade_owner.c.permissions).where(and_(arcade_owner.c.user == user_id, arcade_owner.c.arcade == arcade_id))
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return False
|
|
return result.fetchone()
|
|
|
|
async def get_arcade_owners(self, arcade_id: int) -> Optional[Row]:
|
|
sql = select(arcade_owner).where(arcade_owner.c.arcade == arcade_id)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchall()
|
|
|
|
async def add_arcade_owner(self, arcade_id: int, user_id: int) -> None:
|
|
sql = insert(arcade_owner).values(arcade=arcade_id, user=user_id)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.lastrowid
|
|
|
|
async def get_arcade_by_name(self, name: str) -> Optional[List[Row]]:
|
|
sql = arcade.select(or_(arcade.c.name.like(f"%{name}%"), arcade.c.nickname.like(f"%{name}%")))
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchall()
|
|
|
|
async def get_arcades_by_ip(self, ip: str) -> Optional[List[Row]]:
|
|
sql = arcade.select().where(arcade.c.ip == ip)
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchall()
|
|
|
|
async def get_num_generated_keychips(self) -> Optional[int]:
|
|
result = await self.execute(select(func.count("serial LIKE 'A69A%'")).select_from(machine))
|
|
if result:
|
|
return result.fetchone()['count_1']
|
|
self.logger.error("Failed to count machine serials that start with A69A!")
|
|
|
|
def format_serial(
|
|
self, platform_code: str, platform_rev: int, serial_letter: str, serial_num: int, append: int, dash: bool = False
|
|
) -> str:
|
|
return f"{platform_code}{'-' if dash else ''}{platform_rev:02d}{serial_letter}{serial_num:04d}{append:04d}"
|
|
|
|
def validate_keychip_format(self, serial: str) -> bool:
|
|
# For the 2nd letter, E and X are the only "real" values that have been observed
|
|
if re.fullmatch(r"^A[0-9]{2}[A-Z][-]?[0-9]{2}[A-HJ-NP-Z][0-9]{4}([0-9]{4})?$", serial) is None:
|
|
return False
|
|
|
|
return True
|
|
|
|
# Thanks bottersnike!
|
|
def get_keychip_suffix(self, year: int, month: int) -> str:
|
|
assert year > 1957
|
|
assert 1 <= month <= 12
|
|
|
|
year -= 1957
|
|
# Jan/Feb/Mar are from the previous tax year
|
|
if month < 4:
|
|
year -= 1
|
|
assert year >= 1 and year <= 99
|
|
|
|
month = ((month - 1) + 9) % 12 # Offset so April=0
|
|
return f"{year:02}{month // 6:01}{month % 6 + 1:01}"
|
|
|
|
|
|
def parse_keychip_suffix(self, suffix: str) -> tuple[int, int]:
|
|
year = int(suffix[0:2])
|
|
half = int(suffix[2])
|
|
assert half in (0, 1)
|
|
period = int(suffix[3])
|
|
assert period in (1, 2, 3, 4, 5, 6)
|
|
|
|
month = half * 6 + (period - 1)
|
|
month = ((month + 3) % 12) + 1 # Offset so Jan=1
|
|
|
|
# Jan/Feb/Mar are from the previous tax year
|
|
if month < 4:
|
|
year += 1
|
|
year += 1957
|
|
|
|
return (year, month)
|