unwind/unwind/request.py

452 lines
14 KiB
Python

import email.utils
import json
import logging
import os
import tempfile
from collections import deque
from contextlib import asynccontextmanager, contextmanager
from dataclasses import dataclass, field
from functools import wraps
from hashlib import md5
from pathlib import Path
from random import random
from time import sleep, time
from typing import Callable, cast
import bs4
import httpx
from . import config
log = logging.getLogger(__name__)
if config.debug and config.cachedir:
config.cachedir.mkdir(exist_ok=True)
_shared_asession = None
_shared_session = None
_ASession_T = httpx.AsyncClient
_Session_T = httpx.Client
_Response_T = httpx.Response
@contextmanager
def session():
"""Return the shared request session.
The session is shared by all request functions and provides cookie
persistence and connection pooling.
Opening the session before making a request allows you to set headers
or change the retry behavior.
"""
global _shared_session
if _shared_session:
yield _shared_session
return
_shared_session = _Session()
try:
yield _shared_session
finally:
_shared_session = None
def _Session() -> _Session_T:
s = _Session_T()
s.headers["user-agent"] = "Mozilla/5.0 Gecko/20100101 unwind/20230203"
return s
@asynccontextmanager
async def asession():
"""Return the shared request session.
The session is shared by all request functions and provides cookie
persistence and connection pooling.
Opening the session before making a request allows you to set headers
or change the retry behavior.
"""
global _shared_asession
if _shared_asession:
yield _shared_asession
return
_shared_asession = _ASession_T()
_shared_asession.headers[
"user-agent"
] = "Mozilla/5.0 Gecko/20100101 unwind/20230203"
try:
async with _shared_asession:
yield _shared_asession
finally:
_shared_asession = None
def _throttle(
times: int, per_seconds: float, jitter: Callable[[], float] | None = None
) -> Callable[[Callable], Callable]:
calls: deque[float] = deque(maxlen=times)
if jitter is None:
jitter = lambda: 0.0
def decorator(func: Callable) -> Callable:
@wraps(func)
def inner(*args, **kwds):
# clean up
while calls:
if calls[0] + per_seconds > time():
break
calls.popleft()
# wait
if len(calls) == calls.maxlen:
wait_until = calls.popleft() + per_seconds + jitter()
timeout = wait_until - time()
log.debug(f"⏳ waiting {timeout:.2} seconds ...")
sleep(timeout)
# call
calls.append(time())
try:
r = func(*args, **kwds)
except Exception as e:
if getattr(e, "is_cached", False):
calls.pop()
raise
if getattr(r, "is_cached", False):
calls.pop()
return r
return inner
return decorator
@dataclass
class _CachedResponse:
is_cached = True
status_code: int
text: str
url: str
headers: dict[str, str] = field(default_factory=dict)
def json(self):
return json.loads(self.text)
class _RedirectError(RuntimeError):
def __init__(self, from_url: str, to_url: str, is_cached=False):
self.from_url = from_url
self.to_url = to_url
self.is_cached = is_cached
super().__init__(f"Redirected: {from_url} -> {to_url}")
def cache_path(req) -> Path | None:
if not config.cachedir:
return
sig = repr(req.url) # + repr(sorted(req.headers.items()))
return config.cachedir / md5(sig.encode()).hexdigest()
@_throttle(1, 1, random)
def _http_get(s: _Session_T, url: str, *args, **kwds) -> _Response_T:
req = s.build_request(method="GET", url=url, *args, **kwds)
cachefile = cache_path(req) if config.debug else None
if cachefile:
if cachefile.exists():
log.debug(
f"💾 loading {req.url} ({req.headers!a}) from cache {cachefile} ..."
)
with cachefile.open() as fp:
resp = _CachedResponse(**json.load(fp))
if 300 <= resp.status_code <= 399:
raise _RedirectError(
from_url=resp.url, to_url=resp.headers["location"], is_cached=True
)
return cast(_Response_T, resp)
log.debug(f"⚡️ loading {req.url} ({req.headers!a}) ...")
resp = s.send(req, follow_redirects=False, stream=True)
resp.raise_for_status()
resp.read() # Download the response stream to allow `resp.text` access.
if cachefile:
with cachefile.open("w") as fp:
json.dump(
{
"status_code": resp.status_code,
"text": resp.text,
"url": resp.url,
"headers": dict(resp.headers),
},
fp,
)
if resp.is_redirect:
# Redirects could mean trouble, we need to stay on top of that!
raise _RedirectError(from_url=str(resp.url), to_url=resp.headers["location"])
return resp
def soup_from_url(url):
"""Return a BeautifulSoup instance from the contents for the given URL."""
with session() as s:
r = _http_get(s, url)
soup = bs4.BeautifulSoup(r.text, "html5lib")
return soup
def _last_modified_from_response(resp: _Response_T) -> float | None:
if last_mod := resp.headers.get("last-modified"):
try:
return email.utils.parsedate_to_datetime(last_mod).timestamp()
except:
log.exception("🐛 Received invalid value for Last-Modified: %s", last_mod)
def _last_modified_from_file(path: Path) -> float:
return path.stat().st_mtime
def download(
url: str,
file_path: Path | str | None = None,
*,
replace_existing: bool | None = None,
only_if_newer: bool = False,
timeout: float | None = None,
chunk_callback=None,
response_callback=None,
) -> bytes | None:
"""Download a file.
If `file_path` is `None` return the remote content, otherwise write the
content to the given file path.
Existing files will not be overwritten unless `replace_existing` is set.
Setting `only_if_newer` will check if the remote file is newer than the
local file, otherwise the download will be aborted.
"""
if replace_existing is None:
replace_existing = only_if_newer
file_exists = None
if file_path is not None:
file_path = Path(file_path)
file_exists = file_path.exists() and file_path.stat().st_size
if file_exists and not replace_existing:
raise FileExistsError(23, "Would replace existing file", str(file_path))
with session() as s:
headers = {}
if file_exists and only_if_newer:
assert file_path
file_lastmod = _last_modified_from_file(file_path)
headers["if-modified-since"] = email.utils.formatdate(
file_lastmod, usegmt=True
)
req = s.build_request(method="GET", url=url, headers=headers, timeout=timeout)
log.debug("⚡️ loading %s (%s) ...", req.url, req.headers)
resp = s.send(req, follow_redirects=True, stream=True)
if response_callback is not None:
try:
response_callback(resp)
except:
log.exception("🐛 Error in response callback.")
log.debug("☕️ Response status: %s; headers: %s", resp.status_code, resp.headers)
if resp.status_code == httpx.codes.NOT_MODIFIED:
log.debug("✋ Remote file has not changed, skipping download.")
return
resp.raise_for_status()
if file_path is None:
resp.read() # Download the response stream to allow `resp.content` access.
return resp.content
assert replace_existing is True
resp_lastmod = _last_modified_from_response(resp)
# Check Last-Modified in case the server ignored If-Modified-Since.
# XXX also check Content-Length?
if file_exists and only_if_newer and resp_lastmod is not None:
assert file_lastmod
if resp_lastmod <= file_lastmod:
log.debug("✋ Local file is newer, skipping download.")
resp.close()
return
# Create intermediate directories if necessary.
download_dir = file_path.parent
download_dir.mkdir(parents=True, exist_ok=True)
# Write content to temp file.
tempdir = download_dir
tempfd, tempfile_path = tempfile.mkstemp(
dir=tempdir, prefix=f".download-{file_path.name}."
)
one_mb = 2**20
chunk_size = 8 * one_mb
try:
log.debug("💾 Writing to temp file %s ...", tempfile_path)
for chunk in resp.iter_bytes(chunk_size):
os.write(tempfd, chunk)
if chunk_callback:
try:
chunk_callback(chunk)
except:
log.exception("🐛 Error in chunk callback.")
finally:
os.close(tempfd)
# Move downloaded file to destination.
if file_exists:
log.debug("💾 Replacing existing file: %s", file_path)
Path(tempfile_path).replace(file_path)
# Fix file attributes.
if resp_lastmod is not None:
os.utime(file_path, (resp_lastmod, resp_lastmod))
async def adownload(
url: str,
*,
to_path: Path | str | None = None,
replace_existing: bool | None = None,
only_if_newer: bool = False,
timeout: float | None = None,
chunk_callback=None,
response_callback=None,
) -> bytes | None:
"""Download a file.
If `to_path` is `None` return the remote content, otherwise write the
content to the given file path.
Existing files will not be overwritten unless `replace_existing` is set.
Setting `only_if_newer` will check if the remote file is newer than the
local file, otherwise the download will be aborted.
"""
if replace_existing is None:
replace_existing = only_if_newer
file_exists = None
if to_path is not None:
to_path = Path(to_path)
file_exists = to_path.exists() and to_path.stat().st_size
if file_exists and not replace_existing:
raise FileExistsError(23, "Would replace existing file", str(to_path))
async with asession() as s:
headers = {}
if file_exists and only_if_newer:
assert to_path
file_lastmod = _last_modified_from_file(to_path)
headers["if-modified-since"] = email.utils.formatdate(
file_lastmod, usegmt=True
)
req = s.build_request(method="GET", url=url, headers=headers, timeout=timeout)
log.debug("⚡️ Loading %s (%a) ...", req.url, dict(req.headers))
resp = await s.send(req, follow_redirects=True, stream=True)
try:
if response_callback is not None:
try:
response_callback(resp)
except:
log.exception("🐛 Error in response callback.")
log.debug(
"☕️ %s -> status: %s; headers: %a",
req.url,
resp.status_code,
dict(resp.headers),
)
if resp.status_code == httpx.codes.NOT_MODIFIED:
log.debug(
"✋ Remote file has not changed, skipping download: %s -> %a",
req.url,
to_path,
)
return
resp.raise_for_status()
if to_path is None:
await resp.aread() # Download the response stream to allow `resp.content` access.
return resp.content
resp_lastmod = _last_modified_from_response(resp)
# Check Last-Modified in case the server ignored If-Modified-Since.
# XXX also check Content-Length?
if file_exists and only_if_newer and resp_lastmod is not None:
assert file_lastmod
if resp_lastmod <= file_lastmod:
log.debug("✋ Local file is newer, skipping download: %a", req.url)
return
# Create intermediate directories if necessary.
download_dir = to_path.parent
download_dir.mkdir(parents=True, exist_ok=True)
# Write content to temp file.
tempdir = download_dir
tempfd, tempfile_path = tempfile.mkstemp(
dir=tempdir, prefix=f".download-{to_path.name}."
)
one_mb = 2**20
chunk_size = 8 * one_mb
try:
log.debug("💾 Writing to temp file %s ...", tempfile_path)
async for chunk in resp.aiter_bytes(chunk_size):
os.write(tempfd, chunk)
if chunk_callback:
try:
chunk_callback(chunk)
except:
log.exception("🐛 Error in chunk callback.")
finally:
os.close(tempfd)
# Move downloaded file to destination.
if to_path.exists():
log.debug("💾 Replacing existing file: %s", to_path)
else:
log.debug("💾 Move to destination: %s", to_path)
if replace_existing:
Path(tempfile_path).replace(to_path)
else:
Path(tempfile_path).rename(to_path)
# Fix file attributes.
if resp_lastmod is not None:
log.debug("💾 Adjusting file timestamp: %s (%s)", to_path, resp_lastmod)
os.utime(to_path, (resp_lastmod, resp_lastmod))
finally:
await resp.aclose()