from typing import Any, List, Dict import logging from math import floor from datetime import datetime, timedelta from core.config import CoreConfig from titles.wacca.config import WaccaConfig from titles.wacca.const import WaccaConstants from titles.wacca.database import WaccaData from titles.wacca.handlers import * class WaccaBase(): def __init__(self, cfg: CoreConfig, game_cfg: WaccaConfig) -> None: self.config = cfg # Config file self.game_config = game_cfg # Game Config file self.game = WaccaConstants.GAME_CODE # Game code self.version = WaccaConstants.VER_WACCA # Game version self.data = WaccaData(cfg) # Database self.logger = logging.getLogger("wacca") self.srvtime = datetime.now() self.season = 1 self.OPTIONS_DEFAULTS: Dict[str, Any] = { "note_speed": 5, "field_mask": 0, "note_sound": 105001, "note_color": 203001, "bgm_volume": 10, "bg_video": 0, "mirror": 0, "judge_display_pos": 0, "judge_detail_display": 0, "measure_guidelines": 1, "guideline_mask": 1, "judge_line_timing_adjust": 10, "note_design": 3, "bonus_effect": 1, "chara_voice": 1, "score_display_method": 0, "give_up": 0, "guideline_spacing": 1, "center_display": 1, "ranking_display": 1, "stage_up_icon_display": 1, "rating_display": 1, "player_level_display": 1, "touch_effect": 1, "guide_sound_vol": 3, "touch_note_vol": 8, "hold_note_vol": 8, "slide_note_vol": 8, "snap_note_vol": 8, "chain_note_vol": 8, "bonus_note_vol": 8, "gate_skip": 0, "key_beam_display": 1, "left_slide_note_color": 4, "right_slide_note_color": 3, "forward_slide_note_color": 1, "back_slide_note_color": 2, "master_vol": 3, "set_title_id": 104001, "set_icon_id": 102001, "set_nav_id": 210001, "set_plate_id": 211001 } self.allowed_stages = [] def handle_housing_get_request(self, data: Dict) -> Dict: req = BaseRequest(data) housing_id = 1337 self.logger.info(f"{req.chipId} -> {housing_id}") resp = HousingGetResponse(housing_id) return resp.make() def handle_advertise_GetRanking_request(self, data: Dict) -> Dict: req = AdvertiseGetRankingRequest(data) return AdvertiseGetRankingResponse().make() def handle_housing_start_request(self, data: Dict) -> Dict: req = HousingStartRequestV1(data) resp = HousingStartResponseV1( 1, [ # Recomended songs 1269,1007,1270,1002,1020,1003,1008,1211,1018,1092,1056,32, 1260,1230,1258,1251,2212,1264,1125,1037,2001,1272,1126,1119, 1104,1070,1047,1044,1027,1004,1001,24,2068,2062,2021,1275, 1249,1207,1203,1107,1021,1009,9,4,3,23,22,2014,13,1276,1247, 1240,1237,1128,1114,1110,1109,1102,1045,1043,1036,1035,1030, 1023,1015 ] ) return resp.make() def handle_advertise_GetNews_request(self, data: Dict) -> Dict: resp = GetNewsResponseV1() return resp.make() def handle_user_status_logout_request(self, data: Dict) -> Dict: req = UserStatusLogoutRequest(data) self.logger.info(f"Log out user {req.userId} from {req.chipId}") return BaseResponse().make() def handle_user_status_get_request(self, data: Dict)-> Dict: req = UserStatusGetRequest(data) resp = UserStatusGetV1Response() ver_split = req.appVersion.split(".") profile = self.data.profile.get_profile(aime_id=req.aimeId) if profile is None: self.logger.info(f"No user exists for aime id {req.aimeId}") resp.profileStatus = ProfileStatus.ProfileRegister return resp.make() self.logger.info(f"User preview for {req.aimeId} from {req.chipId}") if profile["last_game_ver"] is None: profile_ver_split = ver_split resp.lastGameVersion = req.appVersion else: profile_ver_split = profile["last_game_ver"].split(".") resp.lastGameVersion = profile["last_game_ver"] resp.userStatus.userId = profile["id"] resp.userStatus.username = profile["username"] resp.userStatus.xp = profile["xp"] resp.userStatus.danLevel = profile["dan_level"] resp.userStatus.danType = profile["dan_type"] resp.userStatus.wp = profile["wp"] resp.userStatus.useCount = profile["login_count"] set_title_id = self.data.profile.get_options(WaccaConstants.OPTIONS["set_title_id"], profile["user"]) if set_title_id is None: set_title_id = self.OPTIONS_DEFAULTS["set_title_id"] resp.setTitleId = set_title_id set_icon_id = self.data.profile.get_options(WaccaConstants.OPTIONS["set_title_id"], profile["user"]) if set_icon_id is None: set_icon_id = self.OPTIONS_DEFAULTS["set_icon_id"] resp.setIconId = set_icon_id if int(ver_split[0]) > int(profile_ver_split[0]): resp.versionStatus = PlayVersionStatus.VersionUpgrade elif int(ver_split[0]) < int(profile_ver_split[0]): resp.versionStatus = PlayVersionStatus.VersionTooNew else: if int(ver_split[1]) > int(profile_ver_split[1]): resp.versionStatus = PlayVersionStatus.VersionUpgrade elif int(ver_split[1]) < int(profile_ver_split[1]): resp.versionStatus = PlayVersionStatus.VersionTooNew else: if int(ver_split[2]) > int(profile_ver_split[2]): resp.versionStatus = PlayVersionStatus.VersionUpgrade elif int(ver_split[2]) < int(profile_ver_split[2]): resp.versionStatus = PlayVersionStatus.VersionTooNew return resp.make() def handle_user_status_login_request(self, data: Dict)-> Dict: req = UserStatusLoginRequest(data) resp = UserStatusLoginResponseV1() is_new_day = False is_consec_day = False is_consec_day = True if req.userId == 0: self.logger.info(f"Guest login on {req.chipId}") resp.lastLoginDate = 0 else: profile = self.data.profile.get_profile(req.userId) if profile is None: self.logger.warn(f"Unknown user id {req.userId} attempted login from {req.chipId}") return resp.make() self.logger.info(f"User {req.userId} login on {req.chipId}") last_login_time = int(profile["last_login_date"].timestamp()) resp.lastLoginDate = last_login_time # If somebodies login timestamp < midnight of current day, then they are logging in for the first time today if last_login_time < int(datetime.now().replace(hour=0,minute=0,second=0,microsecond=0).timestamp()): is_new_day = True is_consec_day = True # If somebodies login timestamp > midnight of current day + 1 day, then they broke their daily login streak elif last_login_time > int((datetime.now().replace(hour=0,minute=0,second=0,microsecond=0) + timedelta(days=1)).timestamp()): is_consec_day = False # else, they are simply logging in again on the same day, and we don't need to do anything for that self.data.profile.session_login(req.userId, is_new_day, is_consec_day) resp.firstLoginDaily = int(is_new_day) return resp.make() def handle_user_status_create_request(self, data: Dict)-> Dict: req = UserStatusCreateRequest(data) profileId = self.data.profile.create_profile(req.aimeId, req.username, self.version) if profileId is None: return BaseResponse().make() # Insert starting items self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["title"], 104001) self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["title"], 104002) self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["title"], 104003) self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["title"], 104005) self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["icon"], 102001) self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["icon"], 102002) self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["note_color"], 103001) self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["note_color"], 203001) self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["note_sound"], 105001) self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["note_sound"], 205005) # Added lily self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["navigator"], 210001) self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["user_plate"], 211001) # Added lily self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["touch_effect"], 312000) # Added reverse self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["touch_effect"], 312001) # Added reverse self.data.item.put_item(req.aimeId, WaccaConstants.ITEM_TYPES["touch_effect"], 312002) # Added reverse return UserStatusCreateResponseV2(profileId, req.username).make() def handle_user_status_getDetail_request(self, data: Dict)-> Dict: req = UserStatusGetDetailRequest(data) resp = UserStatusGetDetailResponseV1() profile = self.data.profile.get_profile(req.userId) if profile is None: self.logger.warn(f"Unknown profile {req.userId}") return resp.make() self.logger.info(f"Get detail for profile {req.userId}") user_id = profile["user"] profile_scores = self.data.score.get_best_scores(user_id) profile_items = self.data.item.get_items(user_id) profile_song_unlocks = self.data.item.get_song_unlocks(user_id) profile_options = self.data.profile.get_options(user_id) profile_trophies = self.data.item.get_trophies(user_id) profile_tickets = self.data.item.get_tickets(user_id) resp.songUpdateTime = int(profile["last_login_date"].timestamp()) resp.songPlayStatus = [profile["last_song_id"], 1] resp.userStatus.userId = profile["id"] resp.userStatus.username = profile["username"] resp.userStatus.xp = profile["xp"] resp.userStatus.danLevel = profile["dan_level"] resp.userStatus.danType = profile["dan_type"] resp.userStatus.wp = profile["wp"] resp.userStatus.useCount = profile["login_count"] if self.game_config.mods.infinite_wp: resp.userStatus.wp = 999999 if profile["friend_view_1"] is not None: pass if profile["friend_view_2"] is not None: pass if profile["friend_view_3"] is not None: pass resp.seasonalPlayModeCounts.append(PlayModeCounts(self.season, 1, profile["playcount_single"])) resp.seasonalPlayModeCounts.append(PlayModeCounts(self.season, 2, profile["playcount_multi_vs"])) resp.seasonalPlayModeCounts.append(PlayModeCounts(self.season, 3, profile["playcount_multi_coop"])) resp.seasonalPlayModeCounts.append(PlayModeCounts(self.season, 4, profile["playcount_stageup"])) for opt in profile_options: resp.options.append(UserOption(opt["opt_id"], opt["value"])) for unlock in profile_song_unlocks: for x in range(1, unlock["highest_difficulty"] + 1): resp.userItems.songUnlocks.append(SongUnlock(unlock["song_id"], x, 0, int(unlock["acquire_date"].timestamp()))) for song in profile_scores: resp.seasonInfo.cumulativeScore += song["score"] clear_cts = SongDetailClearCounts( song["play_ct"], song["clear_ct"], song["missless_ct"], song["fullcombo_ct"], song["allmarv_ct"], ) grade_cts = SongDetailGradeCountsV1( song["grade_d_ct"], song["grade_c_ct"], song["grade_b_ct"], song["grade_a_ct"], song["grade_aa_ct"], song["grade_aaa_ct"], song["grade_s_ct"], song["grade_ss_ct"], song["grade_sss_ct"], song["grade_master_ct"] ) deets = BestScoreDetailV1(song["song_id"], song["chart_id"]) deets.clearCounts = clear_cts deets.clearCountsSeason = clear_cts deets.gradeCounts = grade_cts deets.score = song["score"] deets.bestCombo = song["best_combo"] deets.lowestMissCtMaybe = song["lowest_miss_ct"] deets.rating = song["rating"] resp.scores.append(deets) for trophy in profile_trophies: resp.userItems.trophies.append(TrophyItem(trophy["trophy_id"], trophy["season"], trophy["progress"], trophy["badge_type"])) if self.game_config.mods.infinite_tickets: for x in range(5): resp.userItems.tickets.append(TicketItem(x, 106002, 0)) else: for ticket in profile_tickets: if ticket["expire_date"] is None: expire = int((self.srvtime + timedelta(days=30)).timestamp()) else: expire = int(ticket["expire_date"].timestamp()) resp.userItems.tickets.append(TicketItem(ticket["id"], ticket["ticket_id"], expire)) if profile_items: for item in profile_items: try: if item["type"] == WaccaConstants.ITEM_TYPES["icon"]: resp.userItems.icons.append(IconItem(item["item_id"], 1, item["use_count"], int(item["acquire_date"].timestamp()))) else: itm_send = GenericItemSend(item["item_id"], 1, int(item["acquire_date"].timestamp())) if item["type"] == WaccaConstants.ITEM_TYPES["title"]: resp.userItems.titles.append(itm_send) elif item["type"] == WaccaConstants.ITEM_TYPES["note_color"]: resp.userItems.noteColors.append(itm_send) elif item["type"] == WaccaConstants.ITEM_TYPES["note_sound"]: resp.userItems.noteSounds.append(itm_send) except: self.logger.error(f"{__name__} Failed to load item {item['item_id']} for user {profile['user']}") resp.seasonInfo.level = profile["xp"] resp.seasonInfo.wpObtained = profile["wp_total"] resp.seasonInfo.wpSpent = profile["wp_spent"] resp.seasonInfo.titlesObtained = len(resp.userItems.titles) resp.seasonInfo.iconsObtained = len(resp.userItems.icons) resp.seasonInfo.noteColorsObtained = len(resp.userItems.noteColors) resp.seasonInfo.noteSoundsObtained = len(resp.userItems.noteSounds) return resp.make() def handle_user_trial_get_request(self, data: Dict)-> Dict: req = UserTrialGetRequest(data) resp = UserTrialGetResponse() user_id = self.data.profile.profile_to_aime_user(req.profileId) if user_id is None: self.logger.error(f"handle_user_trial_get_request: No profile with id {req.profileId}") return resp.make() self.logger.info(f"Get trial info for user {req.profileId}") stages = self.data.score.get_stageup(user_id, self.version) if stages is None: stages = [] add_next = True for d in self.allowed_stages: stage_info = StageInfo(d[0], d[1]) for score in stages: if score["stage_id"] == stage_info.danId: stage_info.clearStatus = score["clear_status"] stage_info.numSongCleared = score["clear_song_ct"] stage_info.song1BestScore = score["song1_score"] stage_info.song2BestScore = score["song2_score"] stage_info.song3BestScore = score["song3_score"] break if add_next or stage_info.danLevel < 9: resp.stageList.append(stage_info) if stage_info.danLevel >= 9 and stage_info.clearStatus < 1: add_next = False return resp.make() def handle_user_trial_update_request(self, data: Dict)-> Dict: req = UserTrialUpdateRequest(data) total_score = 0 for score in req.songScores: total_score += score while len(req.songScores) < 3: req.songScores.append(0) profile = self.data.profile.get_profile(req.profileId) user_id = profile["user"] old_stage = self.data.score.get_stageup_stage(user_id, self.version, req.stageId) if old_stage is None: self.data.score.put_stageup(user_id, self.version, req.stageId, req.clearType.value, req.numSongsCleared, req.songScores[0], req.songScores[1], req.songScores[2]) else: # We only care about total score for best of, even if one score happens to be lower (I think) if total_score > (old_stage["song1_score"] + old_stage["song2_score"] + old_stage["song3_score"]): best_score1 = req.songScores[0] best_score2 = req.songScores[1] best_score3 = req.songScores[2] else: best_score1 = old_stage["song1_score"] best_score2 = old_stage["song2_score"] best_score3 = old_stage["song3_score"] self.data.score.put_stageup(user_id, self.version, req.stageId, req.clearType.value, req.numSongsCleared, best_score1, best_score2, best_score3) if req.stageLevel > 0 and req.stageLevel <= 14 and req.clearType.value > 0: # For some reason, special stages send dan level 1001... if req.stageLevel > profile["dan_level"] or (req.stageLevel == profile["dan_level"] and req.clearType.value > profile["dan_type"]): self.data.profile.update_profile_dan(req.profileId, req.stageLevel, req.clearType.value) self.util_put_items(req.profileId, user_id, req.itemsObtained) # user/status/update isn't called after stageup so we have to do some things now current_icon = self.data.profile.get_options(user_id, WaccaConstants.OPTIONS["set_icon_id"]) current_nav = self.data.profile.get_options(user_id, WaccaConstants.OPTIONS["set_nav_id"]) if current_icon is None: current_icon = self.OPTIONS_DEFAULTS["set_icon_id"] else: current_icon = current_icon["value"] if current_nav is None: current_nav = self.OPTIONS_DEFAULTS["set_nav_id"] else: current_nav = current_nav["value"] self.data.item.put_item(user_id, WaccaConstants.ITEM_TYPES["icon"], current_icon) self.data.item.put_item(user_id, WaccaConstants.ITEM_TYPES["navigator"], current_nav) self.data.profile.update_profile_playtype(req.profileId, 4, data["appVersion"][:7]) return BaseResponse().make() def handle_user_sugoroku_update_request(self, data: Dict)-> Dict: ver_split = data["appVersion"].split(".") resp = BaseResponse() if int(ver_split[0]) <= 2 and int(ver_split[1]) < 53: req = UserSugarokuUpdateRequestV1(data) mission_flg = 0 else: req = UserSugarokuUpdateRequestV2(data) mission_flg = req.mission_flag user_id = self.data.profile.profile_to_aime_user(req.profileId) if user_id is None: self.logger.info(f"handle_user_sugoroku_update_request unknwon profile ID {req.profileId}") return resp.make() self.util_put_items(req.profileId, user_id, req.itemsObtainted) self.data.profile.update_gate(user_id, req.gateId, req.page, req.progress, req.loops, mission_flg, req.totalPts) return resp.make() def handle_user_info_getMyroom_request(self, data: Dict)-> Dict: return UserInfogetMyroomResponseV1().make() def handle_user_music_unlock_request(self, data: Dict)-> Dict: req = UserMusicUnlockRequest(data) profile = self.data.profile.get_profile(req.profileId) if profile is None: return BaseResponse().make() user_id = profile["user"] current_wp = profile["wp"] tickets = self.data.item.get_tickets(user_id) new_tickets = [] for ticket in tickets: new_tickets.append([ticket["id"], ticket["ticket_id"], 9999999999]) for item in req.itemsUsed: if item.itemType == WaccaConstants.ITEM_TYPES["wp"]: if current_wp >= item.quantity: current_wp -= item.quantity self.data.profile.spend_wp(req.profileId, item.quantity) else: return BaseResponse().make() elif item.itemType == WaccaConstants.ITEM_TYPES["ticket"]: for x in range(len(new_tickets)): if new_tickets[x][1] == item.itemId: self.data.item.spend_ticket(new_tickets[x][0]) new_tickets.pop(x) break # wp, ticket info if req.difficulty > WaccaConstants.Difficulty.HARD.value: old_score = self.data.score.get_best_score(user_id, req.songId, req.difficulty) if not old_score: self.data.score.put_best_score(user_id, req.songId, req.difficulty, 0, [0] * 5, [0] * 13, 0, 0) self.data.item.unlock_song(user_id, req.songId, req.difficulty if req.difficulty > WaccaConstants.Difficulty.HARD.value else WaccaConstants.Difficulty.HARD.value) if self.game_config.mods.infinite_tickets: new_tickets = [ [0, 106002, 0], [1, 106002, 0], [2, 106002, 0], [3, 106002, 0], [4, 106002, 0], ] if self.game_config.mods.infinite_wp: current_wp = 999999 return UserMusicUnlockResponse(current_wp, new_tickets).make() def handle_user_info_getRanking_request(self, data: Dict)-> Dict: # total score, high score by song, cumulative socre, stage up score, other score, WP ranking # This likely requies calculating standings at regular intervals and caching the results return UserInfogetRankingResponse().make() def handle_user_music_update_request(self, data: Dict)-> Dict: ver_split = data["appVersion"].split(".") if int(ver_split[0]) >= 3: resp = UserMusicUpdateResponseV3() req = UserMusicUpdateRequestV2(data) elif int(ver_split[0]) >= 2: resp = UserMusicUpdateResponseV2() req = UserMusicUpdateRequestV2(data) else: resp = UserMusicUpdateResponseV1() req = UserMusicUpdateRequestV1(data) resp.songDetail.songId = req.songDetail.songId resp.songDetail.difficulty = req.songDetail.difficulty if req.profileId == 0: self.logger.info(f"Guest score for song {req.songDetail.songId} difficulty {req.songDetail.difficulty}") return resp.make() profile = self.data.profile.get_profile(req.profileId) if profile is None: self.logger.warn(f"handle_user_music_update_request: No profile for game_id {req.profileId}") return resp.make() user_id = profile["user"] self.util_put_items(req.profileId, user_id, req.itemsObtained) playlog_clear_status = req.songDetail.flagCleared + req.songDetail.flagMissless + req.songDetail.flagFullcombo + \ req.songDetail.flagAllMarvelous self.data.score.put_playlog(user_id, req.songDetail.songId, req.songDetail.difficulty, req.songDetail.score, playlog_clear_status, req.songDetail.grade.value, req.songDetail.maxCombo, req.songDetail.judgements.marvCt, req.songDetail.judgements.greatCt, req.songDetail.judgements.goodCt, req.songDetail.judgements.missCt, req.songDetail.fastCt, req.songDetail.slowCt, self.season) old_score = self.data.score.get_best_score(user_id, req.songDetail.songId, req.songDetail.difficulty) if not old_score: grades = [0] * 13 clears = [0] * 5 clears[0] = 1 clears[1] = 1 if req.songDetail.flagCleared else 0 clears[2] = 1 if req.songDetail.flagMissless else 0 clears[3] = 1 if req.songDetail.flagFullcombo else 0 clears[4] = 1 if req.songDetail.flagAllMarvelous else 0 grades[req.songDetail.grade.value - 1] = 1 self.data.score.put_best_score(user_id, req.songDetail.songId, req.songDetail.difficulty, req.songDetail.score, clears, grades, req.songDetail.maxCombo, req.songDetail.judgements.missCt) resp.songDetail.score = req.songDetail.score resp.songDetail.lowestMissCount = req.songDetail.judgements.missCt else: grades = [ old_score["grade_d_ct"], old_score["grade_c_ct"], old_score["grade_b_ct"], old_score["grade_a_ct"], old_score["grade_aa_ct"], old_score["grade_aaa_ct"], old_score["grade_s_ct"], old_score["grade_ss_ct"], old_score["grade_sss_ct"], old_score["grade_master_ct"], old_score["grade_sp_ct"], old_score["grade_ssp_ct"], old_score["grade_sssp_ct"], ] clears = [ old_score["play_ct"], old_score["clear_ct"], old_score["missless_ct"], old_score["fullcombo_ct"], old_score["allmarv_ct"], ] clears[0] += 1 clears[1] += 1 if req.songDetail.flagCleared else 0 clears[2] += 1 if req.songDetail.flagMissless else 0 clears[3] += 1 if req.songDetail.flagFullcombo else 0 clears[4] += 1 if req.songDetail.flagAllMarvelous else 0 grades[req.songDetail.grade.value - 1] += 1 best_score = max(req.songDetail.score, old_score["score"]) best_max_combo = max(req.songDetail.maxCombo, old_score["best_combo"]) lowest_miss_ct = min(req.songDetail.judgements.missCt, old_score["lowest_miss_ct"]) best_rating = max(self.util_calc_song_rating(req.songDetail.score, req.songDetail.level), old_score["rating"]) self.data.score.put_best_score(user_id, req.songDetail.songId, req.songDetail.difficulty, best_score, clears, grades, best_max_combo, lowest_miss_ct) resp.songDetail.score = best_score resp.songDetail.lowestMissCount = lowest_miss_ct resp.songDetail.rating = best_rating resp.songDetail.clearCounts = SongDetailClearCounts(counts=clears) resp.songDetail.clearCountsSeason = SongDetailClearCounts(counts=clears) if int(ver_split[0]) >= 3: resp.songDetail.grades = SongDetailGradeCountsV2(counts=grades) else: resp.songDetail.grades = SongDetailGradeCountsV1(counts=grades) return resp.make() #TODO: Coop and vs data def handle_user_music_updateCoop_request(self, data: Dict)-> Dict: coop_info = data["params"][4] return self.handle_user_music_update_request(data) def handle_user_music_updateVersus_request(self, data: Dict)-> Dict: vs_info = data["params"][4] return self.handle_user_music_update_request(data) def handle_user_music_updateTrial_request(self, data: Dict)-> Dict: return self.handle_user_music_update_request(data) def handle_user_mission_update_request(self, data: Dict)-> Dict: req = UserMissionUpdateRequest(data) page_status = req.params[1][1] profile = self.data.profile.get_profile(req.profileId) if profile is None: return BaseResponse().make() if len(req.itemsObtained) > 0: self.util_put_items(req.profileId, profile["user"], req.itemsObtained) self.data.profile.update_bingo(profile["user"], req.bingoDetail.pageNumber, page_status) self.data.profile.update_tutorial_flags(req.profileId, req.params[3]) return BaseResponse().make() def handle_user_goods_purchase_request(self, data: Dict)-> Dict: req = UserGoodsPurchaseRequest(data) resp = UserGoodsPurchaseResponse() profile = self.data.profile.get_profile(req.profileId) if profile is None: return BaseResponse().make() user_id = profile["user"] resp.currentWp = profile["wp"] if req.purchaseType == PurchaseType.PurchaseTypeWP: resp.currentWp -= req.cost self.data.profile.spend_wp(req.profileId, req.cost) elif req.purchaseType == PurchaseType.PurchaseTypeCredit: self.logger.info(f"User {req.profileId} Purchased item {req.itemObtained.itemType} id {req.itemObtained.itemId} for {req.cost} credits on machine {req.chipId}") self.util_put_items(req.profileId ,user_id, [req.itemObtained]) if self.game_config.mods.infinite_tickets: for x in range(5): resp.tickets.append(TicketItem(x, 106002, 0)) else: tickets = self.data.item.get_tickets(user_id) for ticket in tickets: resp.tickets.append(TicketItem(ticket["id"], ticket["ticket_id"], int((self.srvtime + timedelta(days=30)).timestamp()))) if self.game_config.mods.infinite_wp: resp.currentWp = 999999 return resp.make() def handle_competition_status_login_request(self, data: Dict)-> Dict: return BaseResponse().make() def handle_competition_status_update_request(self, data: Dict)-> Dict: return BaseResponse().make() def handle_user_rating_update_request(self, data: Dict)-> Dict: req = UserRatingUpdateRequest(data) user_id = self.data.profile.profile_to_aime_user(req.profileId) if user_id is None: self.logger.error(f"handle_user_rating_update_request: No profild with ID {req.profileId}") return BaseResponse().make() for song in req.songs: self.data.score.update_song_rating(user_id, song.songId, song.difficulty, song.rating) self.data.profile.update_user_rating(req.profileId, req.totalRating) return BaseResponse().make() def handle_user_status_update_request(self, data: Dict)-> Dict: req = UserStatusUpdateRequestV1(data) user_id = self.data.profile.profile_to_aime_user(req.profileId) if user_id is None: self.logger.info(f"handle_user_status_update_request: No profile with ID {req.profileId}") return BaseResponse().make() self.util_put_items(req.profileId, user_id, req.itemsRecieved) self.data.profile.update_profile_playtype(req.profileId, req.playType.value, data["appVersion"][:7]) current_icon = self.data.profile.get_options(user_id, WaccaConstants.OPTIONS["set_icon_id"]) current_nav = self.data.profile.get_options(user_id, WaccaConstants.OPTIONS["set_nav_id"]) if current_icon is None: current_icon = self.OPTIONS_DEFAULTS["set_icon_id"] else: current_icon = current_icon["value"] if current_nav is None: current_nav = self.OPTIONS_DEFAULTS["set_nav_id"] else: current_nav = current_nav["value"] self.data.item.put_item(user_id, WaccaConstants.ITEM_TYPES["icon"], current_icon) self.data.item.put_item(user_id, WaccaConstants.ITEM_TYPES["navigator"], current_nav) return BaseResponse().make() def handle_user_info_update_request(self, data: Dict)-> Dict: req = UserInfoUpdateRequest(data) user_id = self.data.profile.profile_to_aime_user(req.profileId) for opt in req.optsUpdated: self.data.profile.update_option(user_id, opt.id, opt.val) for update in req.datesUpdated: pass for fav in req.favoritesAdded: self.data.profile.add_favorite_song(user_id, fav) for unfav in req.favoritesRemoved: self.data.profile.remove_favorite_song(user_id, unfav) return BaseResponse().make() def handle_user_vip_get_request(self, data: Dict)-> Dict: req = UserVipGetRequest(data) resp = UserVipGetResponse() profile = self.data.profile.get_profile(req.profileId) if profile is None: self.logger.warn(f"handle_user_vip_get_request no profile with ID {req.profileId}") return BaseResponse().make() if "vip_expire_time" in profile and profile["vip_expire_time"] is not None and profile["vip_expire_time"].timestamp() > int(self.srvtime.timestamp()): resp.vipDays = int((profile["vip_expire_time"].timestamp() - int(self.srvtime.timestamp())) / 86400) resp.vipDays += 30 resp.presents.append(VipLoginBonus(1,0,16,211025,1)) resp.presents.append(VipLoginBonus(2,0,6,202086,1)) resp.presents.append(VipLoginBonus(3,0,11,205008,1)) resp.presents.append(VipLoginBonus(4,0,10,203009,1)) resp.presents.append(VipLoginBonus(5,0,16,211026,1)) resp.presents.append(VipLoginBonus(6,0,9,206001,1)) return resp.make() def handle_user_vip_start_request(self, data: Dict)-> Dict: req = UserVipStartRequest(data) profile = self.data.profile.get_profile(req.profileId) if profile is None: return BaseResponse().make() # This should never happen because wacca stops you from buying VIP # if you have more then 10 days remaining, but this IS wacca we're dealing with... if "always_vip" in profile and profile["always_vip"] or self.game_config.mods.always_vip: return UserVipStartResponse(int((self.srvtime + timedelta(days=req.days)).timestamp())).make() profile["vip_expire_time"] = int((self.srvtime + timedelta(days=req.days)).timestamp()) self.data.profile.update_vip_time(req.profileId, self.srvtime + timedelta(days=req.days)) return UserVipStartResponse(profile["vip_expire_time"]).make() def util_put_items(self, profile_id: int, user_id: int, items_obtained: List[GenericItemRecv]) -> None: if user_id is None or profile_id <= 0: return None if items_obtained: for item in items_obtained: if item.itemType == WaccaConstants.ITEM_TYPES["xp"]: self.data.profile.add_xp(profile_id, item.quantity) elif item.itemType == WaccaConstants.ITEM_TYPES["wp"]: self.data.profile.add_wp(profile_id, item.quantity) elif item.itemType == WaccaConstants.ITEM_TYPES["music_difficulty_unlock"] or item.itemType == WaccaConstants.ITEM_TYPES["music_unlock"]: if item.quantity > WaccaConstants.Difficulty.HARD.value: old_score = self.data.score.get_best_score(user_id, item.itemId, item.quantity) if not old_score: self.data.score.put_best_score(user_id, item.itemId, item.quantity, 0, [0] * 5, [0] * 13, 0, 0) if item.quantity == 0: item.quantity = WaccaConstants.Difficulty.HARD.value self.data.item.unlock_song(user_id, item.itemId, item.quantity) elif item.itemType == WaccaConstants.ITEM_TYPES["ticket"]: self.data.item.add_ticket(user_id, item.itemId) elif item.itemType == WaccaConstants.ITEM_TYPES["trophy"]: self.data.item.update_trophy(user_id, item.itemId, self.season, item.quantity, 0) else: self.data.item.put_item(user_id, item.itemType, item.itemId) def util_calc_song_rating(self, score: int, difficulty: float) -> int: if score >= 990000: const = 4.00 elif score >= 980000 and score < 990000: const = 3.75 elif score >= 970000 and score < 980000: const = 3.50 elif score >= 960000 and score < 970000: const = 3.25 elif score >= 950000 and score < 960000: const = 3.00 elif score >= 940000 and score < 950000: const = 2.75 elif score >= 930000 and score < 940000: const = 2.50 elif score >= 920000 and score < 930000: const = 2.25 elif score >= 910000 and score < 920000: const = 2.00 elif score >= 900000 and score < 910000: const = 1.00 else: const = 0.00 return floor((difficulty * const) * 10)