forked from Hay1tsme/artemis
146 lines
5.3 KiB
Python
146 lines
5.3 KiB
Python
from typing import Dict, List, Optional
|
|
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_, update
|
|
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON, BigInteger
|
|
from sqlalchemy.engine.base import Connection
|
|
from sqlalchemy.schema import ForeignKey
|
|
from sqlalchemy.sql import func, select
|
|
from sqlalchemy.engine import Row
|
|
from sqlalchemy.dialects.mysql import insert
|
|
from datetime import datetime
|
|
|
|
from core.data.schema import BaseData, metadata
|
|
from core.config import CoreConfig
|
|
|
|
round_details = Table(
|
|
"idac_round_info",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True, nullable=False),
|
|
Column("round_id_in_json", Integer),
|
|
Column("name", String(64)),
|
|
Column("season", Integer),
|
|
Column("start_dt", TIMESTAMP, server_default=func.now()),
|
|
Column("end_dt", TIMESTAMP, server_default=func.now()),
|
|
mysql_charset="utf8mb4",
|
|
)
|
|
|
|
round_info = Table(
|
|
"idac_user_round_info",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True, nullable=False),
|
|
Column("user", ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade")),
|
|
Column("round_id", ForeignKey("idac_round_info.id", ondelete="cascade", onupdate="cascade")),
|
|
Column("count", Integer),
|
|
Column("win", Integer),
|
|
Column("point", Integer),
|
|
Column("play_dt", TIMESTAMP, server_default=func.now()),
|
|
UniqueConstraint("user", "round_id", name="idac_user_round_info_uk"),
|
|
mysql_charset="utf8mb4",
|
|
)
|
|
|
|
class IDACOnlineRounds(BaseData):
|
|
# get player's ranking from a specified round event
|
|
async def get_round_rank_by_id(self, aime_id: int, round_event_id: int) -> Optional[Row]:
|
|
subquery = (
|
|
select([func.group_concat(func.concat(round_info.c.user, '|', round_info.c.point),order_by=[round_info.c.point.desc(), round_info.c.play_dt])])
|
|
.select_from(round_info)
|
|
.where(round_info.c.round_id == round_event_id)
|
|
.as_scalar()
|
|
)
|
|
|
|
sql = (
|
|
select([func.find_in_set(func.concat(round_info.c.user, '|', round_info.c.point), subquery.label('rank'))])
|
|
.select_from(round_info)
|
|
.where(
|
|
and_(
|
|
round_info.c.user == aime_id,
|
|
round_info.c.round_id == round_event_id
|
|
)
|
|
)
|
|
)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchone()
|
|
|
|
|
|
# get player's info from a specified round event
|
|
async def get_round_info_by_id(self, aime_id: int, round_event_id: int) -> Optional[Row]:
|
|
sql = select(round_info).where(
|
|
and_(
|
|
round_info.c.user == aime_id,
|
|
round_info.c.round_id == round_event_id
|
|
)
|
|
)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchone()
|
|
|
|
# get top 5 of a specified round event
|
|
async def get_round_top_five(self, round_event_id: int) -> Optional[Row]:
|
|
subquery = (
|
|
select([func.group_concat(func.concat(round_info.c.user, '|', round_info.c.point),order_by=[round_info.c.point.desc(), round_info.c.play_dt])])
|
|
.select_from(round_info)
|
|
.where(round_info.c.round_id == round_event_id)
|
|
.as_scalar()
|
|
)
|
|
|
|
sql = (
|
|
select([func.find_in_set(func.concat(round_info.c.user, '|', round_info.c.point), subquery.label('rank'))], round_info.c.user)
|
|
.select_from(round_info)
|
|
.where(round_info.c.round_id == round_event_id)
|
|
.limit(5)
|
|
)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchall()
|
|
|
|
# save players info to a specified round event
|
|
async def put_round_event(
|
|
self, aime_id: int, round_id: int, round_data: Dict
|
|
) -> Optional[int]:
|
|
round_data["user"] = aime_id
|
|
round_data["round_id"] = round_id
|
|
|
|
sql = insert(round_info).values(**round_data)
|
|
conflict = sql.on_duplicate_key_update(**round_data)
|
|
result = await self.execute(conflict)
|
|
|
|
if result is None:
|
|
self.logger.warn(f"putround: Failed to update! aime_id: {aime_id}")
|
|
return None
|
|
return result.lastrowid
|
|
|
|
# insert if the event does not exist in database
|
|
async def _try_load_round_event(
|
|
self, round_id: int, version: int, round_data: Dict
|
|
) -> Optional[int]:
|
|
|
|
sql = select(round_details).where(round_details.c.round_id_in_json == round_id)
|
|
result = await self.execute(sql)
|
|
|
|
# check if the round already exists
|
|
existing_round = result.fetchone() if result else None
|
|
|
|
if existing_round is None:
|
|
tmp = {
|
|
"round_id_in_json": round_id,
|
|
"name": round_data["round_event_nm"],
|
|
"season": version,
|
|
"start_dt": datetime.fromtimestamp(round_data["start_dt"]),
|
|
"end_dt": datetime.fromtimestamp(round_data["end_dt"]),
|
|
}
|
|
|
|
sql = insert(round_details).values(**tmp)
|
|
result = await self.execute(sql)
|
|
|
|
return result.lastrowid
|
|
|
|
return existing_round["id"]
|
|
|
|
# TODO: get top five players of last round event for Boot/GetConfigData
|