471 lines
16 KiB
Python
471 lines
16 KiB
Python
import logging
|
|
from datetime import datetime, timedelta
|
|
from random import randint
|
|
from typing import Dict
|
|
|
|
from core.config import CoreConfig
|
|
from titles.chuni.const import ChuniConstants
|
|
from titles.chuni.database import ChuniData
|
|
from titles.chuni.base import ChuniBase
|
|
from titles.chuni.config import ChuniConfig
|
|
|
|
|
|
class ChuniNew(ChuniBase):
|
|
ITEM_TYPE = {"character": 20, "story": 21, "card": 22}
|
|
|
|
def __init__(self, core_cfg: CoreConfig, game_cfg: ChuniConfig) -> None:
|
|
self.core_cfg = core_cfg
|
|
self.game_cfg = game_cfg
|
|
self.data = ChuniData(core_cfg)
|
|
self.date_time_format = "%Y-%m-%d %H:%M:%S"
|
|
self.logger = logging.getLogger("chuni")
|
|
self.game = ChuniConstants.GAME_CODE
|
|
self.version = ChuniConstants.VER_CHUNITHM_NEW
|
|
|
|
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
|
|
match_start = datetime.strftime(
|
|
datetime.now() - timedelta(hours=10), self.date_time_format
|
|
)
|
|
match_end = datetime.strftime(
|
|
datetime.now() + timedelta(hours=10), self.date_time_format
|
|
)
|
|
reboot_start = datetime.strftime(
|
|
datetime.now() - timedelta(hours=11), self.date_time_format
|
|
)
|
|
reboot_end = datetime.strftime(
|
|
datetime.now() - timedelta(hours=10), self.date_time_format
|
|
)
|
|
return {
|
|
"gameSetting": {
|
|
"isMaintenance": "false",
|
|
"requestInterval": 10,
|
|
"rebootStartTime": reboot_start,
|
|
"rebootEndTime": reboot_end,
|
|
"isBackgroundDistribute": "false",
|
|
"maxCountCharacter": 300,
|
|
"maxCountItem": 300,
|
|
"maxCountMusic": 300,
|
|
"matchStartTime": match_start,
|
|
"matchEndTime": match_end,
|
|
"matchTimeLimit": 99,
|
|
"matchErrorLimit": 9999,
|
|
"romVersion": self.game_cfg.version.version_rom(self.version),
|
|
"dataVersion": self.game_cfg.version.version_data(self.version),
|
|
"matchingUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
|
|
"matchingUriX": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
|
|
"udpHolePunchUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
|
|
"reflectorUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
|
|
},
|
|
"isDumpUpload": "false",
|
|
"isAou": "false",
|
|
}
|
|
|
|
def handle_remove_token_api_request(self, data: Dict) -> Dict:
|
|
return {"returnCode": "1"}
|
|
|
|
def handle_delete_token_api_request(self, data: Dict) -> Dict:
|
|
return {"returnCode": "1"}
|
|
|
|
def handle_create_token_api_request(self, data: Dict) -> Dict:
|
|
return {"returnCode": "1"}
|
|
|
|
def handle_get_user_map_area_api_request(self, data: Dict) -> Dict:
|
|
user_map_areas = self.data.item.get_map_areas(data["userId"])
|
|
|
|
map_areas = []
|
|
for map_area in user_map_areas:
|
|
tmp = map_area._asdict()
|
|
tmp.pop("id")
|
|
tmp.pop("user")
|
|
map_areas.append(tmp)
|
|
|
|
return {"userId": data["userId"], "userMapAreaList": map_areas}
|
|
|
|
def handle_get_user_symbol_chat_setting_api_request(self, data: Dict) -> Dict:
|
|
return {"userId": data["userId"], "symbolCharInfoList": []}
|
|
|
|
def handle_get_user_preview_api_request(self, data: Dict) -> Dict:
|
|
profile = self.data.profile.get_profile_preview(data["userId"], self.version)
|
|
if profile is None:
|
|
return None
|
|
profile_character = self.data.item.get_character(
|
|
data["userId"], profile["characterId"]
|
|
)
|
|
|
|
if profile_character is None:
|
|
chara = {}
|
|
else:
|
|
chara = profile_character._asdict()
|
|
chara.pop("id")
|
|
chara.pop("user")
|
|
|
|
data1 = {
|
|
"userId": data["userId"],
|
|
# Current Login State
|
|
"isLogin": False,
|
|
"lastLoginDate": profile["lastPlayDate"],
|
|
# User Profile
|
|
"userName": profile["userName"],
|
|
"reincarnationNum": profile["reincarnationNum"],
|
|
"level": profile["level"],
|
|
"exp": profile["exp"],
|
|
"playerRating": profile["playerRating"],
|
|
"lastGameId": profile["lastGameId"],
|
|
"lastRomVersion": profile["lastRomVersion"],
|
|
"lastDataVersion": profile["lastDataVersion"],
|
|
"lastPlayDate": profile["lastPlayDate"],
|
|
"emoneyBrandId": 0,
|
|
"trophyId": profile["trophyId"],
|
|
# Current Selected Character
|
|
"userCharacter": chara,
|
|
# User Game Options
|
|
"playerLevel": profile["playerLevel"],
|
|
"rating": profile["rating"],
|
|
"headphone": profile["headphone"],
|
|
# Enables favorites and teams
|
|
"chargeState": 1,
|
|
"userNameEx": "",
|
|
"banState": 0,
|
|
"classEmblemMedal": profile["classEmblemMedal"],
|
|
"classEmblemBase": profile["classEmblemBase"],
|
|
"battleRankId": profile["battleRankId"],
|
|
}
|
|
return data1
|
|
|
|
def handle_cm_get_user_preview_api_request(self, data: Dict) -> Dict:
|
|
p = self.data.profile.get_profile_data(data["userId"], self.version)
|
|
if p is None:
|
|
return {}
|
|
|
|
return {
|
|
"userName": p["userName"],
|
|
"level": p["level"],
|
|
"medal": p["medal"],
|
|
"lastDataVersion": "2.00.00",
|
|
"isLogin": False,
|
|
}
|
|
|
|
def handle_printer_login_api_request(self, data: Dict) -> Dict:
|
|
return {"returnCode": 1}
|
|
|
|
def handle_printer_logout_api_request(self, data: Dict) -> Dict:
|
|
return {"returnCode": 1}
|
|
|
|
def handle_get_game_gacha_api_request(self, data: Dict) -> Dict:
|
|
"""
|
|
returns all current active banners (gachas)
|
|
"""
|
|
game_gachas = self.data.static.get_gachas(self.version)
|
|
|
|
# clean the database rows
|
|
game_gacha_list = []
|
|
for gacha in game_gachas:
|
|
tmp = gacha._asdict()
|
|
tmp.pop("id")
|
|
tmp.pop("version")
|
|
tmp["startDate"] = datetime.strftime(tmp["startDate"], "%Y-%m-%d %H:%M:%S")
|
|
tmp["endDate"] = datetime.strftime(tmp["endDate"], "%Y-%m-%d %H:%M:%S")
|
|
tmp["noticeStartDate"] = datetime.strftime(
|
|
tmp["noticeStartDate"], "%Y-%m-%d %H:%M:%S"
|
|
)
|
|
tmp["noticeEndDate"] = datetime.strftime(
|
|
tmp["noticeEndDate"], "%Y-%m-%d %H:%M:%S"
|
|
)
|
|
|
|
game_gacha_list.append(tmp)
|
|
|
|
return {
|
|
"length": len(game_gacha_list),
|
|
"gameGachaList": game_gacha_list,
|
|
# no clue
|
|
"registIdList": [],
|
|
}
|
|
|
|
def handle_get_game_gacha_card_by_id_api_request(self, data: Dict) -> Dict:
|
|
"""
|
|
returns all valid cards for a given gachaId
|
|
"""
|
|
game_gacha_cards = self.data.static.get_gacha_cards(data["gachaId"])
|
|
|
|
game_gacha_card_list = []
|
|
for gacha_card in game_gacha_cards:
|
|
tmp = gacha_card._asdict()
|
|
tmp.pop("id")
|
|
game_gacha_card_list.append(tmp)
|
|
|
|
return {
|
|
"gachaId": data["gachaId"],
|
|
"length": len(game_gacha_card_list),
|
|
# check isPickup from the chuni_static_gachas?
|
|
"isPickup": False,
|
|
"gameGachaCardList": game_gacha_card_list,
|
|
# again no clue
|
|
"emissionList": [],
|
|
"afterCalcList": [],
|
|
"ssrBookCalcList": [],
|
|
}
|
|
|
|
def handle_cm_get_user_data_api_request(self, data: Dict) -> Dict:
|
|
p = self.data.profile.get_profile_data(data["userId"], self.version)
|
|
if p is None:
|
|
return {}
|
|
|
|
profile = p._asdict()
|
|
profile.pop("id")
|
|
profile.pop("user")
|
|
profile.pop("version")
|
|
|
|
return {
|
|
"userId": data["userId"],
|
|
"userData": profile,
|
|
"userEmoney": [
|
|
{
|
|
"type": 0,
|
|
"emoneyCredit": 100,
|
|
"emoneyBrand": 1,
|
|
"ext1": 0,
|
|
"ext2": 0,
|
|
"ext3": 0,
|
|
}
|
|
],
|
|
}
|
|
|
|
def handle_get_user_gacha_api_request(self, data: Dict) -> Dict:
|
|
user_gachas = self.data.item.get_user_gachas(data["userId"])
|
|
if user_gachas is None:
|
|
return {"userId": data["userId"], "length": 0, "userGachaList": []}
|
|
|
|
user_gacha_list = []
|
|
for gacha in user_gachas:
|
|
tmp = gacha._asdict()
|
|
tmp.pop("id")
|
|
tmp.pop("user")
|
|
tmp["dailyGachaDate"] = datetime.strftime(tmp["dailyGachaDate"], "%Y-%m-%d")
|
|
user_gacha_list.append(tmp)
|
|
|
|
return {
|
|
"userId": data["userId"],
|
|
"length": len(user_gacha_list),
|
|
"userGachaList": user_gacha_list,
|
|
}
|
|
|
|
def handle_get_user_printed_card_api_request(self, data: Dict) -> Dict:
|
|
user_print_list = self.data.item.get_user_print_states(
|
|
data["userId"], has_completed=True
|
|
)
|
|
if user_print_list is None:
|
|
return {
|
|
"userId": data["userId"],
|
|
"length": 0,
|
|
"nextIndex": -1,
|
|
"userPrintedCardList": [],
|
|
}
|
|
|
|
print_list = []
|
|
next_idx = int(data["nextIndex"])
|
|
max_ct = int(data["maxCount"])
|
|
|
|
for x in range(next_idx, len(user_print_list)):
|
|
tmp = user_print_list[x]._asdict()
|
|
print_list.append(tmp["cardId"])
|
|
|
|
if len(user_print_list) >= max_ct:
|
|
break
|
|
|
|
if len(user_print_list) >= max_ct:
|
|
next_idx = next_idx + max_ct
|
|
else:
|
|
next_idx = -1
|
|
|
|
return {
|
|
"userId": data["userId"],
|
|
"length": len(print_list),
|
|
"nextIndex": next_idx,
|
|
"userPrintedCardList": print_list,
|
|
}
|
|
|
|
def handle_get_user_card_print_error_api_request(self, data: Dict) -> Dict:
|
|
user_id = data["userId"]
|
|
|
|
user_print_states = self.data.item.get_user_print_states(
|
|
user_id, has_completed=False
|
|
)
|
|
|
|
card_print_state_list = []
|
|
for card in user_print_states:
|
|
tmp = card._asdict()
|
|
tmp["orderId"] = tmp["id"]
|
|
tmp.pop("user")
|
|
tmp["limitDate"] = datetime.strftime(tmp["limitDate"], "%Y-%m-%d")
|
|
|
|
card_print_state_list.append(tmp)
|
|
|
|
return {
|
|
"userId": user_id,
|
|
"length": len(card_print_state_list),
|
|
"userCardPrintStateList": card_print_state_list,
|
|
}
|
|
|
|
def handle_cm_get_user_character_api_request(self, data: Dict) -> Dict:
|
|
return super().handle_get_user_character_api_request(data)
|
|
|
|
def handle_cm_get_user_item_api_request(self, data: Dict) -> Dict:
|
|
return super().handle_get_user_item_api_request(data)
|
|
|
|
def handle_roll_gacha_api_request(self, data: Dict) -> Dict:
|
|
"""
|
|
Handle a gacha roll API request, with:
|
|
gachaId: the gachaId where the cards should be pulled from
|
|
times: the number of gacha rolls
|
|
characterId: the character which the user wants
|
|
"""
|
|
gacha_id = data["gachaId"]
|
|
num_rolls = data["times"]
|
|
chara_id = data["characterId"]
|
|
|
|
rolled_cards = []
|
|
|
|
# characterId is set after 10 rolls, where the user can select a card
|
|
# from all gameGachaCards, therefore the correct cardId for a given
|
|
# characterId should be returned
|
|
if chara_id != -1:
|
|
# get the
|
|
card = self.data.static.get_gacha_card_by_character(gacha_id, chara_id)
|
|
|
|
tmp = card._asdict()
|
|
tmp.pop("id")
|
|
|
|
rolled_cards.append(tmp)
|
|
else:
|
|
gacha_cards = self.data.static.get_gacha_cards(gacha_id)
|
|
|
|
# get the card id for each roll
|
|
for _ in range(num_rolls):
|
|
# get the index from all possible cards
|
|
card_idx = randint(0, len(gacha_cards) - 1)
|
|
# remove the index from the cards so it wont get pulled again
|
|
card = gacha_cards.pop(card_idx)
|
|
|
|
# remove the "id" fronm the card
|
|
tmp = card._asdict()
|
|
tmp.pop("id")
|
|
|
|
rolled_cards.append(tmp)
|
|
|
|
return {"length": len(rolled_cards), "gameGachaCardList": rolled_cards}
|
|
|
|
def handle_cm_upsert_user_gacha_api_request(self, data: Dict) -> Dict:
|
|
upsert = data["cmUpsertUserGacha"]
|
|
user_id = data["userId"]
|
|
place_id = data["placeId"]
|
|
|
|
# save the user data
|
|
user_data = upsert["userData"]
|
|
user_data.pop("rankUpChallengeResults")
|
|
user_data.pop("userEmoney")
|
|
|
|
self.data.profile.put_profile_data(user_id, self.version, user_data)
|
|
|
|
# save the user gacha
|
|
user_gacha = upsert["userGacha"]
|
|
gacha_id = user_gacha["gachaId"]
|
|
user_gacha.pop("gachaId")
|
|
user_gacha.pop("dailyGachaDate")
|
|
|
|
self.data.item.put_user_gacha(user_id, gacha_id, user_gacha)
|
|
|
|
# save all user items
|
|
if "userItemList" in upsert:
|
|
for item in upsert["userItemList"]:
|
|
self.data.item.put_item(user_id, item)
|
|
|
|
# add every gamegachaCard to database
|
|
for card in upsert["gameGachaCardList"]:
|
|
self.data.item.put_user_print_state(
|
|
user_id,
|
|
hasCompleted=False,
|
|
placeId=place_id,
|
|
cardId=card["cardId"],
|
|
gachaId=card["gachaId"],
|
|
)
|
|
|
|
# retrieve every game gacha card which has been added in order to get
|
|
# the orderId for the next request
|
|
user_print_states = self.data.item.get_user_print_states_by_gacha(
|
|
user_id, gacha_id, has_completed=False
|
|
)
|
|
card_print_state_list = []
|
|
for card in user_print_states:
|
|
tmp = card._asdict()
|
|
tmp["orderId"] = tmp["id"]
|
|
tmp.pop("user")
|
|
tmp["limitDate"] = datetime.strftime(tmp["limitDate"], "%Y-%m-%d")
|
|
|
|
card_print_state_list.append(tmp)
|
|
|
|
return {
|
|
"returnCode": "1",
|
|
"apiName": "CMUpsertUserGachaApi",
|
|
"userCardPrintStateList": card_print_state_list,
|
|
}
|
|
|
|
def handle_cm_upsert_user_printlog_api_request(self, data: Dict) -> Dict:
|
|
return {
|
|
"returnCode": 1,
|
|
"orderId": 0,
|
|
"serialId": "11111111111111111111",
|
|
"apiName": "CMUpsertUserPrintlogApi",
|
|
}
|
|
|
|
def handle_cm_upsert_user_print_api_request(self, data: Dict) -> Dict:
|
|
user_print_detail = data["userPrintDetail"]
|
|
user_id = data["userId"]
|
|
|
|
# generate random serial id
|
|
serial_id = "".join([str(randint(0, 9)) for _ in range(20)])
|
|
|
|
# not needed because are either zero or unset
|
|
user_print_detail.pop("orderId")
|
|
user_print_detail.pop("printNumber")
|
|
user_print_detail.pop("serialId")
|
|
user_print_detail["printDate"] = datetime.strptime(
|
|
user_print_detail["printDate"], "%Y-%m-%d"
|
|
)
|
|
|
|
# add the entry to the user print table with the random serialId
|
|
self.data.item.put_user_print_detail(user_id, serial_id, user_print_detail)
|
|
|
|
return {
|
|
"returnCode": 1,
|
|
"orderId": 0,
|
|
"serialId": serial_id,
|
|
"apiName": "CMUpsertUserPrintApi",
|
|
}
|
|
|
|
def handle_cm_upsert_user_print_subtract_api_request(self, data: Dict) -> Dict:
|
|
upsert = data["userCardPrintState"]
|
|
user_id = data["userId"]
|
|
place_id = data["placeId"]
|
|
|
|
# save all user items
|
|
if "userItemList" in data:
|
|
for item in data["userItemList"]:
|
|
self.data.item.put_item(user_id, item)
|
|
|
|
# set the card print state to success and use the orderId as the key
|
|
self.data.item.put_user_print_state(
|
|
user_id, id=upsert["orderId"], hasCompleted=True
|
|
)
|
|
|
|
return {"returnCode": "1", "apiName": "CMUpsertUserPrintSubtractApi"}
|
|
|
|
def handle_cm_upsert_user_print_cancel_api_request(self, data: Dict) -> Dict:
|
|
order_ids = data["orderIdList"]
|
|
user_id = data["userId"]
|
|
|
|
# set the card print state to success and use the orderId as the key
|
|
for order_id in order_ids:
|
|
self.data.item.put_user_print_state(user_id, id=order_id, hasCompleted=True)
|
|
|
|
return {"returnCode": "1", "apiName": "CMUpsertUserPrintCancelApi"}
|