unwind/unwind/request.py

327 lines
10 KiB
Python

import email.utils
import json
import logging
import os
import tempfile
from collections import deque
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from functools import wraps
from hashlib import md5
from pathlib import Path
from random import random
from time import sleep, time
from typing import Any, Callable, ParamSpec, TypeVar, cast
import bs4
import httpx
from . import config
log = logging.getLogger(__name__)
if config.debug and config.cachedir:
config.cachedir.mkdir(exist_ok=True)
_shared_asession = None
_ASession_T = httpx.AsyncClient
_Response_T = httpx.Response
_T = TypeVar("_T")
_P = ParamSpec("_P")
@asynccontextmanager
async def asession():
"""Return the shared request session.
The session is shared by all request functions and provides cookie
persistence and connection pooling.
Opening the session before making a request allows you to set headers
or change the retry behavior.
"""
global _shared_asession
if _shared_asession:
yield _shared_asession
return
_shared_asession = _ASession_T()
_shared_asession.headers[
"user-agent"
] = "Mozilla/5.0 Gecko/20100101 unwind/20230203"
try:
async with _shared_asession:
yield _shared_asession
finally:
_shared_asession = None
def _throttle(
times: int, per_seconds: float, jitter: Callable[[], float] | None = None
) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]:
calls: deque[float] = deque(maxlen=times)
if jitter is None:
jitter = lambda: 0.0
def decorator(func: Callable[_P, _T]) -> Callable[_P, _T]:
@wraps(func)
def inner(*args: _P.args, **kwds: _P.kwargs):
# clean up
while calls:
if calls[0] + per_seconds > time():
break
calls.popleft()
# wait
if len(calls) == calls.maxlen:
wait_until = calls.popleft() + per_seconds + jitter()
timeout = wait_until - time()
log.debug(f"⏳ waiting {timeout:.2} seconds ...")
sleep(timeout)
# call
calls.append(time())
try:
r = func(*args, **kwds)
except Exception as e:
if getattr(e, "is_cached", False):
calls.pop()
raise
if getattr(r, "is_cached", False):
calls.pop()
return r
return inner
return decorator
@dataclass
class _CachedResponse:
is_cached = True
status_code: int
text: str
url: str
headers: dict[str, str] = field(default_factory=dict)
def json(self):
return json.loads(self.text)
class _RedirectError(RuntimeError):
def __init__(self, from_url: str, to_url: str, is_cached=False):
self.from_url = from_url
self.to_url = to_url
self.is_cached = is_cached
super().__init__(f"Redirected: {from_url} -> {to_url}")
def cache_path(req) -> Path | None:
if not config.cachedir:
return
sig = repr(req.url) # + repr(sorted(req.headers.items()))
return config.cachedir / md5(sig.encode()).hexdigest()
@_throttle(1, 1, random)
async def _ahttp_get(s: _ASession_T, url: str, *args, **kwds) -> _Response_T:
req = s.build_request(method="GET", url=url, *args, **kwds)
cachefile = cache_path(req) if config.debug else None
if cachefile:
if cachefile.exists():
log.debug(
"💾 loading %s (%a) from cache %s ...", req.url, req.headers, cachefile
)
with cachefile.open() as fp:
resp = _CachedResponse(**json.load(fp))
if 300 <= resp.status_code <= 399:
raise _RedirectError(
from_url=resp.url, to_url=resp.headers["location"], is_cached=True
)
return cast(_Response_T, resp)
log.debug("⚡️ loading %s (%a) ...", req.url, req.headers)
resp = await s.send(req, follow_redirects=False, stream=True)
resp.raise_for_status()
await resp.aread() # Download the response stream to allow `resp.text` access.
if cachefile:
log.debug(
"💾 writing response to cache: %s (%a) -> %s",
req.url,
req.headers,
cachefile,
)
with cachefile.open("w") as fp:
json.dump(
{
"status_code": resp.status_code,
"text": resp.text,
"url": str(resp.url),
"headers": dict(resp.headers),
},
fp,
)
if resp.is_redirect:
# Redirects could mean trouble, we need to stay on top of that!
raise _RedirectError(from_url=str(resp.url), to_url=resp.headers["location"])
return resp
async def asoup_from_url(url):
"""Return a BeautifulSoup instance from the contents for the given URL."""
async with asession() as s:
r = await _ahttp_get(s, url)
soup = bs4.BeautifulSoup(r.text, "html5lib")
return soup
def _last_modified_from_response(resp: _Response_T) -> float | None:
if last_mod := resp.headers.get("last-modified"):
try:
dt = email.utils.parsedate_to_datetime(last_mod)
except ValueError:
log.exception("🐛 Received invalid value for Last-Modified: %s", last_mod)
else:
return dt.timestamp()
def _last_modified_from_file(path: Path) -> float:
return path.stat().st_mtime
async def adownload(
url: str,
*,
to_path: Path | str | None = None,
replace_existing: bool | None = None,
only_if_newer: bool = False,
timeout: float | None = None,
chunk_callback: Callable[[bytes], Any] | None = None,
response_callback: Callable[[_Response_T], Any] | None = None,
) -> bytes | None:
"""Download a file.
If `to_path` is `None` return the remote content, otherwise write the
content to the given file path.
Existing files will not be overwritten unless `replace_existing` is set.
Setting `only_if_newer` will check if the remote file is newer than the
local file, otherwise the download will be aborted.
"""
if replace_existing is None:
replace_existing = only_if_newer
file_exists = None
if to_path is not None:
to_path = Path(to_path)
file_exists = to_path.exists() and to_path.stat().st_size
if file_exists and not replace_existing:
raise FileExistsError(23, "Would replace existing file", str(to_path))
async with asession() as s:
headers = {}
if file_exists and only_if_newer:
assert to_path
file_lastmod = _last_modified_from_file(to_path)
headers["if-modified-since"] = email.utils.formatdate(
file_lastmod, usegmt=True
)
req = s.build_request(method="GET", url=url, headers=headers, timeout=timeout)
log.debug("⚡️ Loading %s (%a) ...", req.url, dict(req.headers))
resp = await s.send(req, follow_redirects=True, stream=True)
try:
if response_callback is not None:
try:
response_callback(resp)
except BaseException:
log.exception("🐛 Error in response callback.")
log.debug(
"☕️ %s -> status: %s; headers: %a",
req.url,
resp.status_code,
dict(resp.headers),
)
if resp.status_code == httpx.codes.NOT_MODIFIED:
log.debug(
"✋ Remote file has not changed, skipping download: %s -> %a",
req.url,
to_path,
)
return
resp.raise_for_status()
if to_path is None:
await (
resp.aread()
) # Download the response stream to allow `resp.content` access.
return resp.content
resp_lastmod = _last_modified_from_response(resp)
# Check Last-Modified in case the server ignored If-Modified-Since.
# XXX also check Content-Length?
if file_exists and only_if_newer and resp_lastmod is not None:
assert file_lastmod # pyright: ignore [reportUnboundVariable]
if resp_lastmod <= file_lastmod:
log.debug("✋ Local file is newer, skipping download: %a", req.url)
return
# Create intermediate directories if necessary.
download_dir = to_path.parent
download_dir.mkdir(parents=True, exist_ok=True)
# Write content to temp file.
tempdir = download_dir
tempfd, tempfile_path = tempfile.mkstemp(
dir=tempdir, prefix=f".download-{to_path.name}."
)
one_mb = 2**20
chunk_size = 8 * one_mb
try:
log.debug("💾 Writing to temp file %s ...", tempfile_path)
async for chunk in resp.aiter_bytes(chunk_size):
os.write(tempfd, chunk)
if chunk_callback:
try:
chunk_callback(chunk)
except BaseException:
log.exception("🐛 Error in chunk callback.")
finally:
os.close(tempfd)
# Move downloaded file to destination.
if to_path.exists():
log.debug("💾 Replacing existing file: %s", to_path)
else:
log.debug("💾 Move to destination: %s", to_path)
if replace_existing:
Path(tempfile_path).replace(to_path)
else:
Path(tempfile_path).rename(to_path)
# Fix file attributes.
if resp_lastmod is not None:
log.debug("💾 Adjusting file timestamp: %s (%s)", to_path, resp_lastmod)
os.utime(to_path, (resp_lastmod, resp_lastmod))
finally:
await resp.aclose()