579 lines
21 KiB
Python
579 lines
21 KiB
Python
import logging
|
|
import json
|
|
from decimal import Decimal
|
|
from base64 import b64encode
|
|
from typing import Any, Dict, List
|
|
from os import path
|
|
|
|
from core.config import CoreConfig
|
|
from .config import CxbConfig
|
|
from .const import CxbConstants
|
|
from .database import CxbData
|
|
|
|
from threading import Thread
|
|
|
|
class CxbBase:
|
|
def __init__(self, cfg: CoreConfig, game_cfg: CxbConfig) -> None:
|
|
self.config = cfg # Config file
|
|
self.game_config = game_cfg
|
|
self.data = CxbData(cfg) # Database
|
|
self.game = CxbConstants.GAME_CODE
|
|
self.logger = logging.getLogger("cxb")
|
|
self.version = CxbConstants.VER_CROSSBEATS_REV
|
|
|
|
def _get_data_contents(self, folder: str, filetype: str, encoding: str = None, subfolder: str = "") -> List[str]:
|
|
if path.exists(f"titles/cxb/data/{folder}/{subfolder}{filetype}.csv"):
|
|
with open(f"titles/cxb/data/{folder}/{subfolder}{filetype}.csv", encoding=encoding) as f:
|
|
return f.readlines()
|
|
|
|
return []
|
|
|
|
async def handle_action_rpreq_request(self, data: Dict) -> Dict:
|
|
return {}
|
|
|
|
async def handle_action_hitreq_request(self, data: Dict) -> Dict:
|
|
return {"data": []}
|
|
|
|
async def handle_auth_usercheck_request(self, data: Dict) -> Dict:
|
|
profile = await self.data.profile.get_profile_index(
|
|
0, data["usercheck"]["authid"], self.version
|
|
)
|
|
if profile is not None:
|
|
self.logger.info(f"User {data['usercheck']['authid']} has CXB profile")
|
|
return {"exist": "true", "logout": "true"}
|
|
|
|
self.logger.info(f"No profile for aime id {data['usercheck']['authid']}")
|
|
return {"exist": "false", "logout": "true"}
|
|
|
|
async def handle_auth_entry_request(self, data: Dict) -> Dict:
|
|
self.logger.info(f"New profile for {data['entry']['authid']}")
|
|
return {"token": data["entry"]["authid"], "uid": data["entry"]["authid"]}
|
|
|
|
async def handle_auth_login_request(self, data: Dict) -> Dict:
|
|
profile = await self.data.profile.get_profile_index(
|
|
0, data["login"]["authid"], self.version
|
|
)
|
|
|
|
if profile is not None:
|
|
self.logger.info(f"Login user {data['login']['authid']}")
|
|
return {"token": data["login"]["authid"], "uid": data["login"]["authid"]}
|
|
|
|
self.logger.warning(f"User {data['login']['authid']} does not have a profile")
|
|
return {}
|
|
|
|
def task_generateCoupon(index, data1):
|
|
# Coupons
|
|
for i in range(500, 510):
|
|
index.append(str(i))
|
|
couponid = int(i) - 500
|
|
dataValue = [
|
|
{
|
|
"couponId": str(couponid),
|
|
"couponNum": "1",
|
|
"couponLog": [],
|
|
}
|
|
]
|
|
data1.append(
|
|
b64encode(
|
|
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
|
|
).decode("utf-8")
|
|
)
|
|
|
|
def task_generateShopListTitle(index, data1):
|
|
# ShopList_Title
|
|
for i in range(200000, 201451):
|
|
index.append(str(i))
|
|
shopid = int(i) - 200000
|
|
dataValue = [
|
|
{
|
|
"shopId": shopid,
|
|
"shopState": "2",
|
|
"isDisable": "t",
|
|
"isDeleted": "f",
|
|
"isSpecialFlag": "f",
|
|
}
|
|
]
|
|
data1.append(
|
|
b64encode(
|
|
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
|
|
).decode("utf-8")
|
|
)
|
|
|
|
def task_generateShopListIcon(index, data1):
|
|
# ShopList_Icon
|
|
for i in range(202000, 202264):
|
|
index.append(str(i))
|
|
shopid = int(i) - 200000
|
|
dataValue = [
|
|
{
|
|
"shopId": shopid,
|
|
"shopState": "2",
|
|
"isDisable": "t",
|
|
"isDeleted": "f",
|
|
"isSpecialFlag": "f",
|
|
}
|
|
]
|
|
data1.append(
|
|
b64encode(
|
|
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
|
|
).decode("utf-8")
|
|
)
|
|
|
|
def task_generateStories(index, data1):
|
|
# Stories
|
|
for i in range(900000, 900003):
|
|
index.append(str(i))
|
|
storyid = int(i) - 900000
|
|
dataValue = [
|
|
{
|
|
"storyId": storyid,
|
|
"unlockState1": ["t"] * 10,
|
|
"unlockState2": ["t"] * 10,
|
|
"unlockState3": ["t"] * 10,
|
|
"unlockState4": ["t"] * 10,
|
|
"unlockState5": ["t"] * 10,
|
|
"unlockState6": ["t"] * 10,
|
|
"unlockState7": ["t"] * 10,
|
|
"unlockState8": ["t"] * 10,
|
|
"unlockState9": ["t"] * 10,
|
|
"unlockState10": ["t"] * 10,
|
|
"unlockState11": ["t"] * 10,
|
|
"unlockState12": ["t"] * 10,
|
|
"unlockState13": ["t"] * 10,
|
|
"unlockState14": ["t"] * 10,
|
|
"unlockState15": ["t"] * 10,
|
|
"unlockState16": ["t"] * 10,
|
|
}
|
|
]
|
|
data1.append(
|
|
b64encode(
|
|
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
|
|
).decode("utf-8")
|
|
)
|
|
|
|
def task_generateScoreData(song, index, data1):
|
|
song_data = song["data"]
|
|
songCode = []
|
|
|
|
songCode.append(
|
|
{
|
|
"mcode": song_data["mcode"],
|
|
"musicState": song_data["musicState"],
|
|
"playCount": song_data["playCount"],
|
|
"totalScore": song_data["totalScore"],
|
|
"highScore": song_data["highScore"],
|
|
"everHighScore": song_data["everHighScore"]
|
|
if "everHighScore" in song_data
|
|
else ["0", "0", "0", "0", "0"],
|
|
"clearRate": song_data["clearRate"],
|
|
"rankPoint": song_data["rankPoint"],
|
|
"normalCR": song_data["normalCR"]
|
|
if "normalCR" in song_data
|
|
else ["0", "0", "0", "0", "0"],
|
|
"survivalCR": song_data["survivalCR"]
|
|
if "survivalCR" in song_data
|
|
else ["0", "0", "0", "0", "0"],
|
|
"ultimateCR": song_data["ultimateCR"]
|
|
if "ultimateCR" in song_data
|
|
else ["0", "0", "0", "0", "0"],
|
|
"nohopeCR": song_data["nohopeCR"]
|
|
if "nohopeCR" in song_data
|
|
else ["0", "0", "0", "0", "0"],
|
|
"combo": song_data["combo"],
|
|
"coupleUserId": song_data["coupleUserId"],
|
|
"difficulty": song_data["difficulty"],
|
|
"isFullCombo": song_data["isFullCombo"],
|
|
"clearGaugeType": song_data["clearGaugeType"],
|
|
"fieldType": song_data["fieldType"],
|
|
"gameType": song_data["gameType"],
|
|
"grade": song_data["grade"],
|
|
"unlockState": song_data["unlockState"],
|
|
"extraState": song_data["extraState"],
|
|
}
|
|
)
|
|
index.append(song_data["index"])
|
|
data1.append(
|
|
b64encode(
|
|
bytes(json.dumps(songCode[0], separators=(",", ":")), "utf-8")
|
|
).decode("utf-8")
|
|
)
|
|
|
|
async def handle_action_loadrange_request(self, data: Dict) -> Dict:
|
|
range_start = data["loadrange"]["range"][0]
|
|
range_end = data["loadrange"]["range"][1]
|
|
uid = data["loadrange"]["uid"]
|
|
|
|
self.logger.info(f"Load data for {uid}")
|
|
profile = await self.data.profile.get_profile(uid, self.version)
|
|
songs = await self.data.score.get_best_scores(uid)
|
|
|
|
data1 = []
|
|
index = []
|
|
versionindex = []
|
|
|
|
for profile_index in profile:
|
|
profile_data = profile_index["data"]
|
|
|
|
if int(range_start) == 800000:
|
|
return {"index": range_start, "data": [], "version": 10400}
|
|
|
|
if not (int(range_start) <= int(profile_index[3]) <= int(range_end)):
|
|
continue
|
|
# Prevent loading of the coupons within the profile to use the force unlock instead
|
|
elif 500 <= int(profile_index[3]) <= 510:
|
|
continue
|
|
# Prevent loading of songs saved in the profile
|
|
elif 100000 <= int(profile_index[3]) <= 110000:
|
|
continue
|
|
# Prevent loading of the shop list / unlocked titles & icons saved in the profile
|
|
elif 200000 <= int(profile_index[3]) <= 210000:
|
|
continue
|
|
# Prevent loading of stories in the profile
|
|
elif 900000 <= int(profile_index[3]) <= 900200:
|
|
continue
|
|
else:
|
|
index.append(profile_index[3])
|
|
data1.append(
|
|
b64encode(
|
|
bytes(json.dumps(profile_data, separators=(",", ":")), "utf-8")
|
|
).decode("utf-8")
|
|
)
|
|
|
|
"""
|
|
100000 = Songs
|
|
200000 = Shop
|
|
300000 = Courses
|
|
400000 = Events
|
|
500000 = Challenges
|
|
600000 = Bonuses
|
|
700000 = rcLog
|
|
800000 = Partners
|
|
900000 = Stories
|
|
"""
|
|
|
|
# Async threads to generate the response
|
|
thread_Coupon = Thread(target=CxbBase.task_generateCoupon(index, data1))
|
|
thread_ShopListTitle = Thread(target=CxbBase.task_generateShopListTitle(index, data1))
|
|
thread_ShopListIcon = Thread(target=CxbBase.task_generateShopListIcon(index, data1))
|
|
thread_Stories = Thread(target=CxbBase.task_generateStories(index, data1))
|
|
|
|
thread_Coupon.start()
|
|
thread_ShopListTitle.start()
|
|
thread_ShopListIcon.start()
|
|
thread_Stories.start()
|
|
|
|
thread_Coupon.join()
|
|
thread_ShopListTitle.join()
|
|
thread_ShopListIcon.join()
|
|
thread_Stories.join()
|
|
|
|
for song in songs:
|
|
thread_ScoreData = Thread(target=CxbBase.task_generateScoreData(song, index, data1))
|
|
thread_ScoreData.start()
|
|
|
|
v_profile = await self.data.profile.get_profile_index(0, uid, self.version)
|
|
v_profile_data = v_profile["data"]
|
|
|
|
for _, data in enumerate(profile):
|
|
if v_profile_data:
|
|
versionindex.append(int(v_profile_data["appVersion"]))
|
|
else:
|
|
versionindex.append("10400")
|
|
|
|
return {"index": index, "data": data1, "version": versionindex}
|
|
|
|
async def handle_action_saveindex_request(self, data: Dict) -> Dict:
|
|
save_data = data["saveindex"]
|
|
|
|
try:
|
|
# REV Omnimix Version Fetcher
|
|
gameversion = data["saveindex"]["data"][0][2]
|
|
self.logger.warning(f"Game Version is {gameversion}")
|
|
except Exception:
|
|
pass
|
|
|
|
if "10205" in gameversion:
|
|
self.logger.info(
|
|
f"Saving CrossBeats REV profile for {data['saveindex']['uid']}"
|
|
)
|
|
# Alright.... time to bring the jank code
|
|
|
|
for value in data["saveindex"]["data"]:
|
|
if "playedUserId" in value[1]:
|
|
await self.data.profile.put_profile(
|
|
data["saveindex"]["uid"], self.version, value[0], value[1]
|
|
)
|
|
if "mcode" not in value[1]:
|
|
await self.data.profile.put_profile(
|
|
data["saveindex"]["uid"], self.version, value[0], value[1]
|
|
)
|
|
if "shopId" in value:
|
|
continue
|
|
if "mcode" in value[1] and "musicState" in value[1]:
|
|
song_json = json.loads(value[1])
|
|
|
|
songCode = []
|
|
songCode.append(
|
|
{
|
|
"mcode": song_json["mcode"],
|
|
"musicState": song_json["musicState"],
|
|
"playCount": song_json["playCount"],
|
|
"totalScore": song_json["totalScore"],
|
|
"highScore": song_json["highScore"],
|
|
"clearRate": song_json["clearRate"],
|
|
"rankPoint": song_json["rankPoint"],
|
|
"combo": song_json["combo"],
|
|
"coupleUserId": song_json["coupleUserId"],
|
|
"difficulty": song_json["difficulty"],
|
|
"isFullCombo": song_json["isFullCombo"],
|
|
"clearGaugeType": song_json["clearGaugeType"],
|
|
"fieldType": song_json["fieldType"],
|
|
"gameType": song_json["gameType"],
|
|
"grade": song_json["grade"],
|
|
"unlockState": song_json["unlockState"],
|
|
"extraState": song_json["extraState"],
|
|
"index": value[0],
|
|
}
|
|
)
|
|
await self.data.score.put_best_score(
|
|
data["saveindex"]["uid"],
|
|
song_json["mcode"],
|
|
self.version,
|
|
value[0],
|
|
songCode[0],
|
|
)
|
|
return {}
|
|
else:
|
|
self.logger.info(
|
|
f"Saving CrossBeats REV Sunrise profile for {data['saveindex']['uid']}"
|
|
)
|
|
|
|
# Sunrise
|
|
try:
|
|
profileIndex = save_data["index"].index("0")
|
|
except Exception:
|
|
return {"data": ""} # Maybe
|
|
|
|
profile = json.loads(save_data["data"][profileIndex])
|
|
aimeId = profile["aimeId"]
|
|
i = 0
|
|
|
|
for index, value in enumerate(data["saveindex"]["data"]):
|
|
if int(data["saveindex"]["index"][index]) == 101:
|
|
await self.data.profile.put_profile(
|
|
aimeId, self.version, data["saveindex"]["index"][index], value
|
|
)
|
|
if (
|
|
int(data["saveindex"]["index"][index]) >= 700000
|
|
and int(data["saveindex"]["index"][index]) <= 701000
|
|
):
|
|
await self.data.profile.put_profile(
|
|
aimeId, self.version, data["saveindex"]["index"][index], value
|
|
)
|
|
if (
|
|
int(data["saveindex"]["index"][index]) >= 500
|
|
and int(data["saveindex"]["index"][index]) <= 510
|
|
):
|
|
await self.data.profile.put_profile(
|
|
aimeId, self.version, data["saveindex"]["index"][index], value
|
|
)
|
|
if "playedUserId" in value:
|
|
await self.data.profile.put_profile(
|
|
aimeId,
|
|
self.version,
|
|
data["saveindex"]["index"][index],
|
|
json.loads(value),
|
|
)
|
|
if "mcode" not in value and "normalCR" not in value:
|
|
await self.data.profile.put_profile(
|
|
aimeId,
|
|
self.version,
|
|
data["saveindex"]["index"][index],
|
|
json.loads(value),
|
|
)
|
|
if "shopId" in value:
|
|
continue
|
|
|
|
# MusicList Index for the profile
|
|
indexSongList = []
|
|
for value in data["saveindex"]["index"]:
|
|
if int(value) in range(100000, 110000):
|
|
indexSongList.append(value)
|
|
|
|
for index, value in enumerate(data["saveindex"]["data"]):
|
|
if "mcode" not in value:
|
|
continue
|
|
if "playedUserId" in value:
|
|
continue
|
|
|
|
data1 = json.loads(value)
|
|
|
|
songCode = []
|
|
songCode.append(
|
|
{
|
|
"mcode": data1["mcode"],
|
|
"musicState": data1["musicState"],
|
|
"playCount": data1["playCount"],
|
|
"totalScore": data1["totalScore"],
|
|
"highScore": data1["highScore"],
|
|
"everHighScore": data1["everHighScore"],
|
|
"clearRate": data1["clearRate"],
|
|
"rankPoint": data1["rankPoint"],
|
|
"normalCR": data1["normalCR"],
|
|
"survivalCR": data1["survivalCR"],
|
|
"ultimateCR": data1["ultimateCR"],
|
|
"nohopeCR": data1["nohopeCR"],
|
|
"combo": data1["combo"],
|
|
"coupleUserId": data1["coupleUserId"],
|
|
"difficulty": data1["difficulty"],
|
|
"isFullCombo": data1["isFullCombo"],
|
|
"clearGaugeType": data1["clearGaugeType"],
|
|
"fieldType": data1["fieldType"],
|
|
"gameType": data1["gameType"],
|
|
"grade": data1["grade"],
|
|
"unlockState": data1["unlockState"],
|
|
"extraState": data1["extraState"],
|
|
"index": indexSongList[i],
|
|
}
|
|
)
|
|
|
|
await self.data.score.put_best_score(
|
|
aimeId, data1["mcode"], self.version, indexSongList[i], songCode[0]
|
|
)
|
|
i += 1
|
|
return {}
|
|
|
|
async def handle_action_sprankreq_request(self, data: Dict) -> Dict:
|
|
uid = data["sprankreq"]["uid"]
|
|
self.logger.info(f"Get best rankings for {uid}")
|
|
p = await self.data.score.get_best_rankings(uid)
|
|
|
|
rankList: List[Dict[str, Any]] = []
|
|
|
|
for rank in p:
|
|
if rank["song_id"] is not None:
|
|
rankList.append(
|
|
{
|
|
"sc": [rank["score"], rank["song_id"]],
|
|
"rid": rank["rev_id"],
|
|
"clear": rank["clear"],
|
|
}
|
|
)
|
|
else:
|
|
rankList.append(
|
|
{
|
|
"sc": [rank["score"]],
|
|
"rid": rank["rev_id"],
|
|
"clear": rank["clear"],
|
|
}
|
|
)
|
|
|
|
return {
|
|
"uid": data["sprankreq"]["uid"],
|
|
"aid": data["sprankreq"]["aid"],
|
|
"rank": rankList,
|
|
"rankx": [1, 1, 1],
|
|
}
|
|
|
|
async def handle_action_getadv_request(self, data: Dict) -> Dict:
|
|
return {"data": [{"r": "1", "i": "100300", "c": "20"}]}
|
|
|
|
async def handle_action_getmsg_request(self, data: Dict) -> Dict:
|
|
return {"msgs": []}
|
|
|
|
async def handle_auth_logout_request(self, data: Dict) -> Dict:
|
|
return {"auth": True}
|
|
|
|
async def handle_action_rankreg_request(self, data: Dict) -> Dict:
|
|
uid = data["rankreg"]["uid"]
|
|
self.logger.info(f"Put {len(data['rankreg']['data'])} rankings for {uid}")
|
|
|
|
for rid in data["rankreg"]["data"]:
|
|
# REV S2
|
|
if "clear" in rid:
|
|
try:
|
|
await self.data.score.put_ranking(
|
|
user_id=uid,
|
|
rev_id=int(rid["rid"]),
|
|
song_id=int(rid["sc"][1]),
|
|
score=int(rid["sc"][0]),
|
|
clear=rid["clear"],
|
|
)
|
|
except Exception:
|
|
await self.data.score.put_ranking(
|
|
user_id=uid,
|
|
rev_id=int(rid["rid"]),
|
|
song_id=0,
|
|
score=int(rid["sc"][0]),
|
|
clear=rid["clear"],
|
|
)
|
|
# REV
|
|
else:
|
|
try:
|
|
await self.data.score.put_ranking(
|
|
user_id=uid,
|
|
rev_id=int(rid["rid"]),
|
|
song_id=int(rid["sc"][1]),
|
|
score=int(rid["sc"][0]),
|
|
clear=0,
|
|
)
|
|
except Exception:
|
|
await self.data.score.put_ranking(
|
|
user_id=uid,
|
|
rev_id=int(rid["rid"]),
|
|
song_id=0,
|
|
score=int(rid["sc"][0]),
|
|
clear=0,
|
|
)
|
|
return {}
|
|
|
|
async def handle_action_addenergy_request(self, data: Dict) -> Dict:
|
|
uid = data["addenergy"]["uid"]
|
|
self.logger.info(f"Add energy to user {uid}")
|
|
profile = await self.data.profile.get_profile_index(0, uid, self.version)
|
|
data1 = profile["data"]
|
|
p = await self.data.item.get_energy(uid)
|
|
|
|
if not p:
|
|
await self.data.item.put_energy(uid, 5)
|
|
|
|
return {
|
|
"class": data1["myClass"],
|
|
"granted": "5",
|
|
"total": "5",
|
|
"threshold": "1000",
|
|
}
|
|
|
|
array = []
|
|
energy = p["energy"]
|
|
|
|
newenergy = int(energy) + 5
|
|
await self.data.item.put_energy(uid, newenergy)
|
|
|
|
if int(energy) <= 995:
|
|
array.append(
|
|
{
|
|
"class": data1["myClass"],
|
|
"granted": "5",
|
|
"total": str(energy),
|
|
"threshold": "1000",
|
|
}
|
|
)
|
|
else:
|
|
array.append(
|
|
{
|
|
"class": data1["myClass"],
|
|
"granted": "0",
|
|
"total": str(energy),
|
|
"threshold": "1000",
|
|
}
|
|
)
|
|
return array[0]
|
|
|
|
async def handle_action_eventreq_request(self, data: Dict) -> Dict:
|
|
self.logger.info(data)
|
|
return {"eventreq": ""}
|
|
|
|
async def handle_action_stampreq_request(self, data: Dict) -> Dict:
|
|
self.logger.info(data)
|
|
return {"stampreq": ""} |