from typing import List from starlette.routing import Route, Mount from starlette.requests import Request from starlette.responses import Response, RedirectResponse from starlette.staticfiles import StaticFiles from os import path import yaml import jinja2 from core.frontend import FE_Base, UserSession from core.config import CoreConfig from .database import ChuniData from .config import ChuniConfig from .const import ChuniConstants def pairwise(iterable): # https://docs.python.org/3/library/itertools.html#itertools.pairwise # but for Python < 3.10. pairwise('ABCDEFG') → AB BC CD DE EF FG iterator = iter(iterable) a = next(iterator, None) for b in iterator: yield a, b a = b def calculate_song_rank(score: int, game_version: int) -> str: if game_version >= ChuniConstants.VER_CHUNITHM_NEW: intervals = ChuniConstants.SCORE_RANK_INTERVALS_NEW else: intervals = ChuniConstants.SCORE_RANK_INTERVALS_OLD for (min_score, rank) in intervals: if score >= min_score: return rank return "D" def calculate_song_rating(score: int, chart_constant: float, game_version: int) -> float: is_new = game_version >= ChuniConstants.VER_CHUNITHM_NEW if is_new: # New and later max_score = 1009000 max_rating_modifier = 2.15 else: # Up to Paradise Lost max_score = 1007500 max_rating_modifier = 2.0 if (score < 500000): return 0.0 # D elif (score >= max_score): return chart_constant + max_rating_modifier # SSS/SSS+ # Okay, we're doing this the hard way. # Rating goes up linearly between breakpoints listed below. # Pick the score interval in which we are in, then calculate # the position between possible ratings. score_intervals = [ ( 500000, 0.0), # C ( 800000, max(0.0, (chart_constant - 5.0) / 2)), # BBB ( 900000, max(0.0, (chart_constant - 5.0))), # A ( 925000, max(0.0, (chart_constant - 3.0))), # AA ( 975000, chart_constant), # S (1000000, chart_constant + 1.0), # SS (1005000, chart_constant + 1.5), # SS+ (1007500, chart_constant + 2.0), # SSS (1009000, chart_constant + max_rating_modifier), # SSS+! ] for ((lo_score, lo_rating), (hi_score, hi_rating)) in pairwise(score_intervals): if not (lo_score <= score < hi_score): continue interval_pos = (score - lo_score) / (hi_score - lo_score) return lo_rating + ((hi_rating - lo_rating) * interval_pos) class ChuniFrontend(FE_Base): def __init__( self, cfg: CoreConfig, environment: jinja2.Environment, cfg_dir: str ) -> None: super().__init__(cfg, environment) self.game_cfg = ChuniConfig() if path.exists(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}"): self.game_cfg.update( yaml.safe_load(open(f"{cfg_dir}/{ChuniConstants.CONFIG_NAME}")) ) self.data = ChuniData(cfg, self.game_cfg) self.nav_name = "Chunithm" def get_routes(self) -> List[Route]: return [ Route("/", self.render_GET, methods=['GET']), Route("/rating", self.render_GET_rating, methods=['GET']), Mount("/playlog", routes=[ Route("/", self.render_GET_playlog, methods=['GET']), Route("/{index}", self.render_GET_playlog, methods=['GET']), ]), Route("/favorites", self.render_GET_favorites, methods=['GET']), Route("/update.name", self.update_name, methods=['POST']), Route("/update.favorite_music_playlog", self.update_favorite_music_playlog, methods=['POST']), Route("/update.favorite_music_favorites", self.update_favorite_music_favorites, methods=['POST']), Route("/version.change", self.version_change, methods=['POST']), Mount('/img', app=StaticFiles(directory='titles/chuni/img'), name="img") ] async def render_GET(self, request: Request) -> bytes: template = self.environment.get_template( "titles/chuni/templates/chuni_index.jinja" ) usr_sesh = self.validate_session(request) if not usr_sesh: usr_sesh = UserSession() if usr_sesh.user_id > 0: versions = await self.data.profile.get_all_profile_versions(usr_sesh.user_id) profile = [] if versions: # chunithm_version is -1 means it is not initialized yet, select a default version from existing. if usr_sesh.chunithm_version < 0: usr_sesh.chunithm_version = versions[0] profile = await self.data.profile.get_profile_data(usr_sesh.user_id, usr_sesh.chunithm_version) resp = Response(template.render( title=f"{self.core_config.server.name} | {self.nav_name}", game_list=self.environment.globals["game_list"], sesh=vars(usr_sesh), user_id=usr_sesh.user_id, profile=profile, version_list=ChuniConstants.VERSION_NAMES, versions=versions, cur_version=usr_sesh.chunithm_version ), media_type="text/html; charset=utf-8") if usr_sesh.chunithm_version >= 0: encoded_sesh = self.encode_session(usr_sesh) resp.set_cookie("ARTEMIS_SESH", encoded_sesh) return resp else: return RedirectResponse("/gate/", 303) async def render_GET_rating(self, request: Request) -> bytes: template = self.environment.get_template( "titles/chuni/templates/chuni_rating.jinja" ) usr_sesh = self.validate_session(request) if not usr_sesh: usr_sesh = UserSession() if usr_sesh.user_id > 0: if usr_sesh.chunithm_version < 0: return RedirectResponse("/game/chuni/", 303) profile = await self.data.profile.get_profile_data(usr_sesh.user_id, usr_sesh.chunithm_version) rating = await self.data.profile.get_profile_rating(usr_sesh.user_id, usr_sesh.chunithm_version) hot_list=[] base_list=[] if profile and rating: song_records = [] for song in rating: music_chart = await self.data.static.get_music_chart(usr_sesh.chunithm_version, song.musicId, song.difficultId) if not music_chart: continue rank = calculate_song_rank(song.score, profile.version) rating = calculate_song_rating(song.score, music_chart.level, profile.version) song_rating = int(rating * 10 ** 2) / 10 ** 2 song_records.append({ "difficultId": song.difficultId, "musicId": song.musicId, "title": music_chart.title, "level": music_chart.level, "score": song.score, "type": song.type, "rank": rank, "song_rating": song_rating, }) hot_list = [obj for obj in song_records if obj["type"] == "userRatingBaseHotList"] base_list = [obj for obj in song_records if obj["type"] == "userRatingBaseList"] return Response(template.render( title=f"{self.core_config.server.name} | {self.nav_name}", game_list=self.environment.globals["game_list"], sesh=vars(usr_sesh), profile=profile, hot_list=hot_list, base_list=base_list, ), media_type="text/html; charset=utf-8") else: return RedirectResponse("/gate/", 303) async def render_GET_playlog(self, request: Request) -> bytes: template = self.environment.get_template( "titles/chuni/templates/chuni_playlog.jinja" ) usr_sesh = self.validate_session(request) if not usr_sesh: usr_sesh = UserSession() if usr_sesh.user_id > 0: if usr_sesh.chunithm_version < 0: return RedirectResponse("/game/chuni/", 303) path_index = request.path_params.get('index') if not path_index or int(path_index) < 1: index = 0 else: index = int(path_index) - 1 # 0 and 1 are 1st page user_id = usr_sesh.user_id version = usr_sesh.chunithm_version playlog_count = await self.data.score.get_user_playlogs_count(user_id, version) if playlog_count < index * 20 : return Response(template.render( title=f"{self.core_config.server.name} | {self.nav_name}", game_list=self.environment.globals["game_list"], sesh=vars(usr_sesh), playlog_count=0 ), media_type="text/html; charset=utf-8") playlog = await self.data.score.get_playlogs_limited(user_id, version, index, 20) playlog_with_title = [] for idx,record in enumerate(playlog): music_chart = await self.data.static.get_music_chart(version, record.musicId, record.level) if music_chart: difficultyNum=music_chart.level artist=music_chart.artist title=music_chart.title (jacket, ext) = path.splitext(music_chart.jacketPath) jacket += ".png" else: difficultyNum=0 artist="unknown" title="musicid: " + str(record.musicId) jacket = "unknown.png" # Check if this song is a favorite so we can populate the add/remove button is_favorite = await self.data.item.is_favorite(user_id, version, record.musicId) playlog_with_title.append({ # Values for the actual readable results "raw": record, "title": title, "difficultyNum": difficultyNum, "artist": artist, "jacket": jacket, # Values used solely for favorite updates "idx": idx, "musicId": record.musicId, "isFav": is_favorite }) return Response(template.render( title=f"{self.core_config.server.name} | {self.nav_name}", game_list=self.environment.globals["game_list"], sesh=vars(usr_sesh), user_id=user_id, playlog=playlog_with_title, playlog_count=playlog_count, cur_version_name=ChuniConstants.game_ver_to_string(version) ), media_type="text/html; charset=utf-8") else: return RedirectResponse("/gate/", 303) async def render_GET_favorites(self, request: Request) -> bytes: template = self.environment.get_template( "titles/chuni/templates/chuni_favorites.jinja" ) usr_sesh = self.validate_session(request) if not usr_sesh: usr_sesh = UserSession() if usr_sesh.user_id > 0: if usr_sesh.chunithm_version < 0: return RedirectResponse("/game/chuni/", 303) user_id = usr_sesh.user_id version = usr_sesh.chunithm_version favorites = await self.data.item.get_all_favorites(user_id, version, 1) favorites_count = len(favorites) favorites_with_title = [] favorites_by_genre = dict() for idx,favorite in enumerate(favorites): song = await self.data.static.get_song(favorite.favId) if song: # we likely got multiple results - one for each chart. Just use the first artist=song.artist title=song.title genre=song.genre (jacket, ext) = path.splitext(song.jacketPath) jacket += ".png" else: artist="unknown" title="musicid: " + str(favorite.favId) genre="unknown" jacket = "unknown.png" # add a new collection for the genre if this is our first time seeing it if genre not in favorites_by_genre: favorites_by_genre[genre] = [] # add the song to the appropriate genre collection favorites_by_genre[genre].append({ "idx": idx, "title": title, "artist": artist, "jacket": jacket, "favId": favorite.favId }) # Sort favorites by title before rendering the page for g in favorites_by_genre: favorites_by_genre[g].sort(key=lambda x: x["title"].lower()) return Response(template.render( title=f"{self.core_config.server.name} | {self.nav_name}", game_list=self.environment.globals["game_list"], sesh=vars(usr_sesh), user_id=user_id, favorites_by_genre=favorites_by_genre, favorites_count=favorites_count, cur_version_name=ChuniConstants.game_ver_to_string(version) ), media_type="text/html; charset=utf-8") else: return RedirectResponse("/gate/", 303) async def update_name(self, request: Request) -> bytes: usr_sesh = self.validate_session(request) if not usr_sesh: return RedirectResponse("/gate/", 303) form_data = await request.form() new_name: str = form_data.get("new_name") new_name_full = "" if not new_name: return RedirectResponse("/gate/?e=4", 303) if len(new_name) > 8: return RedirectResponse("/gate/?e=8", 303) for x in new_name: # FIXME: This will let some invalid characters through atm o = ord(x) try: if o == 0x20: new_name_full += chr(0x3000) elif o < 0x7F and o > 0x20: new_name_full += chr(o + 0xFEE0) elif o <= 0x7F: self.logger.warn(f"Invalid ascii character {o:02X}") return RedirectResponse("/gate/?e=4", 303) else: new_name_full += x except Exception as e: self.logger.error(f"Something went wrong parsing character {o:04X} - {e}") return RedirectResponse("/gate/?e=4", 303) if not await self.data.profile.update_name(usr_sesh.user_id, new_name_full): return RedirectResponse("/gate/?e=999", 303) return RedirectResponse("/game/chuni/?s=1", 303) async def update_favorite_music(self, request: Request, retPage: str): usr_sesh = self.validate_session(request) if not usr_sesh: return RedirectResponse(retPage, 303) user_id = usr_sesh.user_id version = usr_sesh.chunithm_version form_data = await request.form() music_id: str = form_data.get("musicId") isAdd: int = int(form_data.get("isAdd")) if isAdd: if await self.data.item.put_favorite_music(user_id, version, music_id) == None: return RedirectResponse("/gate/?e=999", 303) else: if await self.data.item.delete_favorite_music(user_id, version, music_id) == None: return RedirectResponse("/gate/?e=999", 303) return RedirectResponse(retPage, 303) async def update_favorite_music_playlog(self, request: Request): return await self.update_favorite_music(request, "/game/chuni/playlog") async def update_favorite_music_favorites(self, request: Request): return await self.update_favorite_music(request, "/game/chuni/favorites") async def version_change(self, request: Request): usr_sesh = self.validate_session(request) if not usr_sesh: usr_sesh = UserSession() if usr_sesh.user_id > 0: form_data = await request.form() chunithm_version = form_data.get("version") self.logger.debug(f"version change to: {chunithm_version}") if(chunithm_version.isdigit()): usr_sesh.chunithm_version=int(chunithm_version) encoded_sesh = self.encode_session(usr_sesh) self.logger.debug(f"Created session with JWT {encoded_sesh}") resp = RedirectResponse("/game/chuni/", 303) resp.set_cookie("ARTEMIS_SESH", encoded_sesh) return resp else: return RedirectResponse("/gate/", 303)