forked from Dniel97/artemis
138 lines
5.2 KiB
Python
138 lines
5.2 KiB
Python
|
from typing import Dict, List, Optional
|
||
|
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_, update
|
||
|
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON, BigInteger
|
||
|
from sqlalchemy.engine.base import Connection
|
||
|
from sqlalchemy.schema import ForeignKey
|
||
|
from sqlalchemy.sql import func, select
|
||
|
from sqlalchemy.engine import Row
|
||
|
from sqlalchemy.dialects.mysql import insert
|
||
|
|
||
|
from core.data.schema import BaseData, metadata
|
||
|
from core.config import CoreConfig
|
||
|
import datetime
|
||
|
|
||
|
round_details = Table(
|
||
|
"idac_round_info",
|
||
|
metadata,
|
||
|
Column("id", Integer, primary_key=True, nullable=False),
|
||
|
Column("round_id_in_json", Integer),
|
||
|
Column("name", String(64)),
|
||
|
Column("season", Integer),
|
||
|
Column("start_dt", TIMESTAMP, server_default=func.now()),
|
||
|
Column("end_dt", TIMESTAMP, server_default=func.now()),
|
||
|
mysql_charset="utf8mb4",
|
||
|
)
|
||
|
|
||
|
round_info = Table(
|
||
|
"idac_user_round_info",
|
||
|
metadata,
|
||
|
Column("id", Integer, primary_key=True, nullable=False),
|
||
|
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade")),
|
||
|
Column("round_id", ForeignKey("idac_round_info.id", ondelete="cascade", onupdate="cascade")),
|
||
|
Column("count", Integer),
|
||
|
Column("win", Integer),
|
||
|
Column("point", Integer),
|
||
|
Column("play_dt", TIMESTAMP, server_default=func.now()),
|
||
|
UniqueConstraint("user", "round_id", name="idac_user_round_info_uk"),
|
||
|
mysql_charset="utf8mb4",
|
||
|
)
|
||
|
|
||
|
class IDACOnlineRounds(BaseData):
|
||
|
# get player's ranking from a specified round event
|
||
|
async def get_round_rank_by_id(self, aime_id: int, round_event_id: int) -> Optional[Row]:
|
||
|
subquery = (
|
||
|
select([func.group_concat(func.concat(round_info.c.user, '|', round_info.c.point),order_by=[round_info.c.point.desc(), round_info.c.play_dt])])
|
||
|
.select_from(round_info)
|
||
|
.where(round_info.c.round_id == round_event_id)
|
||
|
.as_scalar()
|
||
|
)
|
||
|
|
||
|
sql = (
|
||
|
select([func.find_in_set(func.concat(round_info.c.user, '|', round_info.c.point), subquery.label('rank'))])
|
||
|
.select_from(round_info)
|
||
|
.where(
|
||
|
and_(
|
||
|
round_info.c.user == aime_id,
|
||
|
round_info.c.round_id == round_event_id
|
||
|
)
|
||
|
)
|
||
|
)
|
||
|
|
||
|
result = await self.execute(sql)
|
||
|
if result is None:
|
||
|
return None
|
||
|
return result.fetchone()
|
||
|
|
||
|
|
||
|
# get player's info from a specified round event
|
||
|
async def get_round_info_by_id(self, aime_id: int, round_event_id: int) -> Optional[Row]:
|
||
|
sql = select(round_info).where(
|
||
|
and_(
|
||
|
round_info.c.user == aime_id,
|
||
|
round_info.c.round_id == round_event_id
|
||
|
)
|
||
|
)
|
||
|
|
||
|
result = await self.execute(sql)
|
||
|
if result is None:
|
||
|
return None
|
||
|
return result.fetchone()
|
||
|
|
||
|
# get top 5 of a specified round event
|
||
|
async def get_round_top_five(self, round_event_id: int) -> Optional[Row]:
|
||
|
subquery = (
|
||
|
select([func.group_concat(func.concat(round_info.c.user, '|', round_info.c.point),order_by=[round_info.c.point.desc(), round_info.c.play_dt])])
|
||
|
.select_from(round_info)
|
||
|
.where(round_info.c.round_id == round_event_id)
|
||
|
.as_scalar()
|
||
|
)
|
||
|
|
||
|
sql = (
|
||
|
select([func.find_in_set(func.concat(round_info.c.user, '|', round_info.c.point), subquery.label('rank'))], round_info.c.user)
|
||
|
.select_from(round_info)
|
||
|
.where(round_info.c.round_id == round_event_id)
|
||
|
.limit(5)
|
||
|
)
|
||
|
|
||
|
result = await self.execute(sql)
|
||
|
if result is None:
|
||
|
return None
|
||
|
return result.fetchall()
|
||
|
|
||
|
# save players info to a specified round event
|
||
|
async def put_round_event(
|
||
|
self, aime_id: int, round_id: int, round_data: Dict
|
||
|
) -> Optional[int]:
|
||
|
round_data["user"] = aime_id
|
||
|
round_data["round_id"] = round_id
|
||
|
|
||
|
sql = insert(round_info).values(**round_data)
|
||
|
conflict = sql.on_duplicate_key_update(**round_data)
|
||
|
result = await self.execute(conflict)
|
||
|
|
||
|
if result is None:
|
||
|
self.logger.warn(f"putround: Failed to update! aime_id: {aime_id}")
|
||
|
return None
|
||
|
return result.lastrowid
|
||
|
|
||
|
# insert if the event does not exist in database
|
||
|
async def _try_load_round_event(
|
||
|
self, round_id: int, version: int, round_data: Dict
|
||
|
) -> Optional[int]:
|
||
|
sql = select(round_details).where(
|
||
|
round_details.c.round_id_in_json == round_id
|
||
|
)
|
||
|
result = await self.execute(sql)
|
||
|
rid = result.fetchone()
|
||
|
if rid is None:
|
||
|
tmp = {}
|
||
|
tmp["round_id_in_json"] = round_id
|
||
|
tmp["name"] = round_data["round_event_nm"]
|
||
|
tmp["season"] = version
|
||
|
tmp["start_dt"] = datetime.datetime.fromtimestamp(round_data["start_dt"])
|
||
|
tmp["end_dt"] = datetime.datetime.fromtimestamp(round_data["end_dt"])
|
||
|
sql = insert(round_details).values(**tmp)
|
||
|
result = await self.execute(sql)
|
||
|
return result.lastrowid
|
||
|
return rid["id"]
|
||
|
#TODO: get top five players of last round event for Boot/GetConfigData
|