Allnet Lite Power On Support

This commit is contained in:
Keeboy99
2025-05-31 07:15:35 +12:00
parent 8e83527e82
commit 5e5365d22b
4 changed files with 103 additions and 10 deletions

View File

@ -7,6 +7,7 @@ import logging
import coloredlogs import coloredlogs
import urllib.parse import urllib.parse
import math import math
import random
from typing import Dict, List, Any, Optional, Union, Final from typing import Dict, List, Any, Optional, Union, Final
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
from starlette.requests import Request from starlette.requests import Request
@ -17,7 +18,10 @@ from datetime import datetime
from enum import Enum from enum import Enum
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
from Crypto.Hash import SHA from Crypto.Hash import SHA
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Signature import PKCS1_v1_5 from Crypto.Signature import PKCS1_v1_5
import os
from os import path, environ, mkdir, access, W_OK from os import path, environ, mkdir, access, W_OK
from .config import CoreConfig from .config import CoreConfig
@ -132,12 +136,20 @@ class AllnetServlet:
async def handle_poweron(self, request: Request): async def handle_poweron(self, request: Request):
request_ip = Utils.get_ip_addr(request) request_ip = Utils.get_ip_addr(request)
pragma_header = request.headers.get('Pragma', "") pragma_header = request.headers.get('Pragma', "")
useragent_header = request.headers.get('User-Agent', "")
is_dfi = pragma_header == "DFI" is_dfi = pragma_header == "DFI"
is_lite = useragent_header[5:] == "Windows/Lite"
data = await request.body() data = await request.body()
if not self.config.allnet.allnet_lite_key and is_lite:
self.logger.error("!!!LITE KEY NOT SET!!!")
os._exit(1)
try: try:
if is_dfi: if is_dfi:
req_urlencode = self.from_dfi(data) req_urlencode = self.from_dfi(data)
elif is_lite:
req_urlencode = self.dec_lite(self.config.allnet.allnet_lite_key, data[:16], data)
else: else:
req_urlencode = data req_urlencode = data
@ -145,20 +157,30 @@ class AllnetServlet:
if req_dict is None: if req_dict is None:
raise AllnetRequestException() raise AllnetRequestException()
req = AllnetPowerOnRequest(req_dict[0]) if is_lite:
req = AllnetPowerOnRequestLite(req_dict[0])
else:
req = AllnetPowerOnRequest(req_dict[0])
# Validate the request. Currently we only validate the fields we plan on using # Validate the request. Currently we only validate the fields we plan on using
if not req.game_id or not req.ver or not req.serial or not req.ip or not req.firm_ver or not req.boot_ver: if not req.game_id or not req.ver or not req.serial or not req.token and is_lite:
raise AllnetRequestException( raise AllnetRequestException(
f"Bad auth request params from {request_ip} - {vars(req)}" f"Bad auth request params from {request_ip} - {vars(req)}"
) )
elif not is_lite:
if not req.game_id or not req.ver or not req.serial or not req.ip or not req.firm_ver or not req.boot_ver:
raise AllnetRequestException(
f"Bad auth request params from {request_ip} - {vars(req)}"
)
except AllnetRequestException as e: except AllnetRequestException as e:
if e.message != "": if e.message != "":
self.logger.error(e) self.logger.error(e)
return PlainTextResponse() return PlainTextResponse()
if req.format_ver == 3: if is_lite:
resp = AllnetPowerOnResponseLite(req.token)
elif req.format_ver == 3:
resp = AllnetPowerOnResponse3(req.token) resp = AllnetPowerOnResponse3(req.token)
elif req.format_ver == 2: elif req.format_ver == 2:
resp = AllnetPowerOnResponse2() resp = AllnetPowerOnResponse2()
@ -175,11 +197,14 @@ class AllnetServlet:
) )
self.logger.warning(msg) self.logger.warning(msg)
resp.stat = ALLNET_STAT.bad_machine.value if is_lite:
resp.result = ALLNET_STAT.bad_machine.value
else:
resp.stat = ALLNET_STAT.bad_machine.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None} resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n") return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
if machine is not None: if machine is not None and not is_lite:
arcade = await self.data.arcade.get_arcade(machine["arcade"]) arcade = await self.data.arcade.get_arcade(machine["arcade"])
if self.config.server.check_arcade_ip: if self.config.server.check_arcade_ip:
if arcade["ip"] and arcade["ip"] is not None and arcade["ip"] != req.ip: if arcade["ip"] and arcade["ip"] is not None and arcade["ip"] != req.ip:
@ -257,7 +282,10 @@ class AllnetServlet:
) )
self.logger.warning(msg) self.logger.warning(msg)
resp.stat = ALLNET_STAT.bad_game.value if is_lite:
resp.result = ALLNET_STAT.bad_game.value
else:
resp.stat = ALLNET_STAT.bad_game.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None} resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n") return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
@ -265,8 +293,12 @@ class AllnetServlet:
self.logger.info( self.logger.info(
f"Allowed unknown game {req.game_id} v{req.ver} to authenticate from {request_ip} due to 'is_develop' being enabled. S/N: {req.serial}" f"Allowed unknown game {req.game_id} v{req.ver} to authenticate from {request_ip} due to 'is_develop' being enabled. S/N: {req.serial}"
) )
resp.uri = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/" if is_lite:
resp.host = f"{self.config.server.hostname}:{self.config.server.port}" resp.uri1 = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/"
resp.uri2 = f"{self.config.server.hostname}:{self.config.server.port}"
else:
resp.uri = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/"
resp.host = f"{self.config.server.hostname}:{self.config.server.port}"
resp_dict = {k: v for k, v in vars(resp).items() if v is not None} resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict))
@ -277,10 +309,16 @@ class AllnetServlet:
int_ver = req.ver.replace(".", "") int_ver = req.ver.replace(".", "")
try: try:
resp.uri, resp.host = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial) if is_lite:
resp.uri1, resp.uri2 = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial)
else:
resp.uri, resp.host = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial)
except Exception as e: except Exception as e:
self.logger.error(f"Error running get_allnet_info for {req.game_id} - {e}") self.logger.error(f"Error running get_allnet_info for {req.game_id} - {e}")
resp.stat = ALLNET_STAT.bad_game.value if is_lite:
resp.result = ALLNET_STAT.bad_game.value
else:
resp.stat = ALLNET_STAT.bad_game.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None} resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n") return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
@ -308,6 +346,9 @@ class AllnetServlet:
"Pragma": "DFI", "Pragma": "DFI",
}, },
) )
elif is_lite:
iv = bytes([random.randint(2, 255) for _ in range(16)])
return PlainTextResponse(content=self.enc_lite(self.config.allnet.allnet_lite_key, iv, resp_str))
return PlainTextResponse(resp_str) return PlainTextResponse(resp_str)
@ -517,6 +558,17 @@ class AllnetServlet:
zipped = zlib.compress(unzipped) zipped = zlib.compress(unzipped)
return base64.b64encode(zipped) return base64.b64encode(zipped)
def dec_lite(self, key, iv, data):
cipher = AES.new(bytes(key), AES.MODE_CBC, iv)
decrypted = cipher.decrypt(data)
return decrypted[16:].decode("utf-8")
def enc_lite(self, key, iv, data):
decrypted = pad(bytes([0] * 16) + data.encode('utf-8'), 16)
cipher = AES.new(bytes(key), AES.MODE_CBC, iv)
encrypted = cipher.encrypt(decrypted)
return encrypted
class BillingServlet: class BillingServlet:
def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None: def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None:
self.config = core_cfg self.config = core_cfg
@ -773,6 +825,15 @@ class AllnetPowerOnResponse:
self.minute = datetime.now().minute self.minute = datetime.now().minute
self.second = datetime.now().second self.second = datetime.now().second
class AllnetPowerOnRequestLite:
def __init__(self, req: Dict) -> None:
if req is None:
raise AllnetRequestException("Request processing failed")
self.game_id: str = req.get("title_id", None)
self.ver: str = req.get("title_ver", None)
self.serial: str = req.get("client_id", None)
self.token: str = req.get("token", None)
class AllnetPowerOnResponse3(AllnetPowerOnResponse): class AllnetPowerOnResponse3(AllnetPowerOnResponse):
def __init__(self, token) -> None: def __init__(self, token) -> None:
super().__init__() super().__init__()
@ -804,6 +865,30 @@ class AllnetPowerOnResponse2(AllnetPowerOnResponse):
self.timezone = "+09:00" self.timezone = "+09:00"
self.res_class = "PowerOnResponseV2" self.res_class = "PowerOnResponseV2"
class AllnetPowerOnResponseLite:
def __init__(self, token) -> None:
# Custom Allnet Lite response
self.result = 1
self.place_id = "0123"
self.uri1 = ""
self.uri2 = ""
self.name = "ARTEMiS"
self.nickname = "ARTEMiS"
self.setting = "1"
self.region0 = "1"
self.region_name0 = "W"
self.region_name1 = ""
self.region_name2 = ""
self.region_name3 = ""
self.country = "CHN"
self.location_type = "1"
self.utc_time = datetime.now(tz=pytz.timezone("UTC")).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
self.client_timezone = "+0800"
self.res_ver = "3"
self.token = token
class AllnetDownloadOrderRequest: class AllnetDownloadOrderRequest:
def __init__(self, req: Dict) -> None: def __init__(self, req: Dict) -> None:
self.game_id = req.get("game_id", "") self.game_id = req.get("game_id", "")
@ -1068,6 +1153,7 @@ app_billing = Starlette(
allnet = AllnetServlet(cfg, cfg_dir) allnet = AllnetServlet(cfg, cfg_dir)
route_lst = [ route_lst = [
Route("/sys/servlet/PowerOn", allnet.handle_poweron, methods=["GET", "POST"]), Route("/sys/servlet/PowerOn", allnet.handle_poweron, methods=["GET", "POST"]),
Route("/net/initialize", allnet.handle_poweron, methods=["GET", "POST"]),
Route("/sys/servlet/DownloadOrder", allnet.handle_dlorder, methods=["GET", "POST"]), Route("/sys/servlet/DownloadOrder", allnet.handle_dlorder, methods=["GET", "POST"]),
Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]), Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]),
Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]), Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]),

View File

@ -75,6 +75,7 @@ if not cfg.allnet.standalone:
allnet = AllnetServlet(cfg, cfg_dir) allnet = AllnetServlet(cfg, cfg_dir)
route_lst += [ route_lst += [
Route("/sys/servlet/PowerOn", allnet.handle_poweron, methods=["GET", "POST"]), Route("/sys/servlet/PowerOn", allnet.handle_poweron, methods=["GET", "POST"]),
Route("/net/initialize", allnet.handle_poweron, methods=["GET", "POST"]),
Route("/sys/servlet/DownloadOrder", allnet.handle_dlorder, methods=["GET", "POST"]), Route("/sys/servlet/DownloadOrder", allnet.handle_dlorder, methods=["GET", "POST"]),
Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]), Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]),
Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]), Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]),

View File

@ -378,6 +378,11 @@ class AllnetConfig:
return CoreConfig.get_config_field( return CoreConfig.get_config_field(
self.__config, "core", "allnet", "save_billing", default=False self.__config, "core", "allnet", "save_billing", default=False
) )
@property
def allnet_lite_key(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "core", "allnet", "allnet_lite_key", default=[]
)
class BillingConfig: class BillingConfig:
def __init__(self, parent_config: "CoreConfig") -> None: def __init__(self, parent_config: "CoreConfig") -> None:

View File

@ -46,6 +46,7 @@ allnet:
allow_online_updates: False allow_online_updates: False
update_cfg_folder: "" update_cfg_folder: ""
save_billing: True save_billing: True
allnet_lite_key: []
billing: billing:
standalone: True standalone: True