forked from Hay1tsme/artemis
300 lines
12 KiB
Python
300 lines
12 KiB
Python
from typing import List
|
|
from starlette.routing import Route, Mount
|
|
from starlette.requests import Request
|
|
from starlette.responses import Response, RedirectResponse
|
|
from os import path
|
|
import yaml
|
|
import jinja2
|
|
|
|
from core.frontend import FE_Base, UserSession
|
|
from core.config import CoreConfig
|
|
from .database import ChuniData
|
|
from .config import ChuniConfig
|
|
from .const import ChuniConstants
|
|
|
|
|
|
def pairwise(iterable):
|
|
# https://docs.python.org/3/library/itertools.html#itertools.pairwise
|
|
# but for Python < 3.10. pairwise('ABCDEFG') → AB BC CD DE EF FG
|
|
iterator = iter(iterable)
|
|
a = next(iterator, None)
|
|
for b in iterator:
|
|
yield a, b
|
|
a = b
|
|
|
|
|
|
def calculate_song_rank(score: int, game_version: int) -> str:
|
|
if game_version >= ChuniConstants.VER_CHUNITHM_NEW:
|
|
intervals = ChuniConstants.SCORE_RANK_INTERVALS_NEW
|
|
else:
|
|
intervals = ChuniConstants.SCORE_RANK_INTERVALS_OLD
|
|
|
|
for (min_score, rank) in intervals:
|
|
if score >= min_score:
|
|
return rank
|
|
|
|
return "D"
|
|
|
|
|
|
def calculate_song_rating(score: int, chart_constant: float, game_version: int) -> float:
|
|
is_new = game_version >= ChuniConstants.VER_CHUNITHM_NEW
|
|
|
|
if is_new: # New and later
|
|
max_score = 1009000
|
|
max_rating_modifier = 2.15
|
|
else: # Up to Paradise Lost
|
|
max_score = 1007500
|
|
max_rating_modifier = 2.0
|
|
|
|
if (score < 500000):
|
|
return 0.0 # D
|
|
elif (score >= max_score):
|
|
return chart_constant + max_rating_modifier # SSS/SSS+
|
|
|
|
# Okay, we're doing this the hard way.
|
|
# Rating goes up linearly between breakpoints listed below.
|
|
# Pick the score interval in which we are in, then calculate
|
|
# the position between possible ratings.
|
|
score_intervals = [
|
|
( 500000, 0.0), # C
|
|
( 800000, max(0.0, (chart_constant - 5.0) / 2)), # BBB
|
|
( 900000, max(0.0, (chart_constant - 5.0))), # A
|
|
( 925000, max(0.0, (chart_constant - 3.0))), # AA
|
|
( 975000, chart_constant), # S
|
|
(1000000, chart_constant + 1.0), # SS
|
|
(1005000, chart_constant + 1.5), # SS+
|
|
(1007500, chart_constant + 2.0), # SSS
|
|
(1009000, chart_constant + max_rating_modifier), # SSS+!
|
|
]
|
|
|
|
for ((lo_score, lo_rating), (hi_score, hi_rating)) in pairwise(score_intervals):
|
|
if not (lo_score <= score < hi_score):
|
|
continue
|
|
|
|
interval_pos = (score - lo_score) / (hi_score - lo_score)
|
|
return lo_rating + ((hi_rating - lo_rating) * interval_pos)
|
|
|
|
|
|
|
|
class ChuniFrontend(FE_Base):
|
|
def __init__(
|
|
self, cfg: CoreConfig, environment: jinja2.Environment, cfg_dir: str
|
|
) -> None:
|
|
super().__init__(cfg, environment)
|
|
self.data = ChuniData(cfg)
|
|
self.game_cfg = ChuniConfig()
|
|
if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"):
|
|
self.game_cfg.update(
|
|
yaml.safe_load(open(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"))
|
|
)
|
|
self.nav_name = "Chunithm"
|
|
|
|
def get_routes(self) -> List[Route]:
|
|
return [
|
|
Route("/", self.render_GET, methods=['GET']),
|
|
Route("/rating", self.render_GET_rating, methods=['GET']),
|
|
Mount("/playlog", routes=[
|
|
Route("/", self.render_GET_playlog, methods=['GET']),
|
|
Route("/{index}", self.render_GET_playlog, methods=['GET']),
|
|
]),
|
|
Route("/update.name", self.update_name, methods=['POST']),
|
|
Route("/version.change", self.version_change, methods=['POST']),
|
|
]
|
|
|
|
async def render_GET(self, request: Request) -> bytes:
|
|
template = self.environment.get_template(
|
|
"titles/chuni/templates/chuni_index.jinja"
|
|
)
|
|
usr_sesh = self.validate_session(request)
|
|
if not usr_sesh:
|
|
usr_sesh = UserSession()
|
|
|
|
if usr_sesh.user_id > 0:
|
|
versions = await self.data.profile.get_all_profile_versions(usr_sesh.user_id)
|
|
profile = []
|
|
if versions:
|
|
# chunithm_version is -1 means it is not initialized yet, select a default version from existing.
|
|
if usr_sesh.chunithm_version < 0:
|
|
usr_sesh.chunithm_version = versions[0]
|
|
profile = await self.data.profile.get_profile_data(usr_sesh.user_id, usr_sesh.chunithm_version)
|
|
|
|
resp = Response(template.render(
|
|
title=f"{self.core_config.server.name} | {self.nav_name}",
|
|
game_list=self.environment.globals["game_list"],
|
|
sesh=vars(usr_sesh),
|
|
user_id=usr_sesh.user_id,
|
|
profile=profile,
|
|
version_list=ChuniConstants.VERSION_NAMES,
|
|
versions=versions,
|
|
cur_version=usr_sesh.chunithm_version
|
|
), media_type="text/html; charset=utf-8")
|
|
|
|
if usr_sesh.chunithm_version >= 0:
|
|
encoded_sesh = self.encode_session(usr_sesh)
|
|
resp.set_cookie("ARTEMIS_SESH", encoded_sesh)
|
|
return resp
|
|
|
|
else:
|
|
return RedirectResponse("/gate/", 303)
|
|
|
|
async def render_GET_rating(self, request: Request) -> bytes:
|
|
template = self.environment.get_template(
|
|
"titles/chuni/templates/chuni_rating.jinja"
|
|
)
|
|
usr_sesh = self.validate_session(request)
|
|
if not usr_sesh:
|
|
usr_sesh = UserSession()
|
|
|
|
if usr_sesh.user_id > 0:
|
|
if usr_sesh.chunithm_version < 0:
|
|
return RedirectResponse("/game/chuni/", 303)
|
|
profile = await self.data.profile.get_profile_data(usr_sesh.user_id, usr_sesh.chunithm_version)
|
|
rating = await self.data.profile.get_profile_rating(usr_sesh.user_id, usr_sesh.chunithm_version)
|
|
hot_list=[]
|
|
base_list=[]
|
|
if profile and rating:
|
|
song_records = []
|
|
|
|
for song in rating:
|
|
music_chart = await self.data.static.get_music_chart(usr_sesh.chunithm_version, song.musicId, song.difficultId)
|
|
if not music_chart:
|
|
continue
|
|
|
|
rank = calculate_song_rank(song.score, profile.version)
|
|
rating = calculate_song_rating(song.score, music_chart.level, profile.version)
|
|
|
|
song_rating = int(rating * 10 ** 2) / 10 ** 2
|
|
song_records.append({
|
|
"difficultId": song.difficultId,
|
|
"musicId": song.musicId,
|
|
"title": music_chart.title,
|
|
"level": music_chart.level,
|
|
"score": song.score,
|
|
"type": song.type,
|
|
"rank": rank,
|
|
"song_rating": song_rating,
|
|
})
|
|
|
|
hot_list = [obj for obj in song_records if obj["type"] == "userRatingBaseHotList"]
|
|
base_list = [obj for obj in song_records if obj["type"] == "userRatingBaseList"]
|
|
return Response(template.render(
|
|
title=f"{self.core_config.server.name} | {self.nav_name}",
|
|
game_list=self.environment.globals["game_list"],
|
|
sesh=vars(usr_sesh),
|
|
profile=profile,
|
|
hot_list=hot_list,
|
|
base_list=base_list,
|
|
), media_type="text/html; charset=utf-8")
|
|
else:
|
|
return RedirectResponse("/gate/", 303)
|
|
|
|
async def render_GET_playlog(self, request: Request) -> bytes:
|
|
template = self.environment.get_template(
|
|
"titles/chuni/templates/chuni_playlog.jinja"
|
|
)
|
|
usr_sesh = self.validate_session(request)
|
|
if not usr_sesh:
|
|
usr_sesh = UserSession()
|
|
|
|
if usr_sesh.user_id > 0:
|
|
if usr_sesh.chunithm_version < 0:
|
|
return RedirectResponse("/game/chuni/", 303)
|
|
path_index = request.path_params.get('index')
|
|
if not path_index or int(path_index) < 1:
|
|
index = 0
|
|
else:
|
|
index = int(path_index) - 1 # 0 and 1 are 1st page
|
|
user_id = usr_sesh.user_id
|
|
playlog_count = await self.data.score.get_user_playlogs_count(user_id)
|
|
if playlog_count < index * 20 :
|
|
return Response(template.render(
|
|
title=f"{self.core_config.server.name} | {self.nav_name}",
|
|
game_list=self.environment.globals["game_list"],
|
|
sesh=vars(usr_sesh),
|
|
playlog_count=0
|
|
), media_type="text/html; charset=utf-8")
|
|
playlog = await self.data.score.get_playlogs_limited(user_id, index, 20)
|
|
playlog_with_title = []
|
|
for record in playlog:
|
|
music_chart = await self.data.static.get_music_chart(usr_sesh.chunithm_version, record.musicId, record.level)
|
|
if music_chart:
|
|
difficultyNum=music_chart.level
|
|
artist=music_chart.artist
|
|
title=music_chart.title
|
|
else:
|
|
difficultyNum=0
|
|
artist="unknown"
|
|
title="musicid: " + str(record.musicId)
|
|
playlog_with_title.append({
|
|
"raw": record,
|
|
"title": title,
|
|
"difficultyNum": difficultyNum,
|
|
"artist": artist,
|
|
})
|
|
return Response(template.render(
|
|
title=f"{self.core_config.server.name} | {self.nav_name}",
|
|
game_list=self.environment.globals["game_list"],
|
|
sesh=vars(usr_sesh),
|
|
user_id=usr_sesh.user_id,
|
|
playlog=playlog_with_title,
|
|
playlog_count=playlog_count
|
|
), media_type="text/html; charset=utf-8")
|
|
else:
|
|
return RedirectResponse("/gate/", 303)
|
|
|
|
async def update_name(self, request: Request) -> bytes:
|
|
usr_sesh = self.validate_session(request)
|
|
if not usr_sesh:
|
|
return RedirectResponse("/gate/", 303)
|
|
|
|
form_data = await request.form()
|
|
new_name: str = form_data.get("new_name")
|
|
new_name_full = ""
|
|
|
|
if not new_name:
|
|
return RedirectResponse("/gate/?e=4", 303)
|
|
|
|
if len(new_name) > 8:
|
|
return RedirectResponse("/gate/?e=8", 303)
|
|
|
|
for x in new_name: # FIXME: This will let some invalid characters through atm
|
|
o = ord(x)
|
|
try:
|
|
if o == 0x20:
|
|
new_name_full += chr(0x3000)
|
|
elif o < 0x7F and o > 0x20:
|
|
new_name_full += chr(o + 0xFEE0)
|
|
elif o <= 0x7F:
|
|
self.logger.warn(f"Invalid ascii character {o:02X}")
|
|
return RedirectResponse("/gate/?e=4", 303)
|
|
else:
|
|
new_name_full += x
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Something went wrong parsing character {o:04X} - {e}")
|
|
return RedirectResponse("/gate/?e=4", 303)
|
|
|
|
if not await self.data.profile.update_name(usr_sesh.user_id, new_name_full):
|
|
return RedirectResponse("/gate/?e=999", 303)
|
|
|
|
return RedirectResponse("/game/chuni/?s=1", 303)
|
|
|
|
async def version_change(self, request: Request):
|
|
usr_sesh = self.validate_session(request)
|
|
if not usr_sesh:
|
|
usr_sesh = UserSession()
|
|
|
|
if usr_sesh.user_id > 0:
|
|
form_data = await request.form()
|
|
chunithm_version = form_data.get("version")
|
|
self.logger.debug(f"version change to: {chunithm_version}")
|
|
if(chunithm_version.isdigit()):
|
|
usr_sesh.chunithm_version=int(chunithm_version)
|
|
encoded_sesh = self.encode_session(usr_sesh)
|
|
self.logger.debug(f"Created session with JWT {encoded_sesh}")
|
|
resp = RedirectResponse("/game/chuni/", 303)
|
|
resp.set_cookie("ARTEMIS_SESH", encoded_sesh)
|
|
return resp
|
|
else:
|
|
return RedirectResponse("/gate/", 303)
|