diff --git a/unwind/config.py b/unwind/config.py index 1805cd7..624365a 100644 --- a/unwind/config.py +++ b/unwind/config.py @@ -4,12 +4,13 @@ from pathlib import Path import toml cachedir = ( - Path(cachedir) if (cachedir := os.getenv("UNWIND_CACHEDIR", ".cache")) else None + Path(cachedir) if (cachedir := os.getenv("UNWIND_CACHEDIR", "./.cache")) else None ) +datadir = Path(os.getenv("UNWIND_DATA") or "./data") debug = os.getenv("DEBUG") == "1" loglevel = os.getenv("UNWIND_LOGLEVEL") or ("DEBUG" if debug else "INFO") -storage_path = os.getenv("UNWIND_STORAGE", "./data/db.sqlite") -config_path = os.getenv("UNWIND_CONFIG", "./data/config.toml") +storage_path = os.getenv("UNWIND_STORAGE", datadir / "db.sqlite") +config_path = os.getenv("UNWIND_CONFIG", datadir / "config.toml") _config = toml.load(config_path) diff --git a/unwind/imdb.py b/unwind/imdb.py index 72d06c9..35368e5 100644 --- a/unwind/imdb.py +++ b/unwind/imdb.py @@ -37,7 +37,7 @@ log = logging.getLogger(__name__) async def refresh_user_ratings_from_imdb(stop_on_dupe=True): with session() as s: - s.headers["Accept-Language"] = "en-GB, en;q=0.5" + s.headers["Accept-Language"] = "en-US, en;q=0.5" for user in await db.get_all(User): @@ -192,6 +192,7 @@ async def load_ratings(user_id): ratings, next_url = await parse_page(next_url) for i, rating in enumerate(ratings): + assert rating.user and rating.movie if i == 0: # All rating objects share the same user. diff --git a/unwind/imdb_import.py b/unwind/imdb_import.py index e5888ef..3cd139a 100644 --- a/unwind/imdb_import.py +++ b/unwind/imdb_import.py @@ -6,7 +6,7 @@ from datetime import datetime, timezone from pathlib import Path from typing import Optional, cast -from . import db +from . import config, db, request from .db import add_or_update_many_movies from .imdb import score_from_imdb_rating from .models import Movie @@ -179,11 +179,11 @@ def read_basics(path): yield m -async def import_from_file(basics_path: Path, ratings_path: Path): - log.info("Loading scores ... 💾") +async def import_from_file(*, basics_path: Path, ratings_path: Path): + log.info("💾 Loading scores ...") scores = read_ratings_as_scoremap(ratings_path) - log.info("Importing movies ... 💾") + log.info("💾 Importing movies ...") total = count_lines(basics_path) assert total != 0 perc = 0.0 @@ -196,7 +196,7 @@ async def import_from_file(basics_path: Path, ratings_path: Path): for i, m in enumerate(read_basics(basics_path)): if i / total > perc: - log.info("Imported %s%%", round(perc * 100, 1)) + log.info("⏳ Imported %s%%", round(perc * 100, 1)) perc += perc_step if m.media_type not in { @@ -222,3 +222,33 @@ async def import_from_file(basics_path: Path, ratings_path: Path): if chunk: await add_or_update_many_movies(chunk) chunk = [] + + +async def load_from_web(): + """Refresh the full IMDb movie database. + + The latest dumps are first downloaded and then imported into the database. + Beware that this can take a very long time (about half an hour), as the + amount of data is quite large. + + See https://www.imdb.com/interfaces/ and https://datasets.imdbws.com/ for + more information on the IMDb database dumps. + """ + basics_url = "https://datasets.imdbws.com/title.basics.tsv.gz" + ratings_url = "https://datasets.imdbws.com/title.ratings.tsv.gz" + ratings_file = config.datadir / "imdb/title.ratings.tsv.gz" + basics_file = config.datadir / "imdb/title.basics.tsv.gz" + + ratings_mtime = ratings_file.stat().st_mtime if ratings_file.exists() else None + bastics_mtime = basics_file.stat().st_mtime if basics_file.exists() else None + + with request.session(): + request.download(ratings_url, ratings_file, only_if_newer=True) + request.download(basics_url, basics_file, only_if_newer=True) + + is_changed = ( + ratings_mtime != ratings_file.stat().st_mtime + or bastics_mtime != basics_file.stat().st_mtime + ) + if is_changed: + await import_from_file(basics_path=basics_file, ratings_path=ratings_file) diff --git a/unwind/request.py b/unwind/request.py index e46672c..81f9f29 100644 --- a/unwind/request.py +++ b/unwind/request.py @@ -1,5 +1,8 @@ +import email.utils import json import logging +import os +import tempfile from collections import deque from contextlib import contextmanager from dataclasses import dataclass @@ -8,10 +11,11 @@ from hashlib import md5 from pathlib import Path from random import random from time import sleep, time -from typing import Callable, Optional +from typing import Callable, Optional, Union import bs4 import requests +from requests.status_codes import codes from urllib3.util.retry import Retry from . import config @@ -44,6 +48,13 @@ _shared_session = None @contextmanager def session(): + """Return the shared request session. + + The session is shared by all request functions and provides cookie + persistence and connection pooling. + Opening the session before making a request allows you to set headers + or change the retry behavior. + """ global _shared_session if _shared_session: @@ -86,7 +97,7 @@ def throttle( if len(calls) == calls.maxlen: wait_until = calls.popleft() + per_seconds + jitter() timeout = wait_until - time() - log.debug(f"waiting {timeout:.2} seconds ... ⏳") + log.debug(f"⏳ waiting {timeout:.2} seconds ...") sleep(timeout) # call @@ -148,7 +159,7 @@ def http_get(s: requests.Session, url: str, *args, **kwds) -> requests.Response: if cachefile: if cachefile.exists(): log.debug( - f"loading {req.url} ({req.headers!r}) from cache {cachefile} ... 💾" + f"💾 loading {req.url} ({req.headers!a}) from cache {cachefile} ..." ) with cachefile.open() as fp: resp = CachedResponse(**json.load(fp)) @@ -158,7 +169,7 @@ def http_get(s: requests.Session, url: str, *args, **kwds) -> requests.Response: ) return resp - log.debug(f"loading {req.url} ({req.headers!r}) ... ⚡️") + log.debug(f"⚡️ loading {req.url} ({req.headers!a}) ...") resp = s.send(req, allow_redirects=False, stream=True) resp.raise_for_status() @@ -182,8 +193,130 @@ def http_get(s: requests.Session, url: str, *args, **kwds) -> requests.Response: def soup_from_url(url): + """Return a BeautifulSoup instance from the contents for the given URL.""" with session() as s: r = http_get(s, url) soup = bs4.BeautifulSoup(r.text, "html5lib") return soup + + +def last_modified_from_response(resp): + if last_mod := resp.headers.get("Last-Modified"): + try: + return email.utils.parsedate_to_datetime(last_mod).timestamp() + except: + log.exception("🐛 Received invalid value for Last-Modified: %s", last_mod) + + +def last_modified_from_file(path: Path): + return path.stat().st_mtime + + +def download( + url: str, + file_path: Union[Path, str] = None, + *, + replace_existing: bool = None, + only_if_newer: bool = False, + timeout: float = None, + verify_ssl: bool = True, + chunk_callback=None, + response_callback=None, +): + """Download a file. + + Existing files will not be overwritten unless `replace_existing` is set. + Setting `only_if_newer` will check if the remote file is newer than the + local file, otherwise the download will be aborted. + """ + if replace_existing is None: + replace_existing = only_if_newer + + file_exists = None + if file_path is not None: + file_path = Path(file_path) + + file_exists = file_path.exists() and file_path.stat().st_size + if file_exists and not replace_existing: + raise FileExistsError(23, "Would replace existing file", str(file_path)) + + with session() as s: + + headers = {} + if file_exists and only_if_newer: + assert file_path + file_lastmod = last_modified_from_file(file_path) + headers["If-Modified-Since"] = email.utils.formatdate( + file_lastmod, usegmt=True + ) + + req = s.prepare_request(requests.Request("GET", url, headers=headers)) + + log.debug("⚡️ loading %s (%s) ...", req.url, req.headers) + resp = s.send( + req, allow_redirects=True, stream=True, timeout=timeout, verify=verify_ssl + ) + + if response_callback is not None: + try: + response_callback(resp) + except: + log.exception("🐛 Error in response callback.") + + log.debug("☕️ Response status: %s; headers: %s", resp.status_code, resp.headers) + + resp.raise_for_status() + + if resp.status_code == codes.not_modified: + log.debug("✋ Remote file has not changed, skipping download.") + return + + if file_path is None: + return resp.content + + assert replace_existing is True + + resp_lastmod = last_modified_from_response(resp) + + # Check Last-Modified in case the server ignored If-Modified-Since. + # XXX also check Content-Length? + if file_exists and only_if_newer and resp_lastmod is not None: + assert file_lastmod + + if resp_lastmod <= file_lastmod: + log.debug("✋ Local file is newer, skipping download.") + resp.close() + return + + # Create intermediate directories if necessary. + download_dir = file_path.parent + download_dir.mkdir(parents=True, exist_ok=True) + + # Write content to temp file. + tempdir = download_dir + tempfd, tempfile_path = tempfile.mkstemp( + dir=tempdir, prefix=f".download-{file_path.name}." + ) + one_mb = 2 ** 20 + chunk_size = 8 * one_mb + try: + log.debug("💾 Writing to temp file %s ...", tempfile_path) + for chunk in resp.iter_content(chunk_size=chunk_size, decode_unicode=False): + os.write(tempfd, chunk) + if chunk_callback: + try: + chunk_callback(chunk) + except: + log.exception("🐛 Error in chunk callback.") + finally: + os.close(tempfd) + + # Move downloaded file to destination. + if file_exists: + log.debug("💾 Replacing existing file: %s", file_path) + Path(tempfile_path).replace(file_path) + + # Fix file attributes. + if resp_lastmod is not None: + os.utime(file_path, (resp_lastmod, resp_lastmod)) diff --git a/unwind/web.py b/unwind/web.py index 128dbf8..d5c336a 100644 --- a/unwind/web.py +++ b/unwind/web.py @@ -12,13 +12,14 @@ from starlette.authentication import ( SimpleUser, requires, ) +from starlette.background import BackgroundTask from starlette.exceptions import HTTPException from starlette.middleware import Middleware from starlette.middleware.authentication import AuthenticationMiddleware from starlette.responses import JSONResponse from starlette.routing import Mount, Route -from . import config, db, imdb +from . import config, db, imdb, imdb_import from .db import close_connection_pool, find_ratings, open_connection_pool from .middleware.responsetime import ResponseTimeMiddleware from .models import Group, Movie, User, asplain @@ -192,6 +193,13 @@ async def add_movie(request): pass +@route("/movies/_reload_imdb", methods=["POST"]) +@requires(["authenticated", "admin"]) +async def load_imdb_movies(request): + task = BackgroundTask(imdb_import.load_from_web) + return JSONResponse({"status": "Import started."}, background=task, status_code=202) + + @route("/users") @requires(["authenticated", "admin"]) async def list_users(request): @@ -217,6 +225,15 @@ async def set_rating_for_user(request): request.path_params["user_id"] +@route("/users/_reload_ratings", methods=["POST"]) +@requires(["authenticated", "admin"]) +async def load_imdb_user_ratings(request): + + ratings = [rating async for rating in imdb.refresh_user_ratings_from_imdb()] + + return JSONResponse({"new_ratings": [asplain(r) for r in ratings]}) + + @route("/groups", methods=["POST"]) @requires(["authenticated", "admin"]) async def add_group(request):