forked from Dniel97/artemis
942 lines
41 KiB
Python
942 lines
41 KiB
Python
from typing import Any, List, Dict
|
|
import logging
|
|
from math import floor
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
from core.config import CoreConfig
|
|
from titles.wacca.config import WaccaConfig
|
|
from titles.wacca.const import WaccaConstants
|
|
from titles.wacca.database import WaccaData
|
|
|
|
from titles.wacca.handlers import *
|
|
|
|
class WaccaBase():
|
|
def __init__(self, cfg: CoreConfig, game_cfg: WaccaConfig) -> None:
|
|
self.config = cfg # Config file
|
|
self.game_config = game_cfg # Game Config file
|
|
self.game = WaccaConstants.GAME_CODE # Game code
|
|
self.version = WaccaConstants.VER_WACCA # Game version
|
|
self.data = WaccaData(cfg) # Database
|
|
self.logger = logging.getLogger("wacca")
|
|
self.srvtime = datetime.now()
|
|
self.season = 1
|
|
|
|
self.OPTIONS_DEFAULTS: Dict[str, Any] = {
|
|
"note_speed": 5,
|
|
"field_mask": 0,
|
|
"note_sound": 105001,
|
|
"note_color": 203001,
|
|
"bgm_volume": 10,
|
|
"bg_video": 0,
|
|
|
|
"mirror": 0,
|
|
"judge_display_pos": 0,
|
|
"judge_detail_display": 0,
|
|
"measure_guidelines": 1,
|
|
"guideline_mask": 1,
|
|
"judge_line_timing_adjust": 10,
|
|
"note_design": 3,
|
|
"bonus_effect": 1,
|
|
"chara_voice": 1,
|
|
"score_display_method": 0,
|
|
"give_up": 0,
|
|
"guideline_spacing": 1,
|
|
"center_display": 1,
|
|
"ranking_display": 1,
|
|
"stage_up_icon_display": 1,
|
|
"rating_display": 1,
|
|
"player_level_display": 1,
|
|
"touch_effect": 1,
|
|
"guide_sound_vol": 3,
|
|
"touch_note_vol": 8,
|
|
"hold_note_vol": 8,
|
|
"slide_note_vol": 8,
|
|
"snap_note_vol": 8,
|
|
"chain_note_vol": 8,
|
|
"bonus_note_vol": 8,
|
|
"gate_skip": 0,
|
|
"key_beam_display": 1,
|
|
|
|
"left_slide_note_color": 4,
|
|
"right_slide_note_color": 3,
|
|
"forward_slide_note_color": 1,
|
|
"back_slide_note_color": 2,
|
|
|
|
"master_vol": 3,
|
|
"set_title_id": 104001,
|
|
"set_icon_id": 102001,
|
|
"set_nav_id": 210001,
|
|
"set_plate_id": 211001
|
|
}
|
|
self.allowed_stages = []
|
|
|
|
def handle_housing_get_request(self, data: Dict) -> Dict:
|
|
req = BaseRequest(data)
|
|
housing_id = 1337
|
|
self.logger.info(f"{req.chipId} -> {housing_id}")
|
|
resp = HousingGetResponse(housing_id)
|
|
return resp.make()
|
|
|
|
def handle_housing_start_request(self, data: Dict) -> Dict:
|
|
req = HousingStartRequest(data)
|
|
|
|
resp = HousingStartResponseV1(
|
|
1,
|
|
[ # Recomended songs
|
|
1269,1007,1270,1002,1020,1003,1008,1211,1018,1092,1056,32,
|
|
1260,1230,1258,1251,2212,1264,1125,1037,2001,1272,1126,1119,
|
|
1104,1070,1047,1044,1027,1004,1001,24,2068,2062,2021,1275,
|
|
1249,1207,1203,1107,1021,1009,9,4,3,23,22,2014,13,1276,1247,
|
|
1240,1237,1128,1114,1110,1109,1102,1045,1043,1036,1035,1030,
|
|
1023,1015
|
|
]
|
|
)
|
|
return resp.make()
|
|
|
|
def handle_advertise_GetNews_request(self, data: Dict) -> Dict:
|
|
resp = GetNewsResponseV1()
|
|
return resp.make()
|
|
|
|
def handle_user_status_logout_request(self, data: Dict) -> Dict:
|
|
req = UserStatusLogoutRequest(data)
|
|
self.logger.info(f"Log out user {req.userId} from {req.chipId}")
|
|
return BaseResponse().make()
|
|
|
|
def handle_user_status_login_request(self, data: Dict) -> List[Any]:
|
|
req = UserStatusLoginRequest(data)
|
|
resp = UserStatusLoginResponseV1()
|
|
is_new_day = False
|
|
is_consec_day = False
|
|
is_consec_day = True
|
|
|
|
if req.userId == 0:
|
|
self.logger.info(f"Guest login on {req.chipId}")
|
|
resp.lastLoginDate = 0
|
|
|
|
else:
|
|
profile = self.data.profile.get_profile(req.userId)
|
|
if profile is None:
|
|
self.logger.warn(f"Unknown user id {req.userId} attempted login from {req.chipId}")
|
|
return resp.make()
|
|
|
|
self.logger.info(f"User {req.userId} login on {req.chipId}")
|
|
last_login_time = int(profile["last_login_date"].timestamp())
|
|
resp.lastLoginDate = last_login_time
|
|
|
|
# If somebodies login timestamp < midnight of current day, then they are logging in for the first time today
|
|
if last_login_time < int(datetime.now().replace(hour=0,minute=0,second=0,microsecond=0).timestamp()):
|
|
is_new_day = True
|
|
is_consec_day = True
|
|
|
|
# If somebodies login timestamp > midnight of current day + 1 day, then they broke their daily login streak
|
|
elif last_login_time > int((datetime.now().replace(hour=0,minute=0,second=0,microsecond=0) + timedelta(days=1)).timestamp()):
|
|
is_consec_day = False
|
|
# else, they are simply logging in again on the same day, and we don't need to do anything for that
|
|
|
|
self.data.profile.session_login(req.userId, is_new_day, is_consec_day)
|
|
|
|
resp.firstLoginDaily = int(is_new_day)
|
|
|
|
return resp.make()
|
|
|
|
def handle_user_status_get_request(self, data: Dict) -> List[Any]:
|
|
req = UserStatusGetRequest(data)
|
|
resp = UserStatusGetV1Response()
|
|
ver_split = req.appVersion.split(".")
|
|
|
|
profile = self.data.profile.get_profile(aime_id=req.aimeId)
|
|
if profile is None:
|
|
self.logger.info(f"No user exists for aime id {req.aimeId}")
|
|
return resp.make()
|
|
|
|
|
|
self.logger.info(f"User preview for {req.aimeId} from {req.chipId}")
|
|
if profile["last_game_ver"] is None:
|
|
profile_ver_split = ver_split
|
|
resp.lastGameVersion = req.appVersion
|
|
else:
|
|
profile_ver_split = profile["last_game_ver"].split(".")
|
|
resp.lastGameVersion = profile["last_game_ver"]
|
|
|
|
resp.userStatus.userId = profile["id"]
|
|
resp.userStatus.username = profile["username"]
|
|
resp.userStatus.xp = profile["xp"]
|
|
resp.userStatus.danLevel = profile["dan_level"]
|
|
resp.userStatus.danType = profile["dan_type"]
|
|
resp.userStatus.wp = profile["wp"]
|
|
resp.userStatus.useCount = profile["login_count"]
|
|
resp.userStatus.loginDays = profile["login_count_days"]
|
|
resp.userStatus.loginConsecutiveDays = profile["login_count_days_consec"]
|
|
|
|
set_title_id = self.data.profile.get_options(WaccaConstants.OPTIONS["set_title_id"], profile["user"])
|
|
if set_title_id is None:
|
|
set_title_id = self.OPTIONS_DEFAULTS["set_title_id"]
|
|
resp.setTitleId = set_title_id
|
|
|
|
set_icon_id = self.data.profile.get_options(WaccaConstants.OPTIONS["set_title_id"], profile["user"])
|
|
if set_icon_id is None:
|
|
set_icon_id = self.OPTIONS_DEFAULTS["set_icon_id"]
|
|
resp.setIconId = set_icon_id
|
|
|
|
if profile["last_login_date"].timestamp() < int((datetime.now().replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(days=1)).timestamp()):
|
|
resp.userStatus.loginConsecutiveDays = 0
|
|
|
|
if int(ver_split[0]) > int(profile_ver_split[0]):
|
|
resp.versionStatus = PlayVersionStatus.VersionUpgrade
|
|
|
|
elif int(ver_split[0]) < int(profile_ver_split[0]):
|
|
resp.versionStatus = PlayVersionStatus.VersionTooNew
|
|
|
|
else:
|
|
if int(ver_split[1]) > int(profile_ver_split[1]):
|
|
resp.versionStatus = PlayVersionStatus.VersionUpgrade
|
|
|
|
elif int(ver_split[1]) < int(profile_ver_split[1]):
|
|
resp.versionStatus = PlayVersionStatus.VersionTooNew
|
|
|
|
else:
|
|
if int(ver_split[2]) > int(profile_ver_split[2]):
|
|
resp.versionStatus = PlayVersionStatus.VersionUpgrade
|
|
|
|
|
|
elif int(ver_split[2]) < int(profile_ver_split[2]):
|
|
resp.versionStatus = PlayVersionStatus.VersionTooNew
|
|
|
|
if profile["always_vip"]:
|
|
resp.userStatus.vipExpireTime = int((datetime.now() + timedelta(days=30)).timestamp())
|
|
|
|
elif profile["vip_expire_time"] is not None:
|
|
resp.userStatus.vipExpireTime = int(profile["vip_expire_time"].timestamp())
|
|
|
|
return resp.make()
|
|
|
|
def handle_user_status_login_request(self, data: Dict) -> List[Any]:
|
|
req = UserStatusLoginRequest(data)
|
|
resp = UserStatusLoginResponseV2()
|
|
is_new_day = False
|
|
is_consec_day = False
|
|
is_consec_day = True
|
|
|
|
if req.userId == 0:
|
|
self.logger.info(f"Guest login on {req.chipId}")
|
|
resp.lastLoginDate = 0
|
|
|
|
else:
|
|
profile = self.data.profile.get_profile(req.userId)
|
|
if profile is None:
|
|
self.logger.warn(f"Unknown user id {req.userId} attempted login from {req.chipId}")
|
|
return resp.make()
|
|
|
|
self.logger.info(f"User {req.userId} login on {req.chipId}")
|
|
last_login_time = int(profile["last_login_date"].timestamp())
|
|
resp.lastLoginDate = last_login_time
|
|
|
|
# If somebodies login timestamp < midnight of current day, then they are logging in for the first time today
|
|
if last_login_time < int(datetime.now().replace(hour=0,minute=0,second=0,microsecond=0).timestamp()):
|
|
is_new_day = True
|
|
is_consec_day = True
|
|
|
|
# If somebodies login timestamp > midnight of current day + 1 day, then they broke their daily login streak
|
|
elif last_login_time > int((datetime.now().replace(hour=0,minute=0,second=0,microsecond=0) + timedelta(days=1)).timestamp()):
|
|
is_consec_day = False
|
|
# else, they are simply logging in again on the same day, and we don't need to do anything for that
|
|
|
|
self.data.profile.session_login(req.userId, is_new_day, is_consec_day)
|
|
resp.vipInfo.pageYear = datetime.now().year
|
|
resp.vipInfo.pageMonth = datetime.now().month
|
|
resp.vipInfo.pageDay = datetime.now().day
|
|
resp.vipInfo.numItem = 1
|
|
|
|
resp.firstLoginDaily = int(is_new_day)
|
|
|
|
return resp.make()
|
|
|
|
def handle_user_status_create_request(self, data: Dict) -> List[Any]:
|
|
req = UserStatusCreateRequest(data)
|
|
|
|
profileId = self.data.profile.create_profile(req.aimeId, req.username, self.version)
|
|
|
|
if profileId is None: return BaseResponse().make()
|
|
|
|
# Insert starting items
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["title"], 104001)
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["title"], 104002)
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["title"], 104003)
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["title"], 104005)
|
|
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["icon"], 102001)
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["icon"], 102002)
|
|
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["note_color"], 103001)
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["note_color"], 203001)
|
|
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["note_sound"], 105001)
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["note_sound"], 205005) # Added lily
|
|
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["navigator"], 210001)
|
|
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["user_plate"], 211001) # Added lily
|
|
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["touch_effect"], 312000) # Added reverse
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["touch_effect"], 312001) # Added reverse
|
|
self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["touch_effect"], 312002) # Added reverse
|
|
|
|
return UserStatusCreateResponseV2(profileId, req.username).make()
|
|
|
|
def handle_user_status_getDetail_request(self, data: Dict) -> List[Any]:
|
|
req = UserStatusGetDetailRequest(data)
|
|
resp = UserStatusGetDetailResponseV1()
|
|
|
|
profile = self.data.profile.get_profile(req.userId)
|
|
if profile is None:
|
|
self.logger.warn(f"Unknown profile {req.userId}")
|
|
return resp.make()
|
|
|
|
self.logger.info(f"Get detail for profile {req.userId}")
|
|
user_id = profile["user"]
|
|
|
|
profile_scores = self.data.score.get_best_scores(user_id)
|
|
profile_items = self.data.item.get_items(user_id)
|
|
profile_song_unlocks = self.data.item.get_song_unlocks(user_id)
|
|
profile_options = self.data.profile.get_options(user_id)
|
|
profile_trophies = self.data.item.get_trophies(user_id)
|
|
profile_tickets = self.data.item.get_tickets(user_id)
|
|
|
|
if profile["vip_expire_time"] is None:
|
|
resp.userStatus.vipExpireTime = 0
|
|
|
|
else:
|
|
resp.userStatus.vipExpireTime = int(profile["vip_expire_time"].timestamp())
|
|
|
|
if profile["always_vip"] or self.game_config.mods.always_vip:
|
|
resp.userStatus.vipExpireTime = int((self.srvtime + timedelta(days=31)).timestamp())
|
|
|
|
resp.songUpdateTime = int(profile["last_login_date"].timestamp())
|
|
resp.songPlayStatus = [profile["last_song_id"], 1]
|
|
|
|
resp.userStatus.userId = profile["id"]
|
|
resp.userStatus.username = profile["username"]
|
|
resp.userStatus.xp = profile["xp"]
|
|
resp.userStatus.danLevel = profile["dan_level"]
|
|
resp.userStatus.danType = profile["dan_type"]
|
|
resp.userStatus.wp = profile["wp"]
|
|
resp.userStatus.useCount = profile["login_count"]
|
|
resp.userStatus.loginDays = profile["login_count_days"]
|
|
resp.userStatus.loginConsecutiveDays = profile["login_count_days_consec"]
|
|
|
|
if self.game_config.mods.infinite_wp:
|
|
resp.userStatus.wp = 999999
|
|
|
|
if profile["friend_view_1"] is not None:
|
|
pass
|
|
if profile["friend_view_2"] is not None:
|
|
pass
|
|
if profile["friend_view_3"] is not None:
|
|
pass
|
|
|
|
resp.seasonalPlayModeCounts.append(PlayModeCounts(self.season, 1, profile["playcount_single"]))
|
|
resp.seasonalPlayModeCounts.append(PlayModeCounts(self.season, 2, profile["playcount_multi_vs"]))
|
|
resp.seasonalPlayModeCounts.append(PlayModeCounts(self.season, 3, profile["playcount_multi_coop"]))
|
|
resp.seasonalPlayModeCounts.append(PlayModeCounts(self.season, 4, profile["playcount_stageup"]))
|
|
|
|
for opt in profile_options:
|
|
resp.options.append(UserOption(opt["opt_id"], opt["value"]))
|
|
|
|
for unlock in profile_song_unlocks:
|
|
for x in range(1, unlock["highest_difficulty"] + 1):
|
|
resp.userItems.songUnlocks.append(SongUnlock(unlock["song_id"], x, 0, int(unlock["acquire_date"].timestamp())))
|
|
if x > 2:
|
|
resp.scores.append(BestScoreDetailV1(unlock["song_id"], x))
|
|
|
|
empty_scores = len(resp.scores)
|
|
for song in profile_scores:
|
|
resp.seasonInfo.cumulativeScore += song["score"]
|
|
empty_score_idx = resp.find_score_idx(song["song_id"], song["chart_id"], 0, empty_scores)
|
|
|
|
clear_cts = SongDetailClearCounts(
|
|
song["play_ct"],
|
|
song["clear_ct"],
|
|
song["missless_ct"],
|
|
song["fullcombo_ct"],
|
|
song["allmarv_ct"],
|
|
)
|
|
|
|
grade_cts = SongDetailGradeCountsV1(
|
|
song["grade_d_ct"], song["grade_c_ct"], song["grade_b_ct"], song["grade_a_ct"], song["grade_aa_ct"],
|
|
song["grade_aaa_ct"], song["grade_s_ct"], song["grade_ss_ct"], song["grade_sss_ct"],
|
|
song["grade_master_ct"]
|
|
)
|
|
|
|
if empty_score_idx is not None:
|
|
resp.scores[empty_score_idx].clearCounts = clear_cts
|
|
resp.scores[empty_score_idx].clearCountsSeason = clear_cts
|
|
resp.scores[empty_score_idx].gradeCounts = grade_cts
|
|
resp.scores[empty_score_idx].score = song["score"]
|
|
resp.scores[empty_score_idx].bestCombo = song["best_combo"]
|
|
resp.scores[empty_score_idx].lowestMissCtMaybe = song["lowest_miss_ct"]
|
|
resp.scores[empty_score_idx].rating = song["rating"]
|
|
|
|
else:
|
|
deets = BestScoreDetailV1(song["song_id"], song["chart_id"])
|
|
deets.clearCounts = clear_cts
|
|
deets.clearCountsSeason = clear_cts
|
|
deets.gradeCounts = grade_cts
|
|
deets.score = song["score"]
|
|
deets.bestCombo = song["best_combo"]
|
|
deets.lowestMissCtMaybe = song["lowest_miss_ct"]
|
|
deets.rating = song["rating"]
|
|
|
|
for trophy in profile_trophies:
|
|
resp.userItems.trophies.append(TrophyItem(trophy["trophy_id"], trophy["season"], trophy["progress"], trophy["badge_type"]))
|
|
|
|
if self.game_config.mods.infinite_tickets:
|
|
for x in range(5):
|
|
resp.userItems.tickets.append(TicketItem(x, 106002, 0))
|
|
else:
|
|
for ticket in profile_tickets:
|
|
if ticket["expire_date"] is None:
|
|
expire = int((self.srvtime + timedelta(days=30)).timestamp())
|
|
else:
|
|
expire = int(ticket["expire_date"].timestamp())
|
|
|
|
resp.userItems.tickets.append(TicketItem(ticket["id"], ticket["ticket_id"], expire))
|
|
|
|
if profile_items:
|
|
for item in profile_items:
|
|
try:
|
|
|
|
if item["type"] == WaccaConstants.ITEM_TYPES["icon"]:
|
|
resp.userItems.icons.append(IconItem(item["item_id"], 1, item["use_count"], int(item["acquire_date"].timestamp())))
|
|
|
|
else:
|
|
itm_send = GenericItemSend(item["item_id"], 1, int(item["acquire_date"].timestamp()))
|
|
|
|
if item["type"] == WaccaConstants.ITEM_TYPES["title"]:
|
|
resp.userItems.titles.append(itm_send)
|
|
|
|
elif item["type"] == WaccaConstants.ITEM_TYPES["note_color"]:
|
|
resp.userItems.noteColors.append(itm_send)
|
|
|
|
elif item["type"] == WaccaConstants.ITEM_TYPES["note_sound"]:
|
|
resp.userItems.noteSounds.append(itm_send)
|
|
|
|
except:
|
|
self.logger.error(f"{__name__} Failed to load item {item['item_id']} for user {profile['user']}")
|
|
|
|
resp.seasonInfo.level = profile["xp"]
|
|
resp.seasonInfo.wpObtained = profile["wp_total"]
|
|
resp.seasonInfo.wpSpent = profile["wp_spent"]
|
|
resp.seasonInfo.titlesObtained = len(resp.userItems.titles)
|
|
resp.seasonInfo.iconsObtained = len(resp.userItems.icons)
|
|
resp.seasonInfo.noteColorsObtained = len(resp.userItems.noteColors)
|
|
resp.seasonInfo.noteSoundsObtained = len(resp.userItems.noteSounds)
|
|
|
|
return resp.make()
|
|
|
|
def handle_user_trial_get_request(self, data: Dict) -> List[Any]:
|
|
req = UserTrialGetRequest(data)
|
|
resp = UserTrialGetResponse()
|
|
|
|
user_id = self.data.profile.profile_to_aime_user(req.profileId)
|
|
if user_id is None:
|
|
self.logger.error(f"handle_user_trial_get_request: No profile with id {req.profileId}")
|
|
return resp.make()
|
|
|
|
self.logger.info(f"Get trial info for user {req.profileId}")
|
|
|
|
for d in self.allowed_stages:
|
|
if d[1] > 0 and d[1] < 10:
|
|
resp.stageList.append(StageInfo(d[0], d[1]))
|
|
|
|
stages = self.data.score.get_stageup(user_id, self.version)
|
|
if stages is None:
|
|
stages = []
|
|
|
|
add_next = True
|
|
for d in self.allowed_stages:
|
|
stage_info = StageInfo(d[0], d[1])
|
|
|
|
for score in stages:
|
|
if score["stage_id"] == stage_info.danId:
|
|
stage_info.clearStatus = score["clear_status"]
|
|
stage_info.numSongCleared = score["clear_song_ct"]
|
|
stage_info.song1BestScore = score["song1_score"]
|
|
stage_info.song2BestScore = score["song2_score"]
|
|
stage_info.song3BestScore = score["song3_score"]
|
|
break
|
|
|
|
if add_next or stage_info.danLevel < 9:
|
|
resp.stageList.append(stage_info)
|
|
|
|
if stage_info.danLevel >= 9 and stage_info.clearStatus < 1:
|
|
add_next = False
|
|
|
|
return resp.make()
|
|
|
|
def handle_user_trial_update_request(self, data: Dict) -> List[Any]:
|
|
req = UserTrialUpdateRequest(data)
|
|
|
|
total_score = 0
|
|
for score in req.songScores:
|
|
total_score += score
|
|
|
|
while len(req.songScores) < 3:
|
|
req.songScores.append(0)
|
|
|
|
profile = self.data.profile.get_profile(req.profileId)
|
|
|
|
user_id = profile["user"]
|
|
old_stage = self.data.score.get_stageup_stage(user_id, self.version, req.stageId)
|
|
|
|
if old_stage is None:
|
|
self.data.score.put_stageup(user_id, self.version, req.stageId, req.clearType.value, req.numSongsCleared, req.songScores[0], req.songScores[1], req.songScores[2])
|
|
|
|
else:
|
|
# We only care about total score for best of, even if one score happens to be lower (I think)
|
|
if total_score > (old_stage["song1_score"] + old_stage["song2_score"] + old_stage["song3_score"]):
|
|
best_score1 = req.songScores[0]
|
|
best_score2 = req.songScores[2]
|
|
best_score3 = req.songScores[3]
|
|
else:
|
|
best_score1 = old_stage["song1_score"]
|
|
best_score2 = old_stage["song2_score"]
|
|
best_score3 = old_stage["song3_score"]
|
|
|
|
self.data.score.put_stageup(user_id, self.version, req.stageId, req.clearType.value, req.numSongsCleared,
|
|
best_score1, best_score2, best_score3)
|
|
|
|
if req.stageLevel > 0 and req.stageLevel <= 14 and req.clearType.value > 0: # For some reason, special stages send dan level 1001...
|
|
if req.stageLevel > profile["dan_level"] or (req.stageLevel == profile["dan_level"] and req.clearType.value > profile["dan_type"]):
|
|
self.data.profile.update_profile_dan(req.profileId, req.stageLevel, req.clearType.value)
|
|
|
|
self.util_put_items(req.profileId, user_id, req.itemsObtained)
|
|
|
|
# user/status/update isn't called after stageup so we have to do some things now
|
|
current_icon = self.data.profile.get_options(user_id, WaccaConstants.OPTIONS["set_icon_id"])
|
|
current_nav = self.data.profile.get_options(user_id, WaccaConstants.OPTIONS["set_nav_id"])
|
|
|
|
if current_icon is None:
|
|
current_icon = self.OPTIONS_DEFAULTS["set_icon_id"]
|
|
else:
|
|
current_icon = current_icon["value"]
|
|
if current_nav is None:
|
|
current_nav = self.OPTIONS_DEFAULTS["set_nav_id"]
|
|
else:
|
|
current_nav = current_nav["value"]
|
|
|
|
self.data.item.put_item(user_id, WaccaConstants.ITEM_TYPES["icon"], current_icon)
|
|
self.data.item.put_item(user_id, WaccaConstants.ITEM_TYPES["navigator"], current_nav)
|
|
self.data.profile.update_profile_playtype(req.profileId, 4, data["appVersion"][:7])
|
|
return BaseResponse.make()
|
|
|
|
def handle_user_sugoroku_update_request(self, data: Dict) -> List[Any]:
|
|
ver_split = data["appVersion"].split(".")
|
|
resp = BaseResponse()
|
|
|
|
if int(ver_split[0]) <= 2 and int(ver_split[1]) < 53:
|
|
req = UserSugarokuUpdateRequestV1(data)
|
|
mission_flg = 0
|
|
|
|
else:
|
|
req = UserSugarokuUpdateRequestV2(data)
|
|
mission_flg = req.mission_flag
|
|
|
|
user_id = self.data.profile.profile_to_aime_user(req.profileId)
|
|
if user_id is None:
|
|
self.logger.info(f"handle_user_sugoroku_update_request unknwon profile ID {req.profileId}")
|
|
return resp.make()
|
|
|
|
self.util_put_items(req.profileId, user_id, req.itemsObtainted)
|
|
|
|
self.data.profile.update_gate(user_id, req.gateId, req.page, req.progress, req.loops, mission_flg, req.totalPts)
|
|
return resp.make()
|
|
|
|
def handle_user_info_getMyroom_request(self, data: Dict) -> List[Any]:
|
|
return UserInfogetMyroomResponse().make()
|
|
|
|
def handle_user_music_unlock_request(self, data: Dict) -> List[Any]:
|
|
req = UserMusicUnlockRequest(data)
|
|
|
|
profile = self.data.profile.get_profile(req.profileId)
|
|
if profile is None: return BaseResponse().make()
|
|
user_id = profile["user"]
|
|
current_wp = profile["wp"]
|
|
|
|
tickets = self.data.item.get_tickets(user_id)
|
|
new_tickets = []
|
|
|
|
for ticket in tickets:
|
|
new_tickets.append([ticket["id"], ticket["ticket_id"], 9999999999])
|
|
|
|
for item in req.itemsUsed:
|
|
if item.itemType == WaccaConstants.ITEM_TYPES["wp"]:
|
|
if current_wp >= item.quantity:
|
|
current_wp -= item.quantity
|
|
self.data.profile.spend_wp(req.profileId, item.quantity)
|
|
else: return BaseResponse().make()
|
|
|
|
elif item.itemType == WaccaConstants.ITEM_TYPES["ticket"]:
|
|
for x in range(len(new_tickets)):
|
|
if new_tickets[x][1] == item.itemId:
|
|
self.data.item.spend_ticket(new_tickets[x][0])
|
|
new_tickets.pop(x)
|
|
break
|
|
|
|
# wp, ticket info
|
|
if req.difficulty > WaccaConstants.Difficulty.HARD.value:
|
|
old_score = self.data.score.get_best_score(user_id, req.songId, req.difficulty)
|
|
if not old_score:
|
|
self.data.score.put_best_score(user_id, req.songId, req.difficulty, 0, [0] * 5, [0] * 13, 0, 0)
|
|
|
|
self.data.item.unlock_song(user_id, req.songId, req.difficulty if req.difficulty > WaccaConstants.Difficulty.HARD.value else WaccaConstants.Difficulty.HARD.value)
|
|
|
|
if self.game_config.mods.infinite_tickets:
|
|
new_tickets = [
|
|
[0, 106002, 0],
|
|
[1, 106002, 0],
|
|
[2, 106002, 0],
|
|
[3, 106002, 0],
|
|
[4, 106002, 0],
|
|
]
|
|
|
|
if self.game_config.mods.infinite_wp:
|
|
current_wp = 999999
|
|
|
|
return UserMusicUnlockResponse(current_wp, new_tickets).make()
|
|
|
|
def handle_user_info_getRanking_request(self, data: Dict) -> List[Any]:
|
|
# total score, high score by song, cumulative socre, stage up score, other score, WP ranking
|
|
# This likely requies calculating standings at regular intervals and caching the results
|
|
return UserInfogetRankingResponse().make()
|
|
|
|
def handle_user_music_update_request(self, data: Dict) -> List[Any]:
|
|
req = UserMusicUpdateRequest(data)
|
|
ver_split = req.appVersion.split(".")
|
|
if int(ver_split[0]) >= 3:
|
|
resp = UserMusicUpdateResponseV3()
|
|
elif int(ver_split[0]) >= 2:
|
|
resp = UserMusicUpdateResponseV2()
|
|
else:
|
|
resp = UserMusicUpdateResponseV1()
|
|
|
|
resp.songDetail.songId = req.songDetail.songId
|
|
resp.songDetail.difficulty = req.songDetail.difficulty
|
|
|
|
profile = self.data.profile.get_profile(req.profileId)
|
|
|
|
if profile is None:
|
|
self.logger.warn(f"handle_user_music_update_request: No profile for game_id {req.profileId}")
|
|
return BaseResponse().make()
|
|
|
|
user_id = profile["user"]
|
|
self.util_put_items(req.profileId, user_id, req.itemsObtained)
|
|
|
|
playlog_clear_status = req.songDetail.flagCleared + req.songDetail.flagMissless + req.songDetail.flagFullcombo + \
|
|
req.songDetail.flagAllMarvelous
|
|
|
|
self.data.score.put_playlog(user_id, req.songDetail.songId, req.songDetail.difficulty, req.songDetail.score,
|
|
playlog_clear_status, req.songDetail.grade.value, req.songDetail.maxCombo, req.songDetail.judgements.marvCt,
|
|
req.songDetail.judgements.greatCt, req.songDetail.judgements.goodCt, req.songDetail.judgements.missCt,
|
|
req.songDetail.fastCt, req.songDetail.slowCt, self.season)
|
|
|
|
old_score = self.data.score.get_best_score(user_id, req.songDetail.songId, req.songDetail.difficulty)
|
|
|
|
if not old_score:
|
|
grades = [0] * 13
|
|
clears = [0] * 5
|
|
|
|
clears[0] = 1
|
|
clears[1] = 1 if req.songDetail.flagCleared else 0
|
|
clears[2] = 1 if req.songDetail.flagMissless else 0
|
|
clears[3] = 1 if req.songDetail.flagFullcombo else 0
|
|
clears[4] = 1 if req.songDetail.flagAllMarvelous else 0
|
|
|
|
grades[req.songDetail.grade.value - 1] = 1
|
|
|
|
self.data.score.put_best_score(user_id, req.songDetail.songId, req.songDetail.difficulty, req.songDetail.score,
|
|
clears, grades, req.songDetail.maxCombo, req.songDetail.judgements.missCt)
|
|
|
|
resp.songDetail.score = req.songDetail.score
|
|
resp.songDetail.lowestMissCount = req.songDetail.judgements.missCt
|
|
|
|
else:
|
|
grades = [
|
|
old_score["grade_d_ct"],
|
|
old_score["grade_c_ct"],
|
|
old_score["grade_b_ct"],
|
|
old_score["grade_a_ct"],
|
|
old_score["grade_aa_ct"],
|
|
old_score["grade_aaa_ct"],
|
|
old_score["grade_s_ct"],
|
|
old_score["grade_ss_ct"],
|
|
old_score["grade_sss_ct"],
|
|
old_score["grade_master_ct"],
|
|
old_score["grade_sp_ct"],
|
|
old_score["grade_ssp_ct"],
|
|
old_score["grade_sssp_ct"],
|
|
]
|
|
clears = [
|
|
old_score["play_ct"],
|
|
old_score["clear_ct"],
|
|
old_score["missless_ct"],
|
|
old_score["fullcombo_ct"],
|
|
old_score["allmarv_ct"],
|
|
]
|
|
|
|
clears[0] += 1
|
|
clears[1] += 1 if req.songDetail.flagCleared else 0
|
|
clears[2] += 1 if req.songDetail.flagMissless else 0
|
|
clears[3] += 1 if req.songDetail.flagFullcombo else 0
|
|
clears[4] += 1 if req.songDetail.flagAllMarvelous else 0
|
|
|
|
grades[req.songDetail.grade.value - 1] += 1
|
|
|
|
best_score = max(req.songDetail.score, old_score["score"])
|
|
best_max_combo = max(req.songDetail.maxCombo, old_score["best_combo"])
|
|
lowest_miss_ct = min(req.songDetail.judgements.missCt, old_score["lowest_miss_ct"])
|
|
best_rating = max(self.util_calc_song_rating(req.songDetail.score, req.songDetail.level), old_score["rating"])
|
|
|
|
self.data.score.put_best_score(user_id, req.songDetail.songId, req.songDetail.difficulty, best_score, clears,
|
|
grades, best_max_combo, lowest_miss_ct)
|
|
|
|
resp.songDetail.score = best_score
|
|
resp.songDetail.lowestMissCount = lowest_miss_ct
|
|
resp.songDetail.rating = best_rating
|
|
|
|
resp.songDetail.clearCounts = SongDetailClearCounts(counts=clears)
|
|
resp.songDetail.clearCountsSeason = SongDetailClearCounts(counts=clears)
|
|
|
|
if int(ver_split[0]) >= 3:
|
|
resp.songDetail.grades = SongDetailGradeCountsV2(counts=grades)
|
|
else:
|
|
resp.songDetail.grades = SongDetailGradeCountsV1(counts=grades)
|
|
|
|
return resp.make()
|
|
|
|
#TODO: Coop and vs data
|
|
def handle_user_music_updateCoop_request(self, data: Dict) -> List[Any]:
|
|
coop_info = data["params"][4]
|
|
return self.handle_user_music_update_request(data)
|
|
|
|
def handle_user_music_updateVersus_request(self, data: Dict) -> List[Any]:
|
|
vs_info = data["params"][4]
|
|
return self.handle_user_music_update_request(data)
|
|
|
|
def handle_user_music_updateTrial_request(self, data: Dict) -> List[Any]:
|
|
return self.handle_user_music_update_request(data)
|
|
|
|
def handle_user_mission_update_request(self, data: Dict) -> List[Any]:
|
|
req = UserMissionUpdateRequest(data)
|
|
page_status = req.params[1][1]
|
|
|
|
profile = self.data.profile.get_profile(req.profileId)
|
|
if profile is None:
|
|
return BaseResponse().make()
|
|
|
|
if len(req.itemsObtained) > 0:
|
|
self.util_put_items(req.profileId, profile["user"], req.itemsObtained)
|
|
|
|
self.data.profile.update_bingo(profile["user"], req.bingoDetail.pageNumber, page_status)
|
|
self.data.profile.update_tutorial_flags(req.profileId, req.params[3])
|
|
|
|
return BaseResponse().make()
|
|
|
|
def handle_user_goods_purchase_request(self, data: Dict) -> List[Any]:
|
|
req = UserGoodsPurchaseRequest(data)
|
|
resp = UserGoodsPurchaseResponse()
|
|
|
|
profile = self.data.profile.get_profile(req.profileId)
|
|
if profile is None: return BaseResponse().make()
|
|
|
|
user_id = profile["user"]
|
|
resp.currentWp = profile["wp"]
|
|
|
|
if req.purchaseType == PurchaseType.PurchaseTypeWP:
|
|
resp.currentWp -= req.cost
|
|
self.data.profile.spend_wp(req.profileId, req.cost)
|
|
|
|
elif req.purchaseType == PurchaseType.PurchaseTypeCredit:
|
|
self.logger.info(f"User {req.profileId} Purchased item {req.itemObtained.itemType} id {req.itemObtained.itemId} for {req.cost} credits on machine {req.chipId}")
|
|
|
|
self.util_put_items(req.profileId ,user_id, [req.itemObtained])
|
|
|
|
if self.game_config.mods.infinite_tickets:
|
|
for x in range(5):
|
|
resp.tickets.append(TicketItem(x, 106002, 0))
|
|
else:
|
|
tickets = self.data.item.get_tickets(user_id)
|
|
|
|
for ticket in tickets:
|
|
resp.tickets.append(TicketItem(ticket["id"], ticket["ticket_id"], int((self.srvtime + timedelta(days=30)).timestamp())))
|
|
|
|
if self.game_config.mods.infinite_wp:
|
|
resp.currentWp = 999999
|
|
|
|
return resp.make()
|
|
|
|
def handle_competition_status_login_request(self, data: Dict) -> List[Any]:
|
|
return BaseResponse().make()
|
|
|
|
def handle_competition_status_update_request(self, data: Dict) -> List[Any]:
|
|
return BaseResponse().make()
|
|
|
|
def handle_user_rating_update_request(self, data: Dict) -> List[Any]:
|
|
req = UserRatingUpdateRequest(data)
|
|
|
|
user_id = self.data.profile.profile_to_aime_user(req.profileId)
|
|
|
|
if user_id is None:
|
|
self.logger.error(f"handle_user_rating_update_request: No profild with ID {req.profileId}")
|
|
return BaseResponse().make()
|
|
|
|
for song in req.songs:
|
|
self.data.score.update_song_rating(user_id, song.songId, song.difficulty, song.rating)
|
|
|
|
self.data.profile.update_user_rating(req.profileId, req.totalRating)
|
|
|
|
return BaseResponse().make()
|
|
|
|
def handle_user_status_update_request(self, data: Dict) -> List[Any]:
|
|
req = UserStatusUpdateRequestV2(data)
|
|
|
|
user_id = self.data.profile.profile_to_aime_user(req.profileId)
|
|
if user_id is None:
|
|
self.logger.info(f"handle_user_status_update_request: No profile with ID {req.profileId}")
|
|
return BaseResponse().make()
|
|
|
|
self.util_put_items(req.profileId, user_id, req.itemsRecieved)
|
|
self.data.profile.update_profile_playtype(req.profileId, req.playType.value, data["appVersion"][:7])
|
|
self.data.profile.update_profile_lastplayed(req.profileId, req.lastSongInfo.lastSongId, req.lastSongInfo.lastSongDiff,
|
|
req.lastSongInfo.lastFolderOrd, req.lastSongInfo.lastFolderId, req.lastSongInfo.lastSongOrd)
|
|
|
|
current_icon = self.data.profile.get_options(user_id, WaccaConstants.OPTIONS["set_icon_id"])
|
|
current_nav = self.data.profile.get_options(user_id, WaccaConstants.OPTIONS["set_nav_id"])
|
|
|
|
if current_icon is None:
|
|
current_icon = self.OPTIONS_DEFAULTS["set_icon_id"]
|
|
else:
|
|
current_icon = current_icon["value"]
|
|
if current_nav is None:
|
|
current_nav = self.OPTIONS_DEFAULTS["set_nav_id"]
|
|
else:
|
|
current_nav = current_nav["value"]
|
|
|
|
self.data.item.put_item(user_id, WaccaConstants.ITEM_TYPES["icon"], current_icon)
|
|
self.data.item.put_item(user_id, WaccaConstants.ITEM_TYPES["navigator"], current_nav)
|
|
return BaseResponse().make()
|
|
|
|
def handle_user_info_update_request(self, data: Dict) -> List[Any]:
|
|
req = UserInfoUpdateRequest(data)
|
|
|
|
user_id = self.data.profile.profile_to_aime_user(req.profileId)
|
|
|
|
for opt in req.optsUpdated:
|
|
self.data.profile.update_option(user_id, opt.id, opt.val)
|
|
|
|
for update in req.datesUpdated:
|
|
pass
|
|
|
|
for fav in req.favoritesAdded:
|
|
self.data.profile.add_favorite_song(user_id, fav)
|
|
|
|
for unfav in req.favoritesRemoved:
|
|
self.data.profile.remove_favorite_song(user_id, unfav)
|
|
|
|
return BaseResponse().make()
|
|
|
|
def handle_user_vip_get_request(self, data: Dict) -> List[Any]:
|
|
req = UserVipGetRequest(data)
|
|
resp = UserVipGetResponse()
|
|
|
|
profile = self.data.profile.get_profile(req.profileId)
|
|
if profile is None:
|
|
self.logger.warn(f"handle_user_vip_get_request no profile with ID {req.profileId}")
|
|
return BaseResponse().make()
|
|
|
|
if "vip_expire_time" in profile and profile["vip_expire_time"] is not None and profile["vip_expire_time"].timestamp() > int(self.srvtime.timestamp()):
|
|
resp.vipDays = int((profile["vip_expire_time"].timestamp() - int(self.srvtime.timestamp())) / 86400)
|
|
|
|
resp.vipDays += 30
|
|
|
|
resp.presents.append(VipLoginBonus(1,0,16,211025,1))
|
|
resp.presents.append(VipLoginBonus(2,0,6,202086,1))
|
|
resp.presents.append(VipLoginBonus(3,0,11,205008,1))
|
|
resp.presents.append(VipLoginBonus(4,0,10,203009,1))
|
|
resp.presents.append(VipLoginBonus(5,0,16,211026,1))
|
|
resp.presents.append(VipLoginBonus(6,0,9,206001,1))
|
|
|
|
return resp.make()
|
|
|
|
def handle_user_vip_start_request(self, data: Dict) -> List[Any]:
|
|
req = UserVipStartRequest(data)
|
|
|
|
profile = self.data.profile.get_profile(req.profileId)
|
|
if profile is None: return BaseResponse().make()
|
|
|
|
# This should never happen because wacca stops you from buying VIP
|
|
# if you have more then 10 days remaining, but this IS wacca we're dealing with...
|
|
if "always_vip" in profile and profile["always_vip"] or self.game_config.mods.always_vip:
|
|
return UserVipStartResponse(int((self.srvtime + timedelta(days=req.days)).timestamp())).make()
|
|
|
|
profile["vip_expire_time"] = int((self.srvtime + timedelta(days=req.days)).timestamp())
|
|
self.data.profile.update_vip_time(req.profileId, self.srvtime + timedelta(days=req.days))
|
|
return UserVipStartResponse(profile["vip_expire_time"]).make()
|
|
|
|
def util_put_items(self, profile_id: int, user_id: int, items_obtained: List[GenericItemRecv]) -> None:
|
|
if user_id is None or profile_id <= 0:
|
|
return None
|
|
|
|
if items_obtained:
|
|
for item in items_obtained:
|
|
|
|
if item.itemType == WaccaConstants.ITEM_TYPES["xp"]:
|
|
self.data.profile.add_xp(profile_id, item.quantity)
|
|
|
|
elif item.itemType == WaccaConstants.ITEM_TYPES["wp"]:
|
|
self.data.profile.add_wp(profile_id, item.quantity)
|
|
|
|
elif item.itemType == WaccaConstants.ITEM_TYPES["music_difficulty_unlock"] or item.itemType == WaccaConstants.ITEM_TYPES["music_unlock"]:
|
|
if item.quantity > WaccaConstants.Difficulty.HARD.value:
|
|
old_score = self.data.score.get_best_score(user_id, item.itemId, item.quantity)
|
|
if not old_score:
|
|
self.data.score.put_best_score(user_id, item.itemId, item.quantity, 0, [0] * 5, [0] * 13, 0, 0)
|
|
|
|
if item.quantity == 0:
|
|
item.quantity = WaccaConstants.Difficulty.HARD.value
|
|
self.data.item.unlock_song(user_id, item.itemId, item.quantity)
|
|
|
|
elif item.itemType == WaccaConstants.ITEM_TYPES["ticket"]:
|
|
self.data.item.add_ticket(user_id, item.itemId)
|
|
|
|
elif item.itemType == WaccaConstants.ITEM_TYPES["trophy"]:
|
|
self.data.item.update_trophy(user_id, item.itemId, self.season, item.quantity, 0)
|
|
|
|
else:
|
|
self.data.item.put_item(user_id, item.itemType, item.itemId)
|
|
|
|
def util_calc_song_rating(self, score: int, difficulty: float) -> int:
|
|
if score >= 990000:
|
|
const = 4.00
|
|
elif score >= 980000 and score < 990000:
|
|
const = 3.75
|
|
elif score >= 970000 and score < 980000:
|
|
const = 3.50
|
|
elif score >= 960000 and score < 970000:
|
|
const = 3.25
|
|
elif score >= 950000 and score < 960000:
|
|
const = 3.00
|
|
elif score >= 940000 and score < 950000:
|
|
const = 2.75
|
|
elif score >= 930000 and score < 940000:
|
|
const = 2.50
|
|
elif score >= 920000 and score < 930000:
|
|
const = 2.25
|
|
elif score >= 910000 and score < 920000:
|
|
const = 2.00
|
|
elif score >= 900000 and score < 910000:
|
|
const = 1.00
|
|
else: const = 0.00
|
|
|
|
return floor((difficulty * const) * 10)
|