Merge pull request 'Chunithm Improvements' (#16) from Dniel97/artemis:chunithm_improvements into develop

Reviewed-on: Hay1tsme/artemis#16
This commit is contained in:
Midorica 2023-03-30 21:03:37 +00:00
commit 979bd7d718
10 changed files with 506 additions and 68 deletions

View File

@ -2,5 +2,19 @@ server:
enable: True
loglevel: "info"
team:
name: ARTEMiS
mods:
use_login_bonus: True
version:
11:
rom: 2.00.00
data: 2.00.00
12:
rom: 2.05.00
data: 2.05.00
crypto:
encrypted_only: False

View File

@ -7,4 +7,4 @@ index = ChuniServlet
database = ChuniData
reader = ChuniReader
game_codes = [ChuniConstants.GAME_CODE, ChuniConstants.GAME_CODE_NEW]
current_schema_version = 1
current_schema_version = 3

View File

@ -23,7 +23,98 @@ class ChuniBase:
self.version = ChuniConstants.VER_CHUNITHM
def handle_game_login_api_request(self, data: Dict) -> Dict:
# self.data.base.log_event("chuni", "login", logging.INFO, {"version": self.version, "user": data["userId"]})
"""
Handles the login bonus logic, required for the game because
getUserLoginBonus gets called after getUserItem and therefore the
items needs to be inserted in the database before they get requested.
Adds a bonusCount after a user logged in after 24 hours, makes sure
loginBonus 30 gets looped, only show the login banner every 24 hours,
adds the bonus to items (itemKind 6)
"""
# ignore the login bonus if disabled in config
if not self.game_cfg.mods.use_login_bonus:
return {"returnCode": 1}
user_id = data["userId"]
login_bonus_presets = self.data.static.get_login_bonus_presets(self.version)
for preset in login_bonus_presets:
# check if a user already has some pogress and if not add the
# login bonus entry
user_login_bonus = self.data.item.get_login_bonus(
user_id, self.version, preset["id"]
)
if user_login_bonus is None:
self.data.item.put_login_bonus(user_id, self.version, preset["id"])
# yeah i'm lazy
user_login_bonus = self.data.item.get_login_bonus(
user_id, self.version, preset["id"]
)
# skip the login bonus entirely if its already finished
if user_login_bonus["isFinished"]:
continue
# make sure the last login is more than 24 hours ago
if user_login_bonus["lastUpdateDate"] < datetime.now() - timedelta(
hours=24
):
# increase the login day counter and update the last login date
bonus_count = user_login_bonus["bonusCount"] + 1
last_update_date = datetime.now()
all_login_boni = self.data.static.get_login_bonus(
self.version, preset["id"]
)
# skip the current bonus preset if no boni were found
if all_login_boni is None or len(all_login_boni) < 1:
self.logger.warn(
f"No bonus entries found for bonus preset {preset['id']}"
)
continue
max_needed_days = all_login_boni[0]["needLoginDayCount"]
# make sure to not show login boni after all days got redeemed
is_finished = False
if bonus_count > max_needed_days:
# assume that all login preset ids under 3000 needs to be
# looped, like 30 and 40 are looped, 40 does not work?
if preset["id"] < 3000:
bonus_count = 1
else:
is_finished = True
# grab the item for the corresponding day
login_item = self.data.static.get_login_bonus_by_required_days(
self.version, preset["id"], bonus_count
)
if login_item is not None:
# now add the present to the database so the
# handle_get_user_item_api_request can grab them
self.data.item.put_item(
user_id,
{
"itemId": login_item["presentId"],
"itemKind": 6,
"stock": login_item["itemNum"],
"isValid": True,
},
)
self.data.item.put_login_bonus(
user_id,
self.version,
preset["id"],
bonusCount=bonus_count,
lastUpdateDate=last_update_date,
isWatched=False,
isFinished=is_finished,
)
return {"returnCode": 1}
def handle_game_logout_api_request(self, data: Dict) -> Dict:
@ -130,7 +221,7 @@ class ChuniBase:
return {
"userId": data["userId"],
"length": len(activity_list),
"kind": data["kind"],
"kind": int(data["kind"]),
"userActivityList": activity_list,
}
@ -309,26 +400,29 @@ class ChuniBase:
}
def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict:
"""
Unsure how to get this to trigger...
"""
user_id = data["userId"]
user_login_bonus = self.data.item.get_all_login_bonus(user_id, self.version)
# ignore the loginBonus request if its disabled in config
if user_login_bonus is None or not self.game_cfg.mods.use_login_bonus:
return {"userId": user_id, "length": 0, "userLoginBonusList": []}
user_login_list = []
for bonus in user_login_bonus:
user_login_list.append(
{
"presetId": bonus["presetId"],
"bonusCount": bonus["bonusCount"],
"lastUpdateDate": datetime.strftime(
bonus["lastUpdateDate"], "%Y-%m-%d %H:%M:%S"
),
"isWatched": bonus["isWatched"],
}
)
return {
"userId": data["userId"],
"length": 2,
"userLoginBonusList": [
{
"presetId": "10",
"bonusCount": "0",
"lastUpdateDate": "1970-01-01 09:00:00",
"isWatched": "true",
},
{
"presetId": "20",
"bonusCount": "0",
"lastUpdateDate": "1970-01-01 09:00:00",
"isWatched": "true",
},
],
"userId": user_id,
"length": len(user_login_list),
"userLoginBonusList": user_login_list,
}
def handle_get_user_map_api_request(self, data: Dict) -> Dict:
@ -451,13 +545,13 @@ class ChuniBase:
"playerLevel": profile["playerLevel"],
"rating": profile["rating"],
"headphone": profile["headphone"],
"chargeState": "1",
"chargeState": 1,
"userNameEx": profile["userName"],
}
def handle_get_user_recent_rating_api_request(self, data: Dict) -> Dict:
recet_rating_list = self.data.profile.get_profile_recent_rating(data["userId"])
if recet_rating_list is None:
recent_rating_list = self.data.profile.get_profile_recent_rating(data["userId"])
if recent_rating_list is None:
return {
"userId": data["userId"],
"length": 0,
@ -466,8 +560,8 @@ class ChuniBase:
return {
"userId": data["userId"],
"length": len(recet_rating_list["recentRating"]),
"userRecentRatingList": recet_rating_list["recentRating"],
"length": len(recent_rating_list["recentRating"]),
"userRecentRatingList": recent_rating_list["recentRating"],
}
def handle_get_user_region_api_request(self, data: Dict) -> Dict:
@ -479,8 +573,24 @@ class ChuniBase:
}
def handle_get_user_team_api_request(self, data: Dict) -> Dict:
# TODO: Team
return {"userId": data["userId"], "teamId": 0}
# TODO: use the database "chuni_profile_team" with a GUI
team_name = self.game_cfg.team.team_name
if team_name == "":
return {"userId": data["userId"], "teamId": 0}
return {
"userId": data["userId"],
"teamId": 1,
"teamRank": 1,
"teamName": team_name,
"userTeamPoint": {
"userId": data["userId"],
"teamId": 1,
"orderId": 1,
"teamPoint": 1,
"aggrDate": data["playDate"],
},
}
def handle_get_team_course_setting_api_request(self, data: Dict) -> Dict:
return {
@ -580,9 +690,18 @@ class ChuniBase:
for emoney in upsert["userEmoneyList"]:
self.data.profile.put_profile_emoney(user_id, emoney)
if "userLoginBonusList" in upsert:
for login in upsert["userLoginBonusList"]:
self.data.item.put_login_bonus(
user_id, self.version, login["presetId"], isWatched=True
)
return {"returnCode": "1"}
def handle_upsert_user_chargelog_api_request(self, data: Dict) -> Dict:
# add tickets after they got bought, this makes sure the tickets are
# still valid after an unsuccessful logout
self.data.profile.put_profile_charge(data["userId"], data["userCharge"])
return {"returnCode": "1"}
def handle_upsert_client_bookkeeping_api_request(self, data: Dict) -> Dict:
@ -603,7 +722,5 @@ class ChuniBase:
def handle_get_user_net_battle_data_api_request(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"userNetBattleData": {
"recentNBSelectMusicList": []
}
"userNetBattleData": {"recentNBSelectMusicList": []},
}

View File

@ -21,6 +21,42 @@ class ChuniServerConfig:
)
class ChuniTeamConfig:
def __init__(self, parent_config: "ChuniConfig") -> None:
self.__config = parent_config
@property
def team_name(self) -> str:
return CoreConfig.get_config_field(
self.__config, "chuni", "team", "name", default=""
)
class ChuniModsConfig:
def __init__(self, parent_config: "ChuniConfig") -> None:
self.__config = parent_config
@property
def use_login_bonus(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "chuni", "mods", "use_login_bonus", default=True
)
class ChuniVersionConfig:
def __init__(self, parent_config: "ChuniConfig") -> None:
self.__config = parent_config
def version(self, version: int) -> Dict:
"""
in the form of:
11: {"rom": 2.00.00, "data": 2.00.00}
"""
return CoreConfig.get_config_field(
self.__config, "chuni", "version", default={}
)[version]
class ChuniCryptoConfig:
def __init__(self, parent_config: "ChuniConfig") -> None:
self.__config = parent_config
@ -46,4 +82,7 @@ class ChuniCryptoConfig:
class ChuniConfig(dict):
def __init__(self) -> None:
self.server = ChuniServerConfig(self)
self.team = ChuniTeamConfig(self)
self.mods = ChuniModsConfig(self)
self.version = ChuniVersionConfig(self)
self.crypto = ChuniCryptoConfig(self)

View File

@ -49,8 +49,8 @@ class ChuniNew(ChuniBase):
"matchEndTime": match_end,
"matchTimeLimit": 99,
"matchErrorLimit": 9999,
"romVersion": "2.00.00",
"dataVersion": "2.00.00",
"romVersion": self.game_cfg.version.version(self.version)["rom"],
"dataVersion": self.game_cfg.version.version(self.version)["data"],
"matchingUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"matchingUriX": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
"udpHolePunchUri": f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/200/ChuniServlet/",
@ -269,10 +269,10 @@ class ChuniNew(ChuniBase):
tmp = user_print_list[x]._asdict()
print_list.append(tmp["cardId"])
if len(user_print_list) >= max_ct:
if len(print_list) >= max_ct:
break
if len(user_print_list) >= max_ct:
if len(print_list) >= max_ct:
next_idx = next_idx + max_ct
else:
next_idx = -1
@ -454,9 +454,7 @@ class ChuniNew(ChuniBase):
# set the card print state to success and use the orderId as the key
self.data.item.put_user_print_state(
user_id,
id=upsert["orderId"],
hasCompleted=True
user_id, id=upsert["orderId"], hasCompleted=True
)
return {"returnCode": "1", "apiName": "CMUpsertUserPrintSubtractApi"}
@ -467,10 +465,6 @@ class ChuniNew(ChuniBase):
# set the card print state to success and use the orderId as the key
for order_id in order_ids:
self.data.item.put_user_print_state(
user_id,
id=order_id,
hasCompleted=True
)
self.data.item.put_user_print_state(user_id, id=order_id, hasCompleted=True)
return {"returnCode": "1", "apiName": "CMUpsertUserPrintCancelApi"}

View File

@ -1,6 +1,4 @@
from datetime import datetime, timedelta
from typing import Dict, Any
import pytz
from core.config import CoreConfig
from titles.chuni.new import ChuniNew
@ -15,8 +13,8 @@ class ChuniNewPlus(ChuniNew):
def handle_get_game_setting_api_request(self, data: Dict) -> Dict:
ret = super().handle_get_game_setting_api_request(data)
ret["gameSetting"]["romVersion"] = "2.05.00"
ret["gameSetting"]["dataVersion"] = "2.05.00"
ret["gameSetting"]["romVersion"] = self.game_cfg.version.version(self.version)["rom"]
ret["gameSetting"]["dataVersion"] = self.game_cfg.version.version(self.version)["data"]
ret["gameSetting"][
"matchingUri"
] = f"http://{self.core_cfg.title.hostname}:{self.core_cfg.title.port}/SDHD/205/ChuniServlet/"

View File

@ -42,6 +42,80 @@ class ChuniReader(BaseReader):
self.read_music(f"{dir}/music")
self.read_charges(f"{dir}/chargeItem")
self.read_avatar(f"{dir}/avatarAccessory")
self.read_login_bonus(f"{dir}/")
def read_login_bonus(self, root_dir: str) -> None:
for root, dirs, files in walk(f"{root_dir}loginBonusPreset"):
for dir in dirs:
if path.exists(f"{root}/{dir}/LoginBonusPreset.xml"):
with open(f"{root}/{dir}/LoginBonusPreset.xml", "rb") as fp:
bytedata = fp.read()
strdata = bytedata.decode("UTF-8")
xml_root = ET.fromstring(strdata)
for name in xml_root.findall("name"):
id = name.find("id").text
name = name.find("str").text
is_enabled = (
True if xml_root.find("disableFlag").text == "false" else False
)
result = self.data.static.put_login_bonus_preset(
self.version, id, name, is_enabled
)
if result is not None:
self.logger.info(f"Inserted login bonus preset {id}")
else:
self.logger.warn(f"Failed to insert login bonus preset {id}")
for bonus in xml_root.find("infos").findall("LoginBonusDataInfo"):
for name in bonus.findall("loginBonusName"):
bonus_id = name.find("id").text
bonus_name = name.find("str").text
if path.exists(
f"{root_dir}/loginBonus/loginBonus{bonus_id}/LoginBonus.xml"
):
with open(
f"{root_dir}/loginBonus/loginBonus{bonus_id}/LoginBonus.xml",
"rb",
) as fp:
bytedata = fp.read()
strdata = bytedata.decode("UTF-8")
bonus_root = ET.fromstring(strdata)
for present in bonus_root.findall("present"):
present_id = present.find("id").text
present_name = present.find("str").text
item_num = int(bonus_root.find("itemNum").text)
need_login_day_count = int(
bonus_root.find("needLoginDayCount").text
)
login_bonus_category_type = int(
bonus_root.find("loginBonusCategoryType").text
)
result = self.data.static.put_login_bonus(
self.version,
id,
bonus_id,
bonus_name,
present_id,
present_name,
item_num,
need_login_day_count,
login_bonus_category_type,
)
if result is not None:
self.logger.info(f"Inserted login bonus {bonus_id}")
else:
self.logger.warn(
f"Failed to insert login bonus {bonus_id}"
)
def read_events(self, evt_dir: str) -> None:
for root, dirs, files in walk(evt_dir):

View File

@ -184,8 +184,73 @@ print_detail = Table(
mysql_charset="utf8mb4",
)
login_bonus = Table(
"chuni_item_login_bonus",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("version", Integer, nullable=False),
Column("presetId", Integer, nullable=False),
Column("bonusCount", Integer, nullable=False, server_default="0"),
Column("lastUpdateDate", TIMESTAMP, server_default="2018-01-01 00:00:00.0"),
Column("isWatched", Boolean, server_default="0"),
Column("isFinished", Boolean, server_default="0"),
UniqueConstraint("version", "user", "presetId", name="chuni_item_login_bonus_uk"),
mysql_charset="utf8mb4",
)
class ChuniItemData(BaseData):
def put_login_bonus(
self, user_id: int, version: int, preset_id: int, **login_bonus_data
) -> Optional[int]:
sql = insert(login_bonus).values(
version=version, user=user_id, presetId=preset_id, **login_bonus_data
)
conflict = sql.on_duplicate_key_update(presetId=preset_id, **login_bonus_data)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def get_all_login_bonus(
self, user_id: int, version: int, is_finished: bool = False
) -> Optional[List[Row]]:
sql = login_bonus.select(
and_(
login_bonus.c.version == version,
login_bonus.c.user == user_id,
login_bonus.c.isFinished == is_finished,
)
)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def get_login_bonus(
self, user_id: int, version: int, preset_id: int
) -> Optional[Row]:
sql = login_bonus.select(
and_(
login_bonus.c.version == version,
login_bonus.c.user == user_id,
login_bonus.c.presetId == preset_id,
)
)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def put_character(self, user_id: int, character_data: Dict) -> Optional[int]:
character_data["user"] = user_id
@ -335,7 +400,7 @@ class ChuniItemData(BaseData):
sql = print_state.select(
and_(
print_state.c.user == aime_id,
print_state.c.hasCompleted == has_completed
print_state.c.hasCompleted == has_completed,
)
)
@ -351,7 +416,7 @@ class ChuniItemData(BaseData):
and_(
print_state.c.user == aime_id,
print_state.c.gachaId == gacha_id,
print_state.c.hasCompleted == has_completed
print_state.c.hasCompleted == has_completed,
)
)
@ -380,9 +445,7 @@ class ChuniItemData(BaseData):
user=aime_id, serialId=serial_id, **user_print_data
)
conflict = sql.on_duplicate_key_update(
user=aime_id, **user_print_data
)
conflict = sql.on_duplicate_key_update(user=aime_id, **user_print_data)
result = self.execute(conflict)
if result is None:

View File

@ -558,8 +558,10 @@ class ChuniProfileData(BaseData):
return result.lastrowid
def get_profile_activity(self, aime_id: int, kind: int) -> Optional[List[Row]]:
sql = select(activity).where(
and_(activity.c.user == aime_id, activity.c.kind == kind)
sql = (
select(activity)
.where(and_(activity.c.user == aime_id, activity.c.kind == kind))
.order_by(activity.c.sortNumber.desc()) # to get the last played track
)
result = self.execute(sql)

View File

@ -122,8 +122,148 @@ gacha_cards = Table(
mysql_charset="utf8mb4",
)
login_bonus_preset = Table(
"chuni_static_login_bonus_preset",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column("presetName", String(255), nullable=False),
Column("isEnabled", Boolean, server_default="1"),
UniqueConstraint("version", "id", name="chuni_static_login_bonus_preset_uk"),
mysql_charset="utf8mb4",
)
login_bonus = Table(
"chuni_static_login_bonus",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("version", Integer, nullable=False),
Column(
"presetId",
ForeignKey(
"chuni_static_login_bonus_preset.id",
ondelete="cascade",
onupdate="cascade",
),
nullable=False,
),
Column("loginBonusId", Integer, nullable=False),
Column("loginBonusName", String(255), nullable=False),
Column("presentId", Integer, nullable=False),
Column("presentName", String(255), nullable=False),
Column("itemNum", Integer, nullable=False),
Column("needLoginDayCount", Integer, nullable=False),
Column("loginBonusCategoryType", Integer, nullable=False),
UniqueConstraint(
"version", "presetId", "loginBonusId", name="chuni_static_login_bonus_uk"
),
mysql_charset="utf8mb4",
)
class ChuniStaticData(BaseData):
def put_login_bonus(
self,
version: int,
preset_id: int,
login_bonus_id: int,
login_bonus_name: str,
present_id: int,
present_ame: str,
item_num: int,
need_login_day_count: int,
login_bonus_category_type: int,
) -> Optional[int]:
sql = insert(login_bonus).values(
version=version,
presetId=preset_id,
loginBonusId=login_bonus_id,
loginBonusName=login_bonus_name,
presentId=present_id,
presentName=present_ame,
itemNum=item_num,
needLoginDayCount=need_login_day_count,
loginBonusCategoryType=login_bonus_category_type,
)
conflict = sql.on_duplicate_key_update(
loginBonusName=login_bonus_name,
presentName=present_ame,
itemNum=item_num,
needLoginDayCount=need_login_day_count,
loginBonusCategoryType=login_bonus_category_type,
)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def get_login_bonus(
self, version: int, preset_id: int,
) -> Optional[List[Row]]:
sql = login_bonus.select(
and_(
login_bonus.c.version == version,
login_bonus.c.presetId == preset_id,
)
).order_by(login_bonus.c.needLoginDayCount.desc())
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def get_login_bonus_by_required_days(
self, version: int, preset_id: int, need_login_day_count: int
) -> Optional[Row]:
sql = login_bonus.select(
and_(
login_bonus.c.version == version,
login_bonus.c.presetId == preset_id,
login_bonus.c.needLoginDayCount == need_login_day_count,
)
)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def put_login_bonus_preset(
self, version: int, preset_id: int, preset_name: str, is_enabled: bool
) -> Optional[int]:
sql = insert(login_bonus_preset).values(
id=preset_id,
version=version,
presetName=preset_name,
isEnabled=is_enabled,
)
conflict = sql.on_duplicate_key_update(
presetName=preset_name, isEnabled=is_enabled
)
result = self.execute(conflict)
if result is None:
return None
return result.lastrowid
def get_login_bonus_presets(
self, version: int, is_enabled: bool = True
) -> Optional[List[Row]]:
sql = login_bonus_preset.select(
and_(
login_bonus_preset.c.version == version,
login_bonus_preset.c.isEnabled == is_enabled,
)
)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()
def put_event(
self, version: int, event_id: int, type: int, name: str
) -> Optional[int]:
@ -390,20 +530,17 @@ class ChuniStaticData(BaseData):
return None
return result.fetchall()
def get_gacha_card_by_character(self, gacha_id: int, chara_id: int) -> Optional[Dict]:
def get_gacha_card_by_character(
self, gacha_id: int, chara_id: int
) -> Optional[Dict]:
sql_sub = (
select(cards.c.cardId)
.filter(
cards.c.charaId == chara_id
)
.scalar_subquery()
select(cards.c.cardId).filter(cards.c.charaId == chara_id).scalar_subquery()
)
# Perform the main query, also rename the resulting column to ranking
sql = gacha_cards.select(and_(
gacha_cards.c.gachaId == gacha_id,
gacha_cards.c.cardId == sql_sub
))
sql = gacha_cards.select(
and_(gacha_cards.c.gachaId == gacha_id, gacha_cards.c.cardId == sql_sub)
)
result = self.execute(sql)
if result is None: