262 lines
7.6 KiB
Python
262 lines
7.6 KiB
Python
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_
|
|
from sqlalchemy.types import Integer, String, TIMESTAMP, JSON, Boolean
|
|
from sqlalchemy.schema import ForeignKey
|
|
from sqlalchemy.sql import func, select
|
|
from sqlalchemy.dialects.mysql import insert
|
|
from sqlalchemy.engine import Row
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
from core.data.schema import BaseData, metadata
|
|
from core.data import cached
|
|
|
|
score = Table(
|
|
"diva_score",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True, nullable=False),
|
|
Column("user", ForeignKey("aime_user.id", ondelete="cascade"), nullable=False),
|
|
Column("version", Integer),
|
|
Column("pv_id", Integer),
|
|
Column("difficulty", Integer),
|
|
Column("edition", Integer),
|
|
Column("score", Integer),
|
|
Column("atn_pnt", Integer),
|
|
Column("clr_kind", Integer),
|
|
Column("sort_kind", Integer),
|
|
Column("cool", Integer),
|
|
Column("fine", Integer),
|
|
Column("safe", Integer),
|
|
Column("sad", Integer),
|
|
Column("worst", Integer),
|
|
Column("max_combo", Integer),
|
|
UniqueConstraint("user", "pv_id", "difficulty", "edition", name="diva_score_uk"),
|
|
mysql_charset="utf8mb4",
|
|
)
|
|
|
|
playlog = Table(
|
|
"diva_playlog",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True, nullable=False),
|
|
Column("user", ForeignKey("aime_user.id", ondelete="cascade"), nullable=False),
|
|
Column("version", Integer),
|
|
Column("pv_id", Integer),
|
|
Column("difficulty", Integer),
|
|
Column("edition", Integer),
|
|
Column("score", Integer),
|
|
Column("atn_pnt", Integer),
|
|
Column("clr_kind", Integer),
|
|
Column("sort_kind", Integer),
|
|
Column("cool", Integer),
|
|
Column("fine", Integer),
|
|
Column("safe", Integer),
|
|
Column("sad", Integer),
|
|
Column("worst", Integer),
|
|
Column("max_combo", Integer),
|
|
Column("date_scored", TIMESTAMP, server_default=func.now()),
|
|
mysql_charset="utf8mb4",
|
|
)
|
|
|
|
|
|
class DivaScoreData(BaseData):
|
|
async def put_best_score(
|
|
self,
|
|
user_id: int,
|
|
game_version: int,
|
|
song_id: int,
|
|
difficulty: int,
|
|
edition: int,
|
|
song_score: int,
|
|
atn_pnt: int,
|
|
clr_kind: int,
|
|
sort_kind: int,
|
|
cool: int,
|
|
fine: int,
|
|
safe: int,
|
|
sad: int,
|
|
worst: int,
|
|
max_combo: int,
|
|
) -> Optional[int]:
|
|
"""
|
|
Update the user's best score for a chart
|
|
"""
|
|
sql = insert(score).values(
|
|
user=user_id,
|
|
version=game_version,
|
|
pv_id=song_id,
|
|
difficulty=difficulty,
|
|
edition=edition,
|
|
score=song_score,
|
|
atn_pnt=atn_pnt,
|
|
clr_kind=clr_kind,
|
|
sort_kind=sort_kind,
|
|
cool=cool,
|
|
fine=fine,
|
|
safe=safe,
|
|
sad=sad,
|
|
worst=worst,
|
|
max_combo=max_combo,
|
|
)
|
|
|
|
conflict = sql.on_duplicate_key_update(
|
|
score=song_score,
|
|
atn_pnt=atn_pnt,
|
|
clr_kind=clr_kind,
|
|
sort_kind=sort_kind,
|
|
cool=cool,
|
|
fine=fine,
|
|
safe=safe,
|
|
sad=sad,
|
|
worst=worst,
|
|
max_combo=max_combo,
|
|
)
|
|
|
|
result = await self.execute(conflict)
|
|
if result is None:
|
|
self.logger.error(
|
|
f"{__name__} failed to insert best score! profile: {user_id}, song: {song_id}"
|
|
)
|
|
return None
|
|
|
|
return result.lastrowid
|
|
|
|
async def put_playlog(
|
|
self,
|
|
user_id: int,
|
|
game_version: int,
|
|
song_id: int,
|
|
difficulty: int,
|
|
edition: int,
|
|
song_score: int,
|
|
atn_pnt: int,
|
|
clr_kind: int,
|
|
sort_kind: int,
|
|
cool: int,
|
|
fine: int,
|
|
safe: int,
|
|
sad: int,
|
|
worst: int,
|
|
max_combo: int,
|
|
) -> Optional[int]:
|
|
"""
|
|
Add an entry to the user's play log
|
|
"""
|
|
sql = playlog.insert().values(
|
|
user=user_id,
|
|
version=game_version,
|
|
pv_id=song_id,
|
|
difficulty=difficulty,
|
|
edition=edition,
|
|
score=song_score,
|
|
atn_pnt=atn_pnt,
|
|
clr_kind=clr_kind,
|
|
sort_kind=sort_kind,
|
|
cool=cool,
|
|
fine=fine,
|
|
safe=safe,
|
|
sad=sad,
|
|
worst=worst,
|
|
max_combo=max_combo,
|
|
)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
self.logger.error(
|
|
f"{__name__} failed to insert playlog! profile: {user_id}, song: {song_id}, chart: {difficulty}"
|
|
)
|
|
return None
|
|
|
|
return result.lastrowid
|
|
|
|
async def get_best_user_score(
|
|
self, user_id: int, pv_id: int, difficulty: int, edition: int
|
|
) -> Optional[Row]:
|
|
sql = score.select(
|
|
and_(
|
|
score.c.user == user_id,
|
|
score.c.pv_id == pv_id,
|
|
score.c.difficulty == difficulty,
|
|
score.c.edition == edition,
|
|
)
|
|
)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchone()
|
|
|
|
async def get_top3_scores(
|
|
self, pv_id: int, difficulty: int, edition: int
|
|
) -> Optional[List[Row]]:
|
|
sql = (
|
|
score.select(
|
|
and_(
|
|
score.c.pv_id == pv_id,
|
|
score.c.difficulty == difficulty,
|
|
score.c.edition == edition,
|
|
)
|
|
)
|
|
.order_by(score.c.score.desc())
|
|
.limit(3)
|
|
)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchall()
|
|
|
|
async def get_global_ranking(
|
|
self, user_id: int, pv_id: int, difficulty: int, edition: int
|
|
) -> Optional[List[Row]]:
|
|
# get the subquery max score of a user with pv_id, difficulty and
|
|
# edition
|
|
sql_sub = (
|
|
select([score.c.score])
|
|
.filter(
|
|
score.c.user == user_id,
|
|
score.c.pv_id == pv_id,
|
|
score.c.difficulty == difficulty,
|
|
score.c.edition == edition,
|
|
)
|
|
.scalar_subquery()
|
|
)
|
|
|
|
# Perform the main query, also rename the resulting column to ranking
|
|
sql = select(func.count(score.c.id).label("ranking")).filter(
|
|
score.c.score >= sql_sub,
|
|
score.c.pv_id == pv_id,
|
|
score.c.difficulty == difficulty,
|
|
score.c.edition == edition,
|
|
)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchone()
|
|
|
|
async def get_best_scores(self, user_id: int) -> Optional[List[Row]]:
|
|
sql = score.select(score.c.user == user_id)
|
|
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
return None
|
|
return result.fetchall()
|
|
|
|
async def get_playlogs(self, aime_id: int, idx: int = 0, limit: int = 0) -> Optional[Row]:
|
|
sql = select(playlog).where(playlog.c.user == aime_id).order_by(playlog.c.date_scored.desc())
|
|
|
|
if limit:
|
|
sql = sql.limit(limit)
|
|
if idx:
|
|
sql = sql.offset(idx)
|
|
|
|
result = await self.execute(sql)
|
|
if result:
|
|
return result.fetchall()
|
|
|
|
async def get_user_playlogs_count(self, aime_id: int) -> Optional[int]:
|
|
sql = select(func.count()).where(playlog.c.user == aime_id)
|
|
result = await self.execute(sql)
|
|
if result is None:
|
|
self.logger.warning(f"aimu_id {aime_id} has no scores ")
|
|
return None
|
|
return result.scalar()
|