110 lines
2.6 KiB
Python
110 lines
2.6 KiB
Python
import secrets
|
|
|
|
from starlette.applications import Starlette
|
|
from starlette.requests import Request
|
|
from starlette.responses import JSONResponse, Response
|
|
from starlette.routing import Route
|
|
|
|
from . import config
|
|
|
|
_storage: dict[str, tuple[bool, bytes]] = {}
|
|
|
|
|
|
def cull(d: dict, /, size: int) -> int:
|
|
"""Remove the oldest items from dict until it fits the size.
|
|
|
|
Return the number of items removed."""
|
|
over = len(d) - size
|
|
if over <= 0:
|
|
return 0
|
|
|
|
keys = iter(d.keys())
|
|
for _ in range(over):
|
|
oldest_key = next(keys)
|
|
del d[oldest_key]
|
|
|
|
return over
|
|
|
|
|
|
def store_data(key: str, data: bytes, once: bool) -> None:
|
|
# Make room for another clip.
|
|
if key not in _storage:
|
|
cull(_storage, config.max_clips - 1)
|
|
|
|
_storage[key] = (once, data)
|
|
|
|
|
|
def load_data(key: str) -> bytes | None:
|
|
clip = _storage.get(key)
|
|
if clip is None:
|
|
return None
|
|
once, data = clip
|
|
if once:
|
|
del _storage[key]
|
|
return data
|
|
|
|
|
|
async def read_bytes(request: Request, max_len: int = 1024) -> bytes:
|
|
chunks: list[bytes] = []
|
|
read_len = 0
|
|
async for chunk in request.stream():
|
|
chunks.append(chunk)
|
|
read_len += len(chunk)
|
|
if read_len >= max_len:
|
|
break
|
|
body = b"".join(chunks)
|
|
return body[:max_len]
|
|
|
|
|
|
async def get_clip(request: Request) -> Response:
|
|
key = request.path_params["name"]
|
|
|
|
if (body := load_data(key)) is None:
|
|
return Response(status_code=404)
|
|
|
|
return Response(body, status_code=200)
|
|
|
|
|
|
async def put_clip(request: Request) -> Response:
|
|
key = request.path_params["name"]
|
|
once = request.query_params.get("once") == "1"
|
|
|
|
body = await read_bytes(request, max_len=config.max_clip_size)
|
|
store_data(key, body, once)
|
|
|
|
return Response(status_code=204)
|
|
|
|
|
|
async def post_clip(request: Request) -> Response:
|
|
once = request.query_params.get("once") == "1"
|
|
|
|
body = await read_bytes(request, max_len=config.max_clip_size)
|
|
|
|
key = secrets.token_urlsafe()
|
|
store_data(key, body, once)
|
|
|
|
url = request.url_for("clip", name=key)
|
|
|
|
# return RedirectResponse(url, status_code=303)
|
|
return JSONResponse({"url": str(url)}, status_code=201)
|
|
|
|
|
|
async def clip(request: Request) -> Response:
|
|
|
|
match request.method:
|
|
|
|
case "GET":
|
|
return await get_clip(request)
|
|
case "PUT":
|
|
return await put_clip(request)
|
|
|
|
return Response(status_code=405)
|
|
|
|
|
|
app = Starlette(
|
|
debug=config.debug,
|
|
routes=[
|
|
Route("/{name}", clip, methods=["GET", "PUT"], name="clip"),
|
|
Route("/", post_clip, methods=["POST"]),
|
|
],
|
|
)
|