diff --git a/core/app.py b/core/app.py index f20c041..0a59112 100644 --- a/core/app.py +++ b/core/app.py @@ -76,10 +76,12 @@ if not cfg.billing.standalone: Route("/request/", billing.handle_billing_request, methods=["POST"]), ] -if not cfg.frontend.standalone: +if not cfg.frontend.standalone and cfg.frontend.secret: frontend = FrontendServlet(cfg, cfg_dir) route_lst += frontend.get_routes() else: + if not cfg.frontend.secret: + logger.error("Frontend secret not specified, cannot start frontend!") route_lst.append(Route("/", dummy_rt)) route_lst.append(Route("/robots.txt", FrontendServlet.robots)) diff --git a/core/config.py b/core/config.py index 0a95742..47a3ff3 100644 --- a/core/config.py +++ b/core/config.py @@ -212,7 +212,7 @@ class FrontendConfig: @property def secret(self) -> str: - CoreConfig.get_config_field( + return CoreConfig.get_config_field( self.__config, "core", "frontend", "secret", default="" ) diff --git a/core/data/schema/card.py b/core/data/schema/card.py index 6953b26..011efe1 100644 --- a/core/data/schema/card.py +++ b/core/data/schema/card.py @@ -58,7 +58,7 @@ class CardData(BaseData): """ Given a 20 digit access code as a string, get the user id associated with that card """ - card = self.get_card_by_access_code(access_code) + card = await self.get_card_by_access_code(access_code) if card is None: return None @@ -68,7 +68,7 @@ class CardData(BaseData): """ Given a 20 digit access code as a string, check if the card is banned """ - card = self.get_card_by_access_code(access_code) + card = await self.get_card_by_access_code(access_code) if card is None: return None if card["is_banned"]: @@ -78,7 +78,7 @@ class CardData(BaseData): """ Given a 20 digit access code as a string, check if the card is locked """ - card = self.get_card_by_access_code(access_code) + card = await self.get_card_by_access_code(access_code) if card is None: return None if card["is_locked"]: diff --git a/core/data/schema/user.py b/core/data/schema/user.py index 6b733e2..f97bf3f 100644 --- a/core/data/schema/user.py +++ b/core/data/schema/user.py @@ -64,8 +64,8 @@ class UserData(BaseData): return False return result.fetchone() - def check_password(self, user_id: int, passwd: bytes = None) -> bool: - usr = self.get_user(user_id) + async def check_password(self, user_id: int, passwd: bytes = None) -> bool: + usr = await self.get_user(user_id) if usr is None: return False diff --git a/core/frontend.py b/core/frontend.py index 26796d3..b180c71 100644 --- a/core/frontend.py +++ b/core/frontend.py @@ -1,30 +1,21 @@ import logging, coloredlogs -from typing import Any, Dict, List -from twisted.web import resource -from twisted.web.util import redirectTo +from typing import Any, Dict, List, Union, Optional from starlette.requests import Request -from starlette.routing import Route -from starlette.responses import Response, PlainTextResponse +from starlette.routing import Route, Mount +from starlette.responses import Response, PlainTextResponse, RedirectResponse from logging.handlers import TimedRotatingFileHandler -from twisted.web.server import Session -from zope.interface import Interface, Attribute, implementer -from twisted.python.components import registerAdapter import jinja2 import bcrypt import re +import jwt +from base64 import b64decode from enum import Enum from urllib import parse +from datetime import datetime, timezone from core import CoreConfig, Utils from core.data import Data - -class IUserSession(Interface): - userId = Attribute("User's ID") - current_ip = Attribute("User's current ip address") - permissions = Attribute("User's permission level") - ongeki_version = Attribute("User's selected Ongeki Version") - class PermissionOffset(Enum): USER = 0 # Regular user USERMOD = 1 # Can moderate other users @@ -33,255 +24,399 @@ class PermissionOffset(Enum): # 4 - 6 reserved for future use OWNER = 7 # Can do anything -@implementer(IUserSession) -class UserSession(object): - def __init__(self, session): - self.userId = 0 - self.current_ip = "0.0.0.0" - self.permissions = 0 - self.ongeki_version = 7 +class ShopPermissionOffset(Enum): + VIEW = 0 # View info and cabs + BOOKKEEP = 1 # View bookeeping info + EDITOR = 2 # Can edit name, settings + REGISTRAR = 3 # Can add cabs + # 4 - 6 reserved for future use + OWNER = 7 # Can do anything +class ShopOwner(): + def __init__(self, usr_id: int = 0, usr_name: str = "", perms: int = 0) -> None: + self.user_id = usr_id + self.username = usr_name + self.permissions = perms -class FrontendServlet(resource.Resource): - def getChild(self, name: bytes, request: Request): - self.logger.debug(f"{Utils.get_ip_addr(request)} -> {name.decode()}") - if name == b"": - return self - return resource.Resource.getChild(self, name, request) +class UserSession(): + def __init__(self, usr_id: int = 0, ip: str = "", perms: int = 0, ongeki_ver: int = 7): + self.user_id = usr_id + self.current_ip = ip + self.permissions = perms + self.ongeki_version = ongeki_ver +class FrontendServlet(): def __init__(self, cfg: CoreConfig, config_dir: str) -> None: self.config = cfg log_fmt_str = "[%(asctime)s] Frontend | %(levelname)s | %(message)s" log_fmt = logging.Formatter(log_fmt_str) - self.logger = logging.getLogger("frontend") self.environment = jinja2.Environment(loader=jinja2.FileSystemLoader(".")) - self.game_list: List[Dict[str, str]] = [] - self.children: Dict[str, Any] = {} + self.game_list: Dict[str, Dict[str, Any]] = {} + self.sn_cvt: Dict[str, str] = {} + + self.logger = logging.getLogger("frontend") + if not hasattr(self.logger, "inited"): + fileHandler = TimedRotatingFileHandler( + "{0}/{1}.log".format(self.config.server.log_dir, "frontend"), + when="d", + backupCount=10, + ) + fileHandler.setFormatter(log_fmt) - fileHandler = TimedRotatingFileHandler( - "{0}/{1}.log".format(self.config.server.log_dir, "frontend"), - when="d", - backupCount=10, - ) - fileHandler.setFormatter(log_fmt) + consoleHandler = logging.StreamHandler() + consoleHandler.setFormatter(log_fmt) - consoleHandler = logging.StreamHandler() - consoleHandler.setFormatter(log_fmt) + self.logger.addHandler(fileHandler) + self.logger.addHandler(consoleHandler) - self.logger.addHandler(fileHandler) - self.logger.addHandler(consoleHandler) + self.logger.setLevel(cfg.frontend.loglevel) + coloredlogs.install( + level=cfg.frontend.loglevel, logger=self.logger, fmt=log_fmt_str + ) - self.logger.setLevel(cfg.frontend.loglevel) - coloredlogs.install( - level=cfg.frontend.loglevel, logger=self.logger, fmt=log_fmt_str - ) - registerAdapter(UserSession, Session, IUserSession) - - fe_game = FE_Game(cfg, self.environment) + self.logger.inited = True + games = Utils.get_all_titles() for game_dir, game_mod in games.items(): - if hasattr(game_mod, "frontend"): + if hasattr(game_mod, "frontend") and hasattr(game_mod, "index") and hasattr(game_mod, "game_codes"): try: - game_fe = game_mod.frontend(cfg, self.environment, config_dir) - self.game_list.append({"url": game_dir, "name": game_fe.nav_name}) - fe_game.putChild(game_dir.encode(), game_fe) + if game_mod.index.is_game_enabled(game_mod.game_codes[0], self.config, config_dir): + game_fe = game_mod.frontend(cfg, self.environment, config_dir) + self.game_list[game_fe.nav_name] = {"url": f"/{game_dir}", "class": game_fe } + + if hasattr(game_fe, "SN_PREFIX") and hasattr(game_fe, "NETID_PREFIX"): + if len(game_fe.SN_PREFIX) == len(game_fe.NETID_PREFIX): + for x in range(len(game_fe.SN_PREFIX)): + self.sn_cvt[game_fe.SN_PREFIX[x]] = game_fe.NETID_PREFIX[x] except Exception as e: self.logger.error( f"Failed to import frontend from {game_dir} because {e}" ) - self.environment.globals["game_list"] = self.game_list - self.putChild(b"gate", FE_Gate(cfg, self.environment)) - self.putChild(b"user", FE_User(cfg, self.environment)) - self.putChild(b"sys", FE_System(cfg, self.environment)) - self.putChild(b"arcade", FE_Arcade(cfg, self.environment)) - self.putChild(b"cab", FE_Machine(cfg, self.environment)) - self.putChild(b"game", fe_game) - self.logger.info( - f"Ready on port {self.config.server.port} serving {len(fe_game.children)} games" - ) + self.environment.globals["game_list"] = self.game_list + self.environment.globals["sn_cvt"] = self.sn_cvt + self.base = FE_Base(cfg, self.environment) + self.gate = FE_Gate(cfg, self.environment) + self.user = FE_User(cfg, self.environment) + self.system = FE_System(cfg, self.environment) + self.arcade = FE_Arcade(cfg, self.environment) + self.machine = FE_Machine(cfg, self.environment) - def get_routes(self) -> List[Route]: - return [] + def get_routes(self) -> List[Route]: + g_routes = [] + for nav_name, g_data in self.environment.globals["game_list"].items(): + g_routes.append(Mount(g_data['url'], routes=g_data['class'].get_routes())) + return [ + Route("/", self.base.render_GET, methods=['GET']), + Mount("/user", routes=[ + Route("/", self.user.render_GET, methods=['GET']), + Route("/{user_id:int}", self.user.render_GET, methods=['GET']), + Route("/update.pw", self.user.render_POST, methods=['POST']), + Route("/update.name", self.user.update_username, methods=['POST']), + Route("/edit.card", self.user.edit_card, methods=['POST']), + Route("/add.card", self.user.add_card, methods=['POST']), + Route("/logout", self.user.render_logout, methods=['GET']), + ]), + Mount("/gate", routes=[ + Route("/", self.gate.render_GET, methods=['GET', 'POST']), + Route("/gate.login", self.gate.render_login, methods=['POST']), + Route("/gate.create", self.gate.render_create, methods=['POST']), + Route("/create", self.gate.render_create_get, methods=['GET']), + ]), + Mount("/sys", routes=[ + Route("/", self.system.render_GET, methods=['GET']), + Route("/lookup.user", self.system.lookup_user, methods=['GET']), + Route("/lookup.shop", self.system.lookup_shop, methods=['GET']), + ]), + Mount("/shop", routes=[ + Route("/", self.arcade.render_GET, methods=['GET']), + Route("/{shop_id:int}", self.arcade.render_GET, methods=['GET']), + ]), + Mount("/cab", routes=[ + Route("/", self.machine.render_GET, methods=['GET']), + Route("/{machine_id:int}", self.machine.render_GET, methods=['GET']), + ]), + Mount("/game", routes=g_routes), + Route("/robots.txt", self.robots) + ] + + def startup(self) -> None: + self.config.update({ + "frontend": { + "standalone": True, + "loglevel": CoreConfig.loglevel_to_str(self.config.frontend.loglevel), + "secret": self.config.frontend.secret + } + }) + self.logger.info(f"Serving {len(self.game_list)} games") @classmethod async def robots(cls, request: Request) -> PlainTextResponse: return PlainTextResponse("User-agent: *\nDisallow: /\n\nUser-agent: AdsBot-Google\nDisallow: /") - async def render_GET(self, request): - self.logger.debug(f"{Utils.get_ip_addr(request)} -> {request.uri.decode()}") - template = self.environment.get_template("core/frontend/index.jinja") - return template.render( - server_name=self.config.server.name, - title=self.config.server.name, - game_list=self.game_list, - sesh=vars(IUserSession(request.getSession())), - ).encode("utf-16") - - -class FE_Base(resource.Resource): +class FE_Base(): """ A Generic skeleton class that all frontend handlers should inherit from Initializes the environment, data, logger, config, and sets isLeaf to true It is expected that game implementations of this class overwrite many of these """ - - isLeaf = True - def __init__(self, cfg: CoreConfig, environment: jinja2.Environment) -> None: self.core_config = cfg self.data = Data(cfg) self.logger = logging.getLogger("frontend") self.environment = environment - self.nav_name = "nav_name" + self.nav_name = "index" + + async def render_GET(self, request: Request): + self.logger.debug(f"{Utils.get_ip_addr(request)} -> {request.url}") + template = self.environment.get_template("core/templates/index.jinja") + sesh = self.validate_session(request) + resp = Response(template.render( + server_name=self.core_config.server.name, + title=self.core_config.server.name, + game_list=self.environment.globals["game_list"], + sesh=vars(sesh) if sesh is not None else vars(UserSession()), + )) + + if sesh is None: + resp.delete_cookie("DIANA_SESH") + return resp + + def get_routes(self) -> List[Route]: + return [] + + @classmethod + def test_perm(cls, permission: int, offset: Union[PermissionOffset, ShopPermissionOffset]) -> bool: + logging.getLogger('frontend').debug(f"{permission} vs {1 << offset.value}") + return permission & 1 << offset.value == 1 << offset.value + + @classmethod + def test_perm_minimum(cls, permission: int, offset: Union[PermissionOffset, ShopPermissionOffset]) -> bool: + return permission >= 1 << offset.value + + def decode_session(self, token: str) -> UserSession: + sesh = UserSession() + if not token: return sesh + try: + tk = jwt.decode(token, b64decode(self.core_config.frontend.secret), options={"verify_signature": True}, algorithms=["HS256"]) + sesh.user_id = tk['user_id'] + sesh.current_ip = tk['current_ip'] + sesh.permissions = tk['permissions'] + + if sesh.user_id <= 0: + self.logger.error("User session failed to validate due to an invalid ID!") + return UserSession() + return sesh + except jwt.ExpiredSignatureError: + self.logger.error("User session failed to validate due to an expired signature!") + return sesh + except jwt.InvalidSignatureError: + self.logger.error("User session failed to validate due to an invalid signature!") + return sesh + except jwt.DecodeError as e: + self.logger.error(f"User session failed to decode! {e}") + return sesh + except jwt.InvalidTokenError as e: + self.logger.error(f"User session is invalid! {e}") + return sesh + except KeyError as e: + self.logger.error(f"{e} missing from User session!") + return UserSession() + except Exception as e: + self.logger.error(f"Unknown exception occoured when decoding User session! {e}") + return UserSession() + + def validate_session(self, request: Request) -> Optional[UserSession]: + sesh = request.cookies.get('DIANA_SESH', "") + if not sesh: + return None + + usr_sesh = self.decode_session(sesh) + req_ip = Utils.get_ip_addr(request) + + if usr_sesh.current_ip != req_ip: + self.logger.error(f"User session failed to validate due to mismatched IPs! {usr_sesh.current_ip} -> {req_ip}") + return None + if usr_sesh.permissions <= 0 or usr_sesh.permissions > 255: + self.logger.error(f"User session failed to validate due to an invalid permission value! {usr_sesh.permissions}") + return None + + return usr_sesh + + def encode_session(self, sesh: UserSession, exp_seconds: int = 86400) -> str: + try: + return jwt.encode({ "user_id": sesh.user_id, "current_ip": sesh.current_ip, "permissions": sesh.permissions, "ongeki_version": sesh.ongeki_version, "exp": int(datetime.now(tz=timezone.utc).timestamp()) + exp_seconds }, b64decode(self.core_config.frontend.secret), algorithm="HS256") + except jwt.InvalidKeyError: + self.logger.error("Failed to encode User session because the secret is invalid!") + return "" + except Exception as e: + self.logger.error(f"Unknown exception occoured when encoding User session! {e}") + return "" class FE_Gate(FE_Base): - def render_GET(self, request: Request): - self.logger.debug(f"{Utils.get_ip_addr(request)} -> {request.uri.decode()}") - uri: str = request.uri.decode() + async def render_GET(self, request: Request): + self.logger.debug(f"{Utils.get_ip_addr(request)} -> {request.url.path}") - sesh = request.getSession() - usr_sesh = IUserSession(sesh) - if usr_sesh.userId > 0: - return redirectTo(b"/user", request) - - if uri.startswith("/gate/create"): - return self.create_user(request) - - if b"e" in request.args: + usr_sesh = self.validate_session(request) + if usr_sesh and usr_sesh.user_id > 0: + return RedirectResponse("/user/", 303) + + + if "e" in request.query_params: try: - err = int(request.args[b"e"][0].decode()) + err = int(request.query_params.get("e", ["0"])[0]) except Exception: err = 0 else: err = 0 - template = self.environment.get_template("core/frontend/gate/gate.jinja") - return template.render( + template = self.environment.get_template("core/templates/gate/gate.jinja") + resp = Response(template.render( title=f"{self.core_config.server.name} | Login Gate", error=err, - sesh=vars(usr_sesh), - ).encode("utf-16") - - async def render_POST(self, request: Request): - uri = request.uri.decode() + sesh=vars(UserSession()), + )) + resp.delete_cookie("DIANA_SESH") + return resp + + async def render_login(self, request: Request): ip = Utils.get_ip_addr(request) + frm = await request.form() + access_code: str = frm.get("access_code", None) + if not access_code: + return RedirectResponse("/gate/?e=1", 303) + + passwd: bytes = frm.get("passwd", "").encode() + if passwd == b"": + passwd = None - if uri == "/gate/gate.login": - access_code: str = request.args[b"access_code"][0].decode() - passwd: bytes = request.args[b"passwd"][0] - if passwd == b"": - passwd = None + uid = await self.data.card.get_user_id_from_card(access_code) + if uid is None: + self.logger.debug(f"Failed to find user for card {access_code}") + return RedirectResponse("/gate/?e=1", 303) - uid = await self.data.card.get_user_id_from_card(access_code) - user = await self.data.user.get_user(uid) - if uid is None: - return redirectTo(b"/gate?e=1", request) + user = await self.data.user.get_user(uid) + if user is None: + self.logger.error(f"Failed to load user {uid}") + return RedirectResponse("/gate/?e=1", 303) - if passwd is None: - sesh = await self.data.user.check_password(uid) + if passwd is None: + sesh = await self.data.user.check_password(uid) - if sesh is not None: - return redirectTo( - f"/gate/create?ac={access_code}".encode(), request - ) - return redirectTo(b"/gate?e=1", request) + if sesh is not None: + return RedirectResponse(f"/gate/create?ac={access_code}") + + return RedirectResponse("/gate/?e=1", 303) - if not self.data.user.check_password(uid, passwd): - return redirectTo(b"/gate?e=1", request) + if not await self.data.user.check_password(uid, passwd): + self.logger.debug(f"Failed password for access code {access_code}") + return RedirectResponse("/gate/?e=1", 303) - self.logger.info(f"Successful login of user {uid} at {ip}") + self.logger.info(f"Successful login of user {uid} at {ip}") - sesh = request.getSession() - usr_sesh = IUserSession(sesh) - usr_sesh.userId = uid - usr_sesh.current_ip = ip - usr_sesh.permissions = user['permissions'] + sesh = UserSession() + sesh.user_id = uid + sesh.current_ip = ip + sesh.permissions = user['permissions'] + + usr_sesh = self.encode_session(sesh) + self.logger.debug(f"Created session with JWT {usr_sesh}") + resp = RedirectResponse("/user/", 303) + resp.set_cookie("DIANA_SESH", usr_sesh) - return redirectTo(b"/user", request) + return resp - elif uri == "/gate/gate.create": - access_code: str = request.args[b"access_code"][0].decode() - username: str = request.args[b"username"][0] - email: str = request.args[b"email"][0].decode() - passwd: bytes = request.args[b"passwd"][0] + async def render_create(self, request: Request): + ip = Utils.get_ip_addr(request) + access_code: str = request.query_params.get("access_code", "") + username: str = request.query_params.get("username", "") + email: str = request.query_params.get("email", "") + passwd: bytes = request.query_params.get("passwd", "").encode() - uid = await self.data.card.get_user_id_from_card(access_code) - if uid is None: - return redirectTo(b"/gate?e=1", request) + if not access_code or not username or not email or not passwd: + return RedirectResponse("/gate/?e=1", 303) - salt = bcrypt.gensalt() - hashed = bcrypt.hashpw(passwd, salt) + uid = await self.data.card.get_user_id_from_card(access_code) + if uid is None: + return RedirectResponse("/gate/?e=1", 303) - result = await self.data.user.create_user( - uid, username, email.lower(), hashed.decode(), 1 - ) - if result is None: - return redirectTo(b"/gate?e=3", request) + salt = bcrypt.gensalt() + hashed = bcrypt.hashpw(passwd, salt) - if not self.data.user.check_password(uid, passwd): - return redirectTo(b"/gate", request) + result = await self.data.user.create_user( + uid, username, email.lower(), hashed.decode(), 1 + ) + if result is None: + return RedirectResponse("/gate/?e=3", 303) - return redirectTo(b"/user", request) + if not await self.data.user.check_password(uid, passwd): + return RedirectResponse("/gate/", 303) + + sesh = UserSession() + sesh.user_id = uid + sesh.current_ip = ip + sesh.permissions = 1 + + usr_sesh = self.encode_session(sesh) + self.logger.debug(f"Created session with JWT {usr_sesh}") + resp = RedirectResponse("/user", 303) + resp.set_cookie("DIANA_SESH", usr_sesh) - else: - return b"" + return resp - async def create_user(self, request: Request): - if b"ac" not in request.args or len(request.args[b"ac"][0].decode()) != 20: - return redirectTo(b"/gate?e=2", request) + async def render_create_get(self, request: Request): + ac = request.query_params.get(b"ac", [b""])[0].decode() + if len(ac) != 20: + return RedirectResponse("/gate/?e=2", 303) - ac = request.args[b"ac"][0].decode() card = await self.data.card.get_card_by_access_code(ac) if card is None: - return redirectTo(b"/gate?e=1", request) + return RedirectResponse("/gate/?e=1", 303) user = await self.data.user.get_user(card['user']) if user is None: self.logger.warning(f"Card {ac} exists with no/invalid associated user ID {card['user']}") - return redirectTo(b"/gate?e=0", request) + return RedirectResponse("/gate/?e=0", 303) if user['password'] is not None: - return redirectTo(b"/gate?e=1", request) + return RedirectResponse("/gate/?e=1", 303) - template = self.environment.get_template("core/frontend/gate/create.jinja") - return template.render( + template = self.environment.get_template("core/templates/gate/create.jinja") + return Response(template.render( title=f"{self.core_config.server.name} | Create User", code=ac, - sesh={"userId": 0, "permissions": 0}, - ).encode("utf-16") - + sesh={"user_id": 0, "permissions": 0}, + )) class FE_User(FE_Base): async def render_GET(self, request: Request): - uri = request.uri.decode() - template = self.environment.get_template("core/frontend/user/index.jinja") + uri = request.url.path + user_id = request.path_params.get('user_id', None) + self.logger.debug(f"{Utils.get_ip_addr(request)} -> {uri}") + template = self.environment.get_template("core/templates/user/index.jinja") - sesh: Session = request.getSession() - usr_sesh = IUserSession(sesh) - if usr_sesh.userId == 0: - return redirectTo(b"/gate", request) + usr_sesh = self.validate_session(request) + if not usr_sesh: + return RedirectResponse("/gate/", 303) - m = re.match("\/user\/(\d*)", uri) - - if m is not None: - usrid = m.group(1) - if usr_sesh.permissions < 1 << PermissionOffset.USERMOD.value or not usrid == usr_sesh.userId: - return redirectTo(b"/user", request) + if user_id: + if not self.test_perm(usr_sesh.permissions, PermissionOffset.USERMOD) and user_id != usr_sesh.user_id: + self.logger.warn(f"User {usr_sesh.user_id} does not have permission to view user {user_id}") + return RedirectResponse("/user/", 303) else: - usrid = usr_sesh.userId + user_id = usr_sesh.user_id - user = await self.data.user.get_user(usrid) + user = await self.data.user.get_user(user_id) if user is None: - return redirectTo(b"/user", request) + self.logger.debug(f"User {user_id} not found") + return RedirectResponse("/user/", 303) - cards = await self.data.card.get_user_cards(usrid) - arcades = await self.data.arcade.get_arcades_managed_by_user(usrid) + cards = await self.data.card.get_user_cards(user_id) card_data = [] arcade_data = [] @@ -294,176 +429,285 @@ class FE_User(FE_Base): else: status = 'Active' - card_data.append({'access_code': c['access_code'], 'status': status}) - - for a in arcades: - arcade_data.append({'id': a['id'], 'name': a['name']}) + idm = c['idm'] + ac = c['access_code'] - return template.render( + if ac.startswith("5") or idm is not None: + c_type = "AmusementIC" + elif ac.startswith("3"): + c_type = "Banapass" + elif ac.startswith("010"): + c_type = "Unknown Aime" + desc, _ = self.data.card.get_aime_ac_key_desc(ac) + if desc is not None: + c_type = desc + else: + c_type = "Unknown" + + card_data.append({'access_code': ac, 'status': status, 'chip_id': None if c['chip_id'] is None else f"{c['chip_id']:X}", 'idm': idm, 'type': c_type, "memo": ""}) + + if "e" in request.query_params: + try: + err = int(request.query_params.get("e", 0)) + except Exception: + err = 0 + + else: + err = 0 + + if "s" in request.query_params: + try: + succ = int(request.query_params.get("s", 0)) + except Exception: + succ = 0 + + else: + succ = 0 + + return Response(template.render( title=f"{self.core_config.server.name} | Account", sesh=vars(usr_sesh), - cards=card_data, + cards=card_data, + error=err, + success=succ, username=user['username'], arcades=arcade_data - ).encode("utf-16") + )) + + async def render_logout(self, request: Request): + resp = RedirectResponse("/gate/", 303) + resp.delete_cookie("DIANA_SESH") + return resp + + async def edit_card(self, request: Request) -> RedirectResponse: + return RedirectResponse("/user/", 303) + + async def add_card(self, request: Request) -> RedirectResponse: + return RedirectResponse("/user/", 303) async def render_POST(self, request: Request): - pass + frm = await request.form() + usr_sesh = self.validate_session(request) + if not usr_sesh or not self.test_perm(usr_sesh.permissions, PermissionOffset.USERMOD): + return RedirectResponse("/gate/", 303) + + old_pw: str = frm.get('current_pw', None) + pw1: str = frm.get('password1', None) + pw2: str = frm.get('password2', None) + if old_pw is None or pw1 is None or pw2 is None: + return RedirectResponse("/user/?e=4", 303) + + if pw1 != pw2: + return RedirectResponse("/user/?e=6", 303) + + if not await self.data.user.check_password(usr_sesh.user_id, old_pw.encode()): + return RedirectResponse("/user/?e=5", 303) + + if len(pw1) < 10 or not any(ele.isupper() for ele in pw1) or not any(ele.islower() for ele in pw1) \ + or not any(ele.isdigit() for ele in pw1) or not any(not ele.isalnum() for ele in pw1): + return RedirectResponse("/user/?e=7", 303) + + salt = bcrypt.gensalt() + hashed = bcrypt.hashpw(pw1.encode(), salt) + if not await self.data.user.change_password(usr_sesh.user_id, hashed.decode()): + return RedirectResponse("/gate/?e=1", 303) + + return RedirectResponse("/user/?s=1", 303) + + async def update_username(self, request: Request): + frm = await request.form() + new_name: bytes = frm.get('new_name', "") + usr_sesh = self.validate_session(request) + if not usr_sesh or not self.test_perm(usr_sesh.permissions, PermissionOffset.USERMOD): + return RedirectResponse("/gate/", 303) + + if new_name is None or not new_name: + return RedirectResponse("/user/?e=4", 303) + + if len(new_name) > 10: + return RedirectResponse("/user/?e=8", 303) + + if not await self.data.user.change_username(usr_sesh.user_id, new_name): + return RedirectResponse("/user/?e=8", 303) + + return RedirectResponse("/user/?s=2", 303) class FE_System(FE_Base): async def render_GET(self, request: Request): - uri = request.uri.decode() - template = self.environment.get_template("core/frontend/sys/index.jinja") + template = self.environment.get_template("core/templates/sys/index.jinja") + self.logger.debug(f"{Utils.get_ip_addr(request)} -> {request.url.path}") + + usr_sesh = self.validate_session(request) + if not usr_sesh or not self.test_perm_minimum(usr_sesh.permissions, PermissionOffset.USERMOD): + return RedirectResponse("/gate/", 303) + + return Response(template.render( + title=f"{self.core_config.server.name} | System", + sesh=vars(usr_sesh), + usrlist=[], + )) + + async def lookup_user(self, request: Request): + template = self.environment.get_template("core/templates/sys/index.jinja") usrlist: List[Dict] = [] - aclist: List[Dict] = [] - cablist: List[Dict] = [] - - sesh: Session = request.getSession() - usr_sesh = IUserSession(sesh) - if usr_sesh.userId == 0 or usr_sesh.permissions < 1 << PermissionOffset.USERMOD.value: - return redirectTo(b"/gate", request) + usr_sesh = self.validate_session(request) + if not usr_sesh or not self.test_perm(usr_sesh.permissions, PermissionOffset.USERMOD): + return RedirectResponse("/gate/", 303) - if uri.startswith("/sys/lookup.user?"): - uri_parse = parse.parse_qs(uri.replace("/sys/lookup.user?", "")) # lop off the first bit - uid_search = uri_parse.get("usrId") - email_search = uri_parse.get("usrEmail") - uname_search = uri_parse.get("usrName") + uid_search = request.query_params.get("usrId", None) + email_search = request.query_params.get("usrEmail", None) + uname_search = request.query_params.get("usrName", None) - if uid_search is not None: - u = await self.data.user.get_user(uid_search[0]) - if u is not None: - usrlist.append(u._asdict()) + if uid_search: + u = await self.data.user.get_user(uid_search) + if u is not None: + usrlist.append(u._asdict()) - elif email_search is not None: - u = await self.data.user.find_user_by_email(email_search[0]) - if u is not None: - usrlist.append(u._asdict()) + elif email_search: + u = await self.data.user.find_user_by_email(email_search) + if u is not None: + usrlist.append(u._asdict()) - elif uname_search is not None: - ul = await self.data.user.find_user_by_username(uname_search[0]) - for u in ul: - usrlist.append(u._asdict()) + elif uname_search: + ul = await self.data.user.find_user_by_username(uname_search) + for u in ul: + usrlist.append(u._asdict()) - elif uri.startswith("/sys/lookup.arcade?"): - uri_parse = parse.parse_qs(uri.replace("/sys/lookup.arcade?", "")) # lop off the first bit - ac_id_search = uri_parse.get("arcadeId") - ac_name_search = uri_parse.get("arcadeName") - ac_user_search = uri_parse.get("arcadeUser") - ac_ip_search = uri_parse.get("arcadeIp") - - if ac_id_search is not None: - u = await self.data.arcade.get_arcade(ac_id_search[0]) - if u is not None: - aclist.append(u._asdict()) - - elif ac_name_search is not None: - ul = await self.data.arcade.get_arcade_by_name(ac_name_search[0]) - if ul is not None: - for u in ul: - aclist.append(u._asdict()) - - elif ac_user_search is not None: - ul = await self.data.arcade.get_arcades_managed_by_user(ac_user_search[0]) - if ul is not None: - for u in ul: - aclist.append(u._asdict()) - - elif ac_ip_search is not None: - ul = await self.data.arcade.get_arcades_by_ip(ac_ip_search[0]) - if ul is not None: - for u in ul: - aclist.append(u._asdict()) - - elif uri.startswith("/sys/lookup.cab?"): - uri_parse = parse.parse_qs(uri.replace("/sys/lookup.cab?", "")) # lop off the first bit - cab_id_search = uri_parse.get("cabId") - cab_serial_search = uri_parse.get("cabSerial") - cab_acid_search = uri_parse.get("cabAcId") - - if cab_id_search is not None: - u = await self.data.arcade.get_machine(id=cab_id_search[0]) - if u is not None: - cablist.append(u._asdict()) - - elif cab_serial_search is not None: - u = await self.data.arcade.get_machine(serial=cab_serial_search[0]) - if u is not None: - cablist.append(u._asdict()) - - elif cab_acid_search is not None: - ul = await self.data.arcade.get_arcade_machines(cab_acid_search[0]) - for u in ul: - cablist.append(u._asdict()) - - return template.render( + return Response(template.render( title=f"{self.core_config.server.name} | System", sesh=vars(usr_sesh), usrlist=usrlist, - aclist=aclist, - cablist=cablist, - ).encode("utf-16") - - -class FE_Game(FE_Base): - isLeaf = False - children: Dict[str, Any] = {} - - def getChild(self, name: bytes, request: Request): - if name == b"": - return self - return resource.Resource.getChild(self, name, request) - - async def render_GET(self, request: Request) -> bytes: - return redirectTo(b"/user", request) + shoplist=[], + )) + async def lookup_shop(self, request: Request): + shoplist = [] + template = self.environment.get_template("core/templates/sys/index.jinja") + + usr_sesh = self.validate_session(request) + if not usr_sesh or not self.test_perm(usr_sesh.permissions, PermissionOffset.ACMOD): + return RedirectResponse("/gate/", 303) + + shopid_search = request.query_params.get("shopId", None) + sn_search = request.query_params.get("serialNum", None) + + if shopid_search: + if shopid_search.isdigit(): + shopid_search = int(shopid_search) + try: + sinfo = await self.data.arcade.get_arcade(shopid_search) + except Exception as e: + self.logger.error(f"Failed to fetch shop info for shop {shopid_search} in lookup_shop - {e}") + sinfo = None + if sinfo: + shoplist.append({ + "name": sinfo.name, + "id": sinfo.allnet_id + }) + + else: + return Response(template.render( + title=f"{self.core_config.server.name} | System", + sesh=vars(usr_sesh), + usrlist=[], + shoplist=shoplist, + error=4 + )) + + if sn_search: + sn_search = sn_search.upper().replace("-", "").strip() + if sn_search.isdigit() and len(sn_search) == 12: + prefix = sn_search[:4] + suffix = sn_search[5:] + + netid_prefix = self.environment.globals["sn_cvt"].get(prefix, "") + sn_search = netid_prefix + suffix + + if re.match("^AB[D|G|L]N\d{7}$", sn_search): + cabinfo = await self.data.arcade.get_machine(sn_search) + if cabinfo is None: sinfo = None + else: + sinfo = await self.data.arcade.get_arcade(cabinfo['arcade']) + if sinfo: + shoplist.append({ + "name": sinfo['name'], + "id": sinfo['id'] + }) + + else: + return Response(template.render( + title=f"{self.core_config.server.name} | System", + sesh=vars(usr_sesh), + usrlist=[], + shoplist=shoplist, + error=10 + )) + + + return Response(template.render( + title=f"{self.core_config.server.name} | System", + sesh=vars(usr_sesh), + usrlist=[], + shoplist=shoplist, + )) class FE_Arcade(FE_Base): async def render_GET(self, request: Request): - uri = request.uri.decode() - template = self.environment.get_template("core/frontend/arcade/index.jinja") - managed = [] + template = self.environment.get_template("core/templates/arcade/index.jinja") + shop_id = request.path_params.get('shop_id', None) - sesh: Session = request.getSession() - usr_sesh = IUserSession(sesh) - if usr_sesh.userId == 0: - return redirectTo(b"/gate", request) + usr_sesh = self.validate_session(request) + if not usr_sesh or not self.test_perm(usr_sesh.permissions, PermissionOffset.ACMOD): + self.logger.warn(f"User {usr_sesh.user_id} does not have permission to view shops!") + return RedirectResponse("/gate/", 303) - m = re.match("\/arcade\/(\d*)", uri) + if not shop_id: + return Response(template.render( + title=f"{self.core_config.server.name} | Arcade", + sesh=vars(usr_sesh), + )) - if m is not None: - arcadeid = m.group(1) - perms = await self.data.arcade.get_manager_permissions(usr_sesh.userId, arcadeid) - arcade = await self.data.arcade.get_arcade(arcadeid) - - if perms is None: - perms = 0 + try: + sinfo = await self.data.arcade.get_arcade(shop_id) + except Exception as e: + self.logger.error(f"Failed to fetch shop info for shop {shop_id} in render_GET - {e}") + sinfo = None + if not sinfo: + return Response(template.render( + title=f"{self.core_config.server.name} | Arcade", + sesh=vars(usr_sesh), + )) - else: - return redirectTo(b"/user", request) - - return template.render( + return Response(template.render( title=f"{self.core_config.server.name} | Arcade", sesh=vars(usr_sesh), - error=0, - perms=perms, - arcade=arcade._asdict() - ).encode("utf-16") - + arcade={ + "name": sinfo['name'], + "id": sinfo['id'], + "cabs": [] + } + + )) class FE_Machine(FE_Base): async def render_GET(self, request: Request): - uri = request.uri.decode() - template = self.environment.get_template("core/frontend/machine/index.jinja") + template = self.environment.get_template("core/templates/machine/index.jinja") + cab_id = request.path_params.get('cab_id', None) - sesh: Session = request.getSession() - usr_sesh = IUserSession(sesh) - if usr_sesh.userId == 0: - return redirectTo(b"/gate", request) + usr_sesh = self.validate_session(request) + if not usr_sesh or not self.test_perm(usr_sesh.permissions, PermissionOffset.ACMOD): + self.logger.warn(f"User {usr_sesh.user_id} does not have permission to view shops!") + return RedirectResponse("/gate/", 303) - return template.render( + if not cab_id: + return Response(template.render( + title=f"{self.core_config.server.name} | Machine", + sesh=vars(usr_sesh), + )) + + return Response(template.render( title=f"{self.core_config.server.name} | Machine", - sesh=vars(usr_sesh), - arcade={}, - error=0, - ).encode("utf-16") \ No newline at end of file + sesh=vars(usr_sesh), + arcade={} + )) \ No newline at end of file diff --git a/core/frontend/arcade/index.jinja b/core/frontend/arcade/index.jinja deleted file mode 100644 index 20a1f46..0000000 --- a/core/frontend/arcade/index.jinja +++ /dev/null @@ -1,4 +0,0 @@ -{% extends "core/frontend/index.jinja" %} -{% block content %} -
{{ usr.id }} | {{ usr.username if usr.username != None else "No Name Set"}}- {% endfor %} -
{{ ac.id }} | {{ ac.name if ac.name != None else "No Name Set" }} | {{ ac.ip if ac.ip != None else "No IP Assigned"}}- {% endfor %} -
{{ cab.id }} | {{ cab.game if cab.game != None else "ANY " }} | {{ cab.serial }}- {% endfor %} -
Serial | Game | Last Seen |
---|---|---|
{{ m.serial }} | {{ m.game }} | {{ m.last_seen }} |