diff --git a/titles/mai2/base.py b/titles/mai2/base.py index 974c94c..4d31213 100644 --- a/titles/mai2/base.py +++ b/titles/mai2/base.py @@ -929,278 +929,3 @@ class Mai2Base: userRecommendSelectionMusicIdList: list[int] """ return {"userId": data["userId"], "userRecommendSelectionMusicIdList": []} - - async def handle_get_user_new_item_api_request(self, data: Dict) -> Dict: - # TODO: Added in 1.41, implement this? - user_id = data["userId"] - version = data.get("version", 1041000) - user_playlog_list = data.get("userPlaylogList", []) - - return { - "userId": user_id, - "itemKind": -1, - "itemId": -1, - } - - async def handle_get_user_rival_data_api_request(self, data: Dict) -> Dict: - user_id = data.get("userId", 0) - rival_id = data.get("rivalId", 0) - - if not user_id or not rival_id: return {} - - rival_pf = await self.data.profile.get_profile_detail(rival_id) - if not rival_pf: return {} - - return { - "userId": user_id, - "userRivalData": { - "rivalId": rival_id, - "rivalName": rival_pf['userName'] - } - } - - async def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict: - user_id = data.get("userId", 0) - rival_id = data.get("rivalId", 0) - next_index = data.get("nextIndex", 0) - max_ct = 100 - upper_lim = next_index + max_ct - rival_music_list: Dict[int, List] = {} - - songs = await self.data.score.get_best_scores(rival_id) - if songs is None: - self.logger.debug("handle_get_user_rival_music_api_request: get_best_scores returned None!") - return { - "userId": user_id, - "rivalId": rival_id, - "nextIndex": 0, - "userRivalMusicList": [] # musicId userRivalMusicDetailList -> level achievement deluxscoreMax - } - - num_user_songs = len(songs) - - for x in range(next_index, upper_lim): - - tmp = songs[x]._asdict() - if tmp['musicId'] in rival_music_list: - rival_music_list[tmp['musicId']].append([{"level": tmp['level'], 'achievement': tmp['achievement'], 'deluxscoreMax': tmp['deluxscoreMax']}]) - - else: - if len(rival_music_list) >= max_ct: - break - rival_music_list[tmp['musicId']] = [{"level": tmp['level'], 'achievement': tmp['achievement'], 'deluxscoreMax': tmp['deluxscoreMax']}] - - next_index = 0 if len(rival_music_list) < max_ct or num_user_songs == upper_lim else upper_lim - self.logger.info(f"Send rival {rival_id} songs {next_index}-{upper_lim} ({len(rival_music_list)}) out of {num_user_songs} for user {user_id} (next idx {next_index})") - - return { - "userId": user_id, - "rivalId": rival_id, - "nextIndex": next_index, - "userRivalMusicList": [{"musicId": x, "userRivalMusicDetailList": y} for x, y in rival_music_list.items()] - } - - # CardMaker support added in Universe - async def handle_cm_get_user_preview_api_request(self, data: Dict) -> Dict: - p = await self.data.profile.get_profile_detail(data["userId"], self.version) - if p is None: - return {} - - return { - "userName": p["userName"], - "rating": p["playerRating"], - # hardcode lastDataVersion for CardMaker - "lastDataVersion": "1.20.00", # Future versiohs should replace this with the correct version - # checks if the user is still logged in - "isLogin": False, - "isExistSellingCard": True, - } - - async def handle_cm_get_user_data_api_request(self, data: Dict) -> Dict: - # user already exists, because the preview checks that already - p = await self.data.profile.get_profile_detail(data["userId"], self.version) - - cards = await self.data.card.get_user_cards(data["userId"]) - if cards is None or len(cards) == 0: - # This should never happen - self.logger.error( - f"handle_get_user_data_api_request: Internal error - No cards found for user id {data['userId']}" - ) - return {} - - # get the dict representation of the row so we can modify values - user_data = p._asdict() - - # remove the values the game doesn't want - user_data.pop("id") - user_data.pop("user") - user_data.pop("version") - - return {"userId": data["userId"], "userData": user_data} - - async def handle_cm_login_api_request(self, data: Dict) -> Dict: - return {"returnCode": 1} - - async def handle_cm_logout_api_request(self, data: Dict) -> Dict: - return {"returnCode": 1} - - async def handle_cm_get_selling_card_api_request(self, data: Dict) -> Dict: - selling_cards = await self.data.static.get_enabled_cards(self.version) - if selling_cards is None: - return {"length": 0, "sellingCardList": []} - - selling_card_list = [] - for card in selling_cards: - tmp = card._asdict() - tmp.pop("id") - tmp.pop("version") - tmp.pop("cardName") - tmp.pop("enabled") - - tmp["startDate"] = datetime.strftime( - tmp["startDate"], Mai2Constants.DATE_TIME_FORMAT - ) - tmp["endDate"] = datetime.strftime( - tmp["endDate"], Mai2Constants.DATE_TIME_FORMAT - ) - tmp["noticeStartDate"] = datetime.strftime( - tmp["noticeStartDate"], Mai2Constants.DATE_TIME_FORMAT - ) - tmp["noticeEndDate"] = datetime.strftime( - tmp["noticeEndDate"], Mai2Constants.DATE_TIME_FORMAT - ) - - selling_card_list.append(tmp) - - return {"length": len(selling_card_list), "sellingCardList": selling_card_list} - - async def handle_cm_get_user_card_api_request(self, data: Dict) -> Dict: - user_cards = await self.data.item.get_cards(data["userId"]) - if user_cards is None: - return {"returnCode": 1, "length": 0, "nextIndex": 0, "userCardList": []} - - max_ct = data["maxCount"] - next_idx = data["nextIndex"] - start_idx = next_idx - end_idx = max_ct + start_idx - - if len(user_cards[start_idx:]) > max_ct: - next_idx += max_ct - else: - next_idx = 0 - - card_list = [] - for card in user_cards: - tmp = card._asdict() - tmp.pop("id") - tmp.pop("user") - - tmp["startDate"] = datetime.strftime( - tmp["startDate"], Mai2Constants.DATE_TIME_FORMAT - ) - tmp["endDate"] = datetime.strftime( - tmp["endDate"], Mai2Constants.DATE_TIME_FORMAT - ) - card_list.append(tmp) - - return { - "returnCode": 1, - "length": len(card_list[start_idx:end_idx]), - "nextIndex": next_idx, - "userCardList": card_list[start_idx:end_idx], - } - - async def handle_cm_get_user_item_api_request(self, data: Dict) -> Dict: - await self.handle_get_user_item_api_request(data) - - async def handle_cm_get_user_character_api_request(self, data: Dict) -> Dict: - characters = await self.data.item.get_characters(data["userId"]) - - chara_list = [] - for chara in characters: - chara_list.append( - { - "characterId": chara["characterId"], - # no clue why those values are even needed - "point": 0, - "count": 0, - "level": chara["level"], - "nextAwake": 0, - "nextAwakePercent": 0, - "favorite": False, - "awakening": chara["awakening"], - "useCount": chara["useCount"], - } - ) - - return { - "returnCode": 1, - "length": len(chara_list), - "userCharacterList": chara_list, - } - - async def handle_cm_get_user_card_print_error_api_request(self, data: Dict) -> Dict: - return {"length": 0, "userPrintDetailList": []} - - async def handle_cm_upsert_user_print_api_request(self, data: Dict) -> Dict: - user_id = data["userId"] - upsert = data["userPrintDetail"] - - # set a random card serial number - serial_id = "".join([str(randint(0, 9)) for _ in range(20)]) - - # calculate start and end date of the card - start_date = datetime.utcnow() - end_date = datetime.utcnow() + timedelta(days=15) - - user_card = upsert["userCard"] - await self.data.item.put_card( - user_id, - user_card["cardId"], - user_card["cardTypeId"], - user_card["charaId"], - user_card["mapId"], - # add the correct start date and also the end date in 15 days - start_date, - end_date, - ) - - # get the profile extend to save the new bought card - extend = await self.data.profile.get_profile_extend(user_id, self.version) - if extend: - extend = extend._asdict() - # parse the selectedCardList - # 6 = Freedom Pass, 4 = Gold Pass (cardTypeId) - selected_cards: List = extend["selectedCardList"] - - # if no pass is already added, add the corresponding pass - if not user_card["cardTypeId"] in selected_cards: - selected_cards.insert(0, user_card["cardTypeId"]) - - extend["selectedCardList"] = selected_cards - await self.data.profile.put_profile_extend(user_id, self.version, extend) - - # properly format userPrintDetail for the database - upsert.pop("userCard") - upsert.pop("serialId") - upsert["printDate"] = datetime.strptime(upsert["printDate"], "%Y-%m-%d") - - await self.data.item.put_user_print_detail(user_id, serial_id, upsert) - - return { - "returnCode": 1, - "orderId": 0, - "serialId": serial_id, - "startDate": datetime.strftime(start_date, Mai2Constants.DATE_TIME_FORMAT), - "endDate": datetime.strftime(end_date, Mai2Constants.DATE_TIME_FORMAT), - } - - async def handle_cm_upsert_user_printlog_api_request(self, data: Dict) -> Dict: - return { - "returnCode": 1, - "orderId": 0, - "serialId": data["userPrintlog"]["serialId"], - } - - async def handle_cm_upsert_buy_card_api_request(self, data: Dict) -> Dict: - return {"returnCode": 1} diff --git a/titles/mai2/dx.py b/titles/mai2/dx.py index 4423824..2c7d960 100644 --- a/titles/mai2/dx.py +++ b/titles/mai2/dx.py @@ -563,33 +563,74 @@ class Mai2DX(Mai2Base): return {"userId": data["userId"], "length": 0, "userRegionList": []} async def handle_get_user_rival_data_api_request(self, data: Dict) -> Dict: - user_id = data["userId"] - rival_id = data["rivalId"] + user_id = data.get("userId", 0) + rival_id = data.get("rivalId", 0) - """ - class UserRivalData: - rivalId: int - rivalName: str - """ - return {"userId": user_id, "userRivalData": {}} + if not user_id or not rival_id: return {} + + rival_pf = await self.data.profile.get_profile_detail(rival_id) + if not rival_pf: return {} + + return { + "userId": user_id, + "userRivalData": { + "rivalId": rival_id, + "rivalName": rival_pf['userName'] + } + } async def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict: + user_id = data.get("userId", 0) + rival_id = data.get("rivalId", 0) + next_index = data.get("nextIndex", 0) + max_ct = 100 + upper_lim = next_index + max_ct + rival_music_list: Dict[int, List] = {} + + songs = await self.data.score.get_best_scores(rival_id) + if songs is None: + self.logger.debug("handle_get_user_rival_music_api_request: get_best_scores returned None!") + return { + "userId": user_id, + "rivalId": rival_id, + "nextIndex": 0, + "userRivalMusicList": [] # musicId userRivalMusicDetailList -> level achievement deluxscoreMax + } + + num_user_songs = len(songs) + + for x in range(next_index, upper_lim): + + tmp = songs[x]._asdict() + if tmp['musicId'] in rival_music_list: + rival_music_list[tmp['musicId']].append([{"level": tmp['level'], 'achievement': tmp['achievement'], 'deluxscoreMax': tmp['deluxscoreMax']}]) + + else: + if len(rival_music_list) >= max_ct: + break + rival_music_list[tmp['musicId']] = [{"level": tmp['level'], 'achievement': tmp['achievement'], 'deluxscoreMax': tmp['deluxscoreMax']}] + + next_index = 0 if len(rival_music_list) < max_ct or num_user_songs == upper_lim else upper_lim + self.logger.info(f"Send rival {rival_id} songs {next_index}-{upper_lim} ({len(rival_music_list)}) out of {num_user_songs} for user {user_id} (next idx {next_index})") + + return { + "userId": user_id, + "rivalId": rival_id, + "nextIndex": next_index, + "userRivalMusicList": [{"musicId": x, "userRivalMusicDetailList": y} for x, y in rival_music_list.items()] + } + + async def handle_get_user_new_item_api_request(self, data: Dict) -> Dict: + # TODO: Added in 1.41, implement this? user_id = data["userId"] - rival_id = data["rivalId"] - next_idx = data["nextIndex"] - rival_music_levels = data["userRivalMusicLevelList"] - - """ - class UserRivalMusicList: - class UserRivalMusicDetailList: - level: int - achievement: int - deluxscoreMax: int - - musicId: int - userRivalMusicDetailList: list[UserRivalMusicDetailList] - """ - return {"userId": user_id, "nextIndex": 0, "userRivalMusicList": []} + version = data.get("version", 1041000) + user_playlog_list = data.get("userPlaylogList", []) + + return { + "userId": user_id, + "itemKind": -1, + "itemId": -1, + } async def handle_get_user_music_api_request(self, data: Dict) -> Dict: user_id = data.get("userId", 0) @@ -636,3 +677,208 @@ class Mai2DX(Mai2Base): return ret ret['loginId'] = ret.get('loginCount', 0) return ret + + # CardMaker support added in Universe + async def handle_cm_get_user_preview_api_request(self, data: Dict) -> Dict: + p = await self.data.profile.get_profile_detail(data["userId"], self.version) + if p is None: + return {} + + return { + "userName": p["userName"], + "rating": p["playerRating"], + # hardcode lastDataVersion for CardMaker + "lastDataVersion": "1.20.00", # Future versiohs should replace this with the correct version + # checks if the user is still logged in + "isLogin": False, + "isExistSellingCard": True, + } + + async def handle_cm_get_user_data_api_request(self, data: Dict) -> Dict: + # user already exists, because the preview checks that already + p = await self.data.profile.get_profile_detail(data["userId"], self.version) + + cards = await self.data.card.get_user_cards(data["userId"]) + if cards is None or len(cards) == 0: + # This should never happen + self.logger.error( + f"handle_get_user_data_api_request: Internal error - No cards found for user id {data['userId']}" + ) + return {} + + # get the dict representation of the row so we can modify values + user_data = p._asdict() + + # remove the values the game doesn't want + user_data.pop("id") + user_data.pop("user") + user_data.pop("version") + + return {"userId": data["userId"], "userData": user_data} + + async def handle_cm_login_api_request(self, data: Dict) -> Dict: + return {"returnCode": 1} + + async def handle_cm_logout_api_request(self, data: Dict) -> Dict: + return {"returnCode": 1} + + async def handle_cm_get_selling_card_api_request(self, data: Dict) -> Dict: + selling_cards = await self.data.static.get_enabled_cards(self.version) + if selling_cards is None: + return {"length": 0, "sellingCardList": []} + + selling_card_list = [] + for card in selling_cards: + tmp = card._asdict() + tmp.pop("id") + tmp.pop("version") + tmp.pop("cardName") + tmp.pop("enabled") + + tmp["startDate"] = datetime.strftime( + tmp["startDate"], Mai2Constants.DATE_TIME_FORMAT + ) + tmp["endDate"] = datetime.strftime( + tmp["endDate"], Mai2Constants.DATE_TIME_FORMAT + ) + tmp["noticeStartDate"] = datetime.strftime( + tmp["noticeStartDate"], Mai2Constants.DATE_TIME_FORMAT + ) + tmp["noticeEndDate"] = datetime.strftime( + tmp["noticeEndDate"], Mai2Constants.DATE_TIME_FORMAT + ) + + selling_card_list.append(tmp) + + return {"length": len(selling_card_list), "sellingCardList": selling_card_list} + + async def handle_cm_get_user_card_api_request(self, data: Dict) -> Dict: + user_cards = await self.data.item.get_cards(data["userId"]) + if user_cards is None: + return {"returnCode": 1, "length": 0, "nextIndex": 0, "userCardList": []} + + max_ct = data["maxCount"] + next_idx = data["nextIndex"] + start_idx = next_idx + end_idx = max_ct + start_idx + + if len(user_cards[start_idx:]) > max_ct: + next_idx += max_ct + else: + next_idx = 0 + + card_list = [] + for card in user_cards: + tmp = card._asdict() + tmp.pop("id") + tmp.pop("user") + + tmp["startDate"] = datetime.strftime( + tmp["startDate"], Mai2Constants.DATE_TIME_FORMAT + ) + tmp["endDate"] = datetime.strftime( + tmp["endDate"], Mai2Constants.DATE_TIME_FORMAT + ) + card_list.append(tmp) + + return { + "returnCode": 1, + "length": len(card_list[start_idx:end_idx]), + "nextIndex": next_idx, + "userCardList": card_list[start_idx:end_idx], + } + + async def handle_cm_get_user_item_api_request(self, data: Dict) -> Dict: + await self.handle_get_user_item_api_request(data) + + async def handle_cm_get_user_character_api_request(self, data: Dict) -> Dict: + characters = await self.data.item.get_characters(data["userId"]) + + chara_list = [] + for chara in characters: + chara_list.append( + { + "characterId": chara["characterId"], + # no clue why those values are even needed + "point": 0, + "count": 0, + "level": chara["level"], + "nextAwake": 0, + "nextAwakePercent": 0, + "favorite": False, + "awakening": chara["awakening"], + "useCount": chara["useCount"], + } + ) + + return { + "returnCode": 1, + "length": len(chara_list), + "userCharacterList": chara_list, + } + + async def handle_cm_get_user_card_print_error_api_request(self, data: Dict) -> Dict: + return {"length": 0, "userPrintDetailList": []} + + async def handle_cm_upsert_user_print_api_request(self, data: Dict) -> Dict: + user_id = data["userId"] + upsert = data["userPrintDetail"] + + # set a random card serial number + serial_id = "".join([str(randint(0, 9)) for _ in range(20)]) + + # calculate start and end date of the card + start_date = datetime.utcnow() + end_date = datetime.utcnow() + timedelta(days=15) + + user_card = upsert["userCard"] + await self.data.item.put_card( + user_id, + user_card["cardId"], + user_card["cardTypeId"], + user_card["charaId"], + user_card["mapId"], + # add the correct start date and also the end date in 15 days + start_date, + end_date, + ) + + # get the profile extend to save the new bought card + extend = await self.data.profile.get_profile_extend(user_id, self.version) + if extend: + extend = extend._asdict() + # parse the selectedCardList + # 6 = Freedom Pass, 4 = Gold Pass (cardTypeId) + selected_cards: List = extend["selectedCardList"] + + # if no pass is already added, add the corresponding pass + if not user_card["cardTypeId"] in selected_cards: + selected_cards.insert(0, user_card["cardTypeId"]) + + extend["selectedCardList"] = selected_cards + await self.data.profile.put_profile_extend(user_id, self.version, extend) + + # properly format userPrintDetail for the database + upsert.pop("userCard") + upsert.pop("serialId") + upsert["printDate"] = datetime.strptime(upsert["printDate"], "%Y-%m-%d") + + await self.data.item.put_user_print_detail(user_id, serial_id, upsert) + + return { + "returnCode": 1, + "orderId": 0, + "serialId": serial_id, + "startDate": datetime.strftime(start_date, Mai2Constants.DATE_TIME_FORMAT), + "endDate": datetime.strftime(end_date, Mai2Constants.DATE_TIME_FORMAT), + } + + async def handle_cm_upsert_user_printlog_api_request(self, data: Dict) -> Dict: + return { + "returnCode": 1, + "orderId": 0, + "serialId": data["userPrintlog"]["serialId"], + } + + async def handle_cm_upsert_buy_card_api_request(self, data: Dict) -> Dict: + return {"returnCode": 1}