artemis/titles/cxb/base.py

579 lines
21 KiB
Python

import logging
import json
from decimal import Decimal
from base64 import b64encode
from typing import Any, Dict, List
from os import path
from core.config import CoreConfig
from .config import CxbConfig
from .const import CxbConstants
from .database import CxbData
from threading import Thread
class CxbBase:
def __init__(self, cfg: CoreConfig, game_cfg: CxbConfig) -> None:
self.config = cfg # Config file
self.game_config = game_cfg
self.data = CxbData(cfg) # Database
self.game = CxbConstants.GAME_CODE
self.logger = logging.getLogger("cxb")
self.version = CxbConstants.VER_CROSSBEATS_REV
def _get_data_contents(self, folder: str, filetype: str, encoding: str = None, subfolder: str = "") -> List[str]:
if path.exists(f"titles/cxb/data/{folder}/{subfolder}{filetype}.csv"):
with open(f"titles/cxb/data/{folder}/{subfolder}{filetype}.csv", encoding=encoding) as f:
return f.readlines()
return []
async def handle_action_rpreq_request(self, data: Dict) -> Dict:
return {}
async def handle_action_hitreq_request(self, data: Dict) -> Dict:
return {"data": []}
async def handle_auth_usercheck_request(self, data: Dict) -> Dict:
profile = await self.data.profile.get_profile_index(
0, data["usercheck"]["authid"], self.version
)
if profile is not None:
self.logger.info(f"User {data['usercheck']['authid']} has CXB profile")
return {"exist": "true", "logout": "true"}
self.logger.info(f"No profile for aime id {data['usercheck']['authid']}")
return {"exist": "false", "logout": "true"}
async def handle_auth_entry_request(self, data: Dict) -> Dict:
self.logger.info(f"New profile for {data['entry']['authid']}")
return {"token": data["entry"]["authid"], "uid": data["entry"]["authid"]}
async def handle_auth_login_request(self, data: Dict) -> Dict:
profile = await self.data.profile.get_profile_index(
0, data["login"]["authid"], self.version
)
if profile is not None:
self.logger.info(f"Login user {data['login']['authid']}")
return {"token": data["login"]["authid"], "uid": data["login"]["authid"]}
self.logger.warning(f"User {data['login']['authid']} does not have a profile")
return {}
def task_generateCoupon(index, data1):
# Coupons
for i in range(500, 510):
index.append(str(i))
couponid = int(i) - 500
dataValue = [
{
"couponId": str(couponid),
"couponNum": "1",
"couponLog": [],
}
]
data1.append(
b64encode(
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
).decode("utf-8")
)
def task_generateShopListTitle(index, data1):
# ShopList_Title
for i in range(200000, 201451):
index.append(str(i))
shopid = int(i) - 200000
dataValue = [
{
"shopId": shopid,
"shopState": "2",
"isDisable": "t",
"isDeleted": "f",
"isSpecialFlag": "f",
}
]
data1.append(
b64encode(
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
).decode("utf-8")
)
def task_generateShopListIcon(index, data1):
# ShopList_Icon
for i in range(202000, 202264):
index.append(str(i))
shopid = int(i) - 200000
dataValue = [
{
"shopId": shopid,
"shopState": "2",
"isDisable": "t",
"isDeleted": "f",
"isSpecialFlag": "f",
}
]
data1.append(
b64encode(
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
).decode("utf-8")
)
def task_generateStories(index, data1):
# Stories
for i in range(900000, 900003):
index.append(str(i))
storyid = int(i) - 900000
dataValue = [
{
"storyId": storyid,
"unlockState1": ["t"] * 10,
"unlockState2": ["t"] * 10,
"unlockState3": ["t"] * 10,
"unlockState4": ["t"] * 10,
"unlockState5": ["t"] * 10,
"unlockState6": ["t"] * 10,
"unlockState7": ["t"] * 10,
"unlockState8": ["t"] * 10,
"unlockState9": ["t"] * 10,
"unlockState10": ["t"] * 10,
"unlockState11": ["t"] * 10,
"unlockState12": ["t"] * 10,
"unlockState13": ["t"] * 10,
"unlockState14": ["t"] * 10,
"unlockState15": ["t"] * 10,
"unlockState16": ["t"] * 10,
}
]
data1.append(
b64encode(
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
).decode("utf-8")
)
def task_generateScoreData(song, index, data1):
song_data = song["data"]
songCode = []
songCode.append(
{
"mcode": song_data["mcode"],
"musicState": song_data["musicState"],
"playCount": song_data["playCount"],
"totalScore": song_data["totalScore"],
"highScore": song_data["highScore"],
"everHighScore": song_data["everHighScore"]
if "everHighScore" in song_data
else ["0", "0", "0", "0", "0"],
"clearRate": song_data["clearRate"],
"rankPoint": song_data["rankPoint"],
"normalCR": song_data["normalCR"]
if "normalCR" in song_data
else ["0", "0", "0", "0", "0"],
"survivalCR": song_data["survivalCR"]
if "survivalCR" in song_data
else ["0", "0", "0", "0", "0"],
"ultimateCR": song_data["ultimateCR"]
if "ultimateCR" in song_data
else ["0", "0", "0", "0", "0"],
"nohopeCR": song_data["nohopeCR"]
if "nohopeCR" in song_data
else ["0", "0", "0", "0", "0"],
"combo": song_data["combo"],
"coupleUserId": song_data["coupleUserId"],
"difficulty": song_data["difficulty"],
"isFullCombo": song_data["isFullCombo"],
"clearGaugeType": song_data["clearGaugeType"],
"fieldType": song_data["fieldType"],
"gameType": song_data["gameType"],
"grade": song_data["grade"],
"unlockState": song_data["unlockState"],
"extraState": song_data["extraState"],
}
)
index.append(song_data["index"])
data1.append(
b64encode(
bytes(json.dumps(songCode[0], separators=(",", ":")), "utf-8")
).decode("utf-8")
)
async def handle_action_loadrange_request(self, data: Dict) -> Dict:
range_start = data["loadrange"]["range"][0]
range_end = data["loadrange"]["range"][1]
uid = data["loadrange"]["uid"]
self.logger.info(f"Load data for {uid}")
profile = await self.data.profile.get_profile(uid, self.version)
songs = await self.data.score.get_best_scores(uid)
data1 = []
index = []
versionindex = []
for profile_index in profile:
profile_data = profile_index["data"]
if int(range_start) == 800000:
return {"index": range_start, "data": [], "version": 10400}
if not (int(range_start) <= int(profile_index[3]) <= int(range_end)):
continue
# Prevent loading of the coupons within the profile to use the force unlock instead
elif 500 <= int(profile_index[3]) <= 510:
continue
# Prevent loading of songs saved in the profile
elif 100000 <= int(profile_index[3]) <= 110000:
continue
# Prevent loading of the shop list / unlocked titles & icons saved in the profile
elif 200000 <= int(profile_index[3]) <= 210000:
continue
# Prevent loading of stories in the profile
elif 900000 <= int(profile_index[3]) <= 900200:
continue
else:
index.append(profile_index[3])
data1.append(
b64encode(
bytes(json.dumps(profile_data, separators=(",", ":")), "utf-8")
).decode("utf-8")
)
"""
100000 = Songs
200000 = Shop
300000 = Courses
400000 = Events
500000 = Challenges
600000 = Bonuses
700000 = rcLog
800000 = Partners
900000 = Stories
"""
# Async threads to generate the response
thread_Coupon = Thread(target=CxbBase.task_generateCoupon(index, data1))
thread_ShopListTitle = Thread(target=CxbBase.task_generateShopListTitle(index, data1))
thread_ShopListIcon = Thread(target=CxbBase.task_generateShopListIcon(index, data1))
thread_Stories = Thread(target=CxbBase.task_generateStories(index, data1))
thread_Coupon.start()
thread_ShopListTitle.start()
thread_ShopListIcon.start()
thread_Stories.start()
thread_Coupon.join()
thread_ShopListTitle.join()
thread_ShopListIcon.join()
thread_Stories.join()
for song in songs:
thread_ScoreData = Thread(target=CxbBase.task_generateScoreData(song, index, data1))
thread_ScoreData.start()
v_profile = await self.data.profile.get_profile_index(0, uid, self.version)
v_profile_data = v_profile["data"]
for _, data in enumerate(profile):
if v_profile_data:
versionindex.append(int(v_profile_data["appVersion"]))
else:
versionindex.append("10400")
return {"index": index, "data": data1, "version": versionindex}
async def handle_action_saveindex_request(self, data: Dict) -> Dict:
save_data = data["saveindex"]
try:
# REV Omnimix Version Fetcher
gameversion = data["saveindex"]["data"][0][2]
self.logger.warning(f"Game Version is {gameversion}")
except Exception:
pass
if "10205" in gameversion:
self.logger.info(
f"Saving CrossBeats REV profile for {data['saveindex']['uid']}"
)
# Alright.... time to bring the jank code
for value in data["saveindex"]["data"]:
if "playedUserId" in value[1]:
await self.data.profile.put_profile(
data["saveindex"]["uid"], self.version, value[0], value[1]
)
if "mcode" not in value[1]:
await self.data.profile.put_profile(
data["saveindex"]["uid"], self.version, value[0], value[1]
)
if "shopId" in value:
continue
if "mcode" in value[1] and "musicState" in value[1]:
song_json = json.loads(value[1])
songCode = []
songCode.append(
{
"mcode": song_json["mcode"],
"musicState": song_json["musicState"],
"playCount": song_json["playCount"],
"totalScore": song_json["totalScore"],
"highScore": song_json["highScore"],
"clearRate": song_json["clearRate"],
"rankPoint": song_json["rankPoint"],
"combo": song_json["combo"],
"coupleUserId": song_json["coupleUserId"],
"difficulty": song_json["difficulty"],
"isFullCombo": song_json["isFullCombo"],
"clearGaugeType": song_json["clearGaugeType"],
"fieldType": song_json["fieldType"],
"gameType": song_json["gameType"],
"grade": song_json["grade"],
"unlockState": song_json["unlockState"],
"extraState": song_json["extraState"],
"index": value[0],
}
)
await self.data.score.put_best_score(
data["saveindex"]["uid"],
song_json["mcode"],
self.version,
value[0],
songCode[0],
)
return {}
else:
self.logger.info(
f"Saving CrossBeats REV Sunrise profile for {data['saveindex']['uid']}"
)
# Sunrise
try:
profileIndex = save_data["index"].index("0")
except Exception:
return {"data": ""} # Maybe
profile = json.loads(save_data["data"][profileIndex])
aimeId = profile["aimeId"]
i = 0
for index, value in enumerate(data["saveindex"]["data"]):
if int(data["saveindex"]["index"][index]) == 101:
await self.data.profile.put_profile(
aimeId, self.version, data["saveindex"]["index"][index], value
)
if (
int(data["saveindex"]["index"][index]) >= 700000
and int(data["saveindex"]["index"][index]) <= 701000
):
await self.data.profile.put_profile(
aimeId, self.version, data["saveindex"]["index"][index], value
)
if (
int(data["saveindex"]["index"][index]) >= 500
and int(data["saveindex"]["index"][index]) <= 510
):
await self.data.profile.put_profile(
aimeId, self.version, data["saveindex"]["index"][index], value
)
if "playedUserId" in value:
await self.data.profile.put_profile(
aimeId,
self.version,
data["saveindex"]["index"][index],
json.loads(value),
)
if "mcode" not in value and "normalCR" not in value:
await self.data.profile.put_profile(
aimeId,
self.version,
data["saveindex"]["index"][index],
json.loads(value),
)
if "shopId" in value:
continue
# MusicList Index for the profile
indexSongList = []
for value in data["saveindex"]["index"]:
if int(value) in range(100000, 110000):
indexSongList.append(value)
for index, value in enumerate(data["saveindex"]["data"]):
if "mcode" not in value:
continue
if "playedUserId" in value:
continue
data1 = json.loads(value)
songCode = []
songCode.append(
{
"mcode": data1["mcode"],
"musicState": data1["musicState"],
"playCount": data1["playCount"],
"totalScore": data1["totalScore"],
"highScore": data1["highScore"],
"everHighScore": data1["everHighScore"],
"clearRate": data1["clearRate"],
"rankPoint": data1["rankPoint"],
"normalCR": data1["normalCR"],
"survivalCR": data1["survivalCR"],
"ultimateCR": data1["ultimateCR"],
"nohopeCR": data1["nohopeCR"],
"combo": data1["combo"],
"coupleUserId": data1["coupleUserId"],
"difficulty": data1["difficulty"],
"isFullCombo": data1["isFullCombo"],
"clearGaugeType": data1["clearGaugeType"],
"fieldType": data1["fieldType"],
"gameType": data1["gameType"],
"grade": data1["grade"],
"unlockState": data1["unlockState"],
"extraState": data1["extraState"],
"index": indexSongList[i],
}
)
await self.data.score.put_best_score(
aimeId, data1["mcode"], self.version, indexSongList[i], songCode[0]
)
i += 1
return {}
async def handle_action_sprankreq_request(self, data: Dict) -> Dict:
uid = data["sprankreq"]["uid"]
self.logger.info(f"Get best rankings for {uid}")
p = await self.data.score.get_best_rankings(uid)
rankList: List[Dict[str, Any]] = []
for rank in p:
if rank["song_id"] is not None:
rankList.append(
{
"sc": [rank["score"], rank["song_id"]],
"rid": rank["rev_id"],
"clear": rank["clear"],
}
)
else:
rankList.append(
{
"sc": [rank["score"]],
"rid": rank["rev_id"],
"clear": rank["clear"],
}
)
return {
"uid": data["sprankreq"]["uid"],
"aid": data["sprankreq"]["aid"],
"rank": rankList,
"rankx": [1, 1, 1],
}
async def handle_action_getadv_request(self, data: Dict) -> Dict:
return {"data": [{"r": "1", "i": "100300", "c": "20"}]}
async def handle_action_getmsg_request(self, data: Dict) -> Dict:
return {"msgs": []}
async def handle_auth_logout_request(self, data: Dict) -> Dict:
return {"auth": True}
async def handle_action_rankreg_request(self, data: Dict) -> Dict:
uid = data["rankreg"]["uid"]
self.logger.info(f"Put {len(data['rankreg']['data'])} rankings for {uid}")
for rid in data["rankreg"]["data"]:
# REV S2
if "clear" in rid:
try:
await self.data.score.put_ranking(
user_id=uid,
rev_id=int(rid["rid"]),
song_id=int(rid["sc"][1]),
score=int(rid["sc"][0]),
clear=rid["clear"],
)
except Exception:
await self.data.score.put_ranking(
user_id=uid,
rev_id=int(rid["rid"]),
song_id=0,
score=int(rid["sc"][0]),
clear=rid["clear"],
)
# REV
else:
try:
await self.data.score.put_ranking(
user_id=uid,
rev_id=int(rid["rid"]),
song_id=int(rid["sc"][1]),
score=int(rid["sc"][0]),
clear=0,
)
except Exception:
await self.data.score.put_ranking(
user_id=uid,
rev_id=int(rid["rid"]),
song_id=0,
score=int(rid["sc"][0]),
clear=0,
)
return {}
async def handle_action_addenergy_request(self, data: Dict) -> Dict:
uid = data["addenergy"]["uid"]
self.logger.info(f"Add energy to user {uid}")
profile = await self.data.profile.get_profile_index(0, uid, self.version)
data1 = profile["data"]
p = await self.data.item.get_energy(uid)
if not p:
await self.data.item.put_energy(uid, 5)
return {
"class": data1["myClass"],
"granted": "5",
"total": "5",
"threshold": "1000",
}
array = []
energy = p["energy"]
newenergy = int(energy) + 5
await self.data.item.put_energy(uid, newenergy)
if int(energy) <= 995:
array.append(
{
"class": data1["myClass"],
"granted": "5",
"total": str(energy),
"threshold": "1000",
}
)
else:
array.append(
{
"class": data1["myClass"],
"granted": "0",
"total": str(energy),
"threshold": "1000",
}
)
return array[0]
async def handle_action_eventreq_request(self, data: Dict) -> Dict:
self.logger.info(data)
return {"eventreq": ""}
async def handle_action_stampreq_request(self, data: Dict) -> Dict:
self.logger.info(data)
return {"stampreq": ""}