HayCDN/index.py
2024-05-23 14:16:35 -04:00

157 lines
4.7 KiB
Python

from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.requests import Request
from starlette.responses import FileResponse, Response, RedirectResponse
from os import path, mkdir, W_OK, access
from base64 import b64encode, b64decode
from random import choices
from string import ascii_letters, digits, punctuation
from sys import argv
from typing import Dict
from logging.handlers import TimedRotatingFileHandler
import logging
import coloredlogs
import json
if not path.exists("logs"):
mkdir("logs")
if not access("logs", W_OK):
print(f"Log directory NOT writable, please check permissions")
exit(1)
logger = logging.getLogger("access")
log_fmt_str = "[%(asctime)s] Access | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"logs/access.log",
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
logger.addHandler(fileHandler)
logger.addHandler(consoleHandler)
logger.setLevel(logging.INFO)
coloredlogs.install(
level=logging.INFO, logger=logger, fmt=log_fmt_str
)
if not path.exists("flist.json"):
logger.warning("Regenerate file list")
flist = {}
with open("flist.json", "w") as f:
f.write("{}")
else:
with open("flist.json", "r") as f:
flist = json.load(f)
if not path.exists("admin.txt"):
admin_creds = None
else:
with open("admin.txt", "r") as f:
admin_creds = f.read()
try:
admin_decode = b64decode(admin_creds.encode()).decode()
if not admin_decode or len(admin_decode) < 14 or ":" not in admin_decode:
logger.warning(f"Invalid admin creds {admin_decode} - regenerating")
admin_creds = None
except Exception as e:
logger.warning(f"Failed to decode {admin_creds} as b64, regenerating - {e}")
admin_creds = None
if not admin_creds:
pw = "".join(choices(ascii_letters + digits + punctuation, k=16))
print(f"Generate password for admin: {pw}") # no logging
admin_creds = b64encode(f"admin:{pw}".encode()).decode()
with open("admin.txt", "w") as f:
f.write(admin_creds)
def get_ip_addr(req: Request) -> str:
ip = req.headers.get("x-forwarded-for", req.client.host)
return ip.split(", ")[0]
async def handle_file(request: Request) -> FileResponse:
name = request.path_params.get("name", "")
key = request.path_params.get("key", "")
req_ip = get_ip_addr(request)
if not name or not key:
return Response(status_code=400)
info: Dict = flist.get(name, {})
if not info:
return Response(status_code=404)
if info.get("key", "") != key:
logger.info(f"Incorrect key for file {name} from {req_ip}")
return Response(status_code=404)
fpath: str = info.get("path", "")
if not fpath or not path.exists(fpath):
logger.info(f"File {name} does not exist at {fpath}")
return Response(status_code=500)
if fpath.endswith("/") or fpath.endswith("\\"):
logger.info(f"Cannot send directory {fpath} as file {name}")
return Response(status_code=500)
last_slash_idx = fpath.rfind("/")
if last_slash_idx == -1:
last_slash_idx = fpath.rfind("\\")
if last_slash_idx == -1:
last_slash_idx = fpath.rfind("\\\\")
if last_slash_idx == -1:
fname = fpath
if last_slash_idx > -1:
fname = fpath[last_slash_idx + 1:]
logger.info(f"Send {name} ({fpath}) to {req_ip}")
return FileResponse(fpath, filename=fname)
async def handle_upload(request: Request) -> Response:
return Response("Uploading not currently supported", status_code=404)
async def render_admin(request: Request) -> Response:
pass
async def render_login(request: Request) -> Response:
pass
async def handle_login(request: Request) -> RedirectResponse:
pass
async def add_file(request: Request) -> RedirectResponse:
pass
async def del_file(request: Request) -> RedirectResponse:
pass
async def chg_pw(request: Request) -> RedirectResponse:
pass
rt_lst = [
Route("/file/{key:str}/{name:str}", handle_file, methods=['GET']),
Route("/upload/{key:str}/{name:str}", handle_upload, methods=['PUT']),
Mount("/admin", routes=[
Route("/", render_admin, methods=['GET']),
Route("/login", render_login, methods=['GET']),
Route("/file.add", add_file, methods=['POST']),
Route("/file.del", del_file, methods=['POST']),
Route("/admin.pw", chg_pw, methods=['POST']),
Route("/admin.login", handle_login, methods=['POST']),
])
]
app = Starlette("debug" in argv, rt_lst)