Merge pull request 'Aime Locks/Bans and Chunithm Improvements' (#47) from EmmyHeart/artemis:develop into develop

Reviewed-on: Hay1tsme/artemis#47
This commit is contained in:
Midorica 2023-10-27 17:25:35 +00:00
commit a8f06ee266
5 changed files with 150 additions and 28 deletions

View File

@ -145,7 +145,15 @@ class AimedbProtocol(Protocol):
def handle_lookup(self, data: bytes, resp_code: int) -> ADBBaseResponse: def handle_lookup(self, data: bytes, resp_code: int) -> ADBBaseResponse:
req = ADBLookupRequest(data) req = ADBLookupRequest(data)
user_id = self.data.card.get_user_id_from_card(req.access_code) user_id = self.data.card.get_user_id_from_card(req.access_code)
is_banned = self.data.card.get_card_banned(req.access_code)
is_locked = self.data.card.get_card_locked(req.access_code)
if is_banned and is_locked:
ret.head.status = ADBStatus.BAN_SYS_USER
elif is_banned:
ret.head.status = ADBStatus.BAN_SYS
elif is_locked:
ret.head.status = ADBStatus.LOCK_USER
ret = ADBLookupResponse.from_req(req.head, user_id) ret = ADBLookupResponse.from_req(req.head, user_id)
self.logger.info( self.logger.info(
@ -157,7 +165,16 @@ class AimedbProtocol(Protocol):
req = ADBLookupRequest(data) req = ADBLookupRequest(data)
user_id = self.data.card.get_user_id_from_card(req.access_code) user_id = self.data.card.get_user_id_from_card(req.access_code)
is_banned = self.data.card.get_card_banned(req.access_code)
is_locked = self.data.card.get_card_locked(req.access_code)
ret = ADBLookupExResponse.from_req(req.head, user_id) ret = ADBLookupExResponse.from_req(req.head, user_id)
if is_banned and is_locked:
ret.head.status = ADBStatus.BAN_SYS_USER
elif is_banned:
ret.head.status = ADBStatus.BAN_SYS
elif is_locked:
ret.head.status = ADBStatus.LOCK_USER
self.logger.info( self.logger.info(
f"access_code {req.access_code} -> user_id {ret.user_id}" f"access_code {req.access_code} -> user_id {ret.user_id}"

View File

@ -64,6 +64,27 @@ class CardData(BaseData):
return int(card["user"]) return int(card["user"])
def get_card_banned(self, access_code: str) -> Optional[bool]:
"""
Given a 20 digit access code as a string, check if the card is banned
"""
card = self.get_card_by_access_code(access_code)
if card is None:
return None
if card["is_banned"]:
return True
return False
def get_card_locked(self, access_code: str) -> Optional[bool]:
"""
Given a 20 digit access code as a string, check if the card is locked
"""
card = self.get_card_by_access_code(access_code)
if card is None:
return None
if card["is_locked"]:
return True
return False
def delete_card(self, card_id: int) -> None: def delete_card(self, card_id: int) -> None:
sql = aime_card.delete(aime_card.c.id == card_id) sql = aime_card.delete(aime_card.c.id == card_id)

View File

@ -5,7 +5,6 @@ server:
team: team:
name: ARTEMiS # If this is set, all players that are not on a team will use this one by default. name: ARTEMiS # If this is set, all players that are not on a team will use this one by default.
rank_scale: True # Scales the in-game ranking based on the number of teams within the database, rather than the default scale of ~100 that the game normally uses.
mods: mods:
use_login_bonus: True use_login_bonus: True

View File

@ -194,7 +194,8 @@ class ChuniBase:
} }
def handle_get_game_ranking_api_request(self, data: Dict) -> Dict: def handle_get_game_ranking_api_request(self, data: Dict) -> Dict:
return {"type": data["type"], "gameRankingList": []} rankings = self.data.score.get_rankings(self.version)
return {"type": data["type"], "gameRankingList": rankings}
def handle_get_game_sale_api_request(self, data: Dict) -> Dict: def handle_get_game_sale_api_request(self, data: Dict) -> Dict:
return {"type": data["type"], "length": 0, "gameSaleList": []} return {"type": data["type"], "length": 0, "gameSaleList": []}
@ -401,7 +402,6 @@ class ChuniBase:
"userId": data["userId"], "userId": data["userId"],
"userRivalData": userRivalData "userRivalData": userRivalData
} }
def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict: def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict:
rival_id = data["rivalId"] rival_id = data["rivalId"]
next_index = int(data["nextIndex"]) next_index = int(data["nextIndex"])
@ -415,10 +415,10 @@ class ChuniBase:
for music in all_entries[next_index:]: for music in all_entries[next_index:]:
music_id = music["musicId"] music_id = music["musicId"]
level = music["level"] level = music["level"]
score = music["score"] score = music["scoreMax"]
rank = music["rank"] rank = music["scoreRank"]
# Create a music entry for the current music_id if it's unique # Create a music entry for the current music_id
music_entry = next((entry for entry in user_rival_music_list if entry["musicId"] == music_id), None) music_entry = next((entry for entry in user_rival_music_list if entry["musicId"] == music_id), None)
if music_entry is None: if music_entry is None:
music_entry = { music_entry = {
@ -428,20 +428,15 @@ class ChuniBase:
} }
user_rival_music_list.append(music_entry) user_rival_music_list.append(music_entry)
# Create a level entry for the current level if it's unique or has a higher score # Create a level entry for the current level
level_entry = next((entry for entry in music_entry["userRivalMusicDetailList"] if entry["level"] == level), None) level_entry = {
if level_entry is None: "level": level,
level_entry = { "scoreMax": score,
"level": level, "scoreRank": rank
"scoreMax": score, }
"scoreRank": rank music_entry["userRivalMusicDetailList"].append(level_entry)
}
music_entry["userRivalMusicDetailList"].append(level_entry)
elif score > level_entry["scoreMax"]:
level_entry["scoreMax"] = score
level_entry["scoreRank"] = rank
# Calculate the length for each "musicId" by counting the unique levels # Calculate the length for each "musicId" by counting the levels
for music_entry in user_rival_music_list: for music_entry in user_rival_music_list:
music_entry["length"] = len(music_entry["userRivalMusicDetailList"]) music_entry["length"] = len(music_entry["userRivalMusicDetailList"])
@ -747,16 +742,57 @@ class ChuniBase:
return { return {
"userId": data["userId"], "userId": data["userId"],
"length": 0, "length": 0,
"nextIndex": 0, "nextIndex": -1,
"teamCourseSettingList": [], "teamCourseSettingList": [],
} }
def handle_get_team_course_setting_api_request_proto(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"length": 1,
"nextIndex": -1,
"teamCourseSettingList": [
{
"orderId": 1,
"courseId": 1,
"classId": 1,
"ruleId": 1,
"courseName": "Test",
"teamCourseMusicList": [
{"track": 184, "type": 1, "level": 3, "selectLevel": -1},
{"track": 184, "type": 1, "level": 3, "selectLevel": -1},
{"track": 184, "type": 1, "level": 3, "selectLevel": -1}
],
"teamCourseRankingInfoList": [],
"recodeDate": "2099-12-31 11:59:99.0",
"isPlayed": False
}
],
}
def handle_get_team_course_rule_api_request(self, data: Dict) -> Dict: def handle_get_team_course_rule_api_request(self, data: Dict) -> Dict:
return { return {
"userId": data["userId"], "userId": data["userId"],
"length": 0, "length": 0,
"nextIndex": 0, "nextIndex": -1,
"teamCourseRuleList": [], "teamCourseRuleList": []
}
def handle_get_team_course_rule_api_request_proto(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
"length": 1,
"nextIndex": -1,
"teamCourseRuleList": [
{
"recoveryLife": 0,
"clearLife": 100,
"damageMiss": 1,
"damageAttack": 1,
"damageJustice": 1,
"damageJusticeC": 1
}
],
} }
def handle_upsert_user_all_api_request(self, data: Dict) -> Dict: def handle_upsert_user_all_api_request(self, data: Dict) -> Dict:
@ -830,7 +866,7 @@ class ChuniBase:
playlog["playedUserName1"] = self.read_wtf8(playlog["playedUserName1"]) playlog["playedUserName1"] = self.read_wtf8(playlog["playedUserName1"])
playlog["playedUserName2"] = self.read_wtf8(playlog["playedUserName2"]) playlog["playedUserName2"] = self.read_wtf8(playlog["playedUserName2"])
playlog["playedUserName3"] = self.read_wtf8(playlog["playedUserName3"]) playlog["playedUserName3"] = self.read_wtf8(playlog["playedUserName3"])
self.data.score.put_playlog(user_id, playlog) self.data.score.put_playlog(user_id, playlog, self.version)
if "userTeamPoint" in upsert: if "userTeamPoint" in upsert:
team_points = upsert["userTeamPoint"] team_points = upsert["userTeamPoint"]

View File

@ -6,7 +6,7 @@ from sqlalchemy.schema import ForeignKey
from sqlalchemy.engine import Row from sqlalchemy.engine import Row
from sqlalchemy.sql import func, select from sqlalchemy.sql import func, select
from sqlalchemy.dialects.mysql import insert from sqlalchemy.dialects.mysql import insert
from sqlalchemy.sql.expression import exists
from core.data.schema import BaseData, metadata from core.data.schema import BaseData, metadata
course = Table( course = Table(
@ -189,9 +189,28 @@ class ChuniScoreData(BaseData):
return None return None
return result.fetchall() return result.fetchall()
def put_playlog(self, aime_id: int, playlog_data: Dict) -> Optional[int]: def put_playlog(self, aime_id: int, playlog_data: Dict, version: int) -> Optional[int]:
# Calculate the ROM version that should be inserted into the DB, based on the version of the ggame being inserted
# We only need from Version 10 (Plost) and back, as newer versions include romVersion in their upsert
# This matters both for gameRankings, as well as a future DB update to keep version data separate
romVer = {
10: "1.50.0",
9: "1.45.0",
8: "1.40.0",
7: "1.35.0",
6: "1.30.0",
5: "1.25.0",
4: "1.20.0",
3: "1.15.0",
2: "1.10.0",
1: "1.05.0",
0: "1.00.0"
}
playlog_data["user"] = aime_id playlog_data["user"] = aime_id
playlog_data = self.fix_bools(playlog_data) playlog_data = self.fix_bools(playlog_data)
if "romVersion" not in playlog_data:
playlog_data["romVersion"] = romVer.get(version, "1.00.0")
sql = insert(playlog).values(**playlog_data) sql = insert(playlog).values(**playlog_data)
conflict = sql.on_duplicate_key_update(**playlog_data) conflict = sql.on_duplicate_key_update(**playlog_data)
@ -201,9 +220,39 @@ class ChuniScoreData(BaseData):
return None return None
return result.lastrowid return result.lastrowid
def get_rankings(self, version: int) -> Optional[List[Dict]]:
# Calculates the ROM version that should be fetched for rankings, based on the game version being retrieved
# This prevents tracks that are not accessible in your version from counting towards the 10 results
romVer = {
13: "2.10%",
12: "2.05%",
11: "2.00%",
10: "1.50%",
9: "1.45%",
8: "1.40%",
7: "1.35%",
6: "1.30%",
5: "1.25%",
4: "1.20%",
3: "1.15%",
2: "1.10%",
1: "1.05%",
0: "1.00%"
}
sql = select([playlog.c.musicId.label('id'), func.count(playlog.c.musicId).label('point')]).where((playlog.c.level != 4) & (playlog.c.romVersion.like(romVer.get(version, "%")))).group_by(playlog.c.musicId).order_by(func.count(playlog.c.musicId).desc()).limit(10)
result = self.execute(sql)
if result is None:
return None
rows = result.fetchall()
return [dict(row) for row in rows]
def get_rival_music(self, rival_id: int) -> Optional[List[Dict]]: def get_rival_music(self, rival_id: int) -> Optional[List[Dict]]:
sql = select(playlog).where(playlog.c.user == rival_id) sql = select(best_score).where(best_score.c.user == rival_id)
result = self.execute(sql) result = self.execute(sql)
if result is None: if result is None:
return None return None
return result.fetchall() return result.fetchall()