272 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			272 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from typing import Optional, Dict, List
 | |
| from sqlalchemy import Table, Column, and_, or_
 | |
| from sqlalchemy.sql.schema import ForeignKey, PrimaryKeyConstraint
 | |
| from sqlalchemy.types import Integer, String, Boolean, JSON
 | |
| from sqlalchemy.sql import func, select
 | |
| from sqlalchemy.dialects.mysql import insert
 | |
| from sqlalchemy.engine import Row
 | |
| import re
 | |
| 
 | |
| from core.data.schema.base import BaseData, metadata
 | |
| from core.const import *
 | |
| 
 | |
| arcade = Table(
 | |
|     "arcade",
 | |
|     metadata,
 | |
|     Column("id", Integer, primary_key=True, nullable=False),
 | |
|     Column("name", String(255)),
 | |
|     Column("nickname", String(255)),
 | |
|     Column("country", String(3)),
 | |
|     Column("country_id", Integer),
 | |
|     Column("state", String(255)),
 | |
|     Column("city", String(255)),
 | |
|     Column("region_id", Integer),
 | |
|     Column("timezone", String(255)),
 | |
|     Column("ip", String(39)),
 | |
|     mysql_charset="utf8mb4",
 | |
| )
 | |
| 
 | |
| machine = Table(
 | |
|     "machine",
 | |
|     metadata,
 | |
|     Column("id", Integer, primary_key=True, nullable=False),
 | |
|     Column(
 | |
|         "arcade",
 | |
|         ForeignKey("arcade.id", ondelete="cascade", onupdate="cascade"),
 | |
|         nullable=False,
 | |
|     ),
 | |
|     Column("serial", String(15), nullable=False),
 | |
|     Column("board", String(15)),
 | |
|     Column("game", String(4)),
 | |
|     Column("country", String(3)),  # overwrites if not null
 | |
|     Column("timezone", String(255)),
 | |
|     Column("ota_enable", Boolean),
 | |
|     Column("memo", String(255)),
 | |
|     Column("is_cab", Boolean),
 | |
|     Column("data", JSON),
 | |
|     mysql_charset="utf8mb4",
 | |
| )
 | |
| 
 | |
| arcade_owner = Table(
 | |
|     "arcade_owner",
 | |
|     metadata,
 | |
|     Column(
 | |
|         "user",
 | |
|         Integer,
 | |
|         ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
 | |
|         nullable=False,
 | |
|     ),
 | |
|     Column(
 | |
|         "arcade",
 | |
|         Integer,
 | |
|         ForeignKey("arcade.id", ondelete="cascade", onupdate="cascade"),
 | |
|         nullable=False,
 | |
|     ),
 | |
|     Column("permissions", Integer, nullable=False),
 | |
|     PrimaryKeyConstraint("user", "arcade", name="arcade_owner_pk"),
 | |
|     mysql_charset="utf8mb4",
 | |
| )
 | |
| 
 | |
| 
 | |
| class ArcadeData(BaseData):
 | |
|     async def get_machine(self, serial: str = None, id: int = None) -> Optional[Row]:
 | |
|         if serial is not None:
 | |
|             serial = serial.replace("-", "")
 | |
|             if len(serial) == 11:
 | |
|                 sql = machine.select(machine.c.serial.like(f"{serial}%"))
 | |
| 
 | |
|             elif len(serial) == 15:
 | |
|                 sql = machine.select(machine.c.serial == serial)
 | |
| 
 | |
|             else:
 | |
|                 self.logger.error(f"{__name__ }: Malformed serial {serial}")
 | |
|                 return None
 | |
| 
 | |
|         elif id is not None:
 | |
|             sql = machine.select(machine.c.id == id)
 | |
| 
 | |
|         else:
 | |
|             self.logger.error(f"{__name__ }: Need either serial or ID to look up!")
 | |
|             return None
 | |
| 
 | |
|         result = await self.execute(sql)
 | |
|         if result is None:
 | |
|             return None
 | |
|         return result.fetchone()
 | |
| 
 | |
|     async def create_machine(
 | |
|         self,
 | |
|         arcade_id: int,
 | |
|         serial: str = "",
 | |
|         board: str = None,
 | |
|         game: str = None,
 | |
|         is_cab: bool = False,
 | |
|     ) -> Optional[int]:
 | |
|         if not arcade_id:
 | |
|             self.logger.error(f"{__name__ }: Need arcade id!")
 | |
|             return None
 | |
| 
 | |
|         sql = machine.insert().values(
 | |
|             arcade=arcade_id, serial=serial, board=board, game=game, is_cab=is_cab
 | |
|         )
 | |
| 
 | |
|         result = await self.execute(sql)
 | |
|         if result is None:
 | |
|             return None
 | |
|         return result.lastrowid
 | |
| 
 | |
|     async def set_machine_serial(self, machine_id: int, serial: str) -> None:
 | |
|         result = await self.execute(
 | |
|             machine.update(machine.c.id == machine_id).values(keychip=serial)
 | |
|         )
 | |
|         if result is None:
 | |
|             self.logger.error(
 | |
|                 f"Failed to update serial for machine {machine_id} -> {serial}"
 | |
|             )
 | |
|         return result.lastrowid
 | |
| 
 | |
|     async def set_machine_boardid(self, machine_id: int, boardid: str) -> None:
 | |
|         result = await self.execute(
 | |
|             machine.update(machine.c.id == machine_id).values(board=boardid)
 | |
|         )
 | |
|         if result is None:
 | |
|             self.logger.error(
 | |
|                 f"Failed to update board id for machine {machine_id} -> {boardid}"
 | |
|             )
 | |
| 
 | |
|     async def get_arcade(self, id: int) -> Optional[Row]:
 | |
|         sql = arcade.select(arcade.c.id == id)
 | |
|         result = await self.execute(sql)
 | |
|         if result is None:
 | |
|             return None
 | |
|         return result.fetchone()
 | |
|     
 | |
|     async def get_arcade_machines(self, id: int) -> Optional[List[Row]]:
 | |
|         sql = machine.select(machine.c.arcade == id)
 | |
|         result = await self.execute(sql)
 | |
|         if result is None:
 | |
|             return None
 | |
|         return result.fetchall()
 | |
| 
 | |
|     async def create_arcade(
 | |
|         self,
 | |
|         name: str = None,
 | |
|         nickname: str = None,
 | |
|         country: str = "JPN",
 | |
|         country_id: int = 1,
 | |
|         state: str = "",
 | |
|         city: str = "",
 | |
|         region_id: int = 1,
 | |
|     ) -> Optional[int]:
 | |
|         if nickname is None:
 | |
|             nickname = name
 | |
| 
 | |
|         sql = arcade.insert().values(
 | |
|             name=name,
 | |
|             nickname=nickname,
 | |
|             country=country,
 | |
|             country_id=country_id,
 | |
|             state=state,
 | |
|             city=city,
 | |
|             region_id=region_id,
 | |
|         )
 | |
| 
 | |
|         result = await self.execute(sql)
 | |
|         if result is None:
 | |
|             return None
 | |
|         return result.lastrowid
 | |
| 
 | |
|     async def get_arcades_managed_by_user(self, user_id: int) -> Optional[List[Row]]:
 | |
|         sql = select(arcade).join(arcade_owner, arcade_owner.c.arcade == arcade.c.id).where(arcade_owner.c.user == user_id)
 | |
|         result = await self.execute(sql)
 | |
|         if result is None:
 | |
|             return False
 | |
|         return result.fetchall()
 | |
|     
 | |
|     async def get_manager_permissions(self, user_id: int, arcade_id: int) -> Optional[int]:
 | |
|         sql = select(arcade_owner.c.permissions).where(and_(arcade_owner.c.user == user_id, arcade_owner.c.arcade == arcade_id))
 | |
|         result = await self.execute(sql)
 | |
|         if result is None:
 | |
|             return False
 | |
|         return result.fetchone()
 | |
| 
 | |
|     async def get_arcade_owners(self, arcade_id: int) -> Optional[Row]:
 | |
|         sql = select(arcade_owner).where(arcade_owner.c.arcade == arcade_id)
 | |
| 
 | |
|         result = await self.execute(sql)
 | |
|         if result is None:
 | |
|             return None
 | |
|         return result.fetchall()
 | |
| 
 | |
|     async def add_arcade_owner(self, arcade_id: int, user_id: int) -> None:
 | |
|         sql = insert(arcade_owner).values(arcade=arcade_id, user=user_id)
 | |
| 
 | |
|         result = await self.execute(sql)
 | |
|         if result is None:
 | |
|             return None
 | |
|         return result.lastrowid
 | |
| 
 | |
|     async def get_arcade_by_name(self, name: str) -> Optional[List[Row]]:
 | |
|         sql = arcade.select(or_(arcade.c.name.like(f"%{name}%"), arcade.c.nickname.like(f"%{name}%")))
 | |
|         result = await self.execute(sql)
 | |
|         if result is None:
 | |
|             return None
 | |
|         return result.fetchall()
 | |
| 
 | |
|     async def get_arcades_by_ip(self, ip: str) -> Optional[List[Row]]:
 | |
|         sql = arcade.select().where(arcade.c.ip == ip)
 | |
|         result = await self.execute(sql)
 | |
|         if result is None:
 | |
|             return None
 | |
|         return result.fetchall()
 | |
|     
 | |
|     async def get_num_generated_keychips(self) -> Optional[int]:
 | |
|         result = await self.execute(select(func.count("serial LIKE 'A69A%'")).select_from(machine))
 | |
|         if result:
 | |
|             return result.fetchone()['count_1']
 | |
|         self.logger.error("Failed to count machine serials that start with A69A!")
 | |
| 
 | |
|     def format_serial(
 | |
|         self, platform_code: str, platform_rev: int, serial_letter: str, serial_num: int, append: int, dash: bool = False
 | |
|     ) -> str:
 | |
|         return f"{platform_code}{'-' if dash else ''}{platform_rev:02d}{serial_letter}{serial_num:04d}{append:04d}"
 | |
| 
 | |
|     def validate_keychip_format(self, serial: str) -> bool:
 | |
|         # For the 2nd letter, E and X are the only "real" values that have been observed
 | |
|         if re.fullmatch(r"^A[0-9]{2}[A-Z][-]?[0-9]{2}[A-HJ-NP-Z][0-9]{4}([0-9]{4})?$", serial) is None:
 | |
|             return False
 | |
|         
 | |
|         return True
 | |
|     
 | |
|     # Thanks bottersnike!
 | |
|     def get_keychip_suffix(self, year: int, month: int) -> str:
 | |
|         assert year > 1957
 | |
|         assert 1 <= month <= 12
 | |
| 
 | |
|         year -= 1957
 | |
|         # Jan/Feb/Mar are from the previous tax year
 | |
|         if month < 4:
 | |
|             year -= 1
 | |
|         assert year >= 1 and year <= 99
 | |
| 
 | |
|         month = ((month - 1) + 9) % 12  # Offset so April=0
 | |
|         return f"{year:02}{month // 6:01}{month % 6 + 1:01}"
 | |
| 
 | |
| 
 | |
|     def parse_keychip_suffix(self, suffix: str) -> tuple[int, int]:
 | |
|         year = int(suffix[0:2])
 | |
|         half = int(suffix[2])
 | |
|         assert half in (0, 1)
 | |
|         period = int(suffix[3])
 | |
|         assert period in (1, 2, 3, 4, 5, 6)
 | |
| 
 | |
|         month = half * 6 + (period - 1)
 | |
|         month = ((month + 3) % 12) + 1  # Offset so Jan=1
 | |
| 
 | |
|         # Jan/Feb/Mar are from the previous tax year
 | |
|         if month < 4:
 | |
|             year += 1
 | |
|         year += 1957
 | |
| 
 | |
|         return (year, month)
 |