artemis/titles/sao/frontend.py

264 lines
11 KiB
Python

import yaml
import jinja2
from typing import List
from starlette.requests import Request
from starlette.responses import Response, RedirectResponse, JSONResponse, PlainTextResponse
from starlette.routing import Route
from os import path
import random
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA1
from Crypto.Cipher import AES, _mode_cbc
from core.frontend import FE_Base, UserSession
from core.config import CoreConfig
from .database import SaoData
from .config import SaoConfig
from .const import SaoConstants
class SaoFrontend(FE_Base):
SN_PREFIX = SaoConstants.SERIAL_IDENT
NETID_PREFIX = SaoConstants.NETID_PREFIX
ALL_HEROS = []
def __init__(
self, cfg: CoreConfig, environment: jinja2.Environment, cfg_dir: str
) -> None:
super().__init__(cfg, environment)
self.data = SaoData(cfg)
self.game_cfg = SaoConfig()
if path.exists(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"):
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{SaoConstants.CONFIG_NAME}"))
)
self.nav_name = "SAO"
self.card_key= None
self.card_iv = None
if self.game_cfg.card.enable and self.game_cfg.card.crypt_password and self.game_cfg.card.crypt_salt:
hash = PBKDF2(
self.game_cfg.card.crypt_password,
bytes.fromhex(self.game_cfg.card.crypt_salt),
48,
count=1000,
hmac_hash_module=SHA1,
)
self.card_key = hash[:32]
self.card_iv = hash[32:48]
def get_routes(self) -> List[Route]:
return [
Route("/", self.render_GET, methods=['GET']),
Route("/update.name", self.change_name, methods=['POST']),
Route("/matching.auth", self.matching_auth, methods=['POST']),
Route("/matching.auth/", self.matching_auth, methods=['POST']),
Route("/qr.read", self.read_qr, methods=['POST']),
Route("/qr.register", self.reg_qr, methods=['POST']),
Route("/profile.register", self.reg_profile, methods=['POST'])
]
async def render_GET(self, request: Request) -> Response:
template = self.environment.get_template(
"titles/sao/templates/sao_index.jinja"
)
pf = None
usr_sesh = self.validate_session(request)
if not usr_sesh:
usr_sesh = UserSession()
else:
profile = await self.data.profile.get_profile(usr_sesh.user_id)
if profile is not None:
pf = profile._asdict()
if not self.ALL_HEROS:
self.ALL_HEROS = await self.data.static.get_heros()
err = 0
suc = 0
if "e" in request.query_params:
err = request.query_params.get("e", 0)
if "s" in request.query_params:
suc = request.query_params.get("s", 0)
return Response(template.render(
title=f"{self.core_config.server.name} | {self.nav_name}",
game_list=self.environment.globals["game_list"],
sesh=vars(usr_sesh),
profile=pf,
success=int(suc),
error=int(err),
all_heros=self.ALL_HEROS if self.ALL_HEROS else []
), media_type="text/html; charset=utf-8")
async def change_name(self, request: Request) -> RedirectResponse:
usr_sesh = self.validate_session(request)
if not usr_sesh:
return RedirectResponse("/game/sao/", 303)
frm = await request.form()
new_name = frm.get("new_name")
if len(new_name) > 16:
return RedirectResponse("/game/sao/?e=8", 303)
await self.data.profile.set_profile_name(usr_sesh.user_id, new_name)
self.logger.info(f"User {usr_sesh.user_id} changed name to {new_name}")
return RedirectResponse("/game/sao/?s=1", 303)
async def matching_auth(self, request: Request) -> JSONResponse:
self.logger.debug(f"Mathing auth params: {request.query_params}")
self.logger.debug(f"Mathing auth headers: {request.headers}")
uid = request.query_params.get('userId', '')
if not uid:
uid = f'Guest{str(random.randint(1,9999)).zfill(4)}'
self.logger.info(f"Matching auth request with no userId, using {uid}")
else:
self.logger.info(f"Matching auth request for userId {uid}")
return JSONResponse({ "ResultCode": 1, "UserId": uid }) # Just auth everything for now
async def read_qr(self, request: Request) -> PlainTextResponse:
if not self.card_key or not self.card_iv:
return PlainTextResponse("e13-1", 400)
usr_sesh = self.validate_session(request)
if not usr_sesh:
return PlainTextResponse("e9", 403)
frm = await request.form()
qr_data = frm.get("qr_data", "")
if not qr_data.isalnum() or not len(qr_data) == 0x40:
return PlainTextResponse("e14-1", 400)
try:
cipher: _mode_cbc.CbcMode = AES.new(self.card_key, AES.MODE_CBC, iv=self.card_iv)
except Exception as e:
self.logger.error(f"Error creating card cipher - {e}")
return PlainTextResponse("e13-2", 500)
sn = b""
try:
sn = cipher.decrypt(bytes.fromhex(qr_data))[:19]
sn = sn.decode()
except Exception as e:
self.logger.error(f"Error decrypting card data {qr_data} ({sn}) - {e}")
return PlainTextResponse("e14-2", 400)
if not sn.isnumeric():
self.logger.error(f"Card serial {sn} decrypted incorrectly")
return PlainTextResponse("e13-3", 400)
return PlainTextResponse(sn)
async def reg_qr(self, request: Request) -> RedirectResponse:
if not self.card_key or not self.card_iv:
return RedirectResponse("/game/sao/?e=14", 303)
usr_sesh = self.validate_session(request)
if not usr_sesh:
return RedirectResponse("/game/sao/?e=9", 303)
frm = await request.form()
serial = frm.get("qr_register_serial")
hero = frm.get("qr_register_hero")
is_holo = bool(frm.get("qr_register_holo", False))
user_hero = await self.data.item.get_hero_log(usr_sesh.user_id, hero)
if not user_hero:
hero_statc = await self.data.static.get_hero_by_id(hero)
if not hero_statc:
self.logger.error(f"Failed to find hero log {hero}! Please run the reader")
return RedirectResponse(" /game/sao/?e=13", 303)
skills = await self.data.static.get_skill_table_by_subid(hero_statc['SkillTableSubId'])
if not skills:
self.logger.error(f"Failed to find skill table {hero_statc['SkillTableSubId']}! Please run the reader")
return RedirectResponse("/game/sao/?e=13", 303)
default_skills = []
now_have_skills = [None, None, None, None, None]
x = 0
for skill in skills:
if skill['LevelObtained'] == 1 and skill['AwakeningId'] == 0:
default_skills[x] = skill['SkillId']
x += 1
if x >= 5:
break
for skill in default_skills:
skill_info = await self.data.static.get_skill_by_id(skill)
skill_slot = skill_info['Level'] - 1
if now_have_skills[skill_slot] is not None:
now_have_skills[skill]
user_hero_id = await self.data.item.put_hero_log(
usr_sesh.user_id,
hero,
1,
0,
hero_statc['DefaultEquipmentId1'],
hero_statc['DefaultEquipmentId2'],
now_have_skills[0],
now_have_skills[1],
now_have_skills[2],
now_have_skills[3],
now_have_skills[4]
)
if not user_hero_id:
self.logger.error(f"Failed to give user {usr_sesh.user_id} hero {hero}!")
return RedirectResponse("/game/sao/?e=99", 303)
else:
user_hero_id = user_hero['id']
card_id = await self.data.profile.put_hero_card(usr_sesh.user_id, serial, user_hero_id, is_holo)
if not card_id:
self.logger.error(f"Failed to give user {usr_sesh.user_id} hero card {hero}!")
return RedirectResponse("/game/sao/?e=99", 303)
self.logger.info(f"User {usr_sesh.user_id} added hero {hero} as card with id {card_id}")
return RedirectResponse("/game/sao/?s=1", 303)
async def reg_profile(self, request: Request) -> RedirectResponse:
usr_sesh = self.validate_session(request)
if not usr_sesh:
return RedirectResponse("/game/sao/?e=9", 303)
frm = await request.form()
name = frm.get("sao_register_username")
if len(name) > 16:
return RedirectResponse("/game/sao/?e=8", 303)
profile_id = await self.data.profile.create_profile(usr_sesh.user_id)
if not profile_id:
self.logger.error(f"Failed to web register User {usr_sesh.user_id} with name {name}")
return RedirectResponse("/game/sao/?e=99", 303)
await self.data.profile.set_profile_name(usr_sesh.user_id, name)
equip1 = await self.data.item.put_equipment(usr_sesh.user_id, 101000000)
equip2 = await self.data.item.put_equipment(usr_sesh.user_id, 102000000)
equip3 = await self.data.item.put_equipment(usr_sesh.user_id, 109000000)
if not equip1 or not equip2 or not equip3:
self.logger.error(f"Failed to create profile for user {usr_sesh.user_id} from (could not add equipment)")
return RedirectResponse("/game/sao/?e=98", 303)
hero1 = await self.data.item.put_hero_log(usr_sesh.user_id, 101000010, 1, 0, equip1, None, 1002, 1003, 1014, None, None)
hero2 = await self.data.item.put_hero_log(usr_sesh.user_id, 102000010, 1, 0, equip2, None, 3001, 3002, 3004, None, None)
hero3 = await self.data.item.put_hero_log(usr_sesh.user_id, 105000010, 1, 0, equip3, None, 10005, 10002, 10004, None, None)
if not hero1 or not hero2 or not hero3:
self.logger.error(f"Failed to create profile for user {usr_sesh.user_id} (could not add heros)")
return RedirectResponse("/game/sao/?e=97", 303)
await self.data.item.put_hero_party(usr_sesh.user_id, 0, hero1, hero2, hero3)
# await self.data.item.put_player_quest(usr_sesh.user_id, 1001, True, 300, 0, 0, 1)
# Force the tutorial stage to be completed due to potential crash in-game
self.logger.info(f"Web registered User {usr_sesh.user_id} profile {profile_id} with name {name}")
return RedirectResponse("/game/sao/?s=1", 303)