157 lines
4.7 KiB
Python
157 lines
4.7 KiB
Python
from starlette.applications import Starlette
|
|
from starlette.routing import Route, Mount
|
|
from starlette.requests import Request
|
|
from starlette.responses import FileResponse, Response, RedirectResponse
|
|
from os import path, mkdir, W_OK, access
|
|
from base64 import b64encode, b64decode
|
|
from random import choices
|
|
from string import ascii_letters, digits, punctuation
|
|
from sys import argv
|
|
from typing import Dict
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
import logging
|
|
import coloredlogs
|
|
import json
|
|
|
|
if not path.exists("logs"):
|
|
mkdir("logs")
|
|
|
|
if not access("logs", W_OK):
|
|
print(f"Log directory NOT writable, please check permissions")
|
|
exit(1)
|
|
|
|
logger = logging.getLogger("access")
|
|
log_fmt_str = "[%(asctime)s] Access | %(levelname)s | %(message)s"
|
|
log_fmt = logging.Formatter(log_fmt_str)
|
|
fileHandler = TimedRotatingFileHandler(
|
|
"logs/access.log",
|
|
when="d",
|
|
backupCount=10,
|
|
)
|
|
fileHandler.setFormatter(log_fmt)
|
|
|
|
consoleHandler = logging.StreamHandler()
|
|
consoleHandler.setFormatter(log_fmt)
|
|
|
|
logger.addHandler(fileHandler)
|
|
logger.addHandler(consoleHandler)
|
|
|
|
logger.setLevel(logging.INFO)
|
|
coloredlogs.install(
|
|
level=logging.INFO, logger=logger, fmt=log_fmt_str
|
|
)
|
|
|
|
if not path.exists("flist.json"):
|
|
logger.warning("Regenerate file list")
|
|
flist = {}
|
|
with open("flist.json", "w") as f:
|
|
f.write("{}")
|
|
|
|
else:
|
|
with open("flist.json", "r") as f:
|
|
flist = json.load(f)
|
|
|
|
if not path.exists("admin.txt"):
|
|
admin_creds = None
|
|
|
|
else:
|
|
with open("admin.txt", "r") as f:
|
|
admin_creds = f.read()
|
|
|
|
try:
|
|
admin_decode = b64decode(admin_creds.encode()).decode()
|
|
if not admin_decode or len(admin_decode) < 14 or ":" not in admin_decode:
|
|
logger.warning(f"Invalid admin creds {admin_decode} - regenerating")
|
|
admin_creds = None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to decode {admin_creds} as b64, regenerating - {e}")
|
|
admin_creds = None
|
|
|
|
if not admin_creds:
|
|
pw = "".join(choices(ascii_letters + digits + punctuation, k=16))
|
|
print(f"Generate password for admin: {pw}") # no logging
|
|
admin_creds = b64encode(f"admin:{pw}".encode()).decode()
|
|
with open("admin.txt", "w") as f:
|
|
f.write(admin_creds)
|
|
|
|
def get_ip_addr(req: Request) -> str:
|
|
ip = req.headers.get("x-forwarded-for", req.client.host)
|
|
return ip.split(", ")[0]
|
|
|
|
async def handle_file(request: Request) -> FileResponse:
|
|
name = request.path_params.get("name", "")
|
|
key = request.path_params.get("key", "")
|
|
req_ip = get_ip_addr(request)
|
|
if not name or not key:
|
|
return Response(status_code=400)
|
|
|
|
info: Dict = flist.get("name", {})
|
|
if not info:
|
|
return Response(status_code=404)
|
|
|
|
if info.get("key", "") != key:
|
|
logger.info(f"Incorrect key for file {name} from {req_ip}")
|
|
return Response(status_code=404)
|
|
|
|
fpath: str = info.get("path", "")
|
|
if not fpath or not path.exists(fpath):
|
|
logger.info(f"File {name} does not exist at {fpath}")
|
|
return Response(status_code=500)
|
|
|
|
if fpath.endswith("/") or fpath.endswith("\\"):
|
|
logger.info(f"Cannot send directory {fpath} as file {name}")
|
|
return Response(status_code=500)
|
|
|
|
last_slash_idx = fpath.rfind("/")
|
|
if last_slash_idx == -1:
|
|
last_slash_idx = fpath.rfind("\\")
|
|
|
|
if last_slash_idx == -1:
|
|
last_slash_idx = fpath.rfind("\\\\")
|
|
|
|
if last_slash_idx == -1:
|
|
fname = fpath
|
|
|
|
if last_slash_idx > -1:
|
|
fname = fpath[last_slash_idx + 1:]
|
|
|
|
logger.info(f"Send {name} ({fpath}) to {req_ip}")
|
|
return FileResponse(fpath, filename=fname)
|
|
|
|
async def handle_upload(request: Request) -> Response:
|
|
return Response("Uploading not currently supported", status_code=404)
|
|
|
|
async def render_admin(request: Request) -> Response:
|
|
pass
|
|
|
|
async def render_login(request: Request) -> Response:
|
|
pass
|
|
|
|
async def handle_login(request: Request) -> RedirectResponse:
|
|
pass
|
|
|
|
async def add_file(request: Request) -> RedirectResponse:
|
|
pass
|
|
|
|
async def del_file(request: Request) -> RedirectResponse:
|
|
pass
|
|
|
|
async def chg_pw(request: Request) -> RedirectResponse:
|
|
pass
|
|
|
|
rt_lst = [
|
|
Route("/file/{key:str}/{name:str}", handle_file, methods=['GET']),
|
|
Route("/upload/{key:str}/{name:str}", handle_upload, methods=['PUT']),
|
|
Mount("/admin", routes=[
|
|
Route("/", render_admin, methods=['GET']),
|
|
Route("/login", render_login, methods=['GET']),
|
|
Route("/file.add", add_file, methods=['POST']),
|
|
Route("/file.del", del_file, methods=['POST']),
|
|
Route("/admin.pw", chg_pw, methods=['POST']),
|
|
Route("/admin.login", handle_login, methods=['POST']),
|
|
])
|
|
]
|
|
|
|
app = Starlette("debug" in argv, rt_lst)
|