forked from Hay1tsme/artemis
Merge pull request 'develop' (#16) from develop into prism_plus_support
Reviewed-on: SoulGateKey/artemis#16
This commit is contained in:
@ -805,7 +805,8 @@ class BillingServlet:
|
||||
)
|
||||
|
||||
if req.traceleft > 0:
|
||||
self.logger.warning(f"{req.traceleft} unsent tracelogs")
|
||||
self.logger.info(f"Requesting 20 more of {req.traceleft} unsent tracelogs")
|
||||
return PlainTextResponse("result=6&waittime=0&linelimit=20\r\n")
|
||||
|
||||
playlimit = req.playlimit
|
||||
while req.playcnt > playlimit:
|
||||
@ -825,9 +826,6 @@ class BillingServlet:
|
||||
resp_str = urllib.parse.unquote(urllib.parse.urlencode(vars(resp))) + "\r\n"
|
||||
|
||||
self.logger.debug(f"response {vars(resp)}")
|
||||
if req.traceleft > 0: # TODO: should probably move this up so we don't do a ton of work that doesn't get used
|
||||
self.logger.info(f"Requesting 20 more of {req.traceleft} unsent tracelogs")
|
||||
return PlainTextResponse("result=6&waittime=0&linelimit=20\r\n")
|
||||
|
||||
return PlainTextResponse(resp_str)
|
||||
|
||||
|
@ -124,3 +124,15 @@ class UserData(BaseData):
|
||||
async def get_user_by_username(self, username: str) -> Optional[Row]:
|
||||
result = await self.execute(aime_user.select(aime_user.c.username == username))
|
||||
if result: return result.fetchone()
|
||||
|
||||
async def change_permission(self, user_id: int, new_perms: int) -> Optional[bool]:
|
||||
sql = aime_user.update(aime_user.c.id == user_id).values(permissions = new_perms)
|
||||
|
||||
result = await self.execute(sql)
|
||||
return result is not None
|
||||
|
||||
async def change_email(self, user_id: int, new_email: int) -> Optional[bool]:
|
||||
sql = aime_user.update(aime_user.c.id == user_id).values(email = new_email)
|
||||
|
||||
result = await self.execute(sql)
|
||||
return result is not None
|
||||
|
@ -149,41 +149,3 @@ class TitleServlet:
|
||||
self.logger.info(
|
||||
f"Serving {len(self.title_registry)} game codes {'on port ' + str(core_cfg.server.port) if core_cfg.server.port > 0 else ''}"
|
||||
)
|
||||
|
||||
def render_GET(self, request: Request, endpoints: dict) -> bytes:
|
||||
code = endpoints["title"]
|
||||
subaction = endpoints['subaction']
|
||||
|
||||
if code not in self.title_registry:
|
||||
self.logger.warning(f"Unknown game code {code}")
|
||||
request.setResponseCode(404)
|
||||
return b""
|
||||
|
||||
index = self.title_registry[code]
|
||||
handler = getattr(index, f"{subaction}", None)
|
||||
if handler is None:
|
||||
self.logger.error(f"{code} does not have handler for GET subaction {subaction}")
|
||||
request.setResponseCode(500)
|
||||
return b""
|
||||
|
||||
return handler(request, code, endpoints)
|
||||
|
||||
def render_POST(self, request: Request, endpoints: dict) -> bytes:
|
||||
code = endpoints["title"]
|
||||
subaction = endpoints['subaction']
|
||||
|
||||
if code not in self.title_registry:
|
||||
self.logger.warning(f"Unknown game code {code}")
|
||||
request.setResponseCode(404)
|
||||
return b""
|
||||
|
||||
index = self.title_registry[code]
|
||||
handler = getattr(index, f"{subaction}", None)
|
||||
if handler is None:
|
||||
self.logger.error(f"{code} does not have handler for POST subaction {subaction}")
|
||||
request.setResponseCode(500)
|
||||
return b""
|
||||
|
||||
endpoints.pop("title")
|
||||
endpoints.pop("subaction")
|
||||
return handler(request, code, endpoints)
|
||||
|
@ -100,7 +100,7 @@ class DivaServlet(BaseServlet):
|
||||
|
||||
try:
|
||||
handler = getattr(self.base, f"handle_{bin_req_data['cmd']}_request")
|
||||
resp = handler(bin_req_data)
|
||||
resp = await handler(bin_req_data)
|
||||
|
||||
except AttributeError as e:
|
||||
self.logger.warning(f"Unhandled {bin_req_data['cmd']} request {e}")
|
||||
|
@ -162,8 +162,8 @@ class IDACServlet(BaseServlet):
|
||||
resp = {
|
||||
"status_code": "0",
|
||||
# Only IPv4 is supported
|
||||
"host": self.game_config.server.matching_host,
|
||||
"port": self.game_config.server.matching_p2p,
|
||||
"host": self.game_cfg.server.matching_host,
|
||||
"port": self.game_cfg.server.matching_p2p,
|
||||
"room_name": "INDTA",
|
||||
"state": 1,
|
||||
}
|
||||
|
@ -152,7 +152,7 @@ class SaoServlet(BaseServlet):
|
||||
|
||||
else:
|
||||
self.logger.error(f"Unknown response type {type(resp)}")
|
||||
return SaoNoopResponse(req_header.cmd + 1).make()
|
||||
return Response(SaoNoopResponse(req_header.cmd + 1).make())
|
||||
|
||||
self.logger.debug(f"Response: {resp.hex()}")
|
||||
|
||||
|
295
tui.py
295
tui.py
@ -77,10 +77,10 @@ class State:
|
||||
return self.id if self.id else 0
|
||||
|
||||
def __init__(self):
|
||||
self.selected_user: self.SelectedUser = self.SelectedUser()
|
||||
self.selected_card: self.SelectedCard = self.SelectedCard()
|
||||
self.selected_arcade: self.SelectedArcade = self.SelectedArcade()
|
||||
self.selected_machine: self.SelectedMachine = self.SelectedMachine()
|
||||
self.selected_user = self.SelectedUser()
|
||||
self.selected_card = self.SelectedCard()
|
||||
self.selected_arcade = self.SelectedArcade()
|
||||
self.selected_machine = self.SelectedMachine()
|
||||
self.last_err: str = ""
|
||||
self.search_results: List[Row] = []
|
||||
self.search_type: str = ""
|
||||
@ -89,6 +89,7 @@ class State:
|
||||
self.selected_user = self.SelectedUser(id, username)
|
||||
|
||||
def clear_user(self) -> None:
|
||||
print(self.selected_user)
|
||||
self.selected_user = self.SelectedUser()
|
||||
|
||||
def set_card(self, id: int, access_code: Optional[str]) -> None:
|
||||
@ -163,9 +164,9 @@ class MainView(Frame):
|
||||
def _quit():
|
||||
raise StopApplication("User pressed quit")
|
||||
|
||||
class ManageUser(Frame):
|
||||
class ManageUserView(Frame):
|
||||
def __init__(self, screen: Screen):
|
||||
super(ManageUser, self).__init__(
|
||||
super(ManageUserView, self).__init__(
|
||||
screen,
|
||||
screen.height * 2 // 3,
|
||||
screen.width * 2 // 3,
|
||||
@ -250,9 +251,9 @@ class ManageUser(Frame):
|
||||
self.save()
|
||||
raise NextScene("Main")
|
||||
|
||||
class ManageCard(Frame):
|
||||
class ManageCardView(Frame):
|
||||
def __init__(self, screen: Screen):
|
||||
super(ManageCard, self).__init__(
|
||||
super(ManageCardView, self).__init__(
|
||||
screen,
|
||||
screen.height * 2 // 3,
|
||||
screen.width * 2 // 3,
|
||||
@ -416,6 +417,19 @@ class SearchResultsView(Frame):
|
||||
|
||||
opts.append((f"{usr['id']:05d} | {name} | {usr['permissions']:08b} | {usr['email']}", state.SelectedUser(usr["id"], str(usr['username']))))
|
||||
|
||||
elif state.search_type == "arcade":
|
||||
layout.add_widget(Label(" ID | Name | Country | # Machines "))
|
||||
layout.add_widget(Divider())
|
||||
|
||||
for ac in state.search_results:
|
||||
name = str(ac['name'])
|
||||
if len(name) < 8:
|
||||
name = str(ac['name']) + ' ' * (8 - len(name))
|
||||
elif len(name) > 8:
|
||||
name = ac['name'][:5] + "..."
|
||||
|
||||
opts.append((f"{ac['id']:04X} | {name} | {ac['country']} | {usr['mech_ct']}", state.SelectedArcade(ac["id"], ac['country'], str(ac['name']))))
|
||||
|
||||
layout.add_widget(RadioButtons(opts, "", "selopt"))
|
||||
|
||||
self.fix()
|
||||
@ -423,13 +437,26 @@ class SearchResultsView(Frame):
|
||||
def _select_current(self):
|
||||
self.save()
|
||||
a = self.data.get('selopt')
|
||||
if state.search_type == "user":
|
||||
state.set_user(a.id, a.name)
|
||||
raise NextScene("User Management")
|
||||
|
||||
elif state.search_type == "arcade":
|
||||
state.set_arcade(a.id, a.country, a.name)
|
||||
raise NextScene("Arcade Management")
|
||||
|
||||
def _cancel(self):
|
||||
state.clear_last_err()
|
||||
if state.search_type == "user":
|
||||
raise NextScene("User Management")
|
||||
|
||||
elif state.search_type == "arcade":
|
||||
raise NextScene("Arcade Management")
|
||||
|
||||
def _back(self):
|
||||
self.save()
|
||||
raise NextScene("Main")
|
||||
|
||||
class LookupUserView(Frame):
|
||||
def __init__(self, screen):
|
||||
super(LookupUserView, self).__init__(
|
||||
@ -528,14 +555,262 @@ class LookupUserView(Frame):
|
||||
self.find_widget('status').value = state.last_err
|
||||
raise NextScene("User Management")
|
||||
|
||||
class EditUserView(Frame):
|
||||
def __init__(self, screen):
|
||||
super(EditUserView, self).__init__(
|
||||
screen,
|
||||
screen.height * 2 // 3,
|
||||
screen.width * 2 // 3,
|
||||
hover_focus=True,
|
||||
can_scroll=False,
|
||||
title="Edit User",
|
||||
on_load=self._redraw
|
||||
)
|
||||
|
||||
layout = Layout([100], fill_frame=True)
|
||||
self.add_layout(layout)
|
||||
layout.add_widget(Text("Username:", "username"))
|
||||
layout.add_widget(Text("Email:", "email"))
|
||||
layout.add_widget(Text("Password:", "passwd"))
|
||||
layout.add_widget(RadioButtons([
|
||||
("User", "1"),
|
||||
("User Manager", "2"),
|
||||
("Arcde Manager", "4"),
|
||||
("Sysadmin", "8"),
|
||||
("Owner", "255"),
|
||||
], "Role:", "role"))
|
||||
|
||||
layout3 = Layout([100])
|
||||
self.add_layout(layout3)
|
||||
layout3.add_widget(Text("", f"status", readonly=True, disabled=True))
|
||||
|
||||
layout2 = Layout([1, 1, 1, 1])
|
||||
self.add_layout(layout2)
|
||||
layout2.add_widget(Button("Save", self._ok), 0)
|
||||
layout2.add_widget(Button("Cancel", self._cancel), 3)
|
||||
|
||||
self.fix()
|
||||
|
||||
def _redraw(self):
|
||||
uinfo = loop.run_until_complete(data.user.get_user(state.selected_user.id))
|
||||
self.find_widget('username').value = uinfo['username']
|
||||
self.find_widget('email').value = uinfo['email']
|
||||
self.find_widget('role').value = str(uinfo['permissions'])
|
||||
|
||||
def _ok(self):
|
||||
self.save()
|
||||
if not self.data.get("username"):
|
||||
state.set_last_err("Username cannot be blank")
|
||||
self.find_widget('status').value = state.last_err
|
||||
self.screen.reset()
|
||||
return
|
||||
|
||||
state.clear_last_err()
|
||||
self.find_widget('status').value = state.last_err
|
||||
|
||||
pw = self.data.get("passwd")
|
||||
hash = bcrypt.hashpw(pw.encode(), bcrypt.gensalt())
|
||||
|
||||
is_good = loop.run_until_complete(self._update_user_async(self.data.get("username"), hash.decode(), self.data.get("email"), self.data.get('role')))
|
||||
|
||||
self.find_widget('status').value = "User Updated" if is_good else "User update failed"
|
||||
|
||||
raise NextScene("User Management")
|
||||
|
||||
async def _update_user_async(self, username: Optional[str], password: Optional[str], email: Optional[str], role: Optional[str]) -> bool:
|
||||
if username: namechange_ok = await data.user.change_username(state.selected_user.id, username)
|
||||
else: namechange_ok = True
|
||||
|
||||
if password: pw_ok = await data.user.change_password(state.selected_user.id, password)
|
||||
else: pw_ok = True
|
||||
|
||||
if role: role_ok = await data.user.change_permission(state.selected_user.id, role)
|
||||
else: role_ok = True
|
||||
|
||||
if email: email_ok = await data.user.change_email(state.selected_user.id, email)
|
||||
else: email_ok = True
|
||||
|
||||
state.set_user(state.selected_user.id, username if username and namechange_ok else state.selected_user.name)
|
||||
return namechange_ok and pw_ok and role_ok and email_ok
|
||||
|
||||
def _cancel(self):
|
||||
state.clear_last_err()
|
||||
self.find_widget('status').value = state.last_err
|
||||
raise NextScene("User Management")
|
||||
|
||||
class ManageArcadeView(Frame):
|
||||
def __init__(self, screen: Screen):
|
||||
super(ManageArcadeView, self).__init__(
|
||||
screen,
|
||||
screen.height * 2 // 3,
|
||||
screen.width * 2 // 3,
|
||||
hover_focus=True,
|
||||
can_scroll=False,
|
||||
title="Arcade Management",
|
||||
on_load=self._redraw
|
||||
)
|
||||
|
||||
layout = Layout([3])
|
||||
self.add_layout(layout)
|
||||
layout.add_widget(Button("Create Arcade", self._create_arcade))
|
||||
layout.add_widget(Button("Lookup Arcade", self._lookup))
|
||||
|
||||
def _redraw(self):
|
||||
self._layouts = [self._layouts[0]]
|
||||
|
||||
layout = Layout([3])
|
||||
self.add_layout(layout)
|
||||
layout.add_widget(Button("Edit Arcade", self._edit_arcade, disabled=state.selected_arcade.id == 0 or state.selected_arcade.id is None))
|
||||
layout.add_widget(Button("Delete Arcade", self._del_arcade, disabled=state.selected_arcade.id == 0 or state.selected_arcade.id is None))
|
||||
layout.add_widget((Divider()))
|
||||
|
||||
layout2 = Layout([1, 1, 1])
|
||||
self.add_layout(layout2)
|
||||
a = Text("", f"status", readonly=True, disabled=True)
|
||||
a.value = f"Selected Arcade: {state.selected_arcade}"
|
||||
layout2.add_widget(a)
|
||||
layout2.add_widget(Button("Back", self._back), 2)
|
||||
|
||||
self.fix()
|
||||
|
||||
def _create_arcade(self):
|
||||
self.save()
|
||||
raise NextScene("Create Arcade")
|
||||
|
||||
def _lookup(self):
|
||||
self.save()
|
||||
raise NextScene("Lookup Arcade")
|
||||
|
||||
def _edit_arcade(self):
|
||||
self.save()
|
||||
raise NextScene("Edit Arcade")
|
||||
|
||||
def _del_arcade(self):
|
||||
self.save()
|
||||
raise NextScene("Delete Arcade")
|
||||
|
||||
def _back(self):
|
||||
self.save()
|
||||
raise NextScene("Main")
|
||||
|
||||
class LookupArcadeView(Frame):
|
||||
def __init__(self, screen):
|
||||
super(LookupArcadeView, self).__init__(
|
||||
screen,
|
||||
screen.height * 2 // 3,
|
||||
screen.width * 2 // 3,
|
||||
hover_focus=True,
|
||||
can_scroll=False,
|
||||
title="Lookup Arcade"
|
||||
)
|
||||
|
||||
layout = Layout([1, 1], fill_frame=True)
|
||||
self.add_layout(layout)
|
||||
layout.add_widget(RadioButtons([
|
||||
("Name", "1"),
|
||||
("Serial", "2"),
|
||||
("Place ID", "3"),
|
||||
("Arcade ID", "4"),
|
||||
], "Search By:", "search_type"))
|
||||
layout.add_widget(Text("Search:", "search_str"), 1)
|
||||
|
||||
layout3 = Layout([100])
|
||||
self.add_layout(layout3)
|
||||
layout3.add_widget(Text("", f"status", readonly=True, disabled=True))
|
||||
|
||||
layout2 = Layout([1, 1, 1, 1])
|
||||
self.add_layout(layout2)
|
||||
layout2.add_widget(Button("Search", self._lookup), 0)
|
||||
layout2.add_widget(Button("Cancel", self._cancel), 3)
|
||||
|
||||
self.fix()
|
||||
|
||||
def _lookup(self):
|
||||
self.save()
|
||||
if not self.data.get("search_str"):
|
||||
state.set_last_err("Search cannot be blank")
|
||||
self.find_widget('status').value = state.last_err
|
||||
self.screen.reset()
|
||||
return
|
||||
|
||||
state.clear_last_err()
|
||||
self.find_widget('status').value = state.last_err
|
||||
|
||||
search_type = self.data.get("search_type")
|
||||
if search_type == "1":
|
||||
loop.run_until_complete(self._lookup_arcade_by_name(self.data.get("search_str")))
|
||||
elif search_type == "2":
|
||||
loop.run_until_complete(self._lookup_arcade_by_serial(self.data.get("search_str")))
|
||||
elif search_type == "3":
|
||||
real_id = int(self.data.get("search_str"), 16)
|
||||
loop.run_until_complete(self._lookup_arcade_by_id(real_id))
|
||||
elif search_type == "4":
|
||||
loop.run_until_complete(self._lookup_arcade_by_id(self.data.get("search_str")))
|
||||
else:
|
||||
state.set_last_err("Unknown search type")
|
||||
self.find_widget('status').value = state.last_err
|
||||
self.screen.reset()
|
||||
return
|
||||
|
||||
if len(state.search_results) < 1:
|
||||
state.set_last_err("Search returned no results")
|
||||
self.find_widget('status').value = state.last_err
|
||||
self.screen.reset()
|
||||
return
|
||||
|
||||
state.search_type = "user"
|
||||
raise NextScene("Search Results")
|
||||
|
||||
async def _lookup_arcade_by_id(self, ac_id: str):
|
||||
ac = await data.arcade.get_arcade(ac_id)
|
||||
|
||||
if ac is not None:
|
||||
res = ac._asdict()
|
||||
num_cabs = await data.arcade.get_arcade_machines(ac_id)
|
||||
res['mech_ct'] = len(num_cabs) if num_cabs else 0
|
||||
state.search_results = [res]
|
||||
|
||||
async def _lookup_arcade_by_name(self, name: str):
|
||||
ac = await data.arcade.get_arcade_by_name(name)
|
||||
|
||||
if ac is not None:
|
||||
res = []
|
||||
for ac_res in ac:
|
||||
t = ac_res._asdict()
|
||||
num_cabs = await data.arcade.get_arcade_machines(t['id'])
|
||||
t['mech_ct'] = len(num_cabs) if num_cabs else 0
|
||||
res.append(t)
|
||||
|
||||
state.search_results = res
|
||||
|
||||
async def _lookup_arcade_by_serial(self, serial: str):
|
||||
mech = await data.arcade.get_machine(serial)
|
||||
|
||||
if mech is not None:
|
||||
ac = await data.arcade.get_arcade(mech['arcade'])
|
||||
|
||||
if ac is not None:
|
||||
res = ac._asdict()
|
||||
num_cabs = await data.arcade.get_arcade_machines(mech['arcade'])
|
||||
res['mech_ct'] = len(num_cabs) if num_cabs else 0
|
||||
state.search_results = [res]
|
||||
|
||||
def _cancel(self):
|
||||
state.clear_last_err()
|
||||
self.find_widget('status').value = state.last_err
|
||||
raise NextScene("Arcade Management")
|
||||
|
||||
def demo(screen:Screen, scene: Scene):
|
||||
scenes = [
|
||||
Scene([MainView(screen)], -1, name="Main"),
|
||||
Scene([ManageUser(screen)], -1, name="User Management"),
|
||||
Scene([ManageUserView(screen)], -1, name="User Management"),
|
||||
Scene([CreateUserView(screen)], -1, name="Create User"),
|
||||
Scene([LookupUserView(screen)], -1, name="Lookup User"),
|
||||
Scene([SearchResultsView(screen)], -1, name="Search Results"),
|
||||
Scene([ManageCard(screen)], -1, name="Card Management"),
|
||||
Scene([ManageCardView(screen)], -1, name="Card Management"),
|
||||
Scene([EditUserView(screen)], -1, name="Edit User"),
|
||||
Scene([ManageArcadeView(screen)], -1, name="Arcade Management"),
|
||||
Scene([LookupArcadeView(screen)], -1, name="Lookup Arcade"),
|
||||
]
|
||||
|
||||
screen.play(scenes, stop_on_resize=False, start_scene=scene, allow_int=True)
|
||||
|
Reference in New Issue
Block a user