From 5e5365d22b817e08d496201540266b5044754b48 Mon Sep 17 00:00:00 2001 From: Keeboy99 <67620144+Keeboy99@users.noreply.github.com> Date: Sat, 31 May 2025 07:15:35 +1200 Subject: [PATCH] Allnet Lite Power On Support --- core/allnet.py | 106 +++++++++++++++++++++++++++++++++++---- core/app.py | 1 + core/config.py | 5 ++ example_config/core.yaml | 1 + 4 files changed, 103 insertions(+), 10 deletions(-) diff --git a/core/allnet.py b/core/allnet.py index 0912f56..2115b9a 100644 --- a/core/allnet.py +++ b/core/allnet.py @@ -7,6 +7,7 @@ import logging import coloredlogs import urllib.parse import math +import random from typing import Dict, List, Any, Optional, Union, Final from logging.handlers import TimedRotatingFileHandler from starlette.requests import Request @@ -17,7 +18,10 @@ from datetime import datetime from enum import Enum from Crypto.PublicKey import RSA from Crypto.Hash import SHA +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad from Crypto.Signature import PKCS1_v1_5 +import os from os import path, environ, mkdir, access, W_OK from .config import CoreConfig @@ -132,12 +136,20 @@ class AllnetServlet: async def handle_poweron(self, request: Request): request_ip = Utils.get_ip_addr(request) pragma_header = request.headers.get('Pragma', "") + useragent_header = request.headers.get('User-Agent', "") is_dfi = pragma_header == "DFI" + is_lite = useragent_header[5:] == "Windows/Lite" data = await request.body() + + if not self.config.allnet.allnet_lite_key and is_lite: + self.logger.error("!!!LITE KEY NOT SET!!!") + os._exit(1) try: if is_dfi: req_urlencode = self.from_dfi(data) + elif is_lite: + req_urlencode = self.dec_lite(self.config.allnet.allnet_lite_key, data[:16], data) else: req_urlencode = data @@ -145,20 +157,30 @@ class AllnetServlet: if req_dict is None: raise AllnetRequestException() - req = AllnetPowerOnRequest(req_dict[0]) + if is_lite: + req = AllnetPowerOnRequestLite(req_dict[0]) + else: + req = AllnetPowerOnRequest(req_dict[0]) # Validate the request. Currently we only validate the fields we plan on using - if not req.game_id or not req.ver or not req.serial or not req.ip or not req.firm_ver or not req.boot_ver: + if not req.game_id or not req.ver or not req.serial or not req.token and is_lite: raise AllnetRequestException( f"Bad auth request params from {request_ip} - {vars(req)}" ) + elif not is_lite: + if not req.game_id or not req.ver or not req.serial or not req.ip or not req.firm_ver or not req.boot_ver: + raise AllnetRequestException( + f"Bad auth request params from {request_ip} - {vars(req)}" + ) except AllnetRequestException as e: if e.message != "": self.logger.error(e) return PlainTextResponse() - if req.format_ver == 3: + if is_lite: + resp = AllnetPowerOnResponseLite(req.token) + elif req.format_ver == 3: resp = AllnetPowerOnResponse3(req.token) elif req.format_ver == 2: resp = AllnetPowerOnResponse2() @@ -175,11 +197,14 @@ class AllnetServlet: ) self.logger.warning(msg) - resp.stat = ALLNET_STAT.bad_machine.value + if is_lite: + resp.result = ALLNET_STAT.bad_machine.value + else: + resp.stat = ALLNET_STAT.bad_machine.value resp_dict = {k: v for k, v in vars(resp).items() if v is not None} return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n") - if machine is not None: + if machine is not None and not is_lite: arcade = await self.data.arcade.get_arcade(machine["arcade"]) if self.config.server.check_arcade_ip: if arcade["ip"] and arcade["ip"] is not None and arcade["ip"] != req.ip: @@ -257,7 +282,10 @@ class AllnetServlet: ) self.logger.warning(msg) - resp.stat = ALLNET_STAT.bad_game.value + if is_lite: + resp.result = ALLNET_STAT.bad_game.value + else: + resp.stat = ALLNET_STAT.bad_game.value resp_dict = {k: v for k, v in vars(resp).items() if v is not None} return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n") @@ -265,8 +293,12 @@ class AllnetServlet: self.logger.info( f"Allowed unknown game {req.game_id} v{req.ver} to authenticate from {request_ip} due to 'is_develop' being enabled. S/N: {req.serial}" ) - resp.uri = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/" - resp.host = f"{self.config.server.hostname}:{self.config.server.port}" + if is_lite: + resp.uri1 = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/" + resp.uri2 = f"{self.config.server.hostname}:{self.config.server.port}" + else: + resp.uri = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/" + resp.host = f"{self.config.server.hostname}:{self.config.server.port}" resp_dict = {k: v for k, v in vars(resp).items() if v is not None} resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) @@ -277,10 +309,16 @@ class AllnetServlet: int_ver = req.ver.replace(".", "") try: - resp.uri, resp.host = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial) + if is_lite: + resp.uri1, resp.uri2 = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial) + else: + resp.uri, resp.host = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial) except Exception as e: self.logger.error(f"Error running get_allnet_info for {req.game_id} - {e}") - resp.stat = ALLNET_STAT.bad_game.value + if is_lite: + resp.result = ALLNET_STAT.bad_game.value + else: + resp.stat = ALLNET_STAT.bad_game.value resp_dict = {k: v for k, v in vars(resp).items() if v is not None} return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n") @@ -308,6 +346,9 @@ class AllnetServlet: "Pragma": "DFI", }, ) + elif is_lite: + iv = bytes([random.randint(2, 255) for _ in range(16)]) + return PlainTextResponse(content=self.enc_lite(self.config.allnet.allnet_lite_key, iv, resp_str)) return PlainTextResponse(resp_str) @@ -517,6 +558,17 @@ class AllnetServlet: zipped = zlib.compress(unzipped) return base64.b64encode(zipped) + def dec_lite(self, key, iv, data): + cipher = AES.new(bytes(key), AES.MODE_CBC, iv) + decrypted = cipher.decrypt(data) + return decrypted[16:].decode("utf-8") + + def enc_lite(self, key, iv, data): + decrypted = pad(bytes([0] * 16) + data.encode('utf-8'), 16) + cipher = AES.new(bytes(key), AES.MODE_CBC, iv) + encrypted = cipher.encrypt(decrypted) + return encrypted + class BillingServlet: def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None: self.config = core_cfg @@ -773,6 +825,15 @@ class AllnetPowerOnResponse: self.minute = datetime.now().minute self.second = datetime.now().second +class AllnetPowerOnRequestLite: + def __init__(self, req: Dict) -> None: + if req is None: + raise AllnetRequestException("Request processing failed") + self.game_id: str = req.get("title_id", None) + self.ver: str = req.get("title_ver", None) + self.serial: str = req.get("client_id", None) + self.token: str = req.get("token", None) + class AllnetPowerOnResponse3(AllnetPowerOnResponse): def __init__(self, token) -> None: super().__init__() @@ -804,6 +865,30 @@ class AllnetPowerOnResponse2(AllnetPowerOnResponse): self.timezone = "+09:00" self.res_class = "PowerOnResponseV2" +class AllnetPowerOnResponseLite: + def __init__(self, token) -> None: + # Custom Allnet Lite response + self.result = 1 + self.place_id = "0123" + self.uri1 = "" + self.uri2 = "" + self.name = "ARTEMiS" + self.nickname = "ARTEMiS" + self.setting = "1" + self.region0 = "1" + self.region_name0 = "W" + self.region_name1 = "" + self.region_name2 = "" + self.region_name3 = "" + self.country = "CHN" + self.location_type = "1" + self.utc_time = datetime.now(tz=pytz.timezone("UTC")).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + self.client_timezone = "+0800" + self.res_ver = "3" + self.token = token + class AllnetDownloadOrderRequest: def __init__(self, req: Dict) -> None: self.game_id = req.get("game_id", "") @@ -1068,6 +1153,7 @@ app_billing = Starlette( allnet = AllnetServlet(cfg, cfg_dir) route_lst = [ Route("/sys/servlet/PowerOn", allnet.handle_poweron, methods=["GET", "POST"]), + Route("/net/initialize", allnet.handle_poweron, methods=["GET", "POST"]), Route("/sys/servlet/DownloadOrder", allnet.handle_dlorder, methods=["GET", "POST"]), Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]), Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]), diff --git a/core/app.py b/core/app.py index fa1c8f2..b450b0c 100644 --- a/core/app.py +++ b/core/app.py @@ -75,6 +75,7 @@ if not cfg.allnet.standalone: allnet = AllnetServlet(cfg, cfg_dir) route_lst += [ Route("/sys/servlet/PowerOn", allnet.handle_poweron, methods=["GET", "POST"]), + Route("/net/initialize", allnet.handle_poweron, methods=["GET", "POST"]), Route("/sys/servlet/DownloadOrder", allnet.handle_dlorder, methods=["GET", "POST"]), Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]), Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]), diff --git a/core/config.py b/core/config.py index eb02c4e..7b6833d 100644 --- a/core/config.py +++ b/core/config.py @@ -378,6 +378,11 @@ class AllnetConfig: return CoreConfig.get_config_field( self.__config, "core", "allnet", "save_billing", default=False ) + @property + def allnet_lite_key(self) -> bool: + return CoreConfig.get_config_field( + self.__config, "core", "allnet", "allnet_lite_key", default=[] + ) class BillingConfig: def __init__(self, parent_config: "CoreConfig") -> None: diff --git a/example_config/core.yaml b/example_config/core.yaml index fa04a67..53063e3 100644 --- a/example_config/core.yaml +++ b/example_config/core.yaml @@ -46,6 +46,7 @@ allnet: allow_online_updates: False update_cfg_folder: "" save_billing: True + allnet_lite_key: [] billing: standalone: True