From 294e311aff054704d8349ee99b55b5646b8348e6 Mon Sep 17 00:00:00 2001 From: ducklet Date: Sat, 10 Jul 2021 19:32:08 +0200 Subject: [PATCH] add new admin routes to trigger database updates The two routes offer reloading the whole movie database from IMDb, and reloading all users' IMDb ratings. These functions (or very similar) were already available using the CLI, but now they can be triggered remotely by an admin. --- unwind/config.py | 7 ++- unwind/imdb.py | 3 +- unwind/imdb_import.py | 40 ++++++++++-- unwind/request.py | 141 ++++++++++++++++++++++++++++++++++++++++-- unwind/web.py | 19 +++++- 5 files changed, 196 insertions(+), 14 deletions(-) diff --git a/unwind/config.py b/unwind/config.py index 1805cd7..624365a 100644 --- a/unwind/config.py +++ b/unwind/config.py @@ -4,12 +4,13 @@ from pathlib import Path import toml cachedir = ( - Path(cachedir) if (cachedir := os.getenv("UNWIND_CACHEDIR", ".cache")) else None + Path(cachedir) if (cachedir := os.getenv("UNWIND_CACHEDIR", "./.cache")) else None ) +datadir = Path(os.getenv("UNWIND_DATA") or "./data") debug = os.getenv("DEBUG") == "1" loglevel = os.getenv("UNWIND_LOGLEVEL") or ("DEBUG" if debug else "INFO") -storage_path = os.getenv("UNWIND_STORAGE", "./data/db.sqlite") -config_path = os.getenv("UNWIND_CONFIG", "./data/config.toml") +storage_path = os.getenv("UNWIND_STORAGE", datadir / "db.sqlite") +config_path = os.getenv("UNWIND_CONFIG", datadir / "config.toml") _config = toml.load(config_path) diff --git a/unwind/imdb.py b/unwind/imdb.py index 72d06c9..35368e5 100644 --- a/unwind/imdb.py +++ b/unwind/imdb.py @@ -37,7 +37,7 @@ log = logging.getLogger(__name__) async def refresh_user_ratings_from_imdb(stop_on_dupe=True): with session() as s: - s.headers["Accept-Language"] = "en-GB, en;q=0.5" + s.headers["Accept-Language"] = "en-US, en;q=0.5" for user in await db.get_all(User): @@ -192,6 +192,7 @@ async def load_ratings(user_id): ratings, next_url = await parse_page(next_url) for i, rating in enumerate(ratings): + assert rating.user and rating.movie if i == 0: # All rating objects share the same user. diff --git a/unwind/imdb_import.py b/unwind/imdb_import.py index e5888ef..3cd139a 100644 --- a/unwind/imdb_import.py +++ b/unwind/imdb_import.py @@ -6,7 +6,7 @@ from datetime import datetime, timezone from pathlib import Path from typing import Optional, cast -from . import db +from . import config, db, request from .db import add_or_update_many_movies from .imdb import score_from_imdb_rating from .models import Movie @@ -179,11 +179,11 @@ def read_basics(path): yield m -async def import_from_file(basics_path: Path, ratings_path: Path): - log.info("Loading scores ... 💾") +async def import_from_file(*, basics_path: Path, ratings_path: Path): + log.info("💾 Loading scores ...") scores = read_ratings_as_scoremap(ratings_path) - log.info("Importing movies ... 💾") + log.info("💾 Importing movies ...") total = count_lines(basics_path) assert total != 0 perc = 0.0 @@ -196,7 +196,7 @@ async def import_from_file(basics_path: Path, ratings_path: Path): for i, m in enumerate(read_basics(basics_path)): if i / total > perc: - log.info("Imported %s%%", round(perc * 100, 1)) + log.info("⏳ Imported %s%%", round(perc * 100, 1)) perc += perc_step if m.media_type not in { @@ -222,3 +222,33 @@ async def import_from_file(basics_path: Path, ratings_path: Path): if chunk: await add_or_update_many_movies(chunk) chunk = [] + + +async def load_from_web(): + """Refresh the full IMDb movie database. + + The latest dumps are first downloaded and then imported into the database. + Beware that this can take a very long time (about half an hour), as the + amount of data is quite large. + + See https://www.imdb.com/interfaces/ and https://datasets.imdbws.com/ for + more information on the IMDb database dumps. + """ + basics_url = "https://datasets.imdbws.com/title.basics.tsv.gz" + ratings_url = "https://datasets.imdbws.com/title.ratings.tsv.gz" + ratings_file = config.datadir / "imdb/title.ratings.tsv.gz" + basics_file = config.datadir / "imdb/title.basics.tsv.gz" + + ratings_mtime = ratings_file.stat().st_mtime if ratings_file.exists() else None + bastics_mtime = basics_file.stat().st_mtime if basics_file.exists() else None + + with request.session(): + request.download(ratings_url, ratings_file, only_if_newer=True) + request.download(basics_url, basics_file, only_if_newer=True) + + is_changed = ( + ratings_mtime != ratings_file.stat().st_mtime + or bastics_mtime != basics_file.stat().st_mtime + ) + if is_changed: + await import_from_file(basics_path=basics_file, ratings_path=ratings_file) diff --git a/unwind/request.py b/unwind/request.py index e46672c..81f9f29 100644 --- a/unwind/request.py +++ b/unwind/request.py @@ -1,5 +1,8 @@ +import email.utils import json import logging +import os +import tempfile from collections import deque from contextlib import contextmanager from dataclasses import dataclass @@ -8,10 +11,11 @@ from hashlib import md5 from pathlib import Path from random import random from time import sleep, time -from typing import Callable, Optional +from typing import Callable, Optional, Union import bs4 import requests +from requests.status_codes import codes from urllib3.util.retry import Retry from . import config @@ -44,6 +48,13 @@ _shared_session = None @contextmanager def session(): + """Return the shared request session. + + The session is shared by all request functions and provides cookie + persistence and connection pooling. + Opening the session before making a request allows you to set headers + or change the retry behavior. + """ global _shared_session if _shared_session: @@ -86,7 +97,7 @@ def throttle( if len(calls) == calls.maxlen: wait_until = calls.popleft() + per_seconds + jitter() timeout = wait_until - time() - log.debug(f"waiting {timeout:.2} seconds ... ⏳") + log.debug(f"⏳ waiting {timeout:.2} seconds ...") sleep(timeout) # call @@ -148,7 +159,7 @@ def http_get(s: requests.Session, url: str, *args, **kwds) -> requests.Response: if cachefile: if cachefile.exists(): log.debug( - f"loading {req.url} ({req.headers!r}) from cache {cachefile} ... 💾" + f"💾 loading {req.url} ({req.headers!a}) from cache {cachefile} ..." ) with cachefile.open() as fp: resp = CachedResponse(**json.load(fp)) @@ -158,7 +169,7 @@ def http_get(s: requests.Session, url: str, *args, **kwds) -> requests.Response: ) return resp - log.debug(f"loading {req.url} ({req.headers!r}) ... ⚡️") + log.debug(f"⚡️ loading {req.url} ({req.headers!a}) ...") resp = s.send(req, allow_redirects=False, stream=True) resp.raise_for_status() @@ -182,8 +193,130 @@ def http_get(s: requests.Session, url: str, *args, **kwds) -> requests.Response: def soup_from_url(url): + """Return a BeautifulSoup instance from the contents for the given URL.""" with session() as s: r = http_get(s, url) soup = bs4.BeautifulSoup(r.text, "html5lib") return soup + + +def last_modified_from_response(resp): + if last_mod := resp.headers.get("Last-Modified"): + try: + return email.utils.parsedate_to_datetime(last_mod).timestamp() + except: + log.exception("🐛 Received invalid value for Last-Modified: %s", last_mod) + + +def last_modified_from_file(path: Path): + return path.stat().st_mtime + + +def download( + url: str, + file_path: Union[Path, str] = None, + *, + replace_existing: bool = None, + only_if_newer: bool = False, + timeout: float = None, + verify_ssl: bool = True, + chunk_callback=None, + response_callback=None, +): + """Download a file. + + Existing files will not be overwritten unless `replace_existing` is set. + Setting `only_if_newer` will check if the remote file is newer than the + local file, otherwise the download will be aborted. + """ + if replace_existing is None: + replace_existing = only_if_newer + + file_exists = None + if file_path is not None: + file_path = Path(file_path) + + file_exists = file_path.exists() and file_path.stat().st_size + if file_exists and not replace_existing: + raise FileExistsError(23, "Would replace existing file", str(file_path)) + + with session() as s: + + headers = {} + if file_exists and only_if_newer: + assert file_path + file_lastmod = last_modified_from_file(file_path) + headers["If-Modified-Since"] = email.utils.formatdate( + file_lastmod, usegmt=True + ) + + req = s.prepare_request(requests.Request("GET", url, headers=headers)) + + log.debug("⚡️ loading %s (%s) ...", req.url, req.headers) + resp = s.send( + req, allow_redirects=True, stream=True, timeout=timeout, verify=verify_ssl + ) + + if response_callback is not None: + try: + response_callback(resp) + except: + log.exception("🐛 Error in response callback.") + + log.debug("☕️ Response status: %s; headers: %s", resp.status_code, resp.headers) + + resp.raise_for_status() + + if resp.status_code == codes.not_modified: + log.debug("✋ Remote file has not changed, skipping download.") + return + + if file_path is None: + return resp.content + + assert replace_existing is True + + resp_lastmod = last_modified_from_response(resp) + + # Check Last-Modified in case the server ignored If-Modified-Since. + # XXX also check Content-Length? + if file_exists and only_if_newer and resp_lastmod is not None: + assert file_lastmod + + if resp_lastmod <= file_lastmod: + log.debug("✋ Local file is newer, skipping download.") + resp.close() + return + + # Create intermediate directories if necessary. + download_dir = file_path.parent + download_dir.mkdir(parents=True, exist_ok=True) + + # Write content to temp file. + tempdir = download_dir + tempfd, tempfile_path = tempfile.mkstemp( + dir=tempdir, prefix=f".download-{file_path.name}." + ) + one_mb = 2 ** 20 + chunk_size = 8 * one_mb + try: + log.debug("💾 Writing to temp file %s ...", tempfile_path) + for chunk in resp.iter_content(chunk_size=chunk_size, decode_unicode=False): + os.write(tempfd, chunk) + if chunk_callback: + try: + chunk_callback(chunk) + except: + log.exception("🐛 Error in chunk callback.") + finally: + os.close(tempfd) + + # Move downloaded file to destination. + if file_exists: + log.debug("💾 Replacing existing file: %s", file_path) + Path(tempfile_path).replace(file_path) + + # Fix file attributes. + if resp_lastmod is not None: + os.utime(file_path, (resp_lastmod, resp_lastmod)) diff --git a/unwind/web.py b/unwind/web.py index 128dbf8..d5c336a 100644 --- a/unwind/web.py +++ b/unwind/web.py @@ -12,13 +12,14 @@ from starlette.authentication import ( SimpleUser, requires, ) +from starlette.background import BackgroundTask from starlette.exceptions import HTTPException from starlette.middleware import Middleware from starlette.middleware.authentication import AuthenticationMiddleware from starlette.responses import JSONResponse from starlette.routing import Mount, Route -from . import config, db, imdb +from . import config, db, imdb, imdb_import from .db import close_connection_pool, find_ratings, open_connection_pool from .middleware.responsetime import ResponseTimeMiddleware from .models import Group, Movie, User, asplain @@ -192,6 +193,13 @@ async def add_movie(request): pass +@route("/movies/_reload_imdb", methods=["POST"]) +@requires(["authenticated", "admin"]) +async def load_imdb_movies(request): + task = BackgroundTask(imdb_import.load_from_web) + return JSONResponse({"status": "Import started."}, background=task, status_code=202) + + @route("/users") @requires(["authenticated", "admin"]) async def list_users(request): @@ -217,6 +225,15 @@ async def set_rating_for_user(request): request.path_params["user_id"] +@route("/users/_reload_ratings", methods=["POST"]) +@requires(["authenticated", "admin"]) +async def load_imdb_user_ratings(request): + + ratings = [rating async for rating in imdb.refresh_user_ratings_from_imdb()] + + return JSONResponse({"new_ratings": [asplain(r) for r in ratings]}) + + @route("/groups", methods=["POST"]) @requires(["authenticated", "admin"]) async def add_group(request):