3
2
forked from Dniel97/artemis
artemis/titles/wacca/schema/profile.py
2024-01-09 14:42:17 -05:00

523 lines
18 KiB
Python

from typing import Optional, Dict, List
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON
from sqlalchemy.schema import ForeignKey
from sqlalchemy.sql import func, select
from sqlalchemy.engine import Row
from sqlalchemy.dialects.mysql import insert
from core.data.schema import BaseData, metadata
from ..handlers.helpers import PlayType
profile = Table(
"wacca_profile",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("version", Integer),
Column("username", String(8), nullable=False),
Column("xp", Integer, server_default="0"),
Column("wp", Integer, server_default="0"),
Column("wp_total", Integer, server_default="0"),
Column("wp_spent", Integer, server_default="0"),
Column("dan_type", Integer, server_default="0"),
Column("dan_level", Integer, server_default="0"),
Column("title_0", Integer, server_default="0"),
Column("title_1", Integer, server_default="0"),
Column("title_2", Integer, server_default="0"),
Column("rating", Integer, server_default="0"),
Column("vip_expire_time", TIMESTAMP),
Column("always_vip", Boolean, server_default="0"),
Column("login_count", Integer, server_default="0"),
Column("login_count_consec", Integer, server_default="0"),
Column("login_count_days", Integer, server_default="0"),
Column("login_count_days_consec", Integer, server_default="0"),
Column("login_count_today", Integer, server_default="0"),
Column("playcount_single", Integer, server_default="0"),
Column("playcount_multi_vs", Integer, server_default="0"),
Column("playcount_multi_coop", Integer, server_default="0"),
Column("playcount_stageup", Integer, server_default="0"),
Column("playcount_time_free", Integer, server_default="0"),
Column("friend_view_1", Integer),
Column("friend_view_2", Integer),
Column("friend_view_3", Integer),
Column("last_game_ver", String(50)),
Column("last_song_id", Integer, server_default="0"),
Column("last_song_difficulty", Integer, server_default="0"),
Column("last_folder_order", Integer, server_default="0"),
Column("last_folder_id", Integer, server_default="0"),
Column("last_song_order", Integer, server_default="0"),
Column("last_login_date", TIMESTAMP, server_default=func.now()),
Column("gate_tutorial_flags", JSON),
UniqueConstraint("user", "version", name="wacca_profile_uk"),
mysql_charset="utf8mb4",
)
option = Table(
"wacca_option",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("opt_id", Integer, nullable=False),
Column("value", Integer, nullable=False),
UniqueConstraint("user", "opt_id", name="wacca_option_uk"),
)
bingo = Table(
"wacca_bingo",
metadata,
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
primary_key=True,
nullable=False,
),
Column("page_number", Integer, nullable=False),
Column("page_progress", JSON, nullable=False),
UniqueConstraint("user", "page_number", name="wacca_bingo_uk"),
mysql_charset="utf8mb4",
)
friend = Table(
"wacca_friend",
metadata,
Column(
"profile_sender",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column(
"profile_reciever",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("is_accepted", Boolean, server_default="0"),
PrimaryKeyConstraint("profile_sender", "profile_reciever", name="arcade_owner_pk"),
mysql_charset="utf8mb4",
)
favorite = Table(
"wacca_favorite_song",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("song_id", Integer, nullable=False),
UniqueConstraint("user", "song_id", name="wacca_favorite_song_uk"),
mysql_charset="utf8mb4",
)
gate = Table(
"wacca_gate",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("gate_id", Integer, nullable=False),
Column("page", Integer, nullable=False, server_default="0"),
Column("progress", Integer, nullable=False, server_default="0"),
Column("loops", Integer, nullable=False, server_default="0"),
Column("last_used", TIMESTAMP, nullable=False, server_default=func.now()),
Column("mission_flag", Integer, nullable=False, server_default="0"),
Column("total_points", Integer, nullable=False, server_default="0"),
UniqueConstraint("user", "gate_id", name="wacca_gate_uk"),
)
class WaccaProfileData(BaseData):
async def create_profile(
self, aime_id: int, username: str, version: int
) -> Optional[int]:
"""
Given a game version, aime id, and username, create a profile and return it's ID
"""
sql = insert(profile).values(user=aime_id, username=username, version=version)
conflict = sql.on_duplicate_key_update(username=sql.inserted.username)
result = await self.execute(conflict)
if result is None:
self.logger.error(
f"{__name__} Failed to insert wacca profile! aime id: {aime_id} username: {username}"
)
return None
return result.lastrowid
async def update_profile_playtype(
self, profile_id: int, play_type: int, game_version: str
) -> None:
sql = profile.update(profile.c.id == profile_id).values(
playcount_single=profile.c.playcount_single + 1
if play_type == PlayType.PlayTypeSingle.value
else profile.c.playcount_single,
playcount_multi_vs=profile.c.playcount_multi_vs + 1
if play_type == PlayType.PlayTypeVs.value
else profile.c.playcount_multi_vs,
playcount_multi_coop=profile.c.playcount_multi_coop + 1
if play_type == PlayType.PlayTypeCoop.value
else profile.c.playcount_multi_coop,
playcount_stageup=profile.c.playcount_stageup + 1
if play_type == PlayType.PlayTypeStageup.value
else profile.c.playcount_stageup,
playcount_time_free=profile.c.playcount_time_free + 1
if play_type == PlayType.PlayTypeTimeFree.value
else profile.c.playcount_time_free,
last_game_ver=game_version,
)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"update_profile: failed to update profile! profile: {profile_id}"
)
return None
async def update_profile_lastplayed(
self,
profile_id: int,
last_song_id: int,
last_song_difficulty: int,
last_folder_order: int,
last_folder_id: int,
last_song_order: int,
) -> None:
sql = profile.update(profile.c.id == profile_id).values(
last_song_id=last_song_id,
last_song_difficulty=last_song_difficulty,
last_folder_order=last_folder_order,
last_folder_id=last_folder_id,
last_song_order=last_song_order,
)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"update_profile_lastplayed: failed to update profile! profile: {profile_id}"
)
return None
async def update_profile_dan(
self, profile_id: int, dan_level: int, dan_type: int
) -> Optional[int]:
sql = profile.update(profile.c.id == profile_id).values(
dan_level=dan_level, dan_type=dan_type
)
result = await self.execute(sql)
if result is None:
self.logger.warning(
f"update_profile_dan: Failed to update! profile {profile_id}"
)
return None
return result.lastrowid
async def get_profile(self, profile_id: int = 0, aime_id: int = None) -> Optional[Row]:
"""
Given a game version and either a profile or aime id, return the profile
"""
if aime_id is not None:
sql = profile.select(profile.c.user == aime_id)
elif profile_id > 0:
sql = profile.select(profile.c.id == profile_id)
else:
self.logger.error(
f"get_profile: Bad arguments!! profile_id {profile_id} aime_id {aime_id}"
)
return None
result = await self.execute(sql)
if result is None:
return None
return result.fetchone()
async def get_options(self, user_id: int, option_id: int = None) -> Optional[List[Row]]:
"""
Get a specific user option for a profile, or all of them if none specified
"""
sql = option.select(
and_(
option.c.user == user_id,
option.c.opt_id == option_id if option_id is not None else True,
)
)
result = await self.execute(sql)
if result is None:
return None
if option_id is not None:
return result.fetchone()
else:
return result.fetchall()
async def update_option(self, user_id: int, option_id: int, value: int) -> Optional[int]:
sql = insert(option).values(user=user_id, opt_id=option_id, value=value)
conflict = sql.on_duplicate_key_update(value=value)
result = await self.execute(conflict)
if result is None:
self.logger.error(
f"{__name__} failed to insert option! profile: {user_id}, option: {option_id}, value: {value}"
)
return None
return result.lastrowid
async def add_favorite_song(self, user_id: int, song_id: int) -> Optional[int]:
sql = favorite.insert().values(user=user_id, song_id=song_id)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"{__name__} failed to insert favorite! profile: {user_id}, song_id: {song_id}"
)
return None
return result.lastrowid
async def remove_favorite_song(self, user_id: int, song_id: int) -> None:
sql = favorite.delete(
and_(favorite.c.user == user_id, favorite.c.song_id == song_id)
)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"{__name__} failed to remove favorite! profile: {user_id}, song_id: {song_id}"
)
return None
async def get_favorite_songs(self, user_id: int) -> Optional[List[Row]]:
sql = favorite.select(favorite.c.user == user_id)
result = await self.execute(sql)
if result is None:
return None
return result.fetchall()
async def get_gates(self, user_id: int) -> Optional[List[Row]]:
sql = select(gate).where(gate.c.user == user_id)
result = await self.execute(sql)
if result is None:
return None
return result.fetchall()
async def update_gate(
self,
user_id: int,
gate_id: int,
page: int,
progress: int,
loop: int,
mission_flag: int,
total_points: int,
) -> Optional[int]:
sql = insert(gate).values(
user=user_id,
gate_id=gate_id,
page=page,
progress=progress,
loops=loop,
mission_flag=mission_flag,
total_points=total_points,
)
conflict = sql.on_duplicate_key_update(
page=sql.inserted.page,
progress=sql.inserted.progress,
loops=sql.inserted.loops,
mission_flag=sql.inserted.mission_flag,
total_points=sql.inserted.total_points,
)
result = await self.execute(conflict)
if result is None:
self.logger.error(
f"{__name__} failed to update gate! user: {user_id}, gate_id: {gate_id}"
)
return None
return result.lastrowid
async def get_friends(self, user_id: int) -> Optional[List[Row]]:
sql = friend.select(friend.c.profile_sender == user_id)
result = await self.execute(sql)
if result is None:
return None
return result.fetchall()
async def profile_to_aime_user(self, profile_id: int) -> Optional[int]:
sql = select(profile.c.user).where(profile.c.id == profile_id)
result = await self.execute(sql)
if result is None:
self.logger.info(
f"profile_to_aime_user: No user found for profile {profile_id}"
)
return None
this_profile = result.fetchone()
if this_profile is None:
self.logger.info(
f"profile_to_aime_user: No user found for profile {profile_id}"
)
return None
return this_profile["user"]
async def session_login(
self, profile_id: int, is_new_day: bool, is_consec_day: bool
) -> None:
# TODO: Reset consec days counter
sql = profile.update(profile.c.id == profile_id).values(
login_count=profile.c.login_count + 1,
login_count_consec=profile.c.login_count_consec + 1,
login_count_days=profile.c.login_count_days + 1
if is_new_day
else profile.c.login_count_days,
login_count_days_consec=profile.c.login_count_days_consec + 1
if is_new_day and is_consec_day
else profile.c.login_count_days_consec,
login_count_today=1 if is_new_day else profile.c.login_count_today + 1,
last_login_date=func.now(),
)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"session_login: failed to update profile! profile: {profile_id}"
)
return None
async def session_logout(self, profile_id: int) -> None:
sql = profile.update(profile.c.id == id).values(login_count_consec=0)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"{__name__} failed to update profile! profile: {profile_id}"
)
return None
async def add_xp(self, profile_id: int, xp: int) -> None:
sql = profile.update(profile.c.id == profile_id).values(xp=profile.c.xp + xp)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"add_xp: Failed to update profile! profile_id {profile_id} xp {xp}"
)
return None
async def add_wp(self, profile_id: int, wp: int) -> None:
sql = profile.update(profile.c.id == profile_id).values(
wp=profile.c.wp + wp,
wp_total=profile.c.wp_total + wp,
)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"add_wp: Failed to update profile! profile_id {profile_id} wp {wp}"
)
return None
async def spend_wp(self, profile_id: int, wp: int) -> None:
sql = profile.update(profile.c.id == profile_id).values(
wp=profile.c.wp - wp,
wp_spent=profile.c.wp_spent + wp,
)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"spend_wp: Failed to update profile! profile_id {profile_id} wp {wp}"
)
return None
async def activate_vip(self, profile_id: int, expire_time) -> None:
sql = profile.update(profile.c.id == profile_id).values(
vip_expire_time=expire_time
)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"activate_vip: Failed to update profile! profile_id {profile_id} expire_time {expire_time}"
)
return None
async def update_user_rating(self, profile_id: int, new_rating: int) -> None:
sql = profile.update(profile.c.id == profile_id).values(rating=new_rating)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"update_user_rating: Failed to update profile! profile_id {profile_id} new_rating {new_rating}"
)
return None
async def update_bingo(self, aime_id: int, page: int, progress: int) -> Optional[int]:
sql = insert(bingo).values(
user=aime_id, page_number=page, page_progress=progress
)
conflict = sql.on_duplicate_key_update(page_number=page, page_progress=progress)
result = await self.execute(conflict)
if result is None:
self.logger.error(f"put_bingo: failed to update! aime_id: {aime_id}")
return None
return result.lastrowid
async def get_bingo(self, aime_id: int) -> Optional[List[Row]]:
sql = select(bingo).where(bingo.c.user == aime_id)
result = await self.execute(sql)
if result is None:
return None
return result.fetchone()
async def get_bingo_page(self, aime_id: int, page: Dict) -> Optional[List[Row]]:
sql = select(bingo).where(
and_(bingo.c.user == aime_id, bingo.c.page_number == page)
)
result = await self.execute(sql)
if result is None:
return None
return result.fetchone()
async def update_vip_time(self, profile_id: int, time_left) -> None:
sql = profile.update(profile.c.id == profile_id).values(
vip_expire_time=time_left
)
result = await self.execute(sql)
if result is None:
self.logger.error(f"Failed to update VIP time for profile {profile_id}")
async def update_tutorial_flags(self, profile_id: int, flags: Dict) -> None:
sql = profile.update(profile.c.id == profile_id).values(
gate_tutorial_flags=flags
)
result = await self.execute(sql)
if result is None:
self.logger.error(
f"Failed to update tutorial flags for profile {profile_id}"
)