from starlette.applications import Starlette from starlette.routing import Route, Mount from starlette.requests import Request from starlette.responses import FileResponse, Response, RedirectResponse from os import path, mkdir, W_OK, access from base64 import b64encode, b64decode from random import choices from string import ascii_letters, digits, punctuation from sys import argv from typing import Dict from datetime import datetime from logging.handlers import TimedRotatingFileHandler import logging import coloredlogs import json if not path.exists("logs"): mkdir("logs") if not access("logs", W_OK): print(f"Log directory NOT writable, please check permissions") exit(1) logger = logging.getLogger("access") log_fmt_str = "[%(asctime)s] Access | %(levelname)s | %(message)s" log_fmt = logging.Formatter(log_fmt_str) fileHandler = TimedRotatingFileHandler( "logs/access.log", when="d", backupCount=10, ) fileHandler.setFormatter(log_fmt) consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(log_fmt) logger.addHandler(fileHandler) logger.addHandler(consoleHandler) logger.setLevel(logging.INFO) coloredlogs.install( level=logging.INFO, logger=logger, fmt=log_fmt_str ) if not path.exists("flist.json"): logger.warning("Regenerate file list") flist = {} with open("flist.json", "w") as f: f.write("{}") else: with open("flist.json", "r") as f: flist = json.load(f) if not path.exists("admin.txt"): admin_creds = None else: with open("admin.txt", "r") as f: admin_creds = f.read() try: admin_decode = b64decode(admin_creds.encode()).decode() if not admin_decode or len(admin_decode) < 14 or ":" not in admin_decode: logger.warning(f"Invalid admin creds {admin_decode} - regenerating") admin_creds = None except Exception as e: logger.warning(f"Failed to decode {admin_creds} as b64, regenerating - {e}") admin_creds = None if not admin_creds: pw = "".join(choices(ascii_letters + digits + punctuation, k=16)) print(f"Generate password for admin: {pw}") # no logging admin_creds = b64encode(f"admin:{pw}".encode()).decode() with open("admin.txt", "w") as f: f.write(admin_creds) def get_ip_addr(req: Request) -> str: ip = req.headers.get("x-forwarded-for", req.client.host) return ip.split(", ")[0] async def handle_file(request: Request) -> FileResponse: name = request.path_params.get("name", "") key = request.path_params.get("key", "") req_ip = get_ip_addr(request) if not name or not key: return Response(status_code=400) info: Dict = flist.get(name, {}) if not info: return Response(status_code=404) expire = info.get("expire", None) if expire and expire <= int(datetime.now().timestamp()): logger.info(f"File {name} expired on {expire}") flist.pop(name) with open("flist.json", "w") as f: json.dump(flist, f) return Response(status_code=404) if info.get("key", "") != key: logger.info(f"Incorrect key for file {name} from {req_ip}") return Response(status_code=404) fpath: str = info.get("path", "") if not fpath or not path.exists(fpath): logger.info(f"File {name} does not exist at {fpath}") return Response(status_code=500) if fpath.endswith("/") or fpath.endswith("\\"): logger.info(f"Cannot send directory {fpath} as file {name}") return Response(status_code=500) last_slash_idx = fpath.rfind("/") if last_slash_idx == -1: last_slash_idx = fpath.rfind("\\") if last_slash_idx == -1: last_slash_idx = fpath.rfind("\\\\") if last_slash_idx == -1: fname = fpath if last_slash_idx > -1: fname = fpath[last_slash_idx + 1:] logger.info(f"Send {name} ({fpath}) to {req_ip}") return FileResponse(fpath, filename=fname) async def handle_upload(request: Request) -> Response: return Response("Uploading not currently supported", status_code=404) async def render_admin(request: Request) -> Response: pass async def render_login(request: Request) -> Response: pass async def handle_login(request: Request) -> RedirectResponse: pass async def add_file(request: Request) -> RedirectResponse: pass async def del_file(request: Request) -> RedirectResponse: pass async def chg_pw(request: Request) -> RedirectResponse: pass rt_lst = [ Route("/file/{key:str}/{name:str}", handle_file, methods=['GET']), Route("/upload/{key:str}/{name:str}", handle_upload, methods=['PUT']), Mount("/admin", routes=[ Route("/", render_admin, methods=['GET']), Route("/login", render_login, methods=['GET']), Route("/file.add", add_file, methods=['POST']), Route("/file.del", del_file, methods=['POST']), Route("/admin.pw", chg_pw, methods=['POST']), Route("/admin.login", handle_login, methods=['POST']), ]) ] app = Starlette("debug" in argv, rt_lst)