1
0
forked from Hay1tsme/artemis
artemis/titles/mai2/dx.py

592 lines
21 KiB
Python

from typing import Any, List, Dict
from datetime import datetime, timedelta
import pytz
import json
from random import randint
from core.config import CoreConfig
from titles.mai2.base import Mai2Base
from titles.mai2.config import Mai2Config
from titles.mai2.const import Mai2Constants
class Mai2DX(Mai2Base):
def __init__(self, cfg: CoreConfig, game_cfg: Mai2Config) -> None:
super().__init__(cfg, game_cfg)
self.version = Mai2Constants.VER_MAIMAI_DX
if self.core_config.server.is_develop and self.core_config.title.port > 0:
self.old_server = f"http://{self.core_config.title.hostname}:{self.core_config.title.port}/SDEZ/100/"
else:
self.old_server = f"http://{self.core_config.title.hostname}/SDEZ/100/"
def handle_get_game_setting_api_request(self, data: Dict):
return {
"gameSetting": {
"isMaintenance": False,
"requestInterval": 1800,
"rebootStartTime": "2020-01-01 07:00:00.0",
"rebootEndTime": "2020-01-01 07:59:59.0",
"movieUploadLimit": 100,
"movieStatus": 1,
"movieServerUri": self.old_server + "movie/",
"deliverServerUri": self.old_server + "deliver/" if self.can_deliver and self.game_config.deliver.enable else "",
"oldServerUri": self.old_server + "old",
"usbDlServerUri": self.old_server + "usbdl/" if self.can_deliver and self.game_config.deliver.udbdl_enable else "",
"rebootInterval": 0,
},
"isAouAccession": False,
}
def handle_get_user_preview_api_request(self, data: Dict) -> Dict:
p = self.data.profile.get_profile_detail(data["userId"], self.version)
o = self.data.profile.get_profile_option(data["userId"], self.version)
if p is None or o is None:
return {} # Register
profile = p._asdict()
option = o._asdict()
return {
"userId": data["userId"],
"userName": profile["userName"],
"isLogin": False,
"lastGameId": profile["lastGameId"],
"lastDataVersion": profile["lastDataVersion"],
"lastRomVersion": profile["lastRomVersion"],
"lastLoginDate": profile["lastLoginDate"],
"lastPlayDate": profile["lastPlayDate"],
"playerRating": profile["playerRating"],
"nameplateId": 0, # Unused
"iconId": profile["iconId"],
"trophyId": 0, # Unused
"partnerId": profile["partnerId"],
"frameId": profile["frameId"],
"dispRate": option[
"dispRate"
], # 0: all/begin, 1: disprate, 2: dispDan, 3: hide, 4: end
"totalAwake": profile["totalAwake"],
"isNetMember": profile["isNetMember"],
"dailyBonusDate": profile["dailyBonusDate"],
"headPhoneVolume": option["headPhoneVolume"],
"isInherit": False, # Not sure what this is or does??
"banState": profile["banState"]
if profile["banState"] is not None
else 0, # New with uni+
}
def handle_upload_user_playlog_api_request(self, data: Dict) -> Dict:
user_id = data["userId"]
playlog = data["userPlaylog"]
self.data.score.put_playlog(user_id, playlog)
return {"returnCode": 1, "apiName": "UploadUserPlaylogApi"}
def handle_upsert_user_chargelog_api_request(self, data: Dict) -> Dict:
user_id = data["userId"]
charge = data["userCharge"]
# remove the ".0" from the date string, festival only?
charge["purchaseDate"] = charge["purchaseDate"].replace(".0", "")
self.data.item.put_charge(
user_id,
charge["chargeId"],
charge["stock"],
datetime.strptime(charge["purchaseDate"], Mai2Constants.DATE_TIME_FORMAT),
datetime.strptime(charge["validDate"], Mai2Constants.DATE_TIME_FORMAT),
)
return {"returnCode": 1, "apiName": "UpsertUserChargelogApi"}
def handle_upsert_user_all_api_request(self, data: Dict) -> Dict:
user_id = data["userId"]
upsert = data["upsertUserAll"]
if "userData" in upsert and len(upsert["userData"]) > 0:
upsert["userData"][0]["isNetMember"] = 1
upsert["userData"][0].pop("accessCode")
self.data.profile.put_profile_detail(
user_id, self.version, upsert["userData"][0]
)
if "userExtend" in upsert and len(upsert["userExtend"]) > 0:
self.data.profile.put_profile_extend(
user_id, self.version, upsert["userExtend"][0]
)
if "userGhost" in upsert:
for ghost in upsert["userGhost"]:
self.data.profile.put_profile_ghost(user_id, self.version, ghost)
if "userOption" in upsert and len(upsert["userOption"]) > 0:
self.data.profile.put_profile_option(
user_id, self.version, upsert["userOption"][0]
)
if "userRatingList" in upsert and len(upsert["userRatingList"]) > 0:
self.data.profile.put_profile_rating(
user_id, self.version, upsert["userRatingList"][0]
)
if "userActivityList" in upsert and len(upsert["userActivityList"]) > 0:
for k, v in upsert["userActivityList"][0].items():
for act in v:
self.data.profile.put_profile_activity(user_id, act)
if "userChargeList" in upsert and len(upsert["userChargeList"]) > 0:
for charge in upsert["userChargeList"]:
# remove the ".0" from the date string, festival only?
charge["purchaseDate"] = charge["purchaseDate"].replace(".0", "")
self.data.item.put_charge(
user_id,
charge["chargeId"],
charge["stock"],
datetime.strptime(
charge["purchaseDate"], Mai2Constants.DATE_TIME_FORMAT
),
datetime.strptime(
charge["validDate"], Mai2Constants.DATE_TIME_FORMAT
),
)
if "userCharacterList" in upsert and len(upsert["userCharacterList"]) > 0:
for char in upsert["userCharacterList"]:
self.data.item.put_character(
user_id,
char["characterId"],
char["level"],
char["awakening"],
char["useCount"],
)
if "userItemList" in upsert and len(upsert["userItemList"]) > 0:
for item in upsert["userItemList"]:
self.data.item.put_item(
user_id,
int(item["itemKind"]),
item["itemId"],
item["stock"],
item["isValid"],
)
if "userLoginBonusList" in upsert and len(upsert["userLoginBonusList"]) > 0:
for login_bonus in upsert["userLoginBonusList"]:
self.data.item.put_login_bonus(
user_id,
login_bonus["bonusId"],
login_bonus["point"],
login_bonus["isCurrent"],
login_bonus["isComplete"],
)
if "userMapList" in upsert and len(upsert["userMapList"]) > 0:
for map in upsert["userMapList"]:
self.data.item.put_map(
user_id,
map["mapId"],
map["distance"],
map["isLock"],
map["isClear"],
map["isComplete"],
)
if "userMusicDetailList" in upsert and len(upsert["userMusicDetailList"]) > 0:
for music in upsert["userMusicDetailList"]:
self.data.score.put_best_score(user_id, music)
if "userCourseList" in upsert and len(upsert["userCourseList"]) > 0:
for course in upsert["userCourseList"]:
self.data.score.put_course(user_id, course)
if "userFavoriteList" in upsert and len(upsert["userFavoriteList"]) > 0:
for fav in upsert["userFavoriteList"]:
self.data.item.put_favorite(user_id, fav["kind"], fav["itemIdList"])
if (
"userFriendSeasonRankingList" in upsert
and len(upsert["userFriendSeasonRankingList"]) > 0
):
for fsr in upsert["userFriendSeasonRankingList"]:
fsr["recordDate"] = (
datetime.strptime(
fsr["recordDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
),
)
self.data.item.put_friend_season_ranking(user_id, fsr)
return {"returnCode": 1, "apiName": "UpsertUserAllApi"}
def handle_get_user_data_api_request(self, data: Dict) -> Dict:
profile = self.data.profile.get_profile_detail(data["userId"], self.version)
if profile is None:
return
profile_dict = profile._asdict()
profile_dict.pop("id")
profile_dict.pop("user")
profile_dict.pop("version")
return {"userId": data["userId"], "userData": profile_dict}
def handle_get_user_extend_api_request(self, data: Dict) -> Dict:
extend = self.data.profile.get_profile_extend(data["userId"], self.version)
if extend is None:
return
extend_dict = extend._asdict()
extend_dict.pop("id")
extend_dict.pop("user")
extend_dict.pop("version")
return {"userId": data["userId"], "userExtend": extend_dict}
def handle_get_user_option_api_request(self, data: Dict) -> Dict:
options = self.data.profile.get_profile_option(data["userId"], self.version)
if options is None:
return
options_dict = options._asdict()
options_dict.pop("id")
options_dict.pop("user")
options_dict.pop("version")
return {"userId": data["userId"], "userOption": options_dict}
def handle_get_user_card_api_request(self, data: Dict) -> Dict:
user_cards = self.data.item.get_cards(data["userId"])
if user_cards is None:
return {"userId": data["userId"], "nextIndex": 0, "userCardList": []}
max_ct = data["maxCount"]
next_idx = data["nextIndex"]
start_idx = next_idx
end_idx = max_ct + start_idx
if len(user_cards[start_idx:]) > max_ct:
next_idx += max_ct
else:
next_idx = 0
card_list = []
for card in user_cards:
tmp = card._asdict()
tmp.pop("id")
tmp.pop("user")
tmp["startDate"] = datetime.strftime(
tmp["startDate"], Mai2Constants.DATE_TIME_FORMAT
)
tmp["endDate"] = datetime.strftime(
tmp["endDate"], Mai2Constants.DATE_TIME_FORMAT
)
card_list.append(tmp)
return {
"userId": data["userId"],
"nextIndex": next_idx,
"userCardList": card_list[start_idx:end_idx],
}
def handle_get_user_charge_api_request(self, data: Dict) -> Dict:
user_charges = self.data.item.get_charges(data["userId"])
if user_charges is None:
return {"userId": data["userId"], "length": 0, "userChargeList": []}
user_charge_list = []
for charge in user_charges:
tmp = charge._asdict()
tmp.pop("id")
tmp.pop("user")
tmp["purchaseDate"] = datetime.strftime(
tmp["purchaseDate"], Mai2Constants.DATE_TIME_FORMAT
)
tmp["validDate"] = datetime.strftime(
tmp["validDate"], Mai2Constants.DATE_TIME_FORMAT
)
user_charge_list.append(tmp)
return {
"userId": data["userId"],
"length": len(user_charge_list),
"userChargeList": user_charge_list,
}
def handle_get_user_item_api_request(self, data: Dict) -> Dict:
kind = int(data["nextIndex"] / 10000000000)
next_idx = int(data["nextIndex"] % 10000000000)
user_item_list = self.data.item.get_items(data["userId"], kind)
items: List[Dict[str, Any]] = []
for i in range(next_idx, len(user_item_list)):
tmp = user_item_list[i]._asdict()
tmp.pop("user")
tmp.pop("id")
items.append(tmp)
if len(items) >= int(data["maxCount"]):
break
xout = kind * 10000000000 + next_idx + len(items)
if len(items) < int(data["maxCount"]):
next_idx = 0
else:
next_idx = xout
return {
"userId": data["userId"],
"nextIndex": next_idx,
"itemKind": kind,
"userItemList": items,
}
def handle_get_user_character_api_request(self, data: Dict) -> Dict:
characters = self.data.item.get_characters(data["userId"])
chara_list = []
for chara in characters:
tmp = chara._asdict()
tmp.pop("id")
tmp.pop("user")
chara_list.append(tmp)
return {"userId": data["userId"], "userCharacterList": chara_list}
def handle_get_user_favorite_api_request(self, data: Dict) -> Dict:
favorites = self.data.item.get_favorites(data["userId"], data["itemKind"])
if favorites is None:
return
userFavs = []
for fav in favorites:
userFavs.append(
{
"userId": data["userId"],
"itemKind": fav["itemKind"],
"itemIdList": fav["itemIdList"],
}
)
return {"userId": data["userId"], "userFavoriteData": userFavs}
def handle_get_user_ghost_api_request(self, data: Dict) -> Dict:
ghost = self.data.profile.get_profile_ghost(data["userId"], self.version)
if ghost is None:
return
ghost_dict = ghost._asdict()
ghost_dict.pop("user")
ghost_dict.pop("id")
ghost_dict.pop("version_int")
return {"userId": data["userId"], "userGhost": ghost_dict}
def handle_get_user_rating_api_request(self, data: Dict) -> Dict:
rating = self.data.profile.get_profile_rating(data["userId"], self.version)
if rating is None:
return
rating_dict = rating._asdict()
rating_dict.pop("user")
rating_dict.pop("id")
rating_dict.pop("version")
return {"userId": data["userId"], "userRating": rating_dict}
def handle_get_user_activity_api_request(self, data: Dict) -> Dict:
"""
kind 1 is playlist, kind 2 is music list
"""
playlist = self.data.profile.get_profile_activity(data["userId"], 1)
musiclist = self.data.profile.get_profile_activity(data["userId"], 2)
if playlist is None or musiclist is None:
return
plst = []
mlst = []
for play in playlist:
tmp = play._asdict()
tmp["id"] = tmp["activityId"]
tmp.pop("activityId")
tmp.pop("user")
plst.append(tmp)
for music in musiclist:
tmp = music._asdict()
tmp["id"] = tmp["activityId"]
tmp.pop("activityId")
tmp.pop("user")
mlst.append(tmp)
return {"userActivity": {"playList": plst, "musicList": mlst}}
def handle_get_user_course_api_request(self, data: Dict) -> Dict:
user_courses = self.data.score.get_courses(data["userId"])
if user_courses is None:
return {"userId": data["userId"], "nextIndex": 0, "userCourseList": []}
course_list = []
for course in user_courses:
tmp = course._asdict()
tmp.pop("user")
tmp.pop("id")
course_list.append(tmp)
return {"userId": data["userId"], "nextIndex": 0, "userCourseList": course_list}
def handle_get_user_portrait_api_request(self, data: Dict) -> Dict:
# No support for custom pfps
return {"length": 0, "userPortraitList": []}
def handle_get_user_friend_season_ranking_api_request(self, data: Dict) -> Dict:
friend_season_ranking = self.data.item.get_friend_season_ranking(data["userId"])
if friend_season_ranking is None:
return {
"userId": data["userId"],
"nextIndex": 0,
"userFriendSeasonRankingList": [],
}
friend_season_ranking_list = []
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
for x in range(next_idx, len(friend_season_ranking)):
tmp = friend_season_ranking[x]._asdict()
tmp.pop("user")
tmp.pop("id")
tmp["recordDate"] = datetime.strftime(
tmp["recordDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
)
friend_season_ranking_list.append(tmp)
if len(friend_season_ranking_list) >= max_ct:
break
if len(friend_season_ranking) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = 0
return {
"userId": data["userId"],
"nextIndex": next_idx,
"userFriendSeasonRankingList": friend_season_ranking_list,
}
def handle_get_user_map_api_request(self, data: Dict) -> Dict:
maps = self.data.item.get_maps(data["userId"])
if maps is None:
return {
"userId": data["userId"],
"nextIndex": 0,
"userMapList": [],
}
map_list = []
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
for x in range(next_idx, len(maps)):
tmp = maps[x]._asdict()
tmp.pop("user")
tmp.pop("id")
map_list.append(tmp)
if len(map_list) >= max_ct:
break
if len(maps) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = 0
return {
"userId": data["userId"],
"nextIndex": next_idx,
"userMapList": map_list,
}
def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict:
login_bonuses = self.data.item.get_login_bonuses(data["userId"])
if login_bonuses is None:
return {
"userId": data["userId"],
"nextIndex": 0,
"userLoginBonusList": [],
}
login_bonus_list = []
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
for x in range(next_idx, len(login_bonuses)):
tmp = login_bonuses[x]._asdict()
tmp.pop("user")
tmp.pop("id")
login_bonus_list.append(tmp)
if len(login_bonus_list) >= max_ct:
break
if len(login_bonuses) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = 0
return {
"userId": data["userId"],
"nextIndex": next_idx,
"userLoginBonusList": login_bonus_list,
}
def handle_get_user_region_api_request(self, data: Dict) -> Dict:
return {"userId": data["userId"], "length": 0, "userRegionList": []}
def handle_get_user_music_api_request(self, data: Dict) -> Dict:
user_id = data.get("userId", 0)
next_index = data.get("nextIndex", 0)
max_ct = data.get("maxCount", 50)
upper_lim = next_index + max_ct
music_detail_list = []
if user_id <= 0:
self.logger.warning("handle_get_user_music_api_request: Could not find userid in data, or userId is 0")
return {}
songs = self.data.score.get_best_scores(user_id)
if songs is None:
self.logger.debug("handle_get_user_music_api_request: get_best_scores returned None!")
return {
"userId": data["userId"],
"nextIndex": 0,
"userMusicList": [],
}
num_user_songs = len(songs)
for x in range(next_index, upper_lim):
if num_user_songs <= x:
break
tmp = songs[x]._asdict()
tmp.pop("id")
tmp.pop("user")
music_detail_list.append(tmp)
next_index = 0 if len(music_detail_list) < max_ct or num_user_songs == upper_lim else upper_lim
self.logger.info(f"Send songs {next_index}-{upper_lim} ({len(music_detail_list)}) out of {num_user_songs} for user {user_id} (next idx {next_index})")
return {
"userId": data["userId"],
"nextIndex": next_index,
"userMusicList": [{"userMusicDetailList": music_detail_list}],
}
def handle_user_login_api_request(self, data: Dict) -> Dict:
ret = super().handle_user_login_api_request(data)
if ret is None or not ret:
return ret
ret['loginId'] = ret.get('loginCount', 0)
return ret