idac: added version based stamp and time trial

This commit is contained in:
Dniel97 2024-06-15 23:56:09 +02:00
parent c086b3f8cf
commit d2175f5b0e
Signed by: Dniel97
GPG Key ID: 6180B3C768FB2E08
4 changed files with 179 additions and 110 deletions

View File

@ -734,7 +734,7 @@ python dbutils.py upgrade
### TimeRelease Chapter:
1. Story: 1, 2, 3, 4, 5, 6, 7, 8, 9, 19 (Chapter 10), 29 (Chapter 11)
2. MF Ghost: 10, 11, 12, 13, 14, 15
2. MF Ghost: 10, 11, 12, 13, 14
3. Bunta: 15, 16, 17, 18, 20, 21, 21, 22
4. Touhou Project Special Event: 23, 24, 25, 26, 27, 28
5. Hatsune Miku Special Event: 36, 37, 38

View File

@ -17,13 +17,15 @@ timerelease:
stamp:
enable: True
enabled_stamps: # max 3 play stamps
- "touhou_remilia_scarlet"
- "touhou_flandre_scarlet"
- "touhou_sakuya_izayoi"
150:
- "touhou_remilia_scarlet"
- "touhou_flandre_scarlet"
- "touhou_sakuya_izayoi"
timetrial:
enable: True
enabled_timetrial: "touhou_remilia_scarlet"
enabled_timetrial:
150: "touhou_remilia_scarlet"
battle_event:
enabled: True

View File

@ -1,3 +1,4 @@
from typing import Dict
from core.config import CoreConfig
@ -95,17 +96,28 @@ class IDACStampConfig:
)
@property
def enabled_stamps(self) -> list:
def enabled_stamps(self) -> Dict:
"""
In the form of:
<version as int>:
- <stamp name 1>
- <stamp name 2>
- <stamp name 3>
max 3 stamps per version
f.e.:
150:
- "touhou_remilia_scarlet"
- "touhou_flandre_scarlet"
- "touhou_sakuya_izayoi"
"""
return CoreConfig.get_config_field(
self.__config,
"idac",
"stamp",
"enabled_stamps",
default=[
"touhou_remilia_scarlet",
"touhou_flandre_scarlet",
"touhou_sakuya_izayoi",
],
default={},
)
@ -120,13 +132,20 @@ class IDACTimetrialConfig:
)
@property
def enabled_timetrial(self) -> str:
def enabled_timetrial(self) -> Dict:
"""
In the form of:
<version as int>: <timetrial name>
f.e.:
150: "touhou_remilia_scarlet"
"""
return CoreConfig.get_config_field(
self.__config,
"idac",
"timetrial",
"enabled_timetrial",
default="touhou_remilia_scarlet",
default={},
)
class IDACTBattleGiftConfig:

View File

@ -1,3 +1,4 @@
from collections import defaultdict
from datetime import datetime, timedelta
import os
from random import choice, randint
@ -24,42 +25,52 @@ class IDACSeason2(IDACBase):
self.game_config.timerelease.timerelease_avatar_gacha_no
)
# load the user configured play stamps (up to 3)
self.stamp_info = []
# load the user configured play stamps (up to 3) for each game version
self.stamp_info = defaultdict(list)
if self.game_config.stamp.enable:
for stamp in self.game_config.stamp.enabled_stamps:
if not os.path.exists(f"./titles/idac/data/stamp/{stamp}.json"):
self.logger.warning(f"Stamp {stamp} is enabled but does not exist!")
continue
enabled_stamps = self.game_config.stamp.enabled_stamps
for game_ver in enabled_stamps.keys():
for stamp in enabled_stamps[game_ver][:3]:
if not os.path.exists(f"./titles/idac/data/stamp/{stamp}.json"):
self.logger.warning(
f"Stamp {stamp} is enabled but does not exist!"
)
continue
with open(
f"./titles/idac/data/stamp/{stamp}.json", encoding="UTF-8"
) as f:
self.logger.debug(f"Loading stamp {stamp}")
self.stamp_info.append(self._fix_dates(json.load(f)))
# load the user configured time trials (only one)
self.timetrial_event = {}
self.timetrial_event_id = None
if self.game_config.timetrial.enable:
timetrial = self.game_config.timetrial.enabled_timetrial
if timetrial is not None:
if not os.path.exists(f"./titles/idac/data/timetrial/{timetrial}.json"):
self.logger.warning(
f"Timetrial {timetrial} is enabled but does not exist!"
)
else:
self.logger.debug(f"Loading timetrial {timetrial}")
with open(
f"./titles/idac/data/timetrial/{timetrial}.json",
encoding="UTF-8",
f"./titles/idac/data/stamp/{stamp}.json", encoding="UTF-8"
) as f:
self.timetrial_event = self._fix_dates(json.load(f))
self.logger.debug(f"Loading stamp {stamp}")
self.stamp_info[game_ver].append(self._fix_dates(json.load(f)))
# required for saving
self.timetrial_event_id = self.timetrial_event.get(
"timetrial_event_id"
)
# load the user configured time trial (only one) for each game version
self.timetrial_event = {}
self.timetrial_event_id = {}
if self.game_config.timetrial.enable:
enabled_timetrial = self.game_config.timetrial.enabled_timetrial
for game_ver in enabled_timetrial.keys():
timetrial = enabled_timetrial[game_ver]
if timetrial is not None:
if not os.path.exists(
f"./titles/idac/data/timetrial/{timetrial}.json"
):
self.logger.warning(
f"Timetrial {timetrial} is enabled but does not exist!"
)
else:
self.logger.debug(f"Loading timetrial {timetrial}")
with open(
f"./titles/idac/data/timetrial/{timetrial}.json",
encoding="UTF-8",
) as f:
self.timetrial_event[game_ver] = self._fix_dates(
json.load(f)
)
# required for saving
self.timetrial_event_id[game_ver] = self.timetrial_event.get(
game_ver
).get("timetrial_event_id")
# load the user configured round event (only one)
asyncio.create_task(self._load_round_event())
@ -95,37 +106,55 @@ class IDACSeason2(IDACBase):
if round is not None:
path = f"./titles/idac/data/rounds/season{self.version+1}/{round}.json"
if not os.path.exists(path):
self.logger.warning(f"Round info {round} is enabled but json file does not exist!")
self.logger.warning(
f"Round info {round} is enabled but json file does not exist!"
)
else:
with open(path, encoding="UTF-8") as f:
self.logger.debug(f"Loading round info {round}...")
tmp = json.load(f)
self.round_event_id = await self.data.rounds._try_load_round_event(tmp["round_event_id"], self.version, tmp)
self.round_event_id = (
await self.data.rounds._try_load_round_event(
tmp["round_event_id"], self.version, tmp
)
)
self.round_event.append(self._fix_dates(tmp))
self.logger.debug(f"Loaded round id for database: {self.round_event_id}...")
self.logger.debug(
f"Loaded round id for database: {self.round_event_id}..."
)
# Load last round event
round = self.game_config.round_event.last_round
if round is not None:
path = f"./titles/idac/data/rounds/season{self.version+1}/{round}.json"
if not os.path.exists(path):
self.logger.warning(f"Round info {round} is enabled but json file does not exist!")
self.logger.warning(
f"Round info {round} is enabled but json file does not exist!"
)
else:
with open(path, encoding="UTF-8") as f:
self.logger.debug(f"Loading round info {round}...")
tmp = json.load(f)
self.last_round_event_id = await self.data.rounds._try_load_round_event(tmp["round_event_id"], self.version, tmp)
self.last_round_event_id = (
await self.data.rounds._try_load_round_event(
tmp["round_event_id"], self.version, tmp
)
)
self.last_round_event.append(tmp)
self.logger.debug(f"Loaded round id for database: {self.last_round_event_id}...")
self.logger.debug(
f"Loaded round id for database: {self.last_round_event_id}..."
)
# Load top five of last round event
# class LastRoundEventRanking(BaseModel):
# round_rank: int
# round_point: int
# round_play_count: int
# round_rank: int
# round_point: int
# round_play_count: int
ranking = await self.data.rounds.get_round_top_five(self.last_round_event_id)
ranking = await self.data.rounds.get_round_top_five(
self.last_round_event_id
)
if ranking is not None:
rank_profile = {
"round_rank": 0,
@ -138,16 +167,19 @@ class IDACSeason2(IDACBase):
"mytitle_id": 0,
"mytitle_effect_id": 0,
"car_data": [],
"user_avatar": []
"user_avatar": [],
}
for user in ranking:
# get the user's profile
p = await self.data.profile.get_profile(user["user"], self.version)
user_id = user["user"]
p = await self.data.profile.get_profile(user_id, self.version)
user_data = p._asdict()
arcade = await self.data.arcade.get_arcade(user_data["store"])
user_data["store_name"] = (
self.core_cfg.server.name if arcade is None else arcade["name"]
)
rank_profile["username"] = user_data["username"]
rank_profile["country"] = user_data["country"]
rank_profile["store"] = user_data["store_name"]
@ -155,7 +187,7 @@ class IDACSeason2(IDACBase):
rank_profile["mytitle_effect_id"] = user_data["mytitle_effect_id"]
# get the user's avatar
a = await self.data.profile.get_profile_avatar(user["user"])
a = await self.data.profile.get_profile_avatar(user_id)
avatar_data = a._asdict()
del avatar_data["id"]
del avatar_data["user"]
@ -170,7 +202,10 @@ class IDACSeason2(IDACBase):
rank_profile["online_battle_rank"] = rank_data["online_battle_rank"]
# get the user's car
cars = await self.data.item.get_cars(self.version, user_id, only_pickup=True)
cars = await self.data.item.get_cars(
self.version, user_id, only_pickup=True
)
fulltune_count = 0
total_car_parts_count = 0
car_data = []
@ -183,7 +218,9 @@ class IDACSeason2(IDACBase):
rank_profile["car_data"] = car_data
# get round info
ri = await self.data.rounds.get_round_info_by_id(user["user"], self.last_round_event_id)
ri = await self.data.rounds.get_round_info_by_id(
user_id, self.last_round_event_id
)
if ri is not None:
ri = ri._asdict()
rank_profile["round_rank"] = user["find_in_set_1"]
@ -214,16 +251,16 @@ class IDACSeason2(IDACBase):
output[key] = value
return output
def _headers_to_version(self, headers: Dict) -> int:
version = headers.get("device_version", "1.00.00")
return int(version.replace(".", "")[:-2])
def _special_story_type(self, headers: Dict):
version = headers["device_version"]
ver_str = version.replace(".", "")[:3]
ver = self._headers_to_version(headers)
# 4 = touhou project, 5 = hatsune miku
return {
"150": 4,
"170": 5
}.get(ver_str, 0)
return {150: 4, 170: 5}.get(ver, 0)
async def handle_boot_getconfigdata_request(self, data: Dict, headers: Dict):
"""
@ -255,8 +292,7 @@ class IDACSeason2(IDACBase):
34 = FullTune Ticket Fragment
35 = Underneon Lights
"""
version = headers["device_version"]
ver_str = version.replace(".", "")[:3]
ver_str = self._headers_to_version(headers)
if self.core_cfg.server.is_using_proxy:
domain_api_game = f"http://{self.core_cfg.server.hostname}/{ver_str}/"
@ -272,7 +308,7 @@ class IDACSeason2(IDACBase):
datetime.strptime("2023-01-01", "%Y-%m-%d").timestamp()
),
"end_dt": int(datetime.strptime("2029-01-01", "%Y-%m-%d").timestamp()),
"story_type": special_story_type
"story_type": special_story_type,
}
return {
@ -309,7 +345,7 @@ class IDACSeason2(IDACBase):
"last_round_event": self.last_round_event,
"last_round_event_ranking": self.last_round_event_ranking,
"round_event_exp": [],
"stamp_info": self.stamp_info,
"stamp_info": self.stamp_info[ver_str],
# 0 = use default data, 1+ = server version of timereleasedata response
"timerelease_no": self.timerelease_no,
# 0 = use default data, 1+ = server version of gachadata response
@ -477,7 +513,7 @@ class IDACSeason2(IDACBase):
"theory_close_version": "9.99.99",
# unlocks the version specific special mode
"special_mode_data": special_mode_data,
"timetrial_event_data": self.timetrial_event,
"timetrial_event_data": self.timetrial_event[ver_str],
"number_lottery_data": [
{
"m_number_lottery_win_number_no": 10,
@ -619,9 +655,9 @@ class IDACSeason2(IDACBase):
"""
timerelease chapter:
1 = Story: 1, 2, 3, 4, 5, 6, 7, 8, 9, 19 (Chapter 10), 29 (Chapter 11)
2 = MF Ghost: 10, 11, 12, 13, 14, 15
2 = MF Ghost: 10, 11, 12, 13, 14
3 = Bunta: 15, 16, 17, 18, 20, 21, 21, 22
4 = Touch Project Special Event: 23, 24, 25, 26, 27, 28
4 = Touhou Project Special Event: 23, 24, 25, 26, 27, 28
5 = Hatsune Miku Special Event: 36, 37, 38
"""
path = "./titles/idac/data/"
@ -709,11 +745,11 @@ class IDACSeason2(IDACBase):
"shop_best_data": best_data,
"rank_management_flag": 0,
}
def _is_valid_version(self, db_version: str, cl_version: str) -> bool:
if len(db_version) < 7 or len(cl_version) < 7:
return False
# convert the trings to int and compare
db_version = int(db_version.replace(".", "")[:7])
cl_version = int(cl_version.replace(".", "")[:7])
@ -975,15 +1011,17 @@ class IDACSeason2(IDACBase):
vs_info["course_select_priority"] = data.get("course_select_priority")
return vs_info
async def _update_round_info(self, user_id: int, round_event_id: int, point: int, win: int) -> Dict:
async def _update_round_info(
self, user_id: int, round_event_id: int, point: int, win: int
) -> Dict:
# get the round info data from database
round_info = []
ri = await self.data.rounds.get_round_info_by_id(user_id, round_event_id)
if ri is not None:
tmp = ri._asdict()
del tmp["id"]
# calculate new round points and info
tmp["point"] = 0 if tmp["point"] + point < 1 else tmp["point"] + point
tmp["count"] += 1
@ -994,7 +1032,7 @@ class IDACSeason2(IDACBase):
await self.data.rounds.put_round_event(user_id, round_event_id, tmp)
del tmp["play_dt"]
# get players new round ranking
r = await self.data.rounds.get_round_rank_by_id(user_id, round_event_id)
round_ranking = r._asdict()
@ -1002,7 +1040,7 @@ class IDACSeason2(IDACBase):
# TODO: get players historical earned points
tmp["total_round_point"] = 0
round_info.append(tmp)
else:
# new player of now-going round event
@ -1021,11 +1059,10 @@ class IDACSeason2(IDACBase):
# TODO: get players historical earned points
tmp["total_round_point"] = 0
round_info.append(tmp)
return round_info
def _choose_gift_id(self) -> Dict:
gift_data = self.battle_gift_event["gift_data"]
# calculate the total_rate based on the first_distribution_rate
@ -1108,19 +1145,21 @@ class IDACSeason2(IDACBase):
}
)
# get the users'd round data
# get the users'd round data
round_info = []
ri = await self.data.rounds.get_round_info_by_id(user_id, self.round_event_id)
if ri is not None:
r = await self.data.rounds.get_round_rank_by_id(user_id, self.round_event_id)
r = await self.data.rounds.get_round_rank_by_id(
user_id, self.round_event_id
)
round_ranking = r._asdict()
tmp = ri._asdict()
del tmp["id"]
del tmp["user"]
del tmp["round_id"]
del tmp["play_dt"]
tmp["rank"] = round_ranking["find_in_set_1"]
# TODO: calculate this
tmp["total_round_point"] = 0
@ -1282,10 +1321,12 @@ class IDACSeason2(IDACBase):
del tmp["create_date_weekly"]
stamp_event_data.append(tmp)
ver_str = self._headers_to_version(headers)
# get the user's timetrial event data
timetrial_event_data = {}
timetrial = await self.data.item.get_timetrial_event(
user_id, self.timetrial_event_id
user_id, self.timetrial_event_id[ver_str]
)
if timetrial is not None:
timetrial_event_data = {
@ -2031,9 +2072,11 @@ class IDACSeason2(IDACBase):
data["play_dt"] = datetime.now()
await self.data.item.put_time_trial(self.version, user_id, data)
ver_str = self._headers_to_version(headers)
# update the timetrial event points
await self.data.item.put_timetrial_event(
user_id, self.timetrial_event_id, event_point
user_id, self.timetrial_event_id[ver_str], event_point
)
# update the tips play count
@ -2044,6 +2087,8 @@ class IDACSeason2(IDACBase):
{"timetrial_play_count": tips["timetrial_play_count"] + 1},
)
ver_str = self._headers_to_version(headers)
return {
"status_code": "0",
"course_rank": course_rank,
@ -2052,7 +2097,7 @@ class IDACSeason2(IDACBase):
"car_use_count": [],
"maker_use_count": [],
"timetrial_event_data": {
"timetrial_event_id": self.timetrial_event_id,
"timetrial_event_id": self.timetrial_event_id[ver_str],
"point": event_point,
},
}
@ -3060,7 +3105,7 @@ class IDACSeason2(IDACBase):
}
async def handle_user_updateonlinebattle_request(self, data: Dict, headers: Dict):
#TODO: voiding cheaters' result. which will need to analysis historical battle results returned by the game.
# TODO: voiding cheaters' result. which will need to analysis historical battle results returned by the game.
return {
"status_code": "0",
"bothwin_penalty": 1,
@ -3136,7 +3181,9 @@ class IDACSeason2(IDACBase):
{"online_battle_play_count": tips["online_battle_play_count"] + 1},
)
round_info = await self._update_round_info(user_id, self.round_event_id, data.pop("round_point"), data.pop("win_flg"))
round_info = await self._update_round_info(
user_id, self.round_event_id, data.pop("round_point"), data.pop("win_flg")
)
return {
"status_code": "0",
@ -3237,7 +3284,9 @@ class IDACSeason2(IDACBase):
"maker_use_count": [],
}
async def handle_factory_numberlotterybefore_request(self, data: Dict, headers: Dict):
async def handle_factory_numberlotterybefore_request(
self, data: Dict, headers: Dict
):
user_id = headers["session"]
win_list = []
lottery_count = 0
@ -3254,23 +3303,21 @@ class IDACSeason2(IDACBase):
for number in range(10):
if saved_value & 1:
# if the least significant bit is 1, add to the win_list
win_list.append({
"m_number_lottery_schedule_no": 1,
"win_number": number * 1111
})
win_list.append(
{"m_number_lottery_schedule_no": 1, "win_number": number * 1111}
)
# right shift saved_value to check the next bit
saved_value >>= 1
return {
"status_code": "0",
"lottery_info": {
"lottery_count": lottery_count,
"win_list": win_list
}
"lottery_info": {"lottery_count": lottery_count, "win_list": win_list},
}
async def handle_factory_numberlotteryresult_request(self, data: Dict, headers: Dict):
async def handle_factory_numberlotteryresult_request(
self, data: Dict, headers: Dict
):
user_id = headers["session"]
win_number = data.pop("win_number")
lottery_count = data.pop("lottery_count")
@ -3289,20 +3336,23 @@ class IDACSeason2(IDACBase):
# calculate the bit position to set based on the win_number
shifted = win_number // 1111
saved_value += 1 << shifted
# update the create_date timestamp when the last create_date is older than 24 hours
if l and datetime.now() - l["create_date"] > timedelta(days=1):
create_date = datetime.now()
# update the lottery data with the new saved_value and lottery_count
await self.data.factory.put_lottery(user_id, self.version, saved_value, lottery_count, create_date)
await self.data.factory.put_lottery(
user_id, self.version, saved_value, lottery_count, create_date
)
if license_no != 10000 and is_end == 1:
# ithe lottery is ended, save car data
await self.data.item.put_car(user_id, self.version, {
"style_car_id": style_car_id,
"l_no": license_no
})
await self.data.item.put_car(
user_id,
self.version,
{"style_car_id": style_car_id, "l_no": license_no},
)
# save each ticket data
for ticket in ticket_data:
@ -3311,6 +3361,4 @@ class IDACSeason2(IDACBase):
# save remaining profile data
await self.data.profile.put_profile(user_id, self.version, data)
return {
"status_code": "0"
}
return {"status_code": "0"}