artemis/titles/mai2/base.py

887 lines
34 KiB
Python
Raw Permalink Normal View History

from datetime import datetime, timedelta
from typing import Any, Dict, List
import logging
from base64 import b64decode
2023-07-16 20:58:18 +00:00
from os import path, stat, remove
from PIL import ImageFile
2023-10-16 13:21:07 +00:00
import pytz
from core.config import CoreConfig
from core.utils import Utils
from .const import Mai2Constants
from .config import Mai2Config
from .database import Mai2Data
2023-03-09 16:38:58 +00:00
class Mai2Base:
def __init__(self, cfg: CoreConfig, game_cfg: Mai2Config) -> None:
self.core_config = cfg
self.game_config = game_cfg
2023-05-01 02:19:31 +00:00
self.version = Mai2Constants.VER_MAIMAI
self.data = Mai2Data(cfg)
self.logger = logging.getLogger("mai2")
2023-05-10 06:31:30 +00:00
self.can_deliver = False
self.can_usbdl = False
self.old_server = ""
self.date_time_format = "%Y-%m-%d %H:%M:%S"
2023-10-16 13:21:07 +00:00
if not self.core_config.server.is_using_proxy and Utils.get_title_port(self.core_config) != 80:
self.old_server = f"http://{self.core_config.title.hostname}:{Utils.get_title_port(cfg)}/197/MaimaiServlet/"
2023-10-16 13:21:07 +00:00
2023-05-04 02:29:08 +00:00
else:
self.old_server = f"http://{self.core_config.title.hostname}/197/MaimaiServlet/"
def handle_get_game_setting_api_request(self, data: Dict):
2023-10-16 13:21:07 +00:00
# if reboot start/end time is not defined use the default behavior of being a few hours ago
if self.core_config.title.reboot_start_time == "" or self.core_config.title.reboot_end_time == "":
2023-10-16 13:21:07 +00:00
reboot_start = datetime.strftime(
datetime.utcnow() + timedelta(hours=6), self.date_time_format
)
reboot_end = datetime.strftime(
datetime.utcnow() + timedelta(hours=7), self.date_time_format
)
else:
# get current datetime in JST
current_jst = datetime.now(pytz.timezone('Asia/Tokyo')).date()
# parse config start/end times into datetime
reboot_start_time = datetime.strptime(self.core_config.title.reboot_start_time, "%H:%M")
reboot_end_time = datetime.strptime(self.core_config.title.reboot_end_time, "%H:%M")
2023-10-16 13:21:07 +00:00
# offset datetimes with current date/time
reboot_start_time = reboot_start_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo'))
reboot_end_time = reboot_end_time.replace(year=current_jst.year, month=current_jst.month, day=current_jst.day, tzinfo=pytz.timezone('Asia/Tokyo'))
# create strings for use in gameSetting
reboot_start = reboot_start_time.strftime(self.date_time_format)
reboot_end = reboot_end_time.strftime(self.date_time_format)
return {
"isDevelop": False,
"isAouAccession": False,
2023-03-09 16:38:58 +00:00
"gameSetting": {
"isMaintenance": False,
"requestInterval": 1800,
2023-10-16 13:21:07 +00:00
"rebootStartTime": reboot_start,
"rebootEndTime": reboot_end,
"movieUploadLimit": 100,
"movieStatus": 1,
2023-06-25 23:10:34 +00:00
"movieServerUri": self.old_server + "api/movie" if self.game_config.uploads.movies else "movie",
2023-05-10 06:31:30 +00:00
"deliverServerUri": self.old_server + "deliver/" if self.can_deliver and self.game_config.deliver.enable else "",
2023-05-06 23:04:10 +00:00
"oldServerUri": self.old_server + "old",
2023-05-10 06:31:30 +00:00
"usbDlServerUri": self.old_server + "usbdl/" if self.can_deliver and self.game_config.deliver.udbdl_enable else "",
},
}
def handle_get_game_ranking_api_request(self, data: Dict) -> Dict:
return {"length": 0, "gameRankingList": []}
def handle_get_game_tournament_info_api_request(self, data: Dict) -> Dict:
# TODO: Tournament support
return {"length": 0, "gameTournamentInfoList": []}
def handle_get_game_event_api_request(self, data: Dict) -> Dict:
events = self.data.static.get_enabled_events(self.version)
events_lst = []
2023-05-06 23:04:10 +00:00
if events is None or not events:
2023-08-08 14:17:56 +00:00
self.logger.warning("No enabled events, did you run the reader?")
2023-03-09 16:38:58 +00:00
return {"type": data["type"], "length": 0, "gameEventList": []}
for event in events:
2023-03-09 16:38:58 +00:00
events_lst.append(
{
"type": event["type"],
"id": event["eventId"],
2023-04-10 16:58:19 +00:00
# actually use the startDate from the import so it
# properly shows all the events when new ones are imported
"startDate": datetime.strftime(
event["startDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
),
2023-03-09 16:38:58 +00:00
"endDate": "2099-12-31 00:00:00.0",
}
)
2023-03-09 16:38:58 +00:00
return {
"type": data["type"],
"length": len(events_lst),
"gameEventList": events_lst,
}
def handle_get_game_ng_music_id_api_request(self, data: Dict) -> Dict:
return {"length": 0, "musicIdList": []}
def handle_get_game_charge_api_request(self, data: Dict) -> Dict:
game_charge_list = self.data.static.get_enabled_tickets(self.version, 1)
2023-03-09 16:38:58 +00:00
if game_charge_list is None:
return {"length": 0, "gameChargeList": []}
charge_list = []
2023-04-10 16:58:19 +00:00
for i, charge in enumerate(game_charge_list):
2023-03-09 16:38:58 +00:00
charge_list.append(
{
2023-07-16 20:58:18 +00:00
"orderId": i + 1,
2023-04-10 16:58:19 +00:00
"chargeId": charge["ticketId"],
"price": charge["price"],
2023-03-09 16:38:58 +00:00
"startDate": "2017-12-05 07:00:00.0",
"endDate": "2099-12-31 00:00:00.0",
}
)
return {"length": len(charge_list), "gameChargeList": charge_list}
def handle_upsert_client_setting_api_request(self, data: Dict) -> Dict:
return {"returnCode": 1, "apiName": "UpsertClientSettingApi"}
def handle_upsert_client_upload_api_request(self, data: Dict) -> Dict:
return {"returnCode": 1, "apiName": "UpsertClientUploadApi"}
def handle_upsert_client_bookkeeping_api_request(self, data: Dict) -> Dict:
return {"returnCode": 1, "apiName": "UpsertClientBookkeepingApi"}
def handle_upsert_client_testmode_api_request(self, data: Dict) -> Dict:
return {"returnCode": 1, "apiName": "UpsertClientTestmodeApi"}
def handle_get_user_preview_api_request(self, data: Dict) -> Dict:
p = self.data.profile.get_profile_detail(data["userId"], self.version, False)
2023-07-01 05:56:52 +00:00
w = self.data.profile.get_web_option(data["userId"], self.version)
if p is None or w is None:
2023-03-09 16:38:58 +00:00
return {} # Register
profile = p._asdict()
web_opt = w._asdict()
return {
"userId": data["userId"],
"userName": profile["userName"],
"isLogin": False,
"lastDataVersion": profile["lastDataVersion"],
"lastLoginDate": profile["lastPlayDate"],
"lastPlayDate": profile["lastPlayDate"],
"playerRating": profile["playerRating"],
"nameplateId": profile["nameplateId"],
2023-05-06 23:04:10 +00:00
"frameId": profile["frameId"],
"iconId": profile["iconId"],
"trophyId": profile["trophyId"],
"dispRate": web_opt["dispRate"], # 0: all, 1: dispRate, 2: dispDan, 3: hide
"dispRank": web_opt["dispRank"],
"dispHomeRanker": web_opt["dispHomeRanker"],
"dispTotalLv": web_opt["dispTotalLv"],
"totalLv": profile["totalLv"],
}
2023-03-09 16:38:58 +00:00
def handle_user_login_api_request(self, data: Dict) -> Dict:
profile = self.data.profile.get_profile_detail(data["userId"], self.version)
consec = self.data.profile.get_consec_login(data["userId"], self.version)
if profile is not None:
lastLoginDate = profile["lastLoginDate"]
loginCt = profile["playCount"]
if "regionId" in data:
self.data.profile.put_profile_region(data["userId"], data["regionId"])
else:
loginCt = 0
lastLoginDate = "2017-12-05 07:00:00.0"
if consec is None or not consec:
consec_ct = 1
else:
lastlogindate_ = datetime.strptime(profile["lastLoginDate"], "%Y-%m-%d %H:%M:%S.%f").timestamp()
today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).timestamp()
yesterday_midnight = today_midnight - 86400
if lastlogindate_ < today_midnight:
consec_ct = consec['logins'] + 1
self.data.profile.add_consec_login(data["userId"], self.version)
elif lastlogindate_ < yesterday_midnight:
consec_ct = 1
self.data.profile.reset_consec_login(data["userId"], self.version)
else:
consec_ct = consec['logins']
return {
"returnCode": 1,
"lastLoginDate": lastLoginDate,
"loginCount": loginCt,
"consecutiveLoginCount": consec_ct, # Number of consecutive days we've logged in.
}
2023-03-09 16:38:58 +00:00
def handle_upload_user_playlog_api_request(self, data: Dict) -> Dict:
user_id = data["userId"]
playlog = data["userPlaylog"]
self.data.score.put_playlog(user_id, playlog)
return {"returnCode": 1, "apiName": "UploadUserPlaylogApi"}
2023-04-10 16:58:19 +00:00
def handle_upsert_user_chargelog_api_request(self, data: Dict) -> Dict:
user_id = data["userId"]
charge = data["userCharge"]
# remove the ".0" from the date string, festival only?
charge["purchaseDate"] = charge["purchaseDate"].replace(".0", "")
self.data.item.put_charge(
user_id,
charge["chargeId"],
charge["stock"],
charge["purchaseDate"], # Ideally these should be datetimes, but db was
charge["validDate"] # set up with them being str, so str it is for now
2023-04-10 16:58:19 +00:00
)
return {"returnCode": 1, "apiName": "UpsertUserChargelogApi"}
def handle_upsert_user_all_api_request(self, data: Dict) -> Dict:
user_id = data["userId"]
upsert = data["upsertUserAll"]
2023-12-10 22:36:29 +00:00
if int(user_id) & 1000000000001 == 1000000000001:
self.logger.info("Guest play, ignoring.")
return {"returnCode": 1, "apiName": "UpsertUserAllApi"}
if "userData" in upsert and len(upsert["userData"]) > 0:
upsert["userData"][0].pop("accessCode")
upsert["userData"][0].pop("userId")
2023-03-09 16:38:58 +00:00
self.data.profile.put_profile_detail(
user_id, self.version, upsert["userData"][0], False
)
2023-07-01 06:19:19 +00:00
if "userWebOption" in upsert and len(upsert["userWebOption"]) > 0:
upsert["userWebOption"][0]["isNetMember"] = True
self.data.profile.put_web_option(
2023-07-01 06:19:19 +00:00
user_id, self.version, upsert["userWebOption"][0]
)
if "userGradeStatusList" in upsert and len(upsert["userGradeStatusList"]) > 0:
2023-07-01 05:41:34 +00:00
self.data.profile.put_grade_status(
user_id, upsert["userGradeStatusList"][0]
2023-03-09 16:38:58 +00:00
)
if "userBossList" in upsert and len(upsert["userBossList"]) > 0:
self.data.profile.put_boss_list(
2023-07-01 05:41:34 +00:00
user_id, upsert["userBossList"][0]
)
if "userPlaylogList" in upsert and len(upsert["userPlaylogList"]) > 0:
for playlog in upsert["userPlaylogList"]:
self.data.score.put_playlog(
2023-07-01 05:59:19 +00:00
user_id, playlog, False
)
if "userExtend" in upsert and len(upsert["userExtend"]) > 0:
2023-03-09 16:38:58 +00:00
self.data.profile.put_profile_extend(
user_id, self.version, upsert["userExtend"][0]
)
if "userGhost" in upsert:
for ghost in upsert["userGhost"]:
self.data.profile.put_profile_ghost(user_id, self.version, ghost)
if "userRecentRatingList" in upsert:
2023-07-01 05:42:38 +00:00
self.data.profile.put_recent_rating(user_id, upsert["userRecentRatingList"])
2023-03-09 16:38:58 +00:00
if "userOption" in upsert and len(upsert["userOption"]) > 0:
2023-07-01 05:56:52 +00:00
upsert["userOption"][0].pop("userId")
2023-03-09 16:38:58 +00:00
self.data.profile.put_profile_option(
user_id, self.version, upsert["userOption"][0], False
2023-03-09 16:38:58 +00:00
)
if "userRatingList" in upsert and len(upsert["userRatingList"]) > 0:
2023-03-09 16:38:58 +00:00
self.data.profile.put_profile_rating(
user_id, self.version, upsert["userRatingList"][0]
)
if "userActivityList" in upsert and len(upsert["userActivityList"]) > 0:
2023-07-01 05:56:52 +00:00
for act in upsert["userActivityList"]:
self.data.profile.put_profile_activity(user_id, act)
if "userChargeList" in upsert and len(upsert["userChargeList"]) > 0:
for charge in upsert["userChargeList"]:
2023-04-10 16:58:19 +00:00
# remove the ".0" from the date string, festival only?
charge["purchaseDate"] = charge["purchaseDate"].replace(".0", "")
self.data.item.put_charge(
user_id,
charge["chargeId"],
charge["stock"],
charge["purchaseDate"],
charge["validDate"]
)
2023-04-10 16:58:19 +00:00
if "userCharacterList" in upsert and len(upsert["userCharacterList"]) > 0:
for char in upsert["userCharacterList"]:
2023-07-01 06:04:30 +00:00
self.data.item.put_character_(
2023-03-09 16:38:58 +00:00
user_id,
2023-07-01 06:04:30 +00:00
char
2023-03-09 16:38:58 +00:00
)
2023-04-10 16:58:19 +00:00
if "userItemList" in upsert and len(upsert["userItemList"]) > 0:
for item in upsert["userItemList"]:
2023-03-09 16:38:58 +00:00
self.data.item.put_item(
user_id,
int(item["itemKind"]),
item["itemId"],
item["stock"],
2023-07-01 06:04:30 +00:00
True
2023-03-09 16:38:58 +00:00
)
2023-04-10 16:58:19 +00:00
if "userLoginBonusList" in upsert and len(upsert["userLoginBonusList"]) > 0:
for login_bonus in upsert["userLoginBonusList"]:
2023-03-09 16:38:58 +00:00
self.data.item.put_login_bonus(
user_id,
login_bonus["bonusId"],
login_bonus["point"],
login_bonus["isCurrent"],
login_bonus["isComplete"],
)
2023-04-10 16:58:19 +00:00
if "userMapList" in upsert and len(upsert["userMapList"]) > 0:
for map in upsert["userMapList"]:
2023-03-09 16:38:58 +00:00
self.data.item.put_map(
user_id,
map["mapId"],
map["distance"],
map["isLock"],
map["isClear"],
map["isComplete"],
)
2023-04-10 16:58:19 +00:00
if "userMusicDetailList" in upsert and len(upsert["userMusicDetailList"]) > 0:
for music in upsert["userMusicDetailList"]:
2023-07-01 05:56:52 +00:00
self.data.score.put_best_score(user_id, music, False)
2023-03-09 16:38:58 +00:00
2023-04-10 16:58:19 +00:00
if "userCourseList" in upsert and len(upsert["userCourseList"]) > 0:
for course in upsert["userCourseList"]:
self.data.score.put_course(user_id, course)
2023-03-09 16:38:58 +00:00
2023-04-10 16:58:19 +00:00
if "userFavoriteList" in upsert and len(upsert["userFavoriteList"]) > 0:
for fav in upsert["userFavoriteList"]:
self.data.item.put_favorite(user_id, fav["kind"], fav["itemIdList"])
2023-04-10 16:58:19 +00:00
if (
"userFriendSeasonRankingList" in upsert
and len(upsert["userFriendSeasonRankingList"]) > 0
):
for fsr in upsert["userFriendSeasonRankingList"]:
fsr["recordDate"] = (
datetime.strptime(
fsr["recordDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
),
2023-04-10 16:58:19 +00:00
)
self.data.item.put_friend_season_ranking(user_id, fsr)
return {"returnCode": 1, "apiName": "UpsertUserAllApi"}
def handle_user_logout_api_request(self, data: Dict) -> Dict:
return {"returnCode": 1}
def handle_get_user_data_api_request(self, data: Dict) -> Dict:
profile = self.data.profile.get_profile_detail(data["userId"], self.version, False)
2023-03-09 16:38:58 +00:00
if profile is None:
return
profile_dict = profile._asdict()
profile_dict.pop("id")
profile_dict.pop("user")
profile_dict.pop("version")
2023-03-09 16:38:58 +00:00
return {"userId": data["userId"], "userData": profile_dict}
def handle_get_user_extend_api_request(self, data: Dict) -> Dict:
extend = self.data.profile.get_profile_extend(data["userId"], self.version)
2023-03-09 16:38:58 +00:00
if extend is None:
return
extend_dict = extend._asdict()
extend_dict.pop("id")
extend_dict.pop("user")
extend_dict.pop("version")
2023-03-09 16:38:58 +00:00
return {"userId": data["userId"], "userExtend": extend_dict}
def handle_get_user_option_api_request(self, data: Dict) -> Dict:
options = self.data.profile.get_profile_option(data["userId"], self.version, False)
2023-03-09 16:38:58 +00:00
if options is None:
return
options_dict = options._asdict()
options_dict.pop("id")
options_dict.pop("user")
options_dict.pop("version")
2023-03-09 16:38:58 +00:00
return {"userId": data["userId"], "userOption": options_dict}
def handle_get_user_card_api_request(self, data: Dict) -> Dict:
user_cards = self.data.item.get_cards(data["userId"])
if user_cards is None:
2023-04-10 16:58:19 +00:00
return {"userId": data["userId"], "nextIndex": 0, "userCardList": []}
max_ct = data["maxCount"]
next_idx = data["nextIndex"]
start_idx = next_idx
end_idx = max_ct + start_idx
if len(user_cards[start_idx:]) > max_ct:
next_idx += max_ct
else:
next_idx = 0
card_list = []
for card in user_cards:
tmp = card._asdict()
tmp.pop("id")
tmp.pop("user")
tmp["startDate"] = datetime.strftime(
2023-04-10 16:58:19 +00:00
tmp["startDate"], Mai2Constants.DATE_TIME_FORMAT
)
tmp["endDate"] = datetime.strftime(
2023-04-10 16:58:19 +00:00
tmp["endDate"], Mai2Constants.DATE_TIME_FORMAT
)
card_list.append(tmp)
return {
"userId": data["userId"],
"nextIndex": next_idx,
2023-04-10 16:58:19 +00:00
"userCardList": card_list[start_idx:end_idx],
}
def handle_get_user_charge_api_request(self, data: Dict) -> Dict:
user_charges = self.data.item.get_charges(data["userId"])
if user_charges is None:
2023-04-10 16:58:19 +00:00
return {"userId": data["userId"], "length": 0, "userChargeList": []}
user_charge_list = []
for charge in user_charges:
tmp = charge._asdict()
tmp.pop("id")
tmp.pop("user")
user_charge_list.append(tmp)
return {
"userId": data["userId"],
"length": len(user_charge_list),
2023-04-10 16:58:19 +00:00
"userChargeList": user_charge_list,
}
def handle_get_user_present_api_request(self, data: Dict) -> Dict:
return { "userId": data.get("userId", 0), "length": 0, "userPresentList": []}
def handle_get_transfer_friend_api_request(self, data: Dict) -> Dict:
return {}
def handle_get_user_present_event_api_request(self, data: Dict) -> Dict:
return { "userId": data.get("userId", 0), "length": 0, "userPresentEventList": []}
def handle_get_user_boss_api_request(self, data: Dict) -> Dict:
b = self.data.profile.get_boss_list(data["userId"])
if b is None:
return { "userId": data.get("userId", 0), "userBossData": {}}
boss_lst = b._asdict()
boss_lst.pop("id")
boss_lst.pop("user")
return { "userId": data.get("userId", 0), "userBossData": boss_lst}
def handle_get_user_item_api_request(self, data: Dict) -> Dict:
kind = int(data["nextIndex"] / 10000000000)
next_idx = int(data["nextIndex"] % 10000000000)
2023-04-10 16:58:19 +00:00
user_item_list = self.data.item.get_items(data["userId"], kind)
2023-03-09 16:38:58 +00:00
items: List[Dict[str, Any]] = []
2023-04-10 16:58:19 +00:00
for i in range(next_idx, len(user_item_list)):
tmp = user_item_list[i]._asdict()
tmp.pop("user")
tmp.pop("id")
items.append(tmp)
if len(items) >= int(data["maxCount"]):
break
2023-04-10 16:58:19 +00:00
xout = kind * 10000000000 + next_idx + len(items)
if len(items) < int(data["maxCount"]):
next_idx = 0
else:
next_idx = xout
2023-03-09 16:38:58 +00:00
return {
"userId": data["userId"],
"nextIndex": next_idx,
"itemKind": kind,
2023-04-10 16:58:19 +00:00
"userItemList": items,
2023-03-09 16:38:58 +00:00
}
def handle_get_user_character_api_request(self, data: Dict) -> Dict:
characters = self.data.item.get_characters(data["userId"])
chara_list = []
for chara in characters:
tmp = chara._asdict()
tmp.pop("id")
tmp.pop("user")
tmp.pop("awakening")
tmp.pop("useCount")
chara_list.append(tmp)
return {"userId": data["userId"], "userCharacterList": chara_list}
2023-03-09 16:38:58 +00:00
def handle_get_user_favorite_api_request(self, data: Dict) -> Dict:
favorites = self.data.item.get_favorites(data["userId"], data["itemKind"])
2023-03-09 16:38:58 +00:00
if favorites is None:
return
userFavs = []
for fav in favorites:
2023-03-09 16:38:58 +00:00
userFavs.append(
{
"userId": data["userId"],
"itemKind": fav["itemKind"],
"itemIdList": fav["itemIdList"],
}
)
2023-03-09 16:38:58 +00:00
return {"userId": data["userId"], "userFavoriteData": userFavs}
def handle_get_user_ghost_api_request(self, data: Dict) -> Dict:
ghost = self.data.profile.get_profile_ghost(data["userId"], self.version)
2023-03-09 16:38:58 +00:00
if ghost is None:
return
ghost_dict = ghost._asdict()
ghost_dict.pop("user")
ghost_dict.pop("id")
ghost_dict.pop("version_int")
2023-03-09 16:38:58 +00:00
return {"userId": data["userId"], "userGhost": ghost_dict}
def handle_get_user_recent_rating_api_request(self, data: Dict) -> Dict:
rating = self.data.profile.get_recent_rating(data["userId"])
if rating is None:
return
r = rating._asdict()
lst = r.get("userRecentRatingList", [])
return {"userId": data["userId"], "length": len(lst), "userRecentRatingList": lst}
def handle_get_user_rating_api_request(self, data: Dict) -> Dict:
rating = self.data.profile.get_profile_rating(data["userId"], self.version)
2023-03-09 16:38:58 +00:00
if rating is None:
return
rating_dict = rating._asdict()
rating_dict.pop("user")
rating_dict.pop("id")
rating_dict.pop("version")
2023-03-09 16:38:58 +00:00
return {"userId": data["userId"], "userRating": rating_dict}
def handle_get_user_activity_api_request(self, data: Dict) -> Dict:
"""
kind 1 is playlist, kind 2 is music list
"""
playlist = self.data.profile.get_profile_activity(data["userId"], 1)
musiclist = self.data.profile.get_profile_activity(data["userId"], 2)
2023-03-09 16:38:58 +00:00
if playlist is None or musiclist is None:
return
plst = []
mlst = []
for play in playlist:
2023-03-09 16:38:58 +00:00
tmp = play._asdict()
tmp["id"] = tmp["activityId"]
tmp.pop("activityId")
tmp.pop("user")
plst.append(tmp)
for music in musiclist:
2023-03-09 16:38:58 +00:00
tmp = music._asdict()
tmp["id"] = tmp["activityId"]
tmp.pop("activityId")
tmp.pop("user")
mlst.append(tmp)
2023-04-10 16:58:19 +00:00
return {"userActivity": {"playList": plst, "musicList": mlst}}
def handle_get_user_course_api_request(self, data: Dict) -> Dict:
user_courses = self.data.score.get_courses(data["userId"])
if user_courses is None:
2023-04-10 16:58:19 +00:00
return {"userId": data["userId"], "nextIndex": 0, "userCourseList": []}
course_list = []
for course in user_courses:
tmp = course._asdict()
tmp.pop("user")
tmp.pop("id")
course_list.append(tmp)
2023-04-10 16:58:19 +00:00
return {"userId": data["userId"], "nextIndex": 0, "userCourseList": course_list}
def handle_get_user_portrait_api_request(self, data: Dict) -> Dict:
# No support for custom pfps
return {"length": 0, "userPortraitList": []}
def handle_get_user_friend_season_ranking_api_request(self, data: Dict) -> Dict:
friend_season_ranking = self.data.item.get_friend_season_ranking(data["userId"])
2023-04-10 16:58:19 +00:00
if friend_season_ranking is None:
return {
"userId": data["userId"],
"nextIndex": 0,
"userFriendSeasonRankingList": [],
}
friend_season_ranking_list = []
2023-04-10 16:58:19 +00:00
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
2023-04-10 16:58:19 +00:00
for x in range(next_idx, len(friend_season_ranking)):
tmp = friend_season_ranking[x]._asdict()
tmp.pop("user")
tmp.pop("id")
tmp["recordDate"] = datetime.strftime(
tmp["recordDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
)
friend_season_ranking_list.append(tmp)
if len(friend_season_ranking_list) >= max_ct:
break
2023-04-10 16:58:19 +00:00
if len(friend_season_ranking) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = 0
2023-03-09 16:38:58 +00:00
return {
"userId": data["userId"],
2023-04-10 16:58:19 +00:00
"nextIndex": next_idx,
2023-03-09 16:38:58 +00:00
"userFriendSeasonRankingList": friend_season_ranking_list,
}
def handle_get_user_map_api_request(self, data: Dict) -> Dict:
maps = self.data.item.get_maps(data["userId"])
2023-04-10 16:58:19 +00:00
if maps is None:
return {
"userId": data["userId"],
"nextIndex": 0,
"userMapList": [],
}
map_list = []
2023-04-10 16:58:19 +00:00
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
2023-04-10 16:58:19 +00:00
for x in range(next_idx, len(maps)):
tmp = maps[x]._asdict()
tmp.pop("user")
tmp.pop("id")
map_list.append(tmp)
if len(map_list) >= max_ct:
break
2023-04-10 16:58:19 +00:00
if len(maps) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = 0
2023-03-09 16:38:58 +00:00
return {
"userId": data["userId"],
2023-04-10 16:58:19 +00:00
"nextIndex": next_idx,
2023-03-09 16:38:58 +00:00
"userMapList": map_list,
}
def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict:
login_bonuses = self.data.item.get_login_bonuses(data["userId"])
2023-04-10 16:58:19 +00:00
if login_bonuses is None:
return {
"userId": data["userId"],
"nextIndex": 0,
"userLoginBonusList": [],
}
login_bonus_list = []
2023-04-10 16:58:19 +00:00
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
2023-04-10 16:58:19 +00:00
for x in range(next_idx, len(login_bonuses)):
tmp = login_bonuses[x]._asdict()
tmp.pop("user")
tmp.pop("id")
login_bonus_list.append(tmp)
if len(login_bonus_list) >= max_ct:
break
2023-04-10 16:58:19 +00:00
if len(login_bonuses) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = 0
2023-03-09 16:38:58 +00:00
return {
"userId": data["userId"],
2023-04-10 16:58:19 +00:00
"nextIndex": next_idx,
2023-03-09 16:38:58 +00:00
"userLoginBonusList": login_bonus_list,
}
def handle_get_user_region_api_request(self, data: Dict) -> Dict:
return {"userId": data["userId"], "length": 0, "userRegionList": []}
2023-07-01 06:40:07 +00:00
def handle_get_user_web_option_api_request(self, data: Dict) -> Dict:
w = self.data.profile.get_web_option(data["userId"], self.version)
if w is None:
return {"userId": data["userId"], "userWebOption": {}}
web_opt = w._asdict()
web_opt.pop("id")
web_opt.pop("user")
web_opt.pop("version")
return {"userId": data["userId"], "userWebOption": web_opt}
def handle_get_user_survival_api_request(self, data: Dict) -> Dict:
return {"userId": data["userId"], "length": 0, "userSurvivalList": []}
def handle_get_user_grade_api_request(self, data: Dict) -> Dict:
g = self.data.profile.get_grade_status(data["userId"])
if g is None:
return {"userId": data["userId"], "userGradeStatus": {}, "length": 0, "userGradeList": []}
grade_stat = g._asdict()
grade_stat.pop("id")
grade_stat.pop("user")
return {"userId": data["userId"], "userGradeStatus": grade_stat, "length": 0, "userGradeList": []}
def handle_get_user_music_api_request(self, data: Dict) -> Dict:
user_id = data.get("userId", 0)
next_index = data.get("nextIndex", 0)
max_ct = data.get("maxCount", 50)
upper_lim = next_index + max_ct
music_detail_list = []
if user_id <= 0:
2023-08-08 14:17:56 +00:00
self.logger.warning("handle_get_user_music_api_request: Could not find userid in data, or userId is 0")
return {}
2023-07-01 06:40:07 +00:00
songs = self.data.score.get_best_scores(user_id, is_dx=False)
2023-06-14 02:07:48 +00:00
if songs is None:
self.logger.debug("handle_get_user_music_api_request: get_best_scores returned None!")
2023-06-14 02:07:48 +00:00
return {
"userId": data["userId"],
"nextIndex": 0,
"userMusicList": [],
}
num_user_songs = len(songs)
2023-06-14 02:07:48 +00:00
for x in range(next_index, upper_lim):
if num_user_songs <= x:
2023-06-14 02:07:48 +00:00
break
2023-06-14 02:07:48 +00:00
tmp = songs[x]._asdict()
tmp.pop("id")
tmp.pop("user")
music_detail_list.append(tmp)
2023-06-14 02:10:35 +00:00
next_index = 0 if len(music_detail_list) < max_ct or num_user_songs == upper_lim else upper_lim
self.logger.info(f"Send songs {next_index}-{upper_lim} ({len(music_detail_list)}) out of {num_user_songs} for user {user_id} (next idx {next_index})")
2023-03-09 16:38:58 +00:00
return {
"userId": data["userId"],
"nextIndex": next_index,
2023-04-10 16:58:19 +00:00
"userMusicList": [{"userMusicDetailList": music_detail_list}],
2023-03-09 16:38:58 +00:00
}
def handle_upload_user_portrait_api_request(self, data: Dict) -> Dict:
self.logger.debug(data)
def handle_upload_user_photo_api_request(self, data: Dict) -> Dict:
if not self.game_config.uploads.photos or not self.game_config.uploads.photos_dir:
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
photo = data.get("userPhoto", {})
if photo is None or not photo:
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
order_id = int(photo.get("orderId", -1))
user_id = int(photo.get("userId", -1))
div_num = int(photo.get("divNumber", -1))
div_len = int(photo.get("divLength", -1))
div_data = photo.get("divData", "")
playlog_id = int(photo.get("playlogId", -1))
track_num = int(photo.get("trackNo", -1))
upload_date = photo.get("uploadDate", "")
if order_id < 0 or user_id <= 0 or div_num < 0 or div_len <= 0 or not div_data or playlog_id < 0 or track_num <= 0 or not upload_date:
2023-08-08 14:17:56 +00:00
self.logger.warning(f"Malformed photo upload request")
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
if order_id == 0 and div_num > 0:
2023-08-08 14:17:56 +00:00
self.logger.warning(f"Failed to set orderId properly (still 0 after first chunk)")
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
if div_num == 0 and order_id > 0:
2023-08-08 14:17:56 +00:00
self.logger.warning(f"First chuck re-send, Ignore")
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
if div_num >= div_len:
2023-08-08 14:17:56 +00:00
self.logger.warning(f"Sent extra chunks ({div_num} >= {div_len})")
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
if div_len >= 100:
2023-08-08 14:17:56 +00:00
self.logger.warning(f"Photo too large ({div_len} * 10240 = {div_len * 10240} bytes)")
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
2023-07-16 20:58:18 +00:00
ret_code = order_id + 1
photo_chunk = b64decode(div_data)
if len(photo_chunk) > 10240 or (len(photo_chunk) < 10240 and div_num + 1 != div_len):
2023-08-08 14:17:56 +00:00
self.logger.warning(f"Incorrect data size after decoding (Expected 10240, got {len(photo_chunk)})")
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
2023-07-16 20:58:18 +00:00
out_name = f"{self.game_config.uploads.photos_dir}/{user_id}_{playlog_id}_{track_num}"
2023-07-16 20:58:18 +00:00
if not path.exists(f"{out_name}.bin") and div_num != 0:
2023-08-08 14:17:56 +00:00
self.logger.warning(f"Out of order photo upload (div_num {div_num})")
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
2023-07-16 20:58:18 +00:00
if path.exists(f"{out_name}.bin") and div_num == 0:
2023-08-08 14:17:56 +00:00
self.logger.warning(f"Duplicate file upload")
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
2023-07-16 20:58:18 +00:00
elif path.exists(f"{out_name}.bin"):
fstats = stat(f"{out_name}.bin")
if fstats.st_size != 10240 * div_num:
2023-08-08 14:17:56 +00:00
self.logger.warning(f"Out of order photo upload (trying to upload div {div_num}, expected div {fstats.st_size / 10240} for file sized {fstats.st_size} bytes)")
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
2023-07-16 20:58:18 +00:00
try:
with open(f"{out_name}.bin", "ab") as f:
f.write(photo_chunk)
except Exception:
self.logger.error(f"Failed writing to {out_name}.bin")
return {'returnCode': 0, 'apiName': 'UploadUserPhotoApi'}
if div_num + 1 == div_len and path.exists(f"{out_name}.bin"):
try:
p = ImageFile.Parser()
with open(f"{out_name}.bin", "rb") as f:
p.feed(f.read())
im = p.close()
im.save(f"{out_name}.jpeg")
except Exception:
self.logger.error(f"File {out_name}.bin failed image validation")
try:
remove(f"{out_name}.bin")
2023-07-16 20:58:18 +00:00
except Exception:
self.logger.error(f"Failed to delete {out_name}.bin, please remove it manually")
2023-07-16 20:58:18 +00:00
return {'returnCode': ret_code, 'apiName': 'UploadUserPhotoApi'}