from datetime import datetime, date, timedelta from typing import Dict import logging from core.config import CoreConfig from titles.mai2.const import Mai2Constants from titles.mai2.config import Mai2Config from titles.mai2.database import Mai2Data class Mai2Base(): def __init__(self, cfg: CoreConfig, game_cfg: Mai2Config) -> None: self.core_config = cfg self.game_config = game_cfg self.game = Mai2Constants.GAME_CODE self.version = Mai2Constants.VER_MAIMAI_DX self.data = Mai2Data(cfg) self.logger = logging.getLogger("mai2") def handle_get_game_setting_api_request(self, data: Dict): reboot_start = date.strftime(datetime.now() + timedelta(hours=3), Mai2Constants.DATE_TIME_FORMAT) reboot_end = date.strftime(datetime.now() + timedelta(hours=4), Mai2Constants.DATE_TIME_FORMAT) return { "gameSetting": { "isMaintenance": "false", "requestInterval": 10, "rebootStartTime": reboot_start, "rebootEndTime": reboot_end, "movieUploadLimit": 10000, "movieStatus": 0, "movieServerUri": "", "deliverServerUri": "", "oldServerUri": "", "usbDlServerUri": "", "rebootInterval": 0 }, "isAouAccession": "true", } def handle_get_game_ranking_api_request(self, data: Dict) -> Dict: return {"length": 0, "gameRankingList": []} def handle_get_game_tournament_info_api_request(self, data: Dict) -> Dict: # TODO: Tournament support return {"length": 0, "gameTournamentInfoList": []} def handle_get_game_event_api_request(self, data: Dict) -> Dict: events = self.data.static.get_enabled_events(self.version) events_lst = [] if events is None: return {"type": data["type"], "length": 0, "gameEventList": []} for event in events: events_lst.append({ "type": event["type"], "id": event["eventId"], "startDate": "2017-12-05 07:00:00.0", "endDate": "2099-12-31 00:00:00.0" }) return {"type": data["type"], "length": len(events_lst), "gameEventList": events_lst} def handle_get_game_ng_music_id_api_request(self, data: Dict) -> Dict: return {"length": 0, "musicIdList": []} def handle_get_game_charge_api_request(self, data: Dict) -> Dict: game_charge_list = self.data.static.get_enabled_tickets(self.version, 1) if game_charge_list is None: return {"length": 0, "gameChargeList": []} charge_list = [] for x in range(len(game_charge_list)): charge_list.append({ "orderId": x, "chargeId": game_charge_list[x]["ticketId"], "price": game_charge_list[x]["price"], "startDate": "2017-12-05 07:00:00.0", "endDate": "2099-12-31 00:00:00.0" }) return {"length": len(charge_list), "gameChargeList": charge_list} def handle_upsert_client_setting_api_request(self, data: Dict) -> Dict: pass def handle_upsert_client_upload_api_request(self, data: Dict) -> Dict: pass def handle_upsert_client_bookkeeping_api_request(self, data: Dict) -> Dict: pass def handle_upsert_client_testmode_api_request(self, data: Dict) -> Dict: pass def handle_get_user_preview_api_request(self, data: Dict) -> Dict: p = self.data.profile.get_profile_detail(data["userId"], self.version) o = self.data.profile.get_profile_option(data["userId"], self.version) if p is None or o is None: return {} # Register profile = p._asdict() option = o._asdict() return { "userId": data["userId"], "userName": profile["userName"], "isLogin": False, "lastGameId": profile["lastGameId"], "lastDataVersion": profile["lastDataVersion"], "lastRomVersion": profile["lastRomVersion"], "lastLoginDate": profile["lastLoginDate"], "lastPlayDate": profile["lastPlayDate"], "playerRating": profile["playerRating"], "nameplateId": 0, # Unused "iconId": profile["iconId"], "trophyId": 0, # Unused "partnerId": profile["partnerId"], "frameId": profile["frameId"], "dispRate": option["dispRate"], # 0: all/begin, 1: disprate, 2: dispDan, 3: hide, 4: end "totalAwake": profile["totalAwake"], "isNetMember": profile["isNetMember"], "dailyBonusDate": profile["dailyBonusDate"], "headPhoneVolume": option["headPhoneVolume"], "isInherit": False, # Not sure what this is or does?? "banState": profile["banState"] if profile["banState"] is not None else 0 # New with uni+ } def handle_user_login_api_request(self, data: Dict) -> Dict: profile = self.data.profile.get_profile_detail(data["userId"], self.version) if profile is not None: lastLoginDate = profile["lastLoginDate"] loginCt = profile["playCount"] if "regionId" in data: self.data.profile.put_profile_region(data["userId"], data["regionId"]) else: loginCt = 0 lastLoginDate = "2017-12-05 07:00:00.0" return { "returnCode": 1, "lastLoginDate": lastLoginDate, "loginCount": loginCt, "consecutiveLoginCount": 0, # We don't really have a way to track this... "loginId": loginCt # Used with the playlog! } def handle_upload_user_playlog_api_request(self, data: Dict) -> Dict: user_id = data["userId"] playlog = data["userPlaylog"] self.data.score.put_playlog(user_id, playlog) def handle_upsert_user_all_api_request(self, data: Dict) -> Dict: user_id = data["userId"] upsert = data["upsertUserAll"] if "userData" in upsert and len(upsert["userData"]) > 0: upsert["userData"][0]["isNetMember"] = 1 upsert["userData"][0].pop("accessCode") self.data.profile.put_profile_detail(user_id, self.version, upsert["userData"][0]) if "userExtend" in upsert and len(upsert["userExtend"]) > 0: self.data.profile.put_profile_extend(user_id, self.version, upsert["userExtend"][0]) if "userGhost" in upsert: for ghost in upsert["userGhost"]: self.data.profile.put_profile_extend(user_id, self.version, ghost) if "userOption" in upsert and len(upsert["userOption"]) > 0: self.data.profile.put_profile_option(user_id, self.version, upsert["userOption"][0]) if "userRatingList" in upsert and len(upsert["userRatingList"]) > 0: self.data.profile.put_profile_rating(user_id, self.version, upsert["userRatingList"][0]) if "userActivityList" in upsert and len(upsert["userActivityList"]) > 0: for k,v in upsert["userActivityList"][0].items(): for act in v: self.data.profile.put_profile_activity(user_id, act) if upsert["isNewCharacterList"] and int(upsert["isNewCharacterList"]) > 0: for char in upsert["userCharacterList"]: self.data.item.put_character(user_id, char["characterId"], char["level"], char["awakening"], char["useCount"]) if upsert["isNewItemList"] and int(upsert["isNewItemList"]) > 0: for item in upsert["userItemList"]: self.data.item.put_item(user_id, int(item["itemKind"]), item["itemId"], item["stock"], item["isValid"]) if upsert["isNewLoginBonusList"] and int(upsert["isNewLoginBonusList"]) > 0: for login_bonus in upsert["userLoginBonusList"]: self.data.item.put_login_bonus(user_id, login_bonus["bonusId"], login_bonus["point"], login_bonus["isCurrent"], login_bonus["isComplete"]) if upsert["isNewMapList"] and int(upsert["isNewMapList"]) > 0: for map in upsert["userMapList"]: self.data.item.put_map(user_id, map["mapId"], map["distance"], map["isLock"], map["isClear"], map["isComplete"]) if upsert["isNewMusicDetailList"] and int(upsert["isNewMusicDetailList"]) > 0: for music in upsert["userMusicDetailList"]: self.data.score.put_best_score(user_id, music) if upsert["isNewCourseList"] and int(upsert["isNewCourseList"]) > 0: for course in upsert["userCourseList"]: self.data.score.put_course(user_id, course) if upsert["isNewFavoriteList"] and int(upsert["isNewFavoriteList"]) > 0: for fav in upsert["userFavoriteList"]: self.data.item.put_favorite(user_id, fav["kind"], fav["itemIdList"]) # if "isNewFriendSeasonRankingList" in upsert and int(upsert["isNewFriendSeasonRankingList"]) > 0: # for fsr in upsert["userFriendSeasonRankingList"]: # pass def handle_user_logout_api_request(self, data: Dict) -> Dict: pass def handle_get_user_data_api_request(self, data: Dict) -> Dict: profile = self.data.profile.get_profile_detail(data["userId"], self.version) if profile is None: return profile_dict = profile._asdict() profile_dict.pop("id") profile_dict.pop("user") profile_dict.pop("version") return { "userId": data["userId"], "userData": profile_dict } def handle_get_user_extend_api_request(self, data: Dict) -> Dict: extend = self.data.profile.get_profile_extend(data["userId"], self.version) if extend is None: return extend_dict = extend._asdict() extend_dict.pop("id") extend_dict.pop("user") extend_dict.pop("version") return { "userId": data["userId"], "userExtend": extend_dict } def handle_get_user_option_api_request(self, data: Dict) -> Dict: options = self.data.profile.get_profile_option(data["userId"], self.version) if options is None: return options_dict = options._asdict() options_dict.pop("id") options_dict.pop("user") options_dict.pop("version") return { "userId": data["userId"], "userOption": options_dict } def handle_get_user_card_api_request(self, data: Dict) -> Dict: return {"userId": data["userId"], "nextIndex": 0, "userCardList": []} def handle_get_user_charge_api_request(self, data: Dict) -> Dict: return {"userId": data["userId"], "length": 0, "userChargeList": []} def handle_get_user_item_api_request(self, data: Dict) -> Dict: kind = int(data["nextIndex"] / 10000000000) next_idx = int(data["nextIndex"] % 10000000000) user_items = self.data.item.get_items(data["userId"], kind) user_item_list = [] next_idx = 0 for x in range(next_idx, data["maxCount"]): try: user_item_list.append({"item_kind": user_items[x]["item_kind"], "item_id": user_items[x]["item_id"], "stock": user_items[x]["stock"], "isValid": user_items[x]["is_valid"]}) except: break if len(user_item_list) == data["maxCount"]: next_idx = data["nextIndex"] + data["maxCount"] + 1 break return {"userId": data["userId"], "nextIndex": next_idx, "itemKind": kind, "userItemList": user_item_list} def handle_get_user_character_api_request(self, data: Dict) -> Dict: characters = self.data.item.get_characters(data["userId"]) chara_list = [] for chara in characters: chara_list.append({ "characterId": chara["character_id"], "level": chara["level"], "awakening": chara["awakening"], "useCount": chara["use_count"], }) return {"userId": data["userId"], "userCharacterList": chara_list} def handle_get_user_favorite_api_request(self, data: Dict) -> Dict: favorites = self.data.item.get_favorites(data["userId"], data["itemKind"]) if favorites is None: return userFavs = [] for fav in favorites: userFavs.append({ "userId": data["userId"], "itemKind": fav["itemKind"], "itemIdList": fav["itemIdList"] }) return { "userId": data["userId"], "userFavoriteData": userFavs } def handle_get_user_ghost_api_request(self, data: Dict) -> Dict: ghost = self.data.profile.get_profile_ghost(data["userId"], self.version) if ghost is None: return ghost_dict = ghost._asdict() ghost_dict.pop("user") ghost_dict.pop("id") ghost_dict.pop("version_int") return { "userId": data["userId"], "userGhost": ghost_dict } def handle_get_user_rating_api_request(self, data: Dict) -> Dict: rating = self.data.profile.get_profile_rating(data["userId"], self.version) if rating is None: return rating_dict = rating._asdict() rating_dict.pop("user") rating_dict.pop("id") rating_dict.pop("version") return { "userId": data["userId"], "userRating": rating_dict } def handle_get_user_activity_api_request(self, data: Dict) -> Dict: """ kind 1 is playlist, kind 2 is music list """ playlist = self.data.profile.get_profile_activity(data["userId"], 1) musiclist = self.data.profile.get_profile_activity(data["userId"], 2) if playlist is None or musiclist is None: return plst = [] mlst = [] for play in playlist: tmp = play._asdict() tmp["id"] = tmp["activityId"] tmp.pop("activityId") tmp.pop("user") plst.append(tmp) for music in musiclist: tmp = music._asdict() tmp["id"] = tmp["activityId"] tmp.pop("activityId") tmp.pop("user") mlst.append(tmp) return { "userActivity": { "playList": plst, "musicList": mlst } } def handle_get_user_course_api_request(self, data: Dict) -> Dict: user_courses = self.data.score.get_courses(data["userId"]) course_list = [] for course in user_courses: tmp = course._asdict() tmp.pop("user") tmp.pop("id") course_list.append(tmp) return {"userId": data["userId"], "nextIndex": 0, "userCourseList": course_list} def handle_get_user_portrait_api_request(self, data: Dict) -> Dict: # No support for custom pfps return {"length": 0, "userPortraitList": []} def handle_get_user_friend_season_ranking_api_request(self, data: Dict) -> Dict: friend_season_ranking = self.data.item.get_friend_season_ranking(data["userId"]) friend_season_ranking_list = [] next_index = 0 for x in range(data["nextIndex"], data["maxCount"] + data["nextIndex"]): try: friend_season_ranking_list.append({ "mapId": friend_season_ranking_list[x]["map_id"], "distance": friend_season_ranking_list[x]["distance"], "isLock": friend_season_ranking_list[x]["is_lock"], "isClear": friend_season_ranking_list[x]["is_clear"], "isComplete": friend_season_ranking_list[x]["is_complete"], }) except: break # We're capped and still have some left to go if len(friend_season_ranking_list) == data["maxCount"] and len(friend_season_ranking) > data["maxCount"] + data["nextIndex"]: next_index = data["maxCount"] + data["nextIndex"] return {"userId": data["userId"], "nextIndex": next_index, "userFriendSeasonRankingList": friend_season_ranking_list} def handle_get_user_map_api_request(self, data: Dict) -> Dict: maps = self.data.item.get_maps(data["userId"]) map_list = [] next_index = 0 for x in range(data["nextIndex"], data["maxCount"] + data["nextIndex"]): try: map_list.append({ "mapId": maps[x]["map_id"], "distance": maps[x]["distance"], "isLock": maps[x]["is_lock"], "isClear": maps[x]["is_clear"], "isComplete": maps[x]["is_complete"], }) except: break # We're capped and still have some left to go if len(map_list) == data["maxCount"] and len(maps) > data["maxCount"] + data["nextIndex"]: next_index = data["maxCount"] + data["nextIndex"] return {"userId": data["userId"], "nextIndex": next_index, "userMapList": map_list} def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict: login_bonuses = self.data.item.get_login_bonuses(data["userId"]) login_bonus_list = [] next_index = 0 for x in range(data["nextIndex"], data["maxCount"] + data["nextIndex"]): try: login_bonus_list.append({ "bonusId": login_bonuses[x]["bonus_id"], "point": login_bonuses[x]["point"], "isCurrent": login_bonuses[x]["is_current"], "isComplete": login_bonuses[x]["is_complete"], }) except: break # We're capped and still have some left to go if len(login_bonus_list) == data["maxCount"] and len(login_bonuses) > data["maxCount"] + data["nextIndex"]: next_index = data["maxCount"] + data["nextIndex"] return {"userId": data["userId"], "nextIndex": next_index, "userLoginBonusList": login_bonus_list} def handle_get_user_region_api_request(self, data: Dict) -> Dict: return {"userId": data["userId"], "length": 0, "userRegionList": []} def handle_get_user_music_api_request(self, data: Dict) -> Dict: songs = self.data.score.get_best_scores(data["userId"]) music_detail_list = [] next_index = 0 if songs is not None: for song in songs: music_detail_list.append({ "musicId": song["song_id"], "level": song["chart_id"], "playCount": song["play_count"], "achievement": song["achievement"], "comboStatus": song["combo_status"], "syncStatus": song["sync_status"], "deluxscoreMax": song["dx_score"], "scoreRank": song["score_rank"], }) if len(music_detail_list) == data["maxCount"]: next_index = data["maxCount"] + data["nextIndex"] break return {"userId": data["userId"], "nextIndex": next_index, "userMusicList": [{"userMusicDetailList": music_detail_list}]}