974 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			974 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import pymysql
 | |
| from datetime import datetime, timedelta
 | |
| from typing import Any, Dict, List
 | |
| import logging
 | |
| from base64 import b64decode
 | |
| from os import path, stat, remove, mkdir, access, W_OK
 | |
| from PIL import ImageFile
 | |
| from random import randint
 | |
| 
 | |
| import pytz
 | |
| from core.config import CoreConfig
 | |
| from core.utils import Utils
 | |
| from .const import Mai2Constants
 | |
| from .config import Mai2Config
 | |
| from .database import Mai2Data
 | |
| 
 | |
| 
 | |
| class Mai2Base:
 | |
|     def __init__(self, cfg: CoreConfig, game_cfg: Mai2Config) -> None:
 | |
|         self.core_config = cfg
 | |
|         self.game_config = game_cfg
 | |
|         self.version = Mai2Constants.VER_MAIMAI
 | |
|         self.data = Mai2Data(cfg)
 | |
|         self.logger = logging.getLogger("mai2")
 | |
|         self.can_deliver = False
 | |
|         self.can_usbdl = False
 | |
|         self.old_server = ""
 | |
|         self.date_time_format = "%Y-%m-%d %H:%M:%S"
 | |
| 
 | |
|         if not self.core_config.server.is_using_proxy and Utils.get_title_port(self.core_config) != 80:
 | |
|             self.old_server = f"http://{self.core_config.server.hostname}:{Utils.get_title_port(cfg)}/SDEY/197/MaimaiServlet/"
 | |
| 
 | |
|         else:
 | |
|             self.old_server = f"http://{self.core_config.server.hostname}/SDEY/197/MaimaiServlet/"
 | |
| 
 | |
|     async def handle_get_game_setting_api_request(self, data: Dict):
 | |
|         # if reboot start/end time is not defined use the default behavior of being a few hours ago
 | |
|         if self.core_config.title.reboot_start_time == "" or self.core_config.title.reboot_end_time == "":
 | |
|             reboot_start = datetime.strftime(
 | |
|                 datetime.utcnow() + timedelta(hours=6), self.date_time_format
 | |
|             )
 | |
|             reboot_end = datetime.strftime(
 | |
|                 datetime.utcnow() + timedelta(hours=7), self.date_time_format
 | |
|             )
 | |
|         else:
 | |
|             # get current datetime in JST
 | |
|             current_jst = datetime.now(pytz.timezone('Asia/Tokyo')).date()
 | |
| 
 | |
|             # parse config start/end times into datetime
 | |
|             reboot_start_time = datetime.strptime(self.core_config.title.reboot_start_time, "%H:%M")
 | |
|             reboot_end_time = datetime.strptime(self.core_config.title.reboot_end_time, "%H:%M")
 | |
| 
 | |
|             # offset datetimes with current date/time
 | |
|             reboot_start_time = reboot_start_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo'))
 | |
|             reboot_end_time = reboot_end_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo'))
 | |
| 
 | |
|             # create strings for use in gameSetting
 | |
|             reboot_start = reboot_start_time.strftime(self.date_time_format)
 | |
|             reboot_end = reboot_end_time.strftime(self.date_time_format)
 | |
| 
 | |
| 
 | |
|         return {
 | |
|             "isDevelop": False,
 | |
|             "isAouAccession": False,
 | |
|             "gameSetting": {
 | |
|                 "isMaintenance": False,
 | |
|                 "requestInterval": 1800,
 | |
|                 "rebootStartTime": reboot_start,
 | |
|                 "rebootEndTime": reboot_end,
 | |
|                 "movieUploadLimit": 100,
 | |
|                 "movieStatus": 1,
 | |
|                 "movieServerUri": self.old_server + "api/movie" if self.game_config.uploads.movies else "movie",
 | |
|                 "deliverServerUri": self.old_server + "deliver/" if self.can_deliver and self.game_config.deliver.enable else "",
 | |
|                 "oldServerUri": self.old_server + "old",
 | |
|                 "usbDlServerUri": self.old_server + "usbdl/" if self.can_deliver and self.game_config.deliver.udbdl_enable else "",
 | |
|             },
 | |
|         }
 | |
| 
 | |
|     async def handle_get_game_ranking_api_request(self, data: Dict) -> Dict:
 | |
|         conn = pymysql.connect(
 | |
|             host=self.core_config.database.host,
 | |
|             port=self.core_config.database.port,
 | |
|             user=self.core_config.database.username,
 | |
|             password=self.core_config.database.password,
 | |
|             database=self.core_config.database.name,
 | |
|             charset='utf8mb4'  
 | |
|         )
 | |
|         try:
 | |
|             cursor = conn.cursor()
 | |
| 
 | |
|             query = """
 | |
|             SELECT musicid AS id, COUNT(*) AS point
 | |
|             FROM mai2_playlog
 | |
|             GROUP BY musicid
 | |
|             ORDER BY point DESC
 | |
|             LIMIT 100
 | |
|             """
 | |
|             cursor.execute(query)
 | |
| 
 | |
|             results = cursor.fetchall()
 | |
|             ranking_list = [{"id": row[0], "point": row[1], "userName": ""} for row in results]
 | |
|             output = {
 | |
|                 "type": 1,
 | |
|                 "gameRankingList": ranking_list,
 | |
|                 "gameRankingInstantList": None
 | |
|             }
 | |
| 
 | |
|             cursor.close()
 | |
|             conn.close()
 | |
|             return output
 | |
| 
 | |
|         except Exception as e:
 | |
|             return {'length': 0, 'gameRankingList': []}
 | |
| 
 | |
|     async def handle_get_game_tournament_info_api_request(self, data: Dict) -> Dict:
 | |
|         # TODO: Tournament support
 | |
|         return {"length": 0, "gameTournamentInfoList": []}
 | |
| 
 | |
|     async def handle_get_game_event_api_request(self, data: Dict) -> Dict:
 | |
|         events = await self.data.static.get_enabled_events(self.version)
 | |
|         events_lst = []
 | |
|         if events is None or not events:
 | |
|             self.logger.warning("No enabled events, did you run the reader?")
 | |
|             return {"type": data["type"], "length": 0, "gameEventList": []}
 | |
| 
 | |
|         for event in events:
 | |
|             events_lst.append(
 | |
|                 {
 | |
|                     "type": event["type"],
 | |
|                     "id": event["eventId"],
 | |
|                     # actually use the startDate from the import so it
 | |
|                     # properly shows all the events when new ones are imported
 | |
|                     "startDate": datetime.strftime(
 | |
|                         event["startDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
 | |
|                     ),
 | |
|                     "endDate": "2099-12-31 00:00:00.0",
 | |
|                 }
 | |
|             )
 | |
| 
 | |
|         return {
 | |
|             "type": data["type"],
 | |
|             "length": len(events_lst),
 | |
|             "gameEventList": events_lst,
 | |
|         }
 | |
| 
 | |
|     async def handle_get_game_ng_music_id_api_request(self, data: Dict) -> Dict:
 | |
|         return {"length": 0, "musicIdList": []}
 | |
| 
 | |
|     async def handle_get_game_charge_api_request(self, data: Dict) -> Dict:
 | |
|         game_charge_list = await self.data.static.get_enabled_tickets(self.version, 1)
 | |
|         if game_charge_list is None:
 | |
|             return {"length": 0, "gameChargeList": []}
 | |
| 
 | |
|         charge_list = []
 | |
|         for i, charge in enumerate(game_charge_list):
 | |
|             charge_list.append(
 | |
|                 {
 | |
|                     "orderId": i + 1,
 | |
|                     "chargeId": charge["ticketId"],
 | |
|                     "price": charge["price"],
 | |
|                     "startDate": "2017-12-05 07:00:00.0",
 | |
|                     "endDate": "2099-12-31 00:00:00.0",
 | |
|                 }
 | |
|             )
 | |
| 
 | |
|         return {"length": len(charge_list), "gameChargeList": charge_list}
 | |
| 
 | |
|     async def handle_upsert_client_setting_api_request(self, data: Dict) -> Dict:
 | |
|         return {"returnCode": 1, "apiName": "UpsertClientSettingApi"}
 | |
| 
 | |
|     async def handle_upsert_client_upload_api_request(self, data: Dict) -> Dict:
 | |
|         return {"returnCode": 1, "apiName": "UpsertClientUploadApi"}
 | |
| 
 | |
|     async def handle_upsert_client_bookkeeping_api_request(self, data: Dict) -> Dict:
 | |
|         return {"returnCode": 1, "apiName": "UpsertClientBookkeepingApi"}
 | |
| 
 | |
|     async def handle_upsert_client_testmode_api_request(self, data: Dict) -> Dict:
 | |
|         return {"returnCode": 1, "apiName": "UpsertClientTestmodeApi"}
 | |
| 
 | |
|     async def handle_get_user_preview_api_request(self, data: Dict) -> Dict:
 | |
|         p = await self.data.profile.get_profile_detail(data["userId"], self.version, False)
 | |
|         w = await self.data.profile.get_web_option(data["userId"], self.version)
 | |
|         if p is None or w is None:
 | |
|             return {}  # Register
 | |
|         profile = p._asdict()
 | |
|         web_opt = w._asdict()
 | |
| 
 | |
|         return {
 | |
|             "userId": data["userId"],
 | |
|             "userName": profile["userName"],
 | |
|             "isLogin": False,
 | |
|             "lastDataVersion": profile["lastDataVersion"],
 | |
|             "lastLoginDate": profile["lastPlayDate"],
 | |
|             "lastPlayDate": profile["lastPlayDate"],
 | |
|             "playerRating": profile["playerRating"],
 | |
|             "nameplateId": profile["nameplateId"],
 | |
|             "frameId": profile["frameId"],
 | |
|             "iconId": profile["iconId"],
 | |
|             "trophyId": profile["trophyId"],
 | |
|             "dispRate": web_opt["dispRate"],  # 0: all, 1: dispRate, 2: dispDan, 3: hide
 | |
|             "dispRank": web_opt["dispRank"],
 | |
|             "dispHomeRanker": web_opt["dispHomeRanker"],
 | |
|             "dispTotalLv": web_opt["dispTotalLv"],
 | |
|             "totalLv": profile["totalLv"],
 | |
|         }
 | |
| 
 | |
|     async def handle_user_login_api_request(self, data: Dict) -> Dict:
 | |
|         profile = await self.data.profile.get_profile_detail(data["userId"], self.version)
 | |
|         consec = await self.data.profile.get_consec_login(data["userId"], self.version)
 | |
| 
 | |
|         if profile is not None:
 | |
|             lastLoginDate = profile["lastLoginDate"]
 | |
|             loginCt = profile["playCount"]
 | |
| 
 | |
|             if "regionId" in data:
 | |
|                 await self.data.profile.put_profile_region(data["userId"], data["regionId"])
 | |
|         else:
 | |
|             loginCt = 0
 | |
|             lastLoginDate = "2017-12-05 07:00:00.0"
 | |
|         
 | |
|         if consec is None or not consec:
 | |
|             consec_ct = 1
 | |
|         
 | |
|         else:
 | |
|             lastlogindate_ = datetime.strptime(profile["lastLoginDate"], "%Y-%m-%d %H:%M:%S.%f").timestamp()
 | |
|             today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).timestamp()
 | |
|             yesterday_midnight = today_midnight - 86400
 | |
| 
 | |
|             if lastlogindate_ < today_midnight:
 | |
|                 consec_ct = consec['logins'] + 1                
 | |
|                 await self.data.profile.add_consec_login(data["userId"], self.version)
 | |
|             
 | |
|             elif lastlogindate_ < yesterday_midnight:
 | |
|                 consec_ct = 1
 | |
|                 await self.data.profile.reset_consec_login(data["userId"], self.version)
 | |
| 
 | |
|             else:
 | |
|                 consec_ct = consec['logins']
 | |
|         
 | |
| 
 | |
|         return {
 | |
|             "returnCode": 1,
 | |
|             "lastLoginDate": lastLoginDate,
 | |
|             "loginCount": loginCt,
 | |
|             "consecutiveLoginCount": consec_ct, # Number of consecutive days we've logged in.
 | |
|         }
 | |
| 
 | |
|     async def handle_upload_user_playlog_api_request(self, data: Dict) -> Dict:
 | |
|         user_id = data["userId"]
 | |
|         playlog = data["userPlaylog"]
 | |
| 
 | |
|         await self.data.score.put_playlog(user_id, playlog)
 | |
| 
 | |
|         return {"returnCode": 1, "apiName": "UploadUserPlaylogApi"}
 | |
| 
 | |
|     async def handle_upsert_user_chargelog_api_request(self, data: Dict) -> Dict:
 | |
|         user_id = data["userId"]
 | |
|         charge = data["userCharge"]
 | |
| 
 | |
|         # remove the ".0" from the date string, festival only?
 | |
|         charge["purchaseDate"] = charge["purchaseDate"].replace(".0", "")
 | |
|         await self.data.item.put_charge(
 | |
|             user_id,
 | |
|             charge["chargeId"],
 | |
|             charge["stock"],
 | |
|             charge["purchaseDate"], # Ideally these should be datetimes, but db was
 | |
|             charge["validDate"] # set up with them being str, so str it is for now
 | |
|         )
 | |
| 
 | |
|         return {"returnCode": 1, "apiName": "UpsertUserChargelogApi"}
 | |
| 
 | |
|     async def handle_upsert_user_all_api_request(self, data: Dict) -> Dict:
 | |
|         user_id = data["userId"]
 | |
|         upsert = data["upsertUserAll"]
 | |
|         
 | |
|         if int(user_id) & 0x1000000000001 == 0x1000000000001:
 | |
|             place_id = int(user_id) & 0xFFFC00000000
 | |
|             
 | |
|             self.logger.info("Guest play from place ID %d, ignoring.", place_id)
 | |
|             return {"returnCode": 1, "apiName": "UpsertUserAllApi"}
 | |
| 
 | |
|         if "userData" in upsert and len(upsert["userData"]) > 0:
 | |
|             upsert["userData"][0].pop("accessCode")
 | |
|             upsert["userData"][0].pop("userId")
 | |
| 
 | |
|             await self.data.profile.put_profile_detail(
 | |
|                 user_id, self.version, upsert["userData"][0], False
 | |
|             )
 | |
|         
 | |
|         if "userWebOption" in upsert and len(upsert["userWebOption"]) > 0:            
 | |
|             upsert["userWebOption"][0]["isNetMember"] = True
 | |
|             await self.data.profile.put_web_option(
 | |
|                 user_id, self.version, upsert["userWebOption"][0]
 | |
|             )
 | |
| 
 | |
|         if "userGradeStatusList" in upsert and len(upsert["userGradeStatusList"]) > 0:
 | |
|             await self.data.profile.put_grade_status(
 | |
|                 user_id, upsert["userGradeStatusList"][0]
 | |
|             )
 | |
| 
 | |
|         if "userBossList" in upsert and len(upsert["userBossList"]) > 0:
 | |
|             await self.data.profile.put_boss_list(
 | |
|                 user_id, upsert["userBossList"][0]
 | |
|             )
 | |
| 
 | |
|         if "userPlaylogList" in upsert and len(upsert["userPlaylogList"]) > 0:
 | |
|             for playlog in upsert["userPlaylogList"]:
 | |
|                 await self.data.score.put_playlog(
 | |
|                     user_id, playlog, False
 | |
|                 )
 | |
| 
 | |
|         if "userExtend" in upsert and len(upsert["userExtend"]) > 0:
 | |
|             await self.data.profile.put_profile_extend(
 | |
|                 user_id, self.version, upsert["userExtend"][0]
 | |
|             )
 | |
| 
 | |
|         if "userGhost" in upsert:
 | |
|             for ghost in upsert["userGhost"]:
 | |
|                 await self.data.profile.put_profile_ghost(user_id, self.version, ghost)
 | |
| 
 | |
|         if "userRecentRatingList" in upsert:
 | |
|             await self.data.profile.put_recent_rating(user_id, upsert["userRecentRatingList"])
 | |
| 
 | |
|         if "userOption" in upsert and len(upsert["userOption"]) > 0:
 | |
|             upsert["userOption"][0].pop("userId")
 | |
|             await self.data.profile.put_profile_option(
 | |
|                 user_id, self.version, upsert["userOption"][0], False
 | |
|             )
 | |
| 
 | |
|         if "userRatingList" in upsert and len(upsert["userRatingList"]) > 0:
 | |
|             await self.data.profile.put_profile_rating(
 | |
|                 user_id, self.version, upsert["userRatingList"][0]
 | |
|             )
 | |
| 
 | |
|         if "userActivityList" in upsert and len(upsert["userActivityList"]) > 0:
 | |
|             for act in upsert["userActivityList"]:
 | |
|                 await self.data.profile.put_profile_activity(user_id, act)
 | |
| 
 | |
|         if "userChargeList" in upsert and len(upsert["userChargeList"]) > 0:
 | |
|             for charge in upsert["userChargeList"]:
 | |
|                 # remove the ".0" from the date string, festival only?
 | |
|                 charge["purchaseDate"] = charge["purchaseDate"].replace(".0", "")
 | |
|                 await self.data.item.put_charge(
 | |
|                     user_id,
 | |
|                     charge["chargeId"],
 | |
|                     charge["stock"],
 | |
|                     charge["purchaseDate"],
 | |
|                     charge["validDate"]
 | |
|                 )
 | |
| 
 | |
|         if "userCharacterList" in upsert and len(upsert["userCharacterList"]) > 0:
 | |
|             for char in upsert["userCharacterList"]:
 | |
|                 await self.data.item.put_character_(
 | |
|                     user_id,
 | |
|                     char
 | |
|                 )
 | |
| 
 | |
|         if "userItemList" in upsert and len(upsert["userItemList"]) > 0:
 | |
|             for item in upsert["userItemList"]:
 | |
|                 await self.data.item.put_item(
 | |
|                     user_id,
 | |
|                     int(item["itemKind"]),
 | |
|                     item["itemId"],
 | |
|                     item["stock"],
 | |
|                     True
 | |
|                 )
 | |
| 
 | |
|         if "userLoginBonusList" in upsert and len(upsert["userLoginBonusList"]) > 0:
 | |
|             for login_bonus in upsert["userLoginBonusList"]:
 | |
|                 await self.data.item.put_login_bonus(
 | |
|                     user_id,
 | |
|                     login_bonus["bonusId"],
 | |
|                     login_bonus["point"],
 | |
|                     login_bonus["isCurrent"],
 | |
|                     login_bonus["isComplete"],
 | |
|                 )
 | |
| 
 | |
|         if "userMapList" in upsert and len(upsert["userMapList"]) > 0:
 | |
|             for map in upsert["userMapList"]:
 | |
|                 await self.data.item.put_map(
 | |
|                     user_id,
 | |
|                     map["mapId"],
 | |
|                     map["distance"],
 | |
|                     map["isLock"],
 | |
|                     map["isClear"],
 | |
|                     map["isComplete"],
 | |
|                 )
 | |
| 
 | |
|         if "userMusicDetailList" in upsert and len(upsert["userMusicDetailList"]) > 0:
 | |
|             for music in upsert["userMusicDetailList"]:
 | |
|                 await self.data.score.put_best_score(user_id, music, False)
 | |
| 
 | |
|         if "userCourseList" in upsert and len(upsert["userCourseList"]) > 0:
 | |
|             for course in upsert["userCourseList"]:
 | |
|                 await self.data.score.put_course(user_id, course)
 | |
| 
 | |
|         if "userFavoriteList" in upsert and len(upsert["userFavoriteList"]) > 0:
 | |
|             for fav in upsert["userFavoriteList"]:
 | |
|                 await self.data.item.put_favorite(user_id, fav["kind"], fav["itemIdList"])
 | |
| 
 | |
|         if (
 | |
|             "userFriendSeasonRankingList" in upsert
 | |
|             and len(upsert["userFriendSeasonRankingList"]) > 0
 | |
|         ):
 | |
|             for fsr in upsert["userFriendSeasonRankingList"]:
 | |
|                 fsr["recordDate"] = (
 | |
|                     datetime.strptime(
 | |
|                         fsr["recordDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
 | |
|                     ),
 | |
|                 )
 | |
|                 await self.data.item.put_friend_season_ranking(user_id, fsr)
 | |
| 
 | |
|         return {"returnCode": 1, "apiName": "UpsertUserAllApi"}
 | |
| 
 | |
|     async def handle_user_logout_api_request(self, data: Dict) -> Dict:
 | |
|         return {"returnCode": 1}
 | |
| 
 | |
|     async def handle_get_user_data_api_request(self, data: Dict) -> Dict:
 | |
|         profile = await self.data.profile.get_profile_detail(data["userId"], self.version, False)
 | |
|         if profile is None:
 | |
|             return
 | |
| 
 | |
|         profile_dict = profile._asdict()
 | |
|         profile_dict.pop("id")
 | |
|         profile_dict.pop("user")
 | |
|         profile_dict.pop("version")
 | |
| 
 | |
|         return {"userId": data["userId"], "userData": profile_dict}
 | |
| 
 | |
|     async def handle_get_user_extend_api_request(self, data: Dict) -> Dict:
 | |
|         extend = await self.data.profile.get_profile_extend(data["userId"], self.version)
 | |
|         if extend is None:
 | |
|             return
 | |
| 
 | |
|         extend_dict = extend._asdict()
 | |
|         extend_dict.pop("id")
 | |
|         extend_dict.pop("user")
 | |
|         extend_dict.pop("version")
 | |
| 
 | |
|         return {"userId": data["userId"], "userExtend": extend_dict}
 | |
| 
 | |
|     async def handle_get_user_option_api_request(self, data: Dict) -> Dict:
 | |
|         options = await self.data.profile.get_profile_option(data["userId"], self.version, False)
 | |
|         if options is None:
 | |
|             return
 | |
| 
 | |
|         options_dict = options._asdict()
 | |
|         options_dict.pop("id")
 | |
|         options_dict.pop("user")
 | |
|         options_dict.pop("version")
 | |
| 
 | |
|         return {"userId": data["userId"], "userOption": options_dict}
 | |
| 
 | |
|     async def handle_get_user_card_api_request(self, data: Dict) -> Dict:
 | |
|         user_cards = await self.data.item.get_cards(data["userId"])
 | |
|         if user_cards is None:
 | |
|             return {"userId": data["userId"], "nextIndex": 0, "userCardList": []}
 | |
| 
 | |
|         max_ct = data["maxCount"]
 | |
|         next_idx = data["nextIndex"]
 | |
|         start_idx = next_idx
 | |
|         end_idx = max_ct + start_idx
 | |
| 
 | |
|         if len(user_cards[start_idx:]) > max_ct:
 | |
|             next_idx += max_ct
 | |
|         else:
 | |
|             next_idx = 0
 | |
| 
 | |
|         card_list = []
 | |
|         for card in user_cards:
 | |
|             tmp = card._asdict()
 | |
|             tmp.pop("id")
 | |
|             tmp.pop("user")
 | |
|             tmp["startDate"] = datetime.strftime(
 | |
|                 tmp["startDate"], Mai2Constants.DATE_TIME_FORMAT
 | |
|             )
 | |
|             tmp["endDate"] = datetime.strftime(
 | |
|                 tmp["endDate"], Mai2Constants.DATE_TIME_FORMAT
 | |
|             )
 | |
|             card_list.append(tmp)
 | |
| 
 | |
|         return {
 | |
|             "userId": data["userId"],
 | |
|             "nextIndex": next_idx,
 | |
|             "userCardList": card_list[start_idx:end_idx],
 | |
|         }
 | |
| 
 | |
|     async def handle_get_user_charge_api_request(self, data: Dict) -> Dict:
 | |
|         user_charges = await self.data.item.get_charges(data["userId"])
 | |
|         if user_charges is None:
 | |
|             return {"userId": data["userId"], "length": 0, "userChargeList": []}
 | |
| 
 | |
|         user_charge_list = []
 | |
|         for charge in user_charges:
 | |
|             tmp = charge._asdict()
 | |
|             tmp.pop("id")
 | |
|             tmp.pop("user")
 | |
| 
 | |
|             user_charge_list.append(tmp)
 | |
| 
 | |
|         return {
 | |
|             "userId": data["userId"],
 | |
|             "length": len(user_charge_list),
 | |
|             "userChargeList": user_charge_list,
 | |
|         }
 | |
| 
 | |
|     async def handle_get_user_present_api_request(self, data: Dict) -> Dict:
 | |
|         items: List[Dict[str, Any]] = []
 | |
|         user_pres_list = await self.data.item.get_presents_by_version_user(self.version, data["userId"])
 | |
|         if user_pres_list:
 | |
|             self.logger.debug(f"Found {len(user_pres_list)} possible presents")
 | |
|             for present in user_pres_list:
 | |
|                 if (present['startDate'] and present['startDate'].timestamp() > datetime.now().timestamp()):
 | |
|                     self.logger.debug(f"Present {present['id']} distribution hasn't started yet (begins {present['startDate']})")
 | |
|                     continue # present period hasn't started yet, move onto the next one
 | |
|                 
 | |
|                 if (present['endDate'] and present['endDate'].timestamp() < datetime.now().timestamp()):
 | |
|                     self.logger.warn(f"Present {present['id']} ended on {present['endDate']} and should be removed")
 | |
|                     continue # present period ended, move onto the next one
 | |
|                 
 | |
|                 test = await self.data.item.get_item(data["userId"], present['itemKind'], present['itemId'])
 | |
|                 if not test: # Don't send presents for items the user already has
 | |
|                     pres_id = present['itemKind'] * 1000000
 | |
|                     pres_id += present['itemId']
 | |
|                     items.append({"itemId": pres_id, "itemKind": 4, "stock": present['stock'], "isValid": True})
 | |
|                     self.logger.info(f"Give user {data['userId']} {present['stock']}x item {present['itemId']} (kind {present['itemKind']}) as present")
 | |
|         
 | |
|         return { "userId": data.get("userId", 0), "length": len(items), "userPresentList": items}
 | |
|     
 | |
|     async def handle_get_transfer_friend_api_request(self, data: Dict) -> Dict:
 | |
|         return {}
 | |
| 
 | |
|     async def handle_get_user_present_event_api_request(self, data: Dict) -> Dict:
 | |
|         return { "userId": data.get("userId", 0), "length": 0, "userPresentEventList": []}
 | |
|     
 | |
|     async def handle_get_user_boss_api_request(self, data: Dict) -> Dict:
 | |
|         b = await self.data.profile.get_boss_list(data["userId"])
 | |
|         if b is None:
 | |
|             return { "userId": data.get("userId", 0), "userBossData": {}}
 | |
|         boss_lst = b._asdict()
 | |
|         boss_lst.pop("id")
 | |
|         boss_lst.pop("user")
 | |
| 
 | |
|         return { "userId": data.get("userId", 0), "userBossData": boss_lst}
 | |
| 
 | |
|     async def handle_get_user_item_api_request(self, data: Dict) -> Dict:
 | |
|         kind = int(data["nextIndex"] / 10000000000)
 | |
|         next_idx = int(data["nextIndex"] % 10000000000)
 | |
|         user_item_list = await self.data.item.get_items(data["userId"], kind)
 | |
| 
 | |
|         items: List[Dict[str, Any]] = []
 | |
|         for i in range(next_idx, len(user_item_list)):
 | |
|             tmp = user_item_list[i]._asdict()
 | |
|             tmp.pop("user")
 | |
|             tmp.pop("id")
 | |
|             items.append(tmp)
 | |
|             if len(items) >= int(data["maxCount"]):
 | |
|                 break
 | |
| 
 | |
|         xout = kind * 10000000000 + next_idx + len(items)
 | |
| 
 | |
|         if len(items) < int(data["maxCount"]):
 | |
|             next_idx = 0
 | |
|         else:
 | |
|             next_idx = xout
 | |
| 
 | |
|         return {
 | |
|             "userId": data["userId"],
 | |
|             "nextIndex": next_idx,
 | |
|             "itemKind": kind,
 | |
|             "userItemList": items,
 | |
|         }
 | |
| 
 | |
|     async def handle_get_user_character_api_request(self, data: Dict) -> Dict:
 | |
|         characters = await self.data.item.get_characters(data["userId"])
 | |
| 
 | |
|         chara_list = []
 | |
|         for chara in characters:
 | |
|             tmp = chara._asdict()
 | |
|             tmp.pop("id")
 | |
|             tmp.pop("user")
 | |
|             tmp.pop("awakening")
 | |
|             tmp.pop("useCount")
 | |
|             chara_list.append(tmp)
 | |
| 
 | |
|         return {"userId": data["userId"], "userCharacterList": chara_list}
 | |
| 
 | |
|     async def handle_get_user_favorite_api_request(self, data: Dict) -> Dict:
 | |
|         favorites = await self.data.item.get_favorites(data["userId"], data["itemKind"])
 | |
|         if favorites is None:
 | |
|             return
 | |
| 
 | |
|         userFavs = []
 | |
|         for fav in favorites:
 | |
|             userFavs.append(
 | |
|                 {
 | |
|                     "userId": data["userId"],
 | |
|                     "itemKind": fav["itemKind"],
 | |
|                     "itemIdList": fav["itemIdList"],
 | |
|                 }
 | |
|             )
 | |
| 
 | |
|         return {"userId": data["userId"], "userFavoriteData": userFavs}
 | |
| 
 | |
|     async def handle_get_user_ghost_api_request(self, data: Dict) -> Dict:
 | |
|         ghost = await self.data.profile.get_profile_ghost(data["userId"], self.version)
 | |
|         if ghost is None:
 | |
|             return
 | |
| 
 | |
|         ghost_dict = ghost._asdict()
 | |
|         ghost_dict.pop("user")
 | |
|         ghost_dict.pop("id")
 | |
|         ghost_dict.pop("version_int")
 | |
| 
 | |
|         return {"userId": data["userId"], "userGhost": ghost_dict}
 | |
| 
 | |
|     async def handle_get_user_recent_rating_api_request(self, data: Dict) -> Dict:
 | |
|         rating = await self.data.profile.get_recent_rating(data["userId"])
 | |
|         if rating is None:
 | |
|             return
 | |
|         
 | |
|         r = rating._asdict()
 | |
|         lst = r.get("userRecentRatingList", [])
 | |
|         
 | |
|         return {"userId": data["userId"], "length": len(lst), "userRecentRatingList": lst}
 | |
| 
 | |
|     async def handle_get_user_rating_api_request(self, data: Dict) -> Dict:
 | |
|         rating = await self.data.profile.get_profile_rating(data["userId"], self.version)
 | |
|         if rating is None:
 | |
|             return
 | |
| 
 | |
|         rating_dict = rating._asdict()
 | |
|         rating_dict.pop("user")
 | |
|         rating_dict.pop("id")
 | |
|         rating_dict.pop("version")
 | |
| 
 | |
|         return {"userId": data["userId"], "userRating": rating_dict}
 | |
| 
 | |
|     async def handle_get_user_activity_api_request(self, data: Dict) -> Dict:
 | |
|         """
 | |
|         kind 1 is playlist, kind 2 is music list
 | |
|         """
 | |
|         playlist = await self.data.profile.get_profile_activity(data["userId"], 1)
 | |
|         musiclist = await self.data.profile.get_profile_activity(data["userId"], 2)
 | |
|         if playlist is None or musiclist is None:
 | |
|             return
 | |
| 
 | |
|         plst = []
 | |
|         mlst = []
 | |
| 
 | |
|         for play in playlist:
 | |
|             tmp = play._asdict()
 | |
|             tmp["id"] = tmp["activityId"]
 | |
|             tmp.pop("activityId")
 | |
|             tmp.pop("user")
 | |
|             plst.append(tmp)
 | |
| 
 | |
|         for music in musiclist:
 | |
|             tmp = music._asdict()
 | |
|             tmp["id"] = tmp["activityId"]
 | |
|             tmp.pop("activityId")
 | |
|             tmp.pop("user")
 | |
|             mlst.append(tmp)
 | |
| 
 | |
|         return {"userActivity": {"playList": plst, "musicList": mlst}}
 | |
| 
 | |
|     async def handle_get_user_course_api_request(self, data: Dict) -> Dict:
 | |
|         user_courses = await self.data.score.get_courses(data["userId"])
 | |
|         if user_courses is None:
 | |
|             return {"userId": data["userId"], "nextIndex": 0, "userCourseList": []}
 | |
| 
 | |
|         course_list = []
 | |
|         for course in user_courses:
 | |
|             tmp = course._asdict()
 | |
|             tmp.pop("user")
 | |
|             tmp.pop("id")
 | |
|             course_list.append(tmp)
 | |
| 
 | |
|         return {"userId": data["userId"], "nextIndex": 0, "userCourseList": course_list}
 | |
| 
 | |
|     async def handle_get_user_portrait_api_request(self, data: Dict) -> Dict:
 | |
|         # No support for custom pfps
 | |
|         return {"length": 0, "userPortraitList": []}
 | |
| 
 | |
|     async def handle_get_user_friend_season_ranking_api_request(self, data: Dict) -> Dict:
 | |
|         friend_season_ranking = await self.data.item.get_friend_season_ranking(data["userId"])
 | |
|         if friend_season_ranking is None:
 | |
|             return {
 | |
|                 "userId": data["userId"],
 | |
|                 "nextIndex": 0,
 | |
|                 "userFriendSeasonRankingList": [],
 | |
|             }
 | |
| 
 | |
|         friend_season_ranking_list = []
 | |
|         next_idx = int(data["nextIndex"])
 | |
|         max_ct = int(data["maxCount"])
 | |
| 
 | |
|         for x in range(next_idx, len(friend_season_ranking)):
 | |
|             tmp = friend_season_ranking[x]._asdict()
 | |
|             tmp.pop("user")
 | |
|             tmp.pop("id")
 | |
|             tmp["recordDate"] = datetime.strftime(
 | |
|                 tmp["recordDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
 | |
|             )
 | |
|             friend_season_ranking_list.append(tmp)
 | |
| 
 | |
|             if len(friend_season_ranking_list) >= max_ct:
 | |
|                 break
 | |
| 
 | |
|         if len(friend_season_ranking) >= next_idx + max_ct:
 | |
|             next_idx += max_ct
 | |
|         else:
 | |
|             next_idx = 0
 | |
| 
 | |
|         return {
 | |
|             "userId": data["userId"],
 | |
|             "nextIndex": next_idx,
 | |
|             "userFriendSeasonRankingList": friend_season_ranking_list,
 | |
|         }
 | |
| 
 | |
|     async def handle_get_user_map_api_request(self, data: Dict) -> Dict:
 | |
|         maps = await self.data.item.get_maps(data["userId"])
 | |
|         if maps is None:
 | |
|             return {
 | |
|                 "userId": data["userId"],
 | |
|                 "nextIndex": 0,
 | |
|                 "userMapList": [],
 | |
|             }
 | |
| 
 | |
|         map_list = []
 | |
|         next_idx = int(data["nextIndex"])
 | |
|         max_ct = int(data["maxCount"])
 | |
| 
 | |
|         for x in range(next_idx, len(maps)):
 | |
|             tmp = maps[x]._asdict()
 | |
|             tmp.pop("user")
 | |
|             tmp.pop("id")
 | |
|             map_list.append(tmp)
 | |
| 
 | |
|             if len(map_list) >= max_ct:
 | |
|                 break
 | |
| 
 | |
|         if len(maps) >= next_idx + max_ct:
 | |
|             next_idx += max_ct
 | |
|         else:
 | |
|             next_idx = 0
 | |
| 
 | |
|         return {
 | |
|             "userId": data["userId"],
 | |
|             "nextIndex": next_idx,
 | |
|             "userMapList": map_list,
 | |
|         }
 | |
| 
 | |
|     async def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict:
 | |
|         login_bonuses = await self.data.item.get_login_bonuses(data["userId"])
 | |
|         if login_bonuses is None:
 | |
|             return {
 | |
|                 "userId": data["userId"],
 | |
|                 "nextIndex": 0,
 | |
|                 "userLoginBonusList": [],
 | |
|             }
 | |
| 
 | |
|         login_bonus_list = []
 | |
|         next_idx = int(data["nextIndex"])
 | |
|         max_ct = int(data["maxCount"])
 | |
| 
 | |
|         for x in range(next_idx, len(login_bonuses)):
 | |
|             tmp = login_bonuses[x]._asdict()
 | |
|             tmp.pop("user")
 | |
|             tmp.pop("id")
 | |
|             login_bonus_list.append(tmp)
 | |
| 
 | |
|             if len(login_bonus_list) >= max_ct:
 | |
|                 break
 | |
| 
 | |
|         if len(login_bonuses) >= next_idx + max_ct:
 | |
|             next_idx += max_ct
 | |
|         else:
 | |
|             next_idx = 0
 | |
| 
 | |
|         return {
 | |
|             "userId": data["userId"],
 | |
|             "nextIndex": next_idx,
 | |
|             "userLoginBonusList": login_bonus_list,
 | |
|         }
 | |
| 
 | |
|     async def handle_get_user_region_api_request(self, data: Dict) -> Dict:
 | |
|         return {"userId": data["userId"], "length": 0, "userRegionList": []}
 | |
|     
 | |
|     async def handle_get_user_web_option_api_request(self, data: Dict) -> Dict:
 | |
|         w = await self.data.profile.get_web_option(data["userId"], self.version)
 | |
|         if  w is None:
 | |
|             return {"userId": data["userId"], "userWebOption": {}}
 | |
|         
 | |
|         web_opt = w._asdict()        
 | |
|         web_opt.pop("id")
 | |
|         web_opt.pop("user")
 | |
|         web_opt.pop("version")
 | |
| 
 | |
|         return {"userId": data["userId"], "userWebOption": web_opt}
 | |
| 
 | |
|     async def handle_get_user_survival_api_request(self, data: Dict) -> Dict:
 | |
|         return {"userId": data["userId"], "length": 0, "userSurvivalList": []}
 | |
| 
 | |
|     async def handle_get_user_grade_api_request(self, data: Dict) -> Dict:
 | |
|         g = await self.data.profile.get_grade_status(data["userId"])
 | |
|         if g is None:
 | |
|             return {"userId": data["userId"], "userGradeStatus": {}, "length": 0, "userGradeList": []}
 | |
|         grade_stat = g._asdict()
 | |
|         grade_stat.pop("id")
 | |
|         grade_stat.pop("user")
 | |
| 
 | |
|         return {"userId": data["userId"], "userGradeStatus": grade_stat, "length": 0, "userGradeList": []}
 | |
| 
 | |
|     async def handle_get_user_music_api_request(self, data: Dict) -> Dict:
 | |
|         user_id = data.get("userId", 0)        
 | |
|         next_index = data.get("nextIndex", 0)
 | |
|         max_ct = data.get("maxCount", 50)
 | |
|         upper_lim = next_index + max_ct
 | |
|         music_detail_list = []
 | |
| 
 | |
|         if user_id <= 0:
 | |
|             self.logger.warning("handle_get_user_music_api_request: Could not find userid in data, or userId is 0")
 | |
|             return {}
 | |
|         
 | |
|         songs = await self.data.score.get_best_scores(user_id, is_dx=False)
 | |
|         if songs is None:
 | |
|             self.logger.debug("handle_get_user_music_api_request: get_best_scores returned None!")
 | |
|             return {
 | |
|             "userId": data["userId"],
 | |
|             "nextIndex": 0,
 | |
|             "userMusicList": [],
 | |
|         }
 | |
| 
 | |
|         num_user_songs = len(songs)
 | |
| 
 | |
|         for x in range(next_index, upper_lim):
 | |
|             if num_user_songs <= x:
 | |
|                 break
 | |
| 
 | |
|             tmp = songs[x]._asdict()
 | |
|             tmp.pop("id")
 | |
|             tmp.pop("user")
 | |
|             music_detail_list.append(tmp)
 | |
| 
 | |
|         next_index = 0 if len(music_detail_list) < max_ct or num_user_songs == upper_lim else upper_lim
 | |
|         self.logger.info(f"Send songs {next_index}-{upper_lim} ({len(music_detail_list)}) out of {num_user_songs} for user {user_id} (next idx {next_index})")
 | |
|         return {
 | |
|             "userId": data["userId"],
 | |
|             "nextIndex": next_index,
 | |
|             "userMusicList": [{"userMusicDetailList": music_detail_list}],
 | |
|         }
 | |
| 
 | |
|     async def handle_upload_user_portrait_api_request(self, data: Dict) -> Dict:
 | |
|         self.logger.warning("Portrait uploading not supported at this time.")
 | |
|         return {'returnCode': 0, 'apiName': 'UploadUserPortraitApi'}
 | |
| 
 | |
|     async def handle_upload_user_photo_api_request(self, data: Dict) -> Dict:
 | |
|         if not self.game_config.uploads.photos or not self.game_config.uploads.photos_dir:
 | |
|             return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
 | |
| 
 | |
|         photo = data.get("userPhoto", {})
 | |
| 
 | |
|         if photo is None or not photo:
 | |
|             return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
 | |
|         
 | |
|         order_id = int(photo.get("orderId", -1))
 | |
|         user_id = int(photo.get("userId", -1))
 | |
|         div_num = int(photo.get("divNumber", -1))
 | |
|         div_len = int(photo.get("divLength", -1))
 | |
|         div_data = photo.get("divData", "")
 | |
|         playlog_id = int(photo.get("playlogId", -1))
 | |
|         track_num = int(photo.get("trackNo", -1))
 | |
|         upload_date = photo.get("uploadDate", "")
 | |
| 
 | |
|         if order_id < 0 or user_id <= 0 or div_num < 0 or div_len <= 0 or not div_data or playlog_id < 0 or track_num <= 0 or not upload_date:
 | |
|             self.logger.warning(f"Malformed photo upload request")
 | |
|             return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
 | |
|         
 | |
|         if order_id == 0 and div_num > 0:
 | |
|             self.logger.warning(f"Failed to set orderId properly (still 0 after first chunk)")
 | |
|             return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
 | |
| 
 | |
|         if div_num == 0 and order_id > 0:
 | |
|             self.logger.warning(f"First chuck re-send, Ignore")
 | |
|             return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
 | |
|         
 | |
|         if div_num >= div_len:
 | |
|             self.logger.warning(f"Sent extra chunks ({div_num} >= {div_len})")
 | |
|             return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
 | |
| 
 | |
|         if div_len >= 100:
 | |
|             self.logger.warning(f"Photo too large ({div_len} * 10240 = {div_len * 10240} bytes)")
 | |
|             return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
 | |
|         
 | |
|         ret_code = order_id + 1
 | |
|         photo_chunk = b64decode(div_data)
 | |
| 
 | |
|         if len(photo_chunk) > 10240 or (len(photo_chunk) < 10240 and div_num + 1 != div_len):
 | |
|             self.logger.warning(f"Incorrect data size after decoding (Expected 10240, got {len(photo_chunk)})")
 | |
|             return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
 | |
|         
 | |
|         photo_data = await self.data.profile.get_user_photo_by_user_playlog_track(user_id, playlog_id, track_num)
 | |
|         
 | |
|         if not photo_data:
 | |
|             photo_id = await self.data.profile.put_user_photo(user_id, playlog_id, track_num)
 | |
|         else:
 | |
|             photo_id = photo_data['id']
 | |
| 
 | |
|         out_folder = f"{self.game_config.uploads.photos_dir}/{photo_id}"
 | |
|         out_file = f"{out_folder}/{div_num}_{div_len - 1}.bin"
 | |
| 
 | |
|         if not path.exists(out_folder):
 | |
|             mkdir(out_folder)
 | |
|         
 | |
|         if not access(out_folder, W_OK):
 | |
|             self.logger.error(f"Cannot access {out_folder}")
 | |
|             return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
 | |
|         
 | |
|         if path.exists(out_file):
 | |
|             self.logger.warning(f"Photo chunk {out_file} already exists, skipping")
 | |
|         
 | |
|         else:
 | |
|             with open(out_file, "wb") as f:
 | |
|                 written = f.write(photo_chunk)
 | |
|             
 | |
|             if written != len(photo_chunk):
 | |
|                 self.logger.error(f"Writing {out_file} failed! Wrote {written} bytes, expected {photo_chunk} bytes")
 | |
|                 return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
 | |
| 
 | |
|         return {'returnCode': ret_code, 'apiName': 'UploadUserPhotoApi'}
 | |
| 
 | |
|     async def handle_get_user_favorite_item_api_request(self, data: Dict) -> Dict:
 | |
|         user_id = data.get("userId", 0)
 | |
|         kind = data.get("kind", 0) # 1 is fav music, 2 is rival user IDs
 | |
|         next_index = data.get("nextIndex", 0)
 | |
|         max_ct = data.get("maxCount", 100) # always 100
 | |
|         is_all = data.get("isAllFavoriteItem", False) # always false
 | |
|         id_list: List[Dict] = []
 | |
| 
 | |
|         if user_id:
 | |
|             if kind == 1:
 | |
|                 fav_music = await self.data.item.get_fav_music(user_id)
 | |
|                 if fav_music:
 | |
|                     for fav in fav_music:
 | |
|                         id_list.append({"orderId": fav["orderId"] or 0, "id": fav["musicId"]})
 | |
|                         if len(id_list) >= 100: # Lazy but whatever
 | |
|                             break
 | |
|             
 | |
|             elif kind == 2:
 | |
|                 rivals = await self.data.profile.get_rivals_game(user_id)
 | |
|                 if rivals:
 | |
|                     for rival in rivals:
 | |
|                         id_list.append({"orderId": 0, "id": rival["rival"]})
 | |
| 
 | |
|         return {
 | |
|             "userId": user_id,
 | |
|             "kind": kind,
 | |
|             "nextIndex": 0,
 | |
|             "userFavoriteItemList": id_list,
 | |
|         }
 | |
| 
 | |
|     async def handle_get_user_recommend_rate_music_api_request(self, data: Dict) -> Dict:
 | |
|         """
 | |
|         userRecommendRateMusicIdList: list[int]
 | |
|         """
 | |
|         return {"userId": data["userId"], "userRecommendRateMusicIdList": []}
 | |
| 
 | |
|     async def handle_get_user_recommend_select_music_api_request(self, data: Dict) -> Dict:
 | |
|         """
 | |
|         userRecommendSelectionMusicIdList: list[int]
 | |
|         """
 | |
|         return {"userId": data["userId"], "userRecommendSelectionMusicIdList": []}
 |