import logging import json from datetime import datetime, timedelta from time import strftime import pytz from typing import Dict, Any, List from core.config import CoreConfig from titles.chuni.const import ChuniConstants from titles.chuni.database import ChuniData from titles.chuni.config import ChuniConfig class ChuniBase: def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None: self.core_cfg = core_cfg self.game_cfg = game_cfg self.data = ChuniData(core_cfg) self.date_time_format = "%Y-%m-%d %H:%M:%S" self.logger = logging.getLogger("chuni") self.game = ChuniConstants.GAME_CODE self.version = ChuniConstants.VER_CHUNITHM def handle_game_login_api_request(self, data: Dict) -> Dict: """ Handles the login bonus logic, required for the game because getUserLoginBonus gets called after getUserItem and therefore the items needs to be inserted in the database before they get requested. Adds a bonusCount after a user logged in after 24 hours, makes sure loginBonus 30 gets looped, only show the login banner every 24 hours, adds the bonus to items (itemKind 6) """ # ignore the login bonus if disabled in config if not self.game_cfg.mods.use_login_bonus: return {"returnCode": 1} user_id = data["userId"] login_bonus_presets = self.data.static.get_login_bonus_presets(self.version) for preset in login_bonus_presets: # check if a user already has some pogress and if not add the # login bonus entry user_login_bonus = self.data.item.get_login_bonus( user_id, self.version, preset["presetId"] ) if user_login_bonus is None: self.data.item.put_login_bonus( user_id, self.version, preset["presetId"] ) # yeah i'm lazy user_login_bonus = self.data.item.get_login_bonus( user_id, self.version, preset["presetId"] ) # skip the login bonus entirely if its already finished if user_login_bonus["isFinished"]: continue # make sure the last login is more than 24 hours ago if user_login_bonus["lastUpdateDate"] < datetime.now() - timedelta( hours=24 ): # increase the login day counter and update the last login date bonus_count = user_login_bonus["bonusCount"] + 1 last_update_date = datetime.now() all_login_boni = self.data.static.get_login_bonus( self.version, preset["presetId"] ) # skip the current bonus preset if no boni were found if all_login_boni is None or len(all_login_boni) < 1: self.logger.warn( f"No bonus entries found for bonus preset {preset['presetId']}" ) continue max_needed_days = all_login_boni[0]["needLoginDayCount"] # make sure to not show login boni after all days got redeemed is_finished = False if bonus_count > max_needed_days: # assume that all login preset ids under 3000 needs to be # looped, like 30 and 40 are looped, 40 does not work? if preset["presetId"] < 3000: bonus_count = 1 else: is_finished = True # grab the item for the corresponding day login_item = self.data.static.get_login_bonus_by_required_days( self.version, preset["presetId"], bonus_count ) if login_item is not None: # now add the present to the database so the # handle_get_user_item_api_request can grab them self.data.item.put_item( user_id, { "itemId": login_item["presentId"], "itemKind": 6, "stock": login_item["itemNum"], "isValid": True, }, ) self.data.item.put_login_bonus( user_id, self.version, preset["presetId"], bonusCount=bonus_count, lastUpdateDate=last_update_date, isWatched=False, isFinished=is_finished, ) return {"returnCode": 1} def handle_game_logout_api_request(self, data: Dict) -> Dict: # self.data.base.log_event("chuni", "logout", logging.INFO, {"version": self.version, "user": data["userId"]}) return {"returnCode": 1} def handle_get_game_charge_api_request(self, data: Dict) -> Dict: game_charge_list = self.data.static.get_enabled_charges(self.version) if game_charge_list is None or len(game_charge_list) == 0: return {"length": 0, "gameChargeList": []} charges = [] for x in range(len(game_charge_list)): charges.append( { "orderId": x, "chargeId": game_charge_list[x]["chargeId"], "price": 1, "startDate": "2017-12-05 07:00:00.0", "endDate": "2099-12-31 00:00:00.0", "salePrice": 1, "saleStartDate": "2017-12-05 07:00:00.0", "saleEndDate": "2099-12-31 00:00:00.0", } ) return {"length": len(charges), "gameChargeList": charges} def handle_get_game_event_api_request(self, data: Dict) -> Dict: game_events = self.data.static.get_enabled_events(self.version) if game_events is None or len(game_events) == 0: self.logger.warn("No enabled events, did you run the reader?") return { "type": data["type"], "length": 0, "gameEventList": [], } event_list = [] for evt_row in game_events: event_list.append( { "id": evt_row["eventId"], "type": evt_row["type"], # actually use the startDate from the import so it # properly shows all the events when new ones are imported "startDate": datetime.strftime( evt_row["startDate"], "%Y-%m-%d %H:%M:%S" ), "endDate": "2099-12-31 00:00:00", } ) return { "type": data["type"], "length": len(event_list), "gameEventList": event_list, } def handle_get_game_idlist_api_request(self, data: Dict) -> Dict: return {"type": data["type"], "length": 0, "gameIdlistList": []} def handle_get_game_message_api_request(self, data: Dict) -> Dict: return {"type": data["type"], "length": "0", "gameMessageList": []} def handle_get_game_ranking_api_request(self, data: Dict) -> Dict: return {"type": data["type"], "gameRankingList": []} def handle_get_game_sale_api_request(self, data: Dict) -> Dict: return {"type": data["type"], "length": 0, "gameSaleList": []} def handle_get_game_setting_api_request(self, data: Dict) -> Dict: reboot_start = datetime.strftime( datetime.now() - timedelta(hours=4), self.date_time_format ) reboot_end = datetime.strftime( datetime.now() - timedelta(hours=3), self.date_time_format ) return { "gameSetting": { "dataVersion": "1.00.00", "isMaintenance": "false", "requestInterval": 10, "rebootStartTime": reboot_start, "rebootEndTime": reboot_end, "isBackgroundDistribute": "false", "maxCountCharacter": 300, "maxCountItem": 300, "maxCountMusic": 300, }, "isDumpUpload": "false", "isAou": "false", } def handle_get_user_activity_api_request(self, data: Dict) -> Dict: user_activity_list = self.data.profile.get_profile_activity( data["userId"], data["kind"] ) activity_list = [] for activity in user_activity_list: tmp = activity._asdict() tmp.pop("user") tmp["id"] = tmp["activityId"] tmp.pop("activityId") activity_list.append(tmp) return { "userId": data["userId"], "length": len(activity_list), "kind": int(data["kind"]), "userActivityList": activity_list, } def handle_get_user_character_api_request(self, data: Dict) -> Dict: characters = self.data.item.get_characters(data["userId"]) if characters is None: return { "userId": data["userId"], "length": 0, "nextIndex": -1, "userCharacterList": [], } character_list = [] next_idx = int(data["nextIndex"]) max_ct = int(data["maxCount"]) for x in range(next_idx, len(characters)): tmp = characters[x]._asdict() tmp.pop("user") tmp.pop("id") character_list.append(tmp) if len(character_list) >= max_ct: break if len(characters) >= next_idx + max_ct: next_idx += max_ct else: next_idx = -1 return { "userId": data["userId"], "length": len(character_list), "nextIndex": next_idx, "userCharacterList": character_list, } def handle_get_user_charge_api_request(self, data: Dict) -> Dict: user_charge_list = self.data.profile.get_profile_charge(data["userId"]) charge_list = [] for charge in user_charge_list: tmp = charge._asdict() tmp.pop("id") tmp.pop("user") charge_list.append(tmp) return { "userId": data["userId"], "length": len(charge_list), "userChargeList": charge_list, } def handle_get_user_course_api_request(self, data: Dict) -> Dict: user_course_list = self.data.score.get_courses(data["userId"]) if user_course_list is None: return { "userId": data["userId"], "length": 0, "nextIndex": -1, "userCourseList": [], } course_list = [] next_idx = int(data["nextIndex"]) max_ct = int(data["maxCount"]) for x in range(next_idx, len(user_course_list)): tmp = user_course_list[x]._asdict() tmp.pop("user") tmp.pop("id") course_list.append(tmp) if len(user_course_list) >= max_ct: break if len(user_course_list) >= next_idx + max_ct: next_idx += max_ct else: next_idx = -1 return { "userId": data["userId"], "length": len(course_list), "nextIndex": next_idx, "userCourseList": course_list, } def handle_get_user_data_api_request(self, data: Dict) -> Dict: p = self.data.profile.get_profile_data(data["userId"], self.version) if p is None: return {} profile = p._asdict() profile.pop("id") profile.pop("user") profile.pop("version") return {"userId": data["userId"], "userData": profile} def handle_get_user_data_ex_api_request(self, data: Dict) -> Dict: p = self.data.profile.get_profile_data_ex(data["userId"], self.version) if p is None: return {} profile = p._asdict() profile.pop("id") profile.pop("user") profile.pop("version") return {"userId": data["userId"], "userDataEx": profile} def handle_get_user_duel_api_request(self, data: Dict) -> Dict: user_duel_list = self.data.item.get_duels(data["userId"]) if user_duel_list is None: return {} duel_list = [] for duel in user_duel_list: tmp = duel._asdict() tmp.pop("id") tmp.pop("user") duel_list.append(tmp) return { "userId": data["userId"], "length": len(duel_list), "userDuelList": duel_list, } def handle_get_user_favorite_item_api_request(self, data: Dict) -> Dict: user_fav_item_list = [] # still needs to be implemented on WebUI # 1: Music, 3: Character fav_list = self.data.item.get_all_favorites( data["userId"], self.version, fav_kind=int(data["kind"]) ) if fav_list is not None: for fav in fav_list: user_fav_item_list.append({"id": fav["favId"]}) return { "userId": data["userId"], "length": len(user_fav_item_list), "kind": data["kind"], "nextIndex": -1, "userFavoriteItemList": user_fav_item_list, } def handle_get_user_favorite_music_api_request(self, data: Dict) -> Dict: """ This is handled via the webui, which we don't have right now """ return {"userId": data["userId"], "length": 0, "userFavoriteMusicList": []} def handle_get_user_item_api_request(self, data: Dict) -> Dict: kind = int(int(data["nextIndex"]) / 10000000000) next_idx = int(int(data["nextIndex"]) % 10000000000) user_item_list = self.data.item.get_items(data["userId"], kind) if user_item_list is None or len(user_item_list) == 0: return { "userId": data["userId"], "nextIndex": -1, "itemKind": kind, "userItemList": [], } items: List[Dict[str, Any]] = [] for i in range(next_idx, len(user_item_list)): tmp = user_item_list[i]._asdict() tmp.pop("user") tmp.pop("id") items.append(tmp) if len(items) >= int(data["maxCount"]): break xout = kind * 10000000000 + next_idx + len(items) if len(items) < int(data["maxCount"]): next_idx = 0 else: next_idx = xout return { "userId": data["userId"], "nextIndex": next_idx, "itemKind": kind, "length": len(items), "userItemList": items, } def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict: user_id = data["userId"] user_login_bonus = self.data.item.get_all_login_bonus(user_id, self.version) # ignore the loginBonus request if its disabled in config if user_login_bonus is None or not self.game_cfg.mods.use_login_bonus: return {"userId": user_id, "length": 0, "userLoginBonusList": []} user_login_list = [] for bonus in user_login_bonus: user_login_list.append( { "presetId": bonus["presetId"], "bonusCount": bonus["bonusCount"], "lastUpdateDate": datetime.strftime( bonus["lastUpdateDate"], "%Y-%m-%d %H:%M:%S" ), "isWatched": bonus["isWatched"], } ) return { "userId": user_id, "length": len(user_login_list), "userLoginBonusList": user_login_list, } def handle_get_user_map_api_request(self, data: Dict) -> Dict: user_map_list = self.data.item.get_maps(data["userId"]) if user_map_list is None: return {} map_list = [] for map in user_map_list: tmp = map._asdict() tmp.pop("id") tmp.pop("user") map_list.append(tmp) return { "userId": data["userId"], "length": len(map_list), "userMapList": map_list, } def handle_get_user_music_api_request(self, data: Dict) -> Dict: music_detail = self.data.score.get_scores(data["userId"]) if music_detail is None: return { "userId": data["userId"], "length": 0, "nextIndex": -1, "userMusicList": [], # 240 } song_list = [] next_idx = int(data["nextIndex"]) max_ct = int(data["maxCount"]) for x in range(next_idx, len(music_detail)): found = False tmp = music_detail[x]._asdict() tmp.pop("user") tmp.pop("id") for song in song_list: if song["userMusicDetailList"][0]["musicId"] == tmp["musicId"]: found = True song["userMusicDetailList"].append(tmp) song["length"] = len(song["userMusicDetailList"]) if not found: song_list.append({"length": 1, "userMusicDetailList": [tmp]}) if len(song_list) >= max_ct: break if len(song_list) >= next_idx + max_ct: next_idx += max_ct else: next_idx = -1 return { "userId": data["userId"], "length": len(song_list), "nextIndex": next_idx, "userMusicList": song_list, # 240 } def handle_get_user_option_api_request(self, data: Dict) -> Dict: p = self.data.profile.get_profile_option(data["userId"]) option = p._asdict() option.pop("id") option.pop("user") return {"userId": data["userId"], "userGameOption": option} def handle_get_user_option_ex_api_request(self, data: Dict) -> Dict: p = self.data.profile.get_profile_option_ex(data["userId"]) option = p._asdict() option.pop("id") option.pop("user") return {"userId": data["userId"], "userGameOptionEx": option} def read_wtf8(self, src): return bytes([ord(c) for c in src]).decode("utf-8") def handle_get_user_preview_api_request(self, data: Dict) -> Dict: profile = self.data.profile.get_profile_preview(data["userId"], self.version) if profile is None: return None profile_character = self.data.item.get_character( data["userId"], profile["characterId"] ) if profile_character is None: chara = {} else: chara = profile_character._asdict() chara.pop("id") chara.pop("user") return { "userId": data["userId"], # Current Login State "isLogin": False, "lastLoginDate": profile["lastPlayDate"], # User Profile "userName": profile["userName"], "reincarnationNum": profile["reincarnationNum"], "level": profile["level"], "exp": profile["exp"], "playerRating": profile["playerRating"], "lastGameId": profile["lastGameId"], "lastRomVersion": profile["lastRomVersion"], "lastDataVersion": profile["lastDataVersion"], "lastPlayDate": profile["lastPlayDate"], "trophyId": profile["trophyId"], "nameplateId": profile["nameplateId"], # Current Selected Character "userCharacter": chara, # User Game Options "playerLevel": profile["playerLevel"], "rating": profile["rating"], "headphone": profile["headphone"], "chargeState": 1, "userNameEx": profile["userName"], } def handle_get_user_recent_rating_api_request(self, data: Dict) -> Dict: recent_rating_list = self.data.profile.get_profile_recent_rating(data["userId"]) if recent_rating_list is None: return { "userId": data["userId"], "length": 0, "userRecentRatingList": [], } return { "userId": data["userId"], "length": len(recent_rating_list["recentRating"]), "userRecentRatingList": recent_rating_list["recentRating"], } def handle_get_user_region_api_request(self, data: Dict) -> Dict: # TODO: Region return { "userId": data["userId"], "length": 0, "userRegionList": [], } def handle_get_user_team_api_request(self, data: Dict) -> Dict: # TODO: use the database "chuni_profile_team" with a GUI team_name = self.game_cfg.team.team_name if team_name == "": return {"userId": data["userId"], "teamId": 0} return { "userId": data["userId"], "teamId": 1, "teamRank": 1, "teamName": team_name, "userTeamPoint": { "userId": data["userId"], "teamId": 1, "orderId": 1, "teamPoint": 1, "aggrDate": data["playDate"], }, } def handle_get_team_course_setting_api_request(self, data: Dict) -> Dict: return { "userId": data["userId"], "length": 0, "nextIndex": 0, "teamCourseSettingList": [], } def handle_get_team_course_rule_api_request(self, data: Dict) -> Dict: return { "userId": data["userId"], "length": 0, "nextIndex": 0, "teamCourseRuleList": [], } def handle_upsert_user_all_api_request(self, data: Dict) -> Dict: upsert = data["upsertUserAll"] user_id = data["userId"] if "userData" in upsert: try: upsert["userData"][0]["userName"] = self.read_wtf8( upsert["userData"][0]["userName"] ) except: pass self.data.profile.put_profile_data( user_id, self.version, upsert["userData"][0] ) if "userDataEx" in upsert: self.data.profile.put_profile_data_ex( user_id, self.version, upsert["userDataEx"][0] ) if "userGameOption" in upsert: self.data.profile.put_profile_option(user_id, upsert["userGameOption"][0]) if "userGameOptionEx" in upsert: self.data.profile.put_profile_option_ex( user_id, upsert["userGameOptionEx"][0] ) if "userRecentRatingList" in upsert: self.data.profile.put_profile_recent_rating( user_id, upsert["userRecentRatingList"] ) if "userCharacterList" in upsert: for character in upsert["userCharacterList"]: self.data.item.put_character(user_id, character) if "userMapList" in upsert: for map in upsert["userMapList"]: self.data.item.put_map(user_id, map) if "userCourseList" in upsert: for course in upsert["userCourseList"]: self.data.score.put_course(user_id, course) if "userDuelList" in upsert: for duel in upsert["userDuelList"]: self.data.item.put_duel(user_id, duel) if "userItemList" in upsert: for item in upsert["userItemList"]: self.data.item.put_item(user_id, item) if "userActivityList" in upsert: for activity in upsert["userActivityList"]: self.data.profile.put_profile_activity(user_id, activity) if "userChargeList" in upsert: for charge in upsert["userChargeList"]: self.data.profile.put_profile_charge(user_id, charge) if "userMusicDetailList" in upsert: for song in upsert["userMusicDetailList"]: self.data.score.put_score(user_id, song) if "userPlaylogList" in upsert: for playlog in upsert["userPlaylogList"]: # convert the player names to utf-8 playlog["playedUserName1"] = self.read_wtf8(playlog["playedUserName1"]) playlog["playedUserName2"] = self.read_wtf8(playlog["playedUserName2"]) playlog["playedUserName3"] = self.read_wtf8(playlog["playedUserName3"]) self.data.score.put_playlog(user_id, playlog) if "userTeamPoint" in upsert: # TODO: team stuff pass if "userMapAreaList" in upsert: for map_area in upsert["userMapAreaList"]: self.data.item.put_map_area(user_id, map_area) if "userOverPowerList" in upsert: for overpower in upsert["userOverPowerList"]: self.data.profile.put_profile_overpower(user_id, overpower) if "userEmoneyList" in upsert: for emoney in upsert["userEmoneyList"]: self.data.profile.put_profile_emoney(user_id, emoney) if "userLoginBonusList" in upsert: for login in upsert["userLoginBonusList"]: self.data.item.put_login_bonus( user_id, self.version, login["presetId"], isWatched=True ) return {"returnCode": "1"} def handle_upsert_user_chargelog_api_request(self, data: Dict) -> Dict: # add tickets after they got bought, this makes sure the tickets are # still valid after an unsuccessful logout self.data.profile.put_profile_charge(data["userId"], data["userCharge"]) return {"returnCode": "1"} def handle_upsert_client_bookkeeping_api_request(self, data: Dict) -> Dict: return {"returnCode": "1"} def handle_upsert_client_develop_api_request(self, data: Dict) -> Dict: return {"returnCode": "1"} def handle_upsert_client_error_api_request(self, data: Dict) -> Dict: return {"returnCode": "1"} def handle_upsert_client_setting_api_request(self, data: Dict) -> Dict: return {"returnCode": "1"} def handle_upsert_client_testmode_api_request(self, data: Dict) -> Dict: return {"returnCode": "1"} def handle_get_user_net_battle_data_api_request(self, data: Dict) -> Dict: return { "userId": data["userId"], "userNetBattleData": {"recentNBSelectMusicList": []}, }