forked from Hay1tsme/artemis
begin move
This commit is contained in:
282
core/allnet.py
282
core/allnet.py
@ -1,20 +1,24 @@
|
||||
from typing import Dict, List, Any, Optional, Tuple, Union, Final
|
||||
import logging, coloredlogs
|
||||
from logging.handlers import TimedRotatingFileHandler
|
||||
from twisted.web.http import Request
|
||||
from datetime import datetime
|
||||
import pytz
|
||||
import base64
|
||||
import zlib
|
||||
import json
|
||||
import yaml
|
||||
import logging
|
||||
import coloredlogs
|
||||
import urllib.parse
|
||||
import math
|
||||
from typing import Dict, List, Any, Optional, Union, Final
|
||||
from logging.handlers import TimedRotatingFileHandler
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import PlainTextResponse
|
||||
from starlette.applications import Starlette
|
||||
from starlette.routing import Route
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Hash import SHA
|
||||
from Crypto.Signature import PKCS1_v1_5
|
||||
from time import strptime
|
||||
from os import path
|
||||
import urllib.parse
|
||||
import math
|
||||
from os import path, environ, mkdir, access, W_OK
|
||||
|
||||
from .config import CoreConfig
|
||||
from .utils import Utils
|
||||
@ -91,7 +95,6 @@ class DLI_STATUS(Enum):
|
||||
|
||||
class AllnetServlet:
|
||||
def __init__(self, core_cfg: CoreConfig, cfg_folder: str):
|
||||
super().__init__()
|
||||
self.config = core_cfg
|
||||
self.config_folder = cfg_folder
|
||||
self.data = Data(core_cfg)
|
||||
@ -126,19 +129,20 @@ class AllnetServlet:
|
||||
self.logger.error("No games detected!")
|
||||
|
||||
self.logger.info(
|
||||
f"Serving {len(TitleServlet.title_registry)} game codes port {core_cfg.allnet.port}"
|
||||
f"Serving {len(TitleServlet.title_registry)} game codes"
|
||||
)
|
||||
|
||||
def handle_poweron(self, request: Request, _: Dict):
|
||||
async def handle_poweron(self, request: Request):
|
||||
request_ip = Utils.get_ip_addr(request)
|
||||
pragma_header = request.getHeader('Pragma')
|
||||
pragma_header = request.headers.get('Pragma', "")
|
||||
is_dfi = pragma_header is not None and pragma_header == "DFI"
|
||||
data = await request.body()
|
||||
|
||||
try:
|
||||
if is_dfi:
|
||||
req_urlencode = self.from_dfi(request.content.getvalue())
|
||||
req_urlencode = self.from_dfi(data)
|
||||
else:
|
||||
req_urlencode = request.content.getvalue().decode()
|
||||
req_urlencode = data
|
||||
|
||||
req_dict = self.allnet_req_to_dict(req_urlencode)
|
||||
if req_dict is None:
|
||||
@ -155,7 +159,7 @@ class AllnetServlet:
|
||||
except AllnetRequestException as e:
|
||||
if e.message != "":
|
||||
self.logger.error(e)
|
||||
return b""
|
||||
return PlainTextResponse()
|
||||
|
||||
if req.format_ver == 3:
|
||||
resp = AllnetPowerOnResponse3(req.token)
|
||||
@ -176,7 +180,7 @@ class AllnetServlet:
|
||||
|
||||
resp.stat = ALLNET_STAT.bad_machine.value
|
||||
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
||||
return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8")
|
||||
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
|
||||
|
||||
if machine is not None:
|
||||
arcade = self.data.arcade.get_arcade(machine["arcade"])
|
||||
@ -190,7 +194,7 @@ class AllnetServlet:
|
||||
|
||||
resp.stat = ALLNET_STAT.bad_shop.value
|
||||
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
||||
return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8")
|
||||
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
|
||||
|
||||
elif (not arcade["ip"] or arcade["ip"] is None) and self.config.server.strict_ip_checking:
|
||||
msg = f"Serial {req.serial} attempted allnet auth from bad IP {req.ip}, but arcade {arcade['id']} has no IP set! (strict checking enabled)."
|
||||
@ -201,7 +205,7 @@ class AllnetServlet:
|
||||
|
||||
resp.stat = ALLNET_STAT.bad_shop.value
|
||||
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
||||
return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8")
|
||||
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
|
||||
|
||||
|
||||
country = (
|
||||
@ -245,20 +249,20 @@ class AllnetServlet:
|
||||
|
||||
resp.stat = ALLNET_STAT.bad_game.value
|
||||
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
||||
return (urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n").encode("utf-8")
|
||||
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
|
||||
|
||||
else:
|
||||
self.logger.info(
|
||||
f"Allowed unknown game {req.game_id} v{req.ver} to authenticate from {request_ip} due to 'is_develop' being enabled. S/N: {req.serial}"
|
||||
)
|
||||
resp.uri = f"http://{self.config.title.hostname}:{self.config.title.port}/{req.game_id}/{req.ver.replace('.', '')}/"
|
||||
resp.host = f"{self.config.title.hostname}:{self.config.title.port}"
|
||||
resp.uri = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/"
|
||||
resp.host = f"{self.config.server.hostname}:{self.config.server.port}"
|
||||
|
||||
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
|
||||
resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict))
|
||||
|
||||
self.logger.debug(f"Allnet response: {resp_str}")
|
||||
return (resp_str + "\n").encode("utf-8")
|
||||
return PlainTextResponse(resp_str + "\n")
|
||||
|
||||
|
||||
int_ver = req.ver.replace(".", "")
|
||||
@ -277,18 +281,19 @@ class AllnetServlet:
|
||||
request.responseHeaders.addRawHeader('Pragma', 'DFI')
|
||||
return self.to_dfi(resp_str)"""
|
||||
|
||||
return resp_str.encode("utf-8")
|
||||
return PlainTextResponse(resp_str)
|
||||
|
||||
def handle_dlorder(self, request: Request, _: Dict):
|
||||
async def handle_dlorder(self, request: Request):
|
||||
request_ip = Utils.get_ip_addr(request)
|
||||
pragma_header = request.getHeader('Pragma')
|
||||
pragma_header = request.headers.get('Pragma', "")
|
||||
is_dfi = pragma_header is not None and pragma_header == "DFI"
|
||||
data = await request.body()
|
||||
|
||||
try:
|
||||
if is_dfi:
|
||||
req_urlencode = self.from_dfi(request.content.getvalue())
|
||||
req_urlencode = self.from_dfi(data)
|
||||
else:
|
||||
req_urlencode = request.content.getvalue().decode()
|
||||
req_urlencode = data.decode()
|
||||
|
||||
req_dict = self.allnet_req_to_dict(req_urlencode)
|
||||
if req_dict is None:
|
||||
@ -305,7 +310,7 @@ class AllnetServlet:
|
||||
except AllnetRequestException as e:
|
||||
if e.message != "":
|
||||
self.logger.error(e)
|
||||
return b""
|
||||
return PlainTextResponse()
|
||||
|
||||
self.logger.info(
|
||||
f"DownloadOrder from {request_ip} -> {req.game_id} v{req.ver} serial {req.serial}"
|
||||
@ -316,18 +321,18 @@ class AllnetServlet:
|
||||
not self.config.allnet.allow_online_updates
|
||||
or not self.config.allnet.update_cfg_folder
|
||||
):
|
||||
return urllib.parse.unquote(urllib.parse.urlencode(vars(resp))) + "\n"
|
||||
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(vars(resp))) + "\n")
|
||||
|
||||
else: # TODO: Keychip check
|
||||
if path.exists(
|
||||
f"{self.config.allnet.update_cfg_folder}/{req.game_id}-{req.ver.replace('.', '')}-app.ini"
|
||||
):
|
||||
resp.uri = f"http://{self.config.title.hostname}:{self.config.title.port}/dl/ini/{req.game_id}-{req.ver.replace('.', '')}-app.ini"
|
||||
resp.uri = f"http://{self.config.server.hostname}:{self.config.server.port}/dl/ini/{req.game_id}-{req.ver.replace('.', '')}-app.ini"
|
||||
|
||||
if path.exists(
|
||||
f"{self.config.allnet.update_cfg_folder}/{req.game_id}-{req.ver.replace('.', '')}-opt.ini"
|
||||
):
|
||||
resp.uri += f"|http://{self.config.title.hostname}:{self.config.title.port}/dl/ini/{req.game_id}-{req.ver.replace('.', '')}-opt.ini"
|
||||
resp.uri += f"|http://{self.config.server.hostname}:{self.config.server.port}/dl/ini/{req.game_id}-{req.ver.replace('.', '')}-opt.ini"
|
||||
|
||||
self.logger.debug(f"Sending download uri {resp.uri}")
|
||||
self.data.base.log_event("allnet", "DLORDER_REQ_SUCCESS", logging.INFO, f"{Utils.get_ip_addr(request)} requested DL Order for {req.serial} {req.game_id} v{req.ver}")
|
||||
@ -337,33 +342,33 @@ class AllnetServlet:
|
||||
request.responseHeaders.addRawHeader('Pragma', 'DFI')
|
||||
return self.to_dfi(res_str)"""
|
||||
|
||||
return res_str
|
||||
return PlainTextResponse(res_str)
|
||||
|
||||
def handle_dlorder_ini(self, request: Request, match: Dict) -> bytes:
|
||||
if "file" not in match:
|
||||
return b""
|
||||
async def handle_dlorder_ini(self, request: Request) -> bytes:
|
||||
req_file = request.path_params.get("file", "").replace("%0A", "")
|
||||
|
||||
req_file = match["file"].replace("%0A", "")
|
||||
if not req_file:
|
||||
return PlainTextResponse(status_code=404)
|
||||
|
||||
if path.exists(f"{self.config.allnet.update_cfg_folder}/{req_file}"):
|
||||
self.logger.info(f"Request for DL INI file {req_file} from {Utils.get_ip_addr(request)} successful")
|
||||
self.data.base.log_event("allnet", "DLORDER_INI_SENT", logging.INFO, f"{Utils.get_ip_addr(request)} successfully recieved {req_file}")
|
||||
|
||||
return open(
|
||||
f"{self.config.allnet.update_cfg_folder}/{req_file}", "rb"
|
||||
).read()
|
||||
return PlainTextResponse(open(
|
||||
f"{self.config.allnet.update_cfg_folder}/{req_file}", "r"
|
||||
).read())
|
||||
|
||||
self.logger.info(f"DL INI File {req_file} not found")
|
||||
return b""
|
||||
return PlainTextResponse()
|
||||
|
||||
def handle_dlorder_report(self, request: Request, match: Dict) -> bytes:
|
||||
req_raw = request.content.getvalue()
|
||||
async def handle_dlorder_report(self, request: Request) -> bytes:
|
||||
req_raw = await request.body()
|
||||
client_ip = Utils.get_ip_addr(request)
|
||||
try:
|
||||
req_dict: Dict = json.loads(req_raw)
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Failed to parse DL Report: {e}")
|
||||
return "NG"
|
||||
return PlainTextResponse("NG")
|
||||
|
||||
dl_data_type = DLIMG_TYPE.app
|
||||
dl_data = req_dict.get("appimage", {})
|
||||
@ -374,13 +379,13 @@ class AllnetServlet:
|
||||
|
||||
if dl_data is None or not dl_data:
|
||||
self.logger.warning(f"Failed to parse DL Report: Invalid format - contains neither appimage nor optimage")
|
||||
return "NG"
|
||||
return PlainTextResponse("NG")
|
||||
|
||||
rep = DLReport(dl_data, dl_data_type)
|
||||
|
||||
if not rep.validate():
|
||||
self.logger.warning(f"Failed to parse DL Report: Invalid format - {rep.err}")
|
||||
return "NG"
|
||||
return PlainTextResponse("NG")
|
||||
|
||||
msg = f"{rep.serial} @ {client_ip} reported {rep.rep_type.name} download state {rep.rf_state.name} for {rep.gd} v{rep.dav}:"\
|
||||
f" {rep.tdsc}/{rep.tsc} segments downloaded for working files {rep.wfl} with {rep.dfl if rep.dfl else 'none'} complete."
|
||||
@ -388,10 +393,10 @@ class AllnetServlet:
|
||||
self.data.base.log_event("allnet", "DL_REPORT", logging.INFO, msg, dl_data)
|
||||
self.logger.info(msg)
|
||||
|
||||
return "OK"
|
||||
return PlainTextResponse("OK")
|
||||
|
||||
def handle_loaderstaterecorder(self, request: Request, match: Dict) -> bytes:
|
||||
req_data = request.content.getvalue()
|
||||
async def handle_loaderstaterecorder(self, request: Request) -> bytes:
|
||||
req_data = await request.body()
|
||||
sections = req_data.decode("utf-8").split("\r\n")
|
||||
|
||||
req_dict = dict(urllib.parse.parse_qsl(sections[0]))
|
||||
@ -403,18 +408,94 @@ class AllnetServlet:
|
||||
ip = Utils.get_ip_addr(request)
|
||||
|
||||
if serial is None or num_files_dld is None or num_files_to_dl is None or dl_state is None:
|
||||
return "NG".encode()
|
||||
return PlainTextResponse("NG")
|
||||
|
||||
self.logger.info(f"LoaderStateRecorder Request from {ip} {serial}: {num_files_dld}/{num_files_to_dl} Files download (State: {dl_state})")
|
||||
return "OK".encode()
|
||||
return PlainTextResponse("OK")
|
||||
|
||||
def handle_alive(self, request: Request, match: Dict) -> bytes:
|
||||
return "OK".encode()
|
||||
async def handle_alive(self, request: Request) -> bytes:
|
||||
return PlainTextResponse("OK")
|
||||
|
||||
def handle_billing_request(self, request: Request, _: Dict):
|
||||
req_raw = request.content.getvalue()
|
||||
async def handle_naomitest(self, request: Request) -> bytes:
|
||||
self.logger.info(f"Ping from {Utils.get_ip_addr(request)}")
|
||||
return PlainTextResponse("naomi ok")
|
||||
|
||||
def allnet_req_to_dict(self, data: str) -> Optional[List[Dict[str, Any]]]:
|
||||
"""
|
||||
Parses an allnet request string into a python dictionary
|
||||
"""
|
||||
try:
|
||||
sections = data.split("\r\n")
|
||||
|
||||
ret = []
|
||||
for x in sections:
|
||||
ret.append(dict(urllib.parse.parse_qsl(x)))
|
||||
return ret
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"allnet_req_to_dict: {e} while parsing {data}")
|
||||
return None
|
||||
|
||||
def from_dfi(self, data: bytes) -> str:
|
||||
zipped = base64.b64decode(data)
|
||||
unzipped = zlib.decompress(zipped)
|
||||
return unzipped.decode("utf-8")
|
||||
|
||||
def to_dfi(self, data: str) -> bytes:
|
||||
unzipped = data.encode('utf-8')
|
||||
zipped = zlib.compress(unzipped)
|
||||
return base64.b64encode(zipped)
|
||||
|
||||
class BillingServlet:
|
||||
def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None:
|
||||
self.config = core_cfg
|
||||
self.config_folder = cfg_folder
|
||||
self.data = Data(core_cfg)
|
||||
|
||||
self.logger = logging.getLogger("billing")
|
||||
if not hasattr(self.logger, "initialized"):
|
||||
log_fmt_str = "[%(asctime)s] Billing | %(levelname)s | %(message)s"
|
||||
log_fmt = logging.Formatter(log_fmt_str)
|
||||
|
||||
fileHandler = TimedRotatingFileHandler(
|
||||
"{0}/{1}.log".format(self.config.server.log_dir, "billing"),
|
||||
when="d",
|
||||
backupCount=10,
|
||||
)
|
||||
fileHandler.setFormatter(log_fmt)
|
||||
|
||||
consoleHandler = logging.StreamHandler()
|
||||
consoleHandler.setFormatter(log_fmt)
|
||||
|
||||
self.logger.addHandler(fileHandler)
|
||||
self.logger.addHandler(consoleHandler)
|
||||
|
||||
self.logger.setLevel(core_cfg.allnet.loglevel)
|
||||
coloredlogs.install(
|
||||
level=core_cfg.billing.loglevel, logger=self.logger, fmt=log_fmt_str
|
||||
)
|
||||
self.logger.initialized = True
|
||||
|
||||
def billing_req_to_dict(self, data: bytes):
|
||||
"""
|
||||
Parses an billing request string into a python dictionary
|
||||
"""
|
||||
try:
|
||||
sections = data.decode("ascii").split("\r\n")
|
||||
|
||||
ret = []
|
||||
for x in sections:
|
||||
ret.append(dict(urllib.parse.parse_qsl(x)))
|
||||
return ret
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"billing_req_to_dict: {e} while parsing {data}")
|
||||
return None
|
||||
|
||||
async def handle_billing_request(self, request: Request):
|
||||
req_raw = await request.body()
|
||||
|
||||
if request.getHeader('Content-Type') == "application/octet-stream":
|
||||
if request.headers.get('Content-Type', '') == "application/octet-stream":
|
||||
req_unzip = zlib.decompressobj(-zlib.MAX_WBITS).decompress(req_raw)
|
||||
else:
|
||||
req_unzip = req_raw
|
||||
@ -423,8 +504,8 @@ class AllnetServlet:
|
||||
request_ip = Utils.get_ip_addr(request)
|
||||
|
||||
if req_dict is None:
|
||||
self.logger.error(f"Failed to parse request {request.content.getvalue()}")
|
||||
return b""
|
||||
self.logger.error(f"Failed to parse request {req_raw}")
|
||||
return PlainTextResponse()
|
||||
|
||||
self.logger.debug(f"request {req_dict}")
|
||||
|
||||
@ -436,7 +517,7 @@ class AllnetServlet:
|
||||
req = BillingInfo(req_dict[0])
|
||||
except KeyError as e:
|
||||
self.logger.error(f"Billing request failed to parse: {e}")
|
||||
return f"result=5&linelimit=&message=field is missing or formatting is incorrect\r\n".encode()
|
||||
return PlainTextResponse("result=5&linelimit=&message=field is missing or formatting is incorrect\r\n")
|
||||
|
||||
for x in range(1, len(req_dict)):
|
||||
if not req_dict[x]:
|
||||
@ -467,7 +548,7 @@ class AllnetServlet:
|
||||
)
|
||||
self.logger.warning(msg)
|
||||
|
||||
return f"result=1&requestno={req.requestno}&message=Keychip Serial bad\r\n".encode()
|
||||
return PlainTextResponse(f"result=1&requestno={req.requestno}&message=Keychip Serial bad\r\n")
|
||||
|
||||
msg = (
|
||||
f"Billing checkin from {request_ip}: game {req.gameid} ver {req.gamever} keychip {req.keychipid} playcount "
|
||||
@ -496,7 +577,6 @@ class AllnetServlet:
|
||||
|
||||
# TODO: playhistory
|
||||
|
||||
#resp = BillingResponse(playlimit, playlimit_sig, nearfull, nearfull_sig)
|
||||
resp = BillingResponse(playlimit, playlimit_sig, nearfull, nearfull_sig, req.requestno, req.protocolver)
|
||||
|
||||
resp_str = urllib.parse.unquote(urllib.parse.urlencode(vars(resp))) + "\r\n"
|
||||
@ -504,56 +584,9 @@ class AllnetServlet:
|
||||
self.logger.debug(f"response {vars(resp)}")
|
||||
if req.traceleft > 0:
|
||||
self.logger.info(f"Requesting 20 more of {req.traceleft} unsent tracelogs")
|
||||
return f"result=6&waittime=0&linelimit=20\r\n".encode()
|
||||
|
||||
return resp_str.encode("utf-8")
|
||||
|
||||
def handle_naomitest(self, request: Request, _: Dict) -> bytes:
|
||||
self.logger.info(f"Ping from {Utils.get_ip_addr(request)}")
|
||||
return b"naomi ok"
|
||||
|
||||
def billing_req_to_dict(self, data: bytes):
|
||||
"""
|
||||
Parses an billing request string into a python dictionary
|
||||
"""
|
||||
try:
|
||||
sections = data.decode("ascii").split("\r\n")
|
||||
|
||||
ret = []
|
||||
for x in sections:
|
||||
ret.append(dict(urllib.parse.parse_qsl(x)))
|
||||
return ret
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"billing_req_to_dict: {e} while parsing {data}")
|
||||
return None
|
||||
|
||||
def allnet_req_to_dict(self, data: str) -> Optional[List[Dict[str, Any]]]:
|
||||
"""
|
||||
Parses an allnet request string into a python dictionary
|
||||
"""
|
||||
try:
|
||||
sections = data.split("\r\n")
|
||||
|
||||
ret = []
|
||||
for x in sections:
|
||||
ret.append(dict(urllib.parse.parse_qsl(x)))
|
||||
return ret
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"allnet_req_to_dict: {e} while parsing {data}")
|
||||
return None
|
||||
|
||||
def from_dfi(self, data: bytes) -> str:
|
||||
zipped = base64.b64decode(data)
|
||||
unzipped = zlib.decompress(zipped)
|
||||
return unzipped.decode("utf-8")
|
||||
|
||||
def to_dfi(self, data: str) -> bytes:
|
||||
unzipped = data.encode('utf-8')
|
||||
zipped = zlib.compress(unzipped)
|
||||
return base64.b64encode(zipped)
|
||||
|
||||
return PlainTextResponse("result=6&waittime=0&linelimit=20\r\n")
|
||||
|
||||
return PlainTextResponse(resp_str)
|
||||
|
||||
class AllnetPowerOnRequest:
|
||||
def __init__(self, req: Dict) -> None:
|
||||
@ -613,7 +646,6 @@ class AllnetPowerOnResponse3(AllnetPowerOnResponse):
|
||||
self.minute = None
|
||||
self.second = None
|
||||
|
||||
|
||||
class AllnetPowerOnResponse2(AllnetPowerOnResponse):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
@ -623,7 +655,6 @@ class AllnetPowerOnResponse2(AllnetPowerOnResponse):
|
||||
self.timezone = "+09:00"
|
||||
self.res_class = "PowerOnResponseV2"
|
||||
|
||||
|
||||
class AllnetDownloadOrderRequest:
|
||||
def __init__(self, req: Dict) -> None:
|
||||
self.game_id = req.get("game_id", "")
|
||||
@ -631,7 +662,6 @@ class AllnetDownloadOrderRequest:
|
||||
self.serial = req.get("serial", "")
|
||||
self.encode = req.get("encode", "")
|
||||
|
||||
|
||||
class AllnetDownloadOrderResponse:
|
||||
def __init__(self, stat: int = 1, serial: str = "", uri: str = "") -> None:
|
||||
self.stat = stat
|
||||
@ -781,7 +811,6 @@ class BillingResponse:
|
||||
# playhistory -> YYYYMM/C:...
|
||||
# YYYY -> 4 digit year, MM -> 2 digit month, C -> Playcount during that period
|
||||
|
||||
|
||||
class AllnetRequestException(Exception):
|
||||
def __init__(self, message="") -> None:
|
||||
self.message = message
|
||||
@ -849,3 +878,26 @@ class DLReport:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
cfg_dir = environ.get("DIANA_CFG_DIR", "config")
|
||||
cfg: CoreConfig = CoreConfig()
|
||||
if path.exists(f"{cfg_dir}/core.yaml"):
|
||||
cfg.update(yaml.safe_load(open(f"{cfg_dir}/core.yaml")))
|
||||
|
||||
if not path.exists(cfg.server.log_dir):
|
||||
mkdir(cfg.server.log_dir)
|
||||
|
||||
if not access(cfg.server.log_dir, W_OK):
|
||||
print(
|
||||
f"Log directory {cfg.server.log_dir} NOT writable, please check permissions"
|
||||
)
|
||||
exit(1)
|
||||
|
||||
billing = BillingServlet(cfg, cfg_dir)
|
||||
app = Starlette(
|
||||
cfg.server.is_develop,
|
||||
[
|
||||
Route("/request", billing.handle_billing_request, methods=["POST"]),
|
||||
Route("/request/", billing.handle_billing_request, methods=["POST"]),
|
||||
]
|
||||
)
|
||||
|
Reference in New Issue
Block a user