forked from Hay1tsme/artemis
wip: use SQL's limit/offset pagination for nextIndex/maxCount requests
This commit is contained in:
@ -1,16 +1,16 @@
|
|||||||
import logging
|
import itertools
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from time import strftime
|
from typing import Any, Dict, List
|
||||||
|
|
||||||
import pytz
|
import pytz
|
||||||
from typing import Dict, Any, List
|
|
||||||
|
|
||||||
from core.config import CoreConfig
|
from core.config import CoreConfig
|
||||||
|
from titles.chuni.config import ChuniConfig
|
||||||
from titles.chuni.const import ChuniConstants, ItemKind
|
from titles.chuni.const import ChuniConstants, ItemKind
|
||||||
from titles.chuni.database import ChuniData
|
from titles.chuni.database import ChuniData
|
||||||
from titles.chuni.config import ChuniConfig
|
|
||||||
SCORE_BUFFER = {}
|
|
||||||
|
|
||||||
class ChuniBase:
|
class ChuniBase:
|
||||||
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
|
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
|
||||||
@ -277,35 +277,39 @@ class ChuniBase:
|
|||||||
}
|
}
|
||||||
|
|
||||||
async def handle_get_user_character_api_request(self, data: Dict) -> Dict:
|
async def handle_get_user_character_api_request(self, data: Dict) -> Dict:
|
||||||
characters = await self.data.item.get_characters(data["userId"])
|
user_id = int(data["userId"])
|
||||||
|
next_idx = int(data["nextIndex"])
|
||||||
|
max_ct = int(data["maxCount"])
|
||||||
|
|
||||||
|
# add one to the limit so we know if there's a next page of items
|
||||||
|
characters = await self.data.item.get_characters(
|
||||||
|
user_id, limit=max_ct + 1, offset=next_idx
|
||||||
|
)
|
||||||
|
|
||||||
if characters is None:
|
if characters is None:
|
||||||
return {
|
return {
|
||||||
"userId": data["userId"],
|
"userId": user_id,
|
||||||
"length": 0,
|
"length": 0,
|
||||||
"nextIndex": -1,
|
"nextIndex": -1,
|
||||||
"userCharacterList": [],
|
"userCharacterList": [],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
character_count = len(characters)
|
||||||
character_list = []
|
character_list = []
|
||||||
next_idx = int(data["nextIndex"])
|
|
||||||
max_ct = int(data["maxCount"])
|
|
||||||
|
|
||||||
for x in range(next_idx, len(characters)):
|
for x in range(min(character_count, max_ct)):
|
||||||
tmp = characters[x]._asdict()
|
tmp = characters[x]._asdict()
|
||||||
tmp.pop("user")
|
tmp.pop("user")
|
||||||
tmp.pop("id")
|
tmp.pop("id")
|
||||||
character_list.append(tmp)
|
character_list.append(tmp)
|
||||||
|
|
||||||
if len(character_list) >= max_ct:
|
if character_count > max_ct:
|
||||||
break
|
|
||||||
|
|
||||||
if len(characters) >= next_idx + max_ct:
|
|
||||||
next_idx += max_ct
|
next_idx += max_ct
|
||||||
else:
|
else:
|
||||||
next_idx = -1
|
next_idx = -1
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"userId": data["userId"],
|
"userId": user_id,
|
||||||
"length": len(character_list),
|
"length": len(character_list),
|
||||||
"nextIndex": next_idx,
|
"nextIndex": next_idx,
|
||||||
"userCharacterList": character_list,
|
"userCharacterList": character_list,
|
||||||
@ -335,29 +339,31 @@ class ChuniBase:
|
|||||||
}
|
}
|
||||||
|
|
||||||
async def handle_get_user_course_api_request(self, data: Dict) -> Dict:
|
async def handle_get_user_course_api_request(self, data: Dict) -> Dict:
|
||||||
user_course_list = await self.data.score.get_courses(data["userId"])
|
user_id = int(data["userId"])
|
||||||
if user_course_list is None:
|
next_idx = int(data["nextIndex"])
|
||||||
|
max_ct = int(data["maxCount"])
|
||||||
|
|
||||||
|
rows = await self.data.score.get_courses(
|
||||||
|
user_id, limit=max_ct + 1, offset=next_idx
|
||||||
|
)
|
||||||
|
|
||||||
|
if rows is None or len(rows) == 0:
|
||||||
return {
|
return {
|
||||||
"userId": data["userId"],
|
"userId": user_id,
|
||||||
"length": 0,
|
"length": 0,
|
||||||
"nextIndex": -1,
|
"nextIndex": -1,
|
||||||
"userCourseList": [],
|
"userCourseList": [],
|
||||||
}
|
}
|
||||||
|
|
||||||
course_list = []
|
course_list = []
|
||||||
next_idx = int(data.get("nextIndex", 0))
|
|
||||||
max_ct = int(data.get("maxCount", 300))
|
|
||||||
|
|
||||||
for x in range(next_idx, len(user_course_list)):
|
for x in range(min(len(rows), max_ct)):
|
||||||
tmp = user_course_list[x]._asdict()
|
tmp = rows[x]._asdict()
|
||||||
tmp.pop("user")
|
tmp.pop("user")
|
||||||
tmp.pop("id")
|
tmp.pop("id")
|
||||||
course_list.append(tmp)
|
course_list.append(tmp)
|
||||||
|
|
||||||
if len(user_course_list) >= max_ct:
|
if len(rows) > max_ct:
|
||||||
break
|
|
||||||
|
|
||||||
if len(user_course_list) >= next_idx + max_ct:
|
|
||||||
next_idx += max_ct
|
next_idx += max_ct
|
||||||
else:
|
else:
|
||||||
next_idx = -1
|
next_idx = -1
|
||||||
@ -425,75 +431,94 @@ class ChuniBase:
|
|||||||
}
|
}
|
||||||
|
|
||||||
async def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict:
|
async def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict:
|
||||||
rival_id = data["rivalId"]
|
user_id = int(data["userId"])
|
||||||
next_index = int(data["nextIndex"])
|
rival_id = int(data["rivalId"])
|
||||||
max_count = int(data["maxCount"])
|
next_idx = int(data["nextIndex"])
|
||||||
user_rival_music_list = []
|
max_ct = int(data["maxCount"])
|
||||||
|
rival_levels = [int(x["level"]) for x in data["userRivalMusicLevelList"]]
|
||||||
|
|
||||||
# Fetch all the rival music entries for the user
|
# Fetch all the rival music entries for the user
|
||||||
all_entries = await self.data.score.get_rival_music(rival_id)
|
music_detail_rows = await self.data.score.get_scores(
|
||||||
|
rival_id,
|
||||||
|
levels=rival_levels,
|
||||||
|
limit=max_ct + 1,
|
||||||
|
offset=next_idx,
|
||||||
|
)
|
||||||
|
|
||||||
# Process the entries based on max_count and nextIndex
|
if music_detail_rows is None or len(music_detail_rows) == 0:
|
||||||
for music in all_entries:
|
return {
|
||||||
music_id = music["musicId"]
|
"userId": user_id,
|
||||||
level = music["level"]
|
"rivalId": rival_id,
|
||||||
score = music["scoreMax"]
|
"nextIndex": -1,
|
||||||
rank = music["scoreRank"]
|
"userRivalMusicList": [],
|
||||||
|
}
|
||||||
|
|
||||||
# Create a music entry for the current music_id if it's unique
|
music_details = [x._asdict() for x in music_detail_rows]
|
||||||
music_entry = next((entry for entry in user_rival_music_list if entry["musicId"] == music_id), None)
|
returned_music_details_count = 0
|
||||||
if music_entry is None:
|
music_list = []
|
||||||
music_entry = {
|
|
||||||
"musicId": music_id,
|
|
||||||
"length": 0,
|
|
||||||
"userRivalMusicDetailList": []
|
|
||||||
}
|
|
||||||
user_rival_music_list.append(music_entry)
|
|
||||||
|
|
||||||
# Create a level entry for the current level if it's unique or has a higher score
|
# note that itertools.groupby will only work on sorted keys, which is already sorted by
|
||||||
level_entry = next((entry for entry in music_entry["userRivalMusicDetailList"] if entry["level"] == level), None)
|
# the query in get_scores
|
||||||
if level_entry is None:
|
for music_id, details_iter in itertools.groupby(music_details, key=lambda x: x["musicId"]):
|
||||||
level_entry = {
|
details: list[dict[Any, Any]] = [
|
||||||
"level": level,
|
{"level": d["level"], "scoreMax": d["scoreMax"]}
|
||||||
"scoreMax": score,
|
for d in details_iter
|
||||||
"scoreRank": rank
|
]
|
||||||
}
|
|
||||||
music_entry["userRivalMusicDetailList"].append(level_entry)
|
|
||||||
elif score > level_entry["scoreMax"]:
|
|
||||||
level_entry["scoreMax"] = score
|
|
||||||
level_entry["scoreRank"] = rank
|
|
||||||
|
|
||||||
# Calculate the length for each "musicId" by counting the unique levels
|
music_list.append({"musicId": music_id, "length": len(details), "userMusicDetailList": details})
|
||||||
for music_entry in user_rival_music_list:
|
returned_music_details_count += len(details)
|
||||||
music_entry["length"] = len(music_entry["userRivalMusicDetailList"])
|
|
||||||
|
|
||||||
# Prepare the result dictionary with user rival music data
|
if len(music_list) >= max_ct:
|
||||||
result = {
|
break
|
||||||
"userId": data["userId"],
|
|
||||||
"rivalId": data["rivalId"],
|
# if we returned fewer PBs than we originally asked for from the database, that means
|
||||||
"nextIndex": str(next_index + len(user_rival_music_list[next_index: next_index + max_count]) if max_count <= len(user_rival_music_list[next_index: next_index + max_count]) else -1),
|
# we queried for the PBs of max_ct + 1 songs.
|
||||||
"userRivalMusicList": user_rival_music_list[next_index: next_index + max_count]
|
if returned_music_details_count < len(music_detail_rows):
|
||||||
|
next_idx += max_ct
|
||||||
|
else:
|
||||||
|
next_idx = -1
|
||||||
|
|
||||||
|
return {
|
||||||
|
"userId": user_id,
|
||||||
|
"rivalId": rival_id,
|
||||||
|
"length": len(music_list),
|
||||||
|
"nextIndex": next_idx,
|
||||||
|
"userRivalMusicList": music_list,
|
||||||
}
|
}
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
async def handle_get_user_favorite_item_api_request(self, data: Dict) -> Dict:
|
async def handle_get_user_favorite_item_api_request(self, data: Dict) -> Dict:
|
||||||
|
user_id = int(data["userId"])
|
||||||
|
next_idx = int(data["nextIndex"])
|
||||||
|
max_ct = int(data["maxCount"])
|
||||||
|
kind = int(data["kind"])
|
||||||
|
is_all_favorite_item = str(data["isAllFavoriteItem"]) == "true"
|
||||||
|
|
||||||
user_fav_item_list = []
|
user_fav_item_list = []
|
||||||
|
|
||||||
# still needs to be implemented on WebUI
|
# still needs to be implemented on WebUI
|
||||||
# 1: Music, 2: User, 3: Character
|
# 1: Music, 2: User, 3: Character
|
||||||
fav_list = await self.data.item.get_all_favorites(
|
fav_list = await self.data.item.get_all_favorites(
|
||||||
data["userId"], self.version, fav_kind=int(data["kind"])
|
user_id,
|
||||||
|
self.version,
|
||||||
|
fav_kind=kind,
|
||||||
|
limit=max_ct + 1,
|
||||||
|
offset=next_idx,
|
||||||
)
|
)
|
||||||
|
|
||||||
if fav_list is not None:
|
if fav_list is not None:
|
||||||
for fav in fav_list:
|
for fav in fav_list:
|
||||||
user_fav_item_list.append({"id": fav["favId"]})
|
user_fav_item_list.append({"id": fav["favId"]})
|
||||||
|
|
||||||
|
if fav_list is None or len(fav_list) <= max_ct:
|
||||||
|
next_idx = -1
|
||||||
|
else:
|
||||||
|
next_idx += max_ct
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"userId": data["userId"],
|
"userId": user_id,
|
||||||
"length": len(user_fav_item_list),
|
"length": len(user_fav_item_list),
|
||||||
"kind": data["kind"],
|
"kind": kind,
|
||||||
"nextIndex": -1,
|
"nextIndex": next_idx,
|
||||||
"userFavoriteItemList": user_fav_item_list,
|
"userFavoriteItemList": user_fav_item_list,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -505,36 +530,39 @@ class ChuniBase:
|
|||||||
return {"userId": data["userId"], "length": 0, "userFavoriteMusicList": []}
|
return {"userId": data["userId"], "length": 0, "userFavoriteMusicList": []}
|
||||||
|
|
||||||
async def handle_get_user_item_api_request(self, data: Dict) -> Dict:
|
async def handle_get_user_item_api_request(self, data: Dict) -> Dict:
|
||||||
kind = int(int(data["nextIndex"]) / 10000000000)
|
user_id = int(data["userId"])
|
||||||
next_idx = int(int(data["nextIndex"]) % 10000000000)
|
next_idx = int(data["nextIndex"])
|
||||||
user_item_list = await self.data.item.get_items(data["userId"], kind)
|
max_ct = int(data["maxCount"])
|
||||||
|
|
||||||
if user_item_list is None or len(user_item_list) == 0:
|
kind = next_idx // 10000000000
|
||||||
|
next_idx = next_idx % 10000000000
|
||||||
|
rows = await self.data.item.get_items(
|
||||||
|
user_id, kind, limit=max_ct + 1, offset=next_idx
|
||||||
|
)
|
||||||
|
|
||||||
|
if rows is None or len(rows) == 0:
|
||||||
return {
|
return {
|
||||||
"userId": data["userId"],
|
"userId": user_id,
|
||||||
"nextIndex": -1,
|
"nextIndex": -1,
|
||||||
"itemKind": kind,
|
"itemKind": kind,
|
||||||
"userItemList": [],
|
"userItemList": [],
|
||||||
}
|
}
|
||||||
|
|
||||||
items: List[Dict[str, Any]] = []
|
items: List[Dict[str, Any]] = []
|
||||||
for i in range(next_idx, len(user_item_list)):
|
|
||||||
tmp = user_item_list[i]._asdict()
|
for i in range(min(len(rows), max_ct)):
|
||||||
|
tmp = rows[i]._asdict()
|
||||||
tmp.pop("user")
|
tmp.pop("user")
|
||||||
tmp.pop("id")
|
tmp.pop("id")
|
||||||
items.append(tmp)
|
items.append(tmp)
|
||||||
if len(items) >= int(data["maxCount"]):
|
|
||||||
break
|
|
||||||
|
|
||||||
xout = kind * 10000000000 + next_idx + len(items)
|
if len(rows) > max_ct:
|
||||||
|
next_idx = kind * 10000000000 + next_idx + max_ct
|
||||||
if len(items) < int(data["maxCount"]):
|
|
||||||
next_idx = 0
|
|
||||||
else:
|
else:
|
||||||
next_idx = xout
|
next_idx = -1
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"userId": data["userId"],
|
"userId": user_id,
|
||||||
"nextIndex": next_idx,
|
"nextIndex": next_idx,
|
||||||
"itemKind": kind,
|
"itemKind": kind,
|
||||||
"length": len(items),
|
"length": len(items),
|
||||||
@ -586,62 +614,55 @@ class ChuniBase:
|
|||||||
}
|
}
|
||||||
|
|
||||||
async def handle_get_user_music_api_request(self, data: Dict) -> Dict:
|
async def handle_get_user_music_api_request(self, data: Dict) -> Dict:
|
||||||
music_detail = await self.data.score.get_scores(data["userId"])
|
user_id = int(data["userId"])
|
||||||
if music_detail is None:
|
next_idx = int(data["nextIndex"])
|
||||||
|
max_ct = int(data["maxCount"])
|
||||||
|
|
||||||
|
rows = await self.data.score.get_scores(
|
||||||
|
user_id, limit=max_ct + 1, offset=next_idx
|
||||||
|
)
|
||||||
|
|
||||||
|
if rows is None or len(rows) == 0:
|
||||||
return {
|
return {
|
||||||
"userId": data["userId"],
|
"userId": user_id,
|
||||||
"length": 0,
|
"length": 0,
|
||||||
"nextIndex": -1,
|
"nextIndex": -1,
|
||||||
"userMusicList": [], # 240
|
"userMusicList": [], # 240
|
||||||
}
|
}
|
||||||
|
|
||||||
song_list = []
|
music_details = [x._asdict() for x in rows]
|
||||||
next_idx = int(data["nextIndex"])
|
returned_music_details_count = 0
|
||||||
max_ct = int(data["maxCount"])
|
music_list = []
|
||||||
|
|
||||||
for x in range(next_idx, len(music_detail)):
|
# note that itertools.groupby will only work on sorted keys, which is already sorted by
|
||||||
found = False
|
# the query in get_scores
|
||||||
tmp = music_detail[x]._asdict()
|
for _music_id, details_iter in itertools.groupby(music_details, key=lambda x: x["musicId"]):
|
||||||
tmp.pop("user")
|
details: list[dict[Any, Any]] = []
|
||||||
tmp.pop("id")
|
|
||||||
|
|
||||||
for song in song_list:
|
for d in details_iter:
|
||||||
score_buf = SCORE_BUFFER.get(str(data["userId"])) or []
|
d.pop("id")
|
||||||
if song["userMusicDetailList"][0]["musicId"] == tmp["musicId"]:
|
d.pop("user")
|
||||||
found = True
|
|
||||||
song["userMusicDetailList"].append(tmp)
|
|
||||||
song["length"] = len(song["userMusicDetailList"])
|
|
||||||
score_buf.append(tmp["musicId"])
|
|
||||||
SCORE_BUFFER[str(data["userId"])] = score_buf
|
|
||||||
|
|
||||||
score_buf = SCORE_BUFFER.get(str(data["userId"])) or []
|
details.append(d)
|
||||||
if not found and tmp["musicId"] not in score_buf:
|
|
||||||
song_list.append({"length": 1, "userMusicDetailList": [tmp]})
|
|
||||||
score_buf.append(tmp["musicId"])
|
|
||||||
SCORE_BUFFER[str(data["userId"])] = score_buf
|
|
||||||
|
|
||||||
if len(song_list) >= max_ct:
|
music_list.append({"length": len(details), "userMusicDetailList": details})
|
||||||
|
returned_music_details_count += len(details)
|
||||||
|
|
||||||
|
if len(music_list) >= max_ct:
|
||||||
break
|
break
|
||||||
|
|
||||||
for songIdx in range(len(song_list)):
|
# if we returned fewer PBs than we originally asked for from the database, that means
|
||||||
for recordIdx in range(x+1, len(music_detail)):
|
# we queried for the PBs of max_ct + 1 songs.
|
||||||
if song_list[songIdx]["userMusicDetailList"][0]["musicId"] == music_detail[recordIdx]["musicId"]:
|
if returned_music_details_count < len(rows):
|
||||||
music = music_detail[recordIdx]._asdict()
|
next_idx += max_ct
|
||||||
music.pop("user")
|
|
||||||
music.pop("id")
|
|
||||||
song_list[songIdx]["userMusicDetailList"].append(music)
|
|
||||||
song_list[songIdx]["length"] += 1
|
|
||||||
|
|
||||||
if len(song_list) >= max_ct:
|
|
||||||
next_idx += len(song_list)
|
|
||||||
else:
|
else:
|
||||||
next_idx = -1
|
next_idx = -1
|
||||||
SCORE_BUFFER[str(data["userId"])] = []
|
|
||||||
return {
|
return {
|
||||||
"userId": data["userId"],
|
"userId": user_id,
|
||||||
"length": len(song_list),
|
"length": len(music_list),
|
||||||
"nextIndex": next_idx,
|
"nextIndex": next_idx,
|
||||||
"userMusicList": song_list, # 240
|
"userMusicList": music_list,
|
||||||
}
|
}
|
||||||
|
|
||||||
async def handle_get_user_option_api_request(self, data: Dict) -> Dict:
|
async def handle_get_user_option_api_request(self, data: Dict) -> Dict:
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
from enum import Enum
|
from enum import Enum, IntEnum
|
||||||
|
|
||||||
|
|
||||||
class ChuniConstants:
|
class ChuniConstants:
|
||||||
@ -81,12 +81,31 @@ class ChuniConstants:
|
|||||||
return cls.VERSION_NAMES[ver]
|
return cls.VERSION_NAMES[ver]
|
||||||
|
|
||||||
|
|
||||||
class MapAreaConditionType(Enum):
|
class MapAreaConditionType(IntEnum):
|
||||||
UNLOCKED = 0
|
"""Condition types for the GetGameMapAreaConditionApi endpoint. Incomplete.
|
||||||
|
|
||||||
|
For the MAP_CLEARED/MAP_AREA_CLEARED/TROPHY_OBTAINED conditions, the conditionId
|
||||||
|
is the map/map area/trophy.
|
||||||
|
|
||||||
|
For the RANK_*/ALL_JUSTICE conditions, the conditionId is songId * 100 + difficultyId.
|
||||||
|
For example, Halcyon [ULTIMA] would be 173 * 100 + 4 = 17304.
|
||||||
|
"""
|
||||||
|
|
||||||
|
ALWAYS_UNLOCKED = 0
|
||||||
|
|
||||||
MAP_CLEARED = 1
|
MAP_CLEARED = 1
|
||||||
MAP_AREA_CLEARED = 2
|
MAP_AREA_CLEARED = 2
|
||||||
|
|
||||||
TROPHY_OBTAINED = 3
|
TROPHY_OBTAINED = 3
|
||||||
|
|
||||||
|
RANK_SSS = 19
|
||||||
|
RANK_SSP = 20
|
||||||
|
RANK_SS = 21
|
||||||
|
RANK_SP = 22
|
||||||
|
RANK_S = 23
|
||||||
|
|
||||||
|
ALL_JUSTICE = 28
|
||||||
|
|
||||||
|
|
||||||
class MapAreaConditionLogicalOperator(Enum):
|
class MapAreaConditionLogicalOperator(Enum):
|
||||||
AND = 1
|
AND = 1
|
||||||
@ -102,11 +121,36 @@ class AvatarCategory(Enum):
|
|||||||
FRONT = 6
|
FRONT = 6
|
||||||
BACK = 7
|
BACK = 7
|
||||||
|
|
||||||
class ItemKind(Enum):
|
class ItemKind(IntEnum):
|
||||||
NAMEPLATE = 1
|
NAMEPLATE = 1
|
||||||
|
|
||||||
|
FRAME = 2
|
||||||
|
"""
|
||||||
|
"Frame" is the background for the gauge/score/max combo display
|
||||||
|
shown during gameplay. This item cannot be equipped (as of LUMINOUS)
|
||||||
|
and is hardcoded to the current game's version.
|
||||||
|
"""
|
||||||
|
|
||||||
TROPHY = 3
|
TROPHY = 3
|
||||||
|
SKILL = 4
|
||||||
|
|
||||||
TICKET = 5
|
TICKET = 5
|
||||||
|
"""A statue is also a ticket."""
|
||||||
|
|
||||||
PRESENT = 6
|
PRESENT = 6
|
||||||
|
MUSIC_UNLOCK = 7
|
||||||
MAP_ICON = 8
|
MAP_ICON = 8
|
||||||
SYSTEM_VOICE = 9
|
SYSTEM_VOICE = 9
|
||||||
|
SYMBOL_CHAT = 10
|
||||||
AVATAR_ACCESSORY = 11
|
AVATAR_ACCESSORY = 11
|
||||||
|
|
||||||
|
ULTIMA_UNLOCK = 12
|
||||||
|
"""This only applies to ULTIMA difficulties that are *not* unlocked by
|
||||||
|
SS-ing EXPERT+MASTER.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class FavoriteItemKind(IntEnum):
|
||||||
|
MUSIC = 1
|
||||||
|
RIVAL = 2
|
||||||
|
CHARACTER = 3
|
||||||
|
@ -4,12 +4,14 @@ from random import randint
|
|||||||
from typing import Dict
|
from typing import Dict
|
||||||
|
|
||||||
import pytz
|
import pytz
|
||||||
|
|
||||||
from core.config import CoreConfig
|
from core.config import CoreConfig
|
||||||
from core.utils import Utils
|
from core.utils import Utils
|
||||||
from titles.chuni.const import ChuniConstants
|
|
||||||
from titles.chuni.database import ChuniData
|
|
||||||
from titles.chuni.base import ChuniBase
|
from titles.chuni.base import ChuniBase
|
||||||
from titles.chuni.config import ChuniConfig
|
from titles.chuni.config import ChuniConfig
|
||||||
|
from titles.chuni.const import ChuniConstants
|
||||||
|
from titles.chuni.database import ChuniData
|
||||||
|
|
||||||
|
|
||||||
class ChuniNew(ChuniBase):
|
class ChuniNew(ChuniBase):
|
||||||
ITEM_TYPE = {"character": 20, "story": 21, "card": 22}
|
ITEM_TYPE = {"character": 20, "story": 21, "card": 22}
|
||||||
@ -285,30 +287,32 @@ class ChuniNew(ChuniBase):
|
|||||||
}
|
}
|
||||||
|
|
||||||
async def handle_get_user_printed_card_api_request(self, data: Dict) -> Dict:
|
async def handle_get_user_printed_card_api_request(self, data: Dict) -> Dict:
|
||||||
user_print_list = await self.data.item.get_user_print_states(
|
user_id = int(data["userId"])
|
||||||
data["userId"], has_completed=True
|
next_idx = int(data["nextIndex"])
|
||||||
|
max_ct = int(data["maxCount"])
|
||||||
|
|
||||||
|
rows = await self.data.item.get_user_print_states(
|
||||||
|
user_id,
|
||||||
|
has_completed=True,
|
||||||
|
limit=max_ct + 1,
|
||||||
|
offset=next_idx,
|
||||||
)
|
)
|
||||||
if user_print_list is None:
|
if rows is None or len(rows) == 0:
|
||||||
return {
|
return {
|
||||||
"userId": data["userId"],
|
"userId": user_id,
|
||||||
"length": 0,
|
"length": 0,
|
||||||
"nextIndex": -1,
|
"nextIndex": -1,
|
||||||
"userPrintedCardList": [],
|
"userPrintedCardList": [],
|
||||||
}
|
}
|
||||||
|
|
||||||
print_list = []
|
print_list = []
|
||||||
next_idx = int(data["nextIndex"])
|
|
||||||
max_ct = int(data["maxCount"])
|
|
||||||
|
|
||||||
for x in range(next_idx, len(user_print_list)):
|
for i in range(min(max_ct, len(rows))):
|
||||||
tmp = user_print_list[x]._asdict()
|
tmp = rows[i]._asdict()
|
||||||
print_list.append(tmp["cardId"])
|
print_list.append(tmp["cardId"])
|
||||||
|
|
||||||
if len(print_list) >= max_ct:
|
if len(rows) > max_ct:
|
||||||
break
|
next_idx += max_ct
|
||||||
|
|
||||||
if len(print_list) >= max_ct:
|
|
||||||
next_idx = next_idx + max_ct
|
|
||||||
else:
|
else:
|
||||||
next_idx = -1
|
next_idx = -1
|
||||||
|
|
||||||
|
@ -1,22 +1,22 @@
|
|||||||
from typing import Dict, List, Optional
|
from typing import Dict, List, Optional
|
||||||
|
|
||||||
from sqlalchemy import (
|
from sqlalchemy import (
|
||||||
Table,
|
|
||||||
Column,
|
Column,
|
||||||
UniqueConstraint,
|
|
||||||
PrimaryKeyConstraint,
|
PrimaryKeyConstraint,
|
||||||
|
Table,
|
||||||
|
UniqueConstraint,
|
||||||
and_,
|
and_,
|
||||||
delete,
|
delete,
|
||||||
)
|
)
|
||||||
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON
|
|
||||||
from sqlalchemy.engine.base import Connection
|
|
||||||
from sqlalchemy.schema import ForeignKey
|
|
||||||
from sqlalchemy.sql import func, select
|
|
||||||
from sqlalchemy.dialects.mysql import insert
|
from sqlalchemy.dialects.mysql import insert
|
||||||
from sqlalchemy.engine import Row
|
from sqlalchemy.engine import Row
|
||||||
|
from sqlalchemy.schema import ForeignKey
|
||||||
|
from sqlalchemy.sql import func, select
|
||||||
|
from sqlalchemy.types import JSON, TIMESTAMP, Boolean, Integer, String
|
||||||
|
|
||||||
from core.data.schema import BaseData, metadata
|
from core.data.schema import BaseData, metadata
|
||||||
|
|
||||||
character = Table(
|
character: Table = Table(
|
||||||
"chuni_item_character",
|
"chuni_item_character",
|
||||||
metadata,
|
metadata,
|
||||||
Column("id", Integer, primary_key=True, nullable=False),
|
Column("id", Integer, primary_key=True, nullable=False),
|
||||||
@ -40,7 +40,7 @@ character = Table(
|
|||||||
mysql_charset="utf8mb4",
|
mysql_charset="utf8mb4",
|
||||||
)
|
)
|
||||||
|
|
||||||
item = Table(
|
item: Table = Table(
|
||||||
"chuni_item_item",
|
"chuni_item_item",
|
||||||
metadata,
|
metadata,
|
||||||
Column("id", Integer, primary_key=True, nullable=False),
|
Column("id", Integer, primary_key=True, nullable=False),
|
||||||
@ -141,7 +141,7 @@ gacha = Table(
|
|||||||
mysql_charset="utf8mb4",
|
mysql_charset="utf8mb4",
|
||||||
)
|
)
|
||||||
|
|
||||||
print_state = Table(
|
print_state: Table = Table(
|
||||||
"chuni_item_print_state",
|
"chuni_item_print_state",
|
||||||
metadata,
|
metadata,
|
||||||
Column("id", Integer, primary_key=True, nullable=False),
|
Column("id", Integer, primary_key=True, nullable=False),
|
||||||
@ -210,7 +210,7 @@ login_bonus = Table(
|
|||||||
mysql_charset="utf8mb4",
|
mysql_charset="utf8mb4",
|
||||||
)
|
)
|
||||||
|
|
||||||
favorite = Table(
|
favorite: Table = Table(
|
||||||
"chuni_item_favorite",
|
"chuni_item_favorite",
|
||||||
metadata,
|
metadata,
|
||||||
Column("id", Integer, primary_key=True, nullable=False),
|
Column("id", Integer, primary_key=True, nullable=False),
|
||||||
@ -379,9 +379,14 @@ class ChuniItemData(BaseData):
|
|||||||
return True if len(result.all()) else False
|
return True if len(result.all()) else False
|
||||||
|
|
||||||
async def get_all_favorites(
|
async def get_all_favorites(
|
||||||
self, user_id: int, version: int, fav_kind: int = 1
|
self,
|
||||||
|
user_id: int,
|
||||||
|
version: int,
|
||||||
|
fav_kind: int = 1,
|
||||||
|
limit: Optional[int] = None,
|
||||||
|
offset: int = 0,
|
||||||
) -> Optional[List[Row]]:
|
) -> Optional[List[Row]]:
|
||||||
sql = favorite.select(
|
sql = select(favorite).where(
|
||||||
and_(
|
and_(
|
||||||
favorite.c.version == version,
|
favorite.c.version == version,
|
||||||
favorite.c.user == user_id,
|
favorite.c.user == user_id,
|
||||||
@ -389,6 +394,9 @@ class ChuniItemData(BaseData):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if limit is not None:
|
||||||
|
sql = sql.order_by(favorite.c.id.asc()).limit(limit).offset(offset)
|
||||||
|
|
||||||
result = await self.execute(sql)
|
result = await self.execute(sql)
|
||||||
if result is None:
|
if result is None:
|
||||||
return None
|
return None
|
||||||
@ -488,9 +496,14 @@ class ChuniItemData(BaseData):
|
|||||||
return None
|
return None
|
||||||
return result.fetchone()
|
return result.fetchone()
|
||||||
|
|
||||||
async def get_characters(self, user_id: int) -> Optional[List[Row]]:
|
async def get_characters(
|
||||||
|
self, user_id: int, limit: Optional[int] = None, offset: int = 0
|
||||||
|
) -> Optional[List[Row]]:
|
||||||
sql = select(character).where(character.c.user == user_id)
|
sql = select(character).where(character.c.user == user_id)
|
||||||
|
|
||||||
|
if limit is not None:
|
||||||
|
sql = sql.limit(limit).offset(offset).order_by(character.c.id.asc())
|
||||||
|
|
||||||
result = await self.execute(sql)
|
result = await self.execute(sql)
|
||||||
if result is None:
|
if result is None:
|
||||||
return None
|
return None
|
||||||
@ -509,7 +522,13 @@ class ChuniItemData(BaseData):
|
|||||||
return None
|
return None
|
||||||
return result.lastrowid
|
return result.lastrowid
|
||||||
|
|
||||||
async def get_items(self, user_id: int, kind: int = None) -> Optional[List[Row]]:
|
async def get_items(
|
||||||
|
self,
|
||||||
|
user_id: int,
|
||||||
|
kind: Optional[int] = None,
|
||||||
|
limit: Optional[int] = None,
|
||||||
|
offset: int = 0
|
||||||
|
) -> Optional[List[Row]]:
|
||||||
if kind is None:
|
if kind is None:
|
||||||
sql = select(item).where(item.c.user == user_id)
|
sql = select(item).where(item.c.user == user_id)
|
||||||
else:
|
else:
|
||||||
@ -517,6 +536,9 @@ class ChuniItemData(BaseData):
|
|||||||
and_(item.c.user == user_id, item.c.itemKind == kind)
|
and_(item.c.user == user_id, item.c.itemKind == kind)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if limit is not None:
|
||||||
|
sql = sql.order_by(item.c.id.asc()).limit(limit).offset(offset)
|
||||||
|
|
||||||
result = await self.execute(sql)
|
result = await self.execute(sql)
|
||||||
if result is None:
|
if result is None:
|
||||||
return None
|
return None
|
||||||
@ -609,15 +631,22 @@ class ChuniItemData(BaseData):
|
|||||||
return result.lastrowid
|
return result.lastrowid
|
||||||
|
|
||||||
async def get_user_print_states(
|
async def get_user_print_states(
|
||||||
self, aime_id: int, has_completed: bool = False
|
self,
|
||||||
|
aime_id: int,
|
||||||
|
has_completed: bool = False,
|
||||||
|
limit: Optional[int] = None,
|
||||||
|
offset: int = 0,
|
||||||
) -> Optional[List[Row]]:
|
) -> Optional[List[Row]]:
|
||||||
sql = print_state.select(
|
sql = select(print_state).where(
|
||||||
and_(
|
and_(
|
||||||
print_state.c.user == aime_id,
|
print_state.c.user == aime_id,
|
||||||
print_state.c.hasCompleted == has_completed,
|
print_state.c.hasCompleted == has_completed,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if limit is not None:
|
||||||
|
sql = sql.order_by(print_state.c.id.asc()).limit(limit).offset(offset)
|
||||||
|
|
||||||
result = await self.execute(sql)
|
result = await self.execute(sql)
|
||||||
if result is None:
|
if result is None:
|
||||||
return None
|
return None
|
||||||
|
@ -1,16 +1,19 @@
|
|||||||
from typing import Dict, List, Optional
|
from typing import Dict, List, Optional, Union
|
||||||
from sqlalchemy import Table, Column, UniqueConstraint, PrimaryKeyConstraint, and_
|
|
||||||
from sqlalchemy.types import Integer, String, TIMESTAMP, Boolean, JSON, BigInteger
|
from sqlalchemy import Column, PrimaryKeyConstraint, Table, UniqueConstraint, and_
|
||||||
|
from sqlalchemy.dialects.mysql import insert
|
||||||
|
from sqlalchemy.engine import Row
|
||||||
from sqlalchemy.engine.base import Connection
|
from sqlalchemy.engine.base import Connection
|
||||||
from sqlalchemy.schema import ForeignKey
|
from sqlalchemy.schema import ForeignKey
|
||||||
from sqlalchemy.engine import Row
|
|
||||||
from sqlalchemy.sql import func, select
|
from sqlalchemy.sql import func, select
|
||||||
from sqlalchemy.dialects.mysql import insert
|
|
||||||
from sqlalchemy.sql.expression import exists
|
from sqlalchemy.sql.expression import exists
|
||||||
|
from sqlalchemy.types import JSON, TIMESTAMP, BigInteger, Boolean, Integer, String
|
||||||
|
|
||||||
from core.data.schema import BaseData, metadata
|
from core.data.schema import BaseData, metadata
|
||||||
|
|
||||||
from ..config import ChuniConfig
|
from ..config import ChuniConfig
|
||||||
|
|
||||||
course = Table(
|
course: Table = Table(
|
||||||
"chuni_score_course",
|
"chuni_score_course",
|
||||||
metadata,
|
metadata,
|
||||||
Column("id", Integer, primary_key=True, nullable=False),
|
Column("id", Integer, primary_key=True, nullable=False),
|
||||||
@ -41,7 +44,7 @@ course = Table(
|
|||||||
mysql_charset="utf8mb4",
|
mysql_charset="utf8mb4",
|
||||||
)
|
)
|
||||||
|
|
||||||
best_score = Table(
|
best_score: Table = Table(
|
||||||
"chuni_score_best",
|
"chuni_score_best",
|
||||||
metadata,
|
metadata,
|
||||||
Column("id", Integer, primary_key=True, nullable=False),
|
Column("id", Integer, primary_key=True, nullable=False),
|
||||||
@ -229,9 +232,12 @@ class ChuniRomVersion():
|
|||||||
return -1
|
return -1
|
||||||
|
|
||||||
class ChuniScoreData(BaseData):
|
class ChuniScoreData(BaseData):
|
||||||
async def get_courses(self, aime_id: int) -> Optional[Row]:
|
async def get_courses(self, aime_id: int, limit: Optional[int] = None, offset: int = 0) -> Optional[List[Row]]:
|
||||||
sql = select(course).where(course.c.user == aime_id)
|
sql = select(course).where(course.c.user == aime_id)
|
||||||
|
|
||||||
|
if limit is not None:
|
||||||
|
sql = sql.order_by(course.c.id.asc()).limit(limit).offset(offset)
|
||||||
|
|
||||||
result = await self.execute(sql)
|
result = await self.execute(sql)
|
||||||
if result is None:
|
if result is None:
|
||||||
return None
|
return None
|
||||||
@ -249,8 +255,40 @@ class ChuniScoreData(BaseData):
|
|||||||
return None
|
return None
|
||||||
return result.lastrowid
|
return result.lastrowid
|
||||||
|
|
||||||
async def get_scores(self, aime_id: int) -> Optional[Row]:
|
async def get_scores(
|
||||||
sql = select(best_score).where(best_score.c.user == aime_id)
|
self,
|
||||||
|
aime_id: int,
|
||||||
|
levels: Optional[list[int]] = None,
|
||||||
|
limit: Optional[int] = None,
|
||||||
|
offset: int = 0,
|
||||||
|
) -> Optional[List[Row]]:
|
||||||
|
condition = best_score.c.user == aime_id
|
||||||
|
|
||||||
|
if levels is not None:
|
||||||
|
condition &= best_score.c.level.in_(levels)
|
||||||
|
|
||||||
|
if limit is None:
|
||||||
|
sql = (
|
||||||
|
select(best_score)
|
||||||
|
.where(condition)
|
||||||
|
.order_by(best_score.c.musicId.asc(), best_score.c.level.asc())
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
subq = (
|
||||||
|
select(best_score.c.musicId)
|
||||||
|
.distinct()
|
||||||
|
.where(condition)
|
||||||
|
.order_by(best_score.c.musicId.asc())
|
||||||
|
.limit(limit)
|
||||||
|
.offset(offset)
|
||||||
|
.subquery()
|
||||||
|
)
|
||||||
|
sql = (
|
||||||
|
select(best_score)
|
||||||
|
.join(subq, best_score.c.musicId == subq.c.musicId)
|
||||||
|
.where(condition)
|
||||||
|
.order_by(best_score.c.musicId.asc(), best_score.c.level.asc())
|
||||||
|
)
|
||||||
|
|
||||||
result = await self.execute(sql)
|
result = await self.execute(sql)
|
||||||
if result is None:
|
if result is None:
|
||||||
@ -360,11 +398,3 @@ class ChuniScoreData(BaseData):
|
|||||||
|
|
||||||
rows = result.fetchall()
|
rows = result.fetchall()
|
||||||
return [dict(row) for row in rows]
|
return [dict(row) for row in rows]
|
||||||
|
|
||||||
async def get_rival_music(self, rival_id: int) -> Optional[List[Dict]]:
|
|
||||||
sql = select(best_score).where(best_score.c.user == rival_id)
|
|
||||||
|
|
||||||
result = await self.execute(sql)
|
|
||||||
if result is None:
|
|
||||||
return None
|
|
||||||
return result.fetchall()
|
|
||||||
|
Reference in New Issue
Block a user