add new admin routes to trigger database updates

The two routes offer reloading the whole movie database from IMDb, and
reloading all users' IMDb ratings.
These functions (or very similar) were already available using the CLI,
but now they can be triggered remotely by an admin.
This commit is contained in:
ducklet 2021-07-10 19:32:08 +02:00
parent 1805282d41
commit 294e311aff
5 changed files with 196 additions and 14 deletions

View file

@ -4,12 +4,13 @@ from pathlib import Path
import toml
cachedir = (
Path(cachedir) if (cachedir := os.getenv("UNWIND_CACHEDIR", ".cache")) else None
Path(cachedir) if (cachedir := os.getenv("UNWIND_CACHEDIR", "./.cache")) else None
)
datadir = Path(os.getenv("UNWIND_DATA") or "./data")
debug = os.getenv("DEBUG") == "1"
loglevel = os.getenv("UNWIND_LOGLEVEL") or ("DEBUG" if debug else "INFO")
storage_path = os.getenv("UNWIND_STORAGE", "./data/db.sqlite")
config_path = os.getenv("UNWIND_CONFIG", "./data/config.toml")
storage_path = os.getenv("UNWIND_STORAGE", datadir / "db.sqlite")
config_path = os.getenv("UNWIND_CONFIG", datadir / "config.toml")
_config = toml.load(config_path)

View file

@ -37,7 +37,7 @@ log = logging.getLogger(__name__)
async def refresh_user_ratings_from_imdb(stop_on_dupe=True):
with session() as s:
s.headers["Accept-Language"] = "en-GB, en;q=0.5"
s.headers["Accept-Language"] = "en-US, en;q=0.5"
for user in await db.get_all(User):
@ -192,6 +192,7 @@ async def load_ratings(user_id):
ratings, next_url = await parse_page(next_url)
for i, rating in enumerate(ratings):
assert rating.user and rating.movie
if i == 0:
# All rating objects share the same user.

View file

@ -6,7 +6,7 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, cast
from . import db
from . import config, db, request
from .db import add_or_update_many_movies
from .imdb import score_from_imdb_rating
from .models import Movie
@ -179,11 +179,11 @@ def read_basics(path):
yield m
async def import_from_file(basics_path: Path, ratings_path: Path):
log.info("Loading scores ... 💾")
async def import_from_file(*, basics_path: Path, ratings_path: Path):
log.info("💾 Loading scores ...")
scores = read_ratings_as_scoremap(ratings_path)
log.info("Importing movies ... 💾")
log.info("💾 Importing movies ...")
total = count_lines(basics_path)
assert total != 0
perc = 0.0
@ -196,7 +196,7 @@ async def import_from_file(basics_path: Path, ratings_path: Path):
for i, m in enumerate(read_basics(basics_path)):
if i / total > perc:
log.info("Imported %s%%", round(perc * 100, 1))
log.info("Imported %s%%", round(perc * 100, 1))
perc += perc_step
if m.media_type not in {
@ -222,3 +222,33 @@ async def import_from_file(basics_path: Path, ratings_path: Path):
if chunk:
await add_or_update_many_movies(chunk)
chunk = []
async def load_from_web():
"""Refresh the full IMDb movie database.
The latest dumps are first downloaded and then imported into the database.
Beware that this can take a very long time (about half an hour), as the
amount of data is quite large.
See https://www.imdb.com/interfaces/ and https://datasets.imdbws.com/ for
more information on the IMDb database dumps.
"""
basics_url = "https://datasets.imdbws.com/title.basics.tsv.gz"
ratings_url = "https://datasets.imdbws.com/title.ratings.tsv.gz"
ratings_file = config.datadir / "imdb/title.ratings.tsv.gz"
basics_file = config.datadir / "imdb/title.basics.tsv.gz"
ratings_mtime = ratings_file.stat().st_mtime if ratings_file.exists() else None
bastics_mtime = basics_file.stat().st_mtime if basics_file.exists() else None
with request.session():
request.download(ratings_url, ratings_file, only_if_newer=True)
request.download(basics_url, basics_file, only_if_newer=True)
is_changed = (
ratings_mtime != ratings_file.stat().st_mtime
or bastics_mtime != basics_file.stat().st_mtime
)
if is_changed:
await import_from_file(basics_path=basics_file, ratings_path=ratings_file)

View file

@ -1,5 +1,8 @@
import email.utils
import json
import logging
import os
import tempfile
from collections import deque
from contextlib import contextmanager
from dataclasses import dataclass
@ -8,10 +11,11 @@ from hashlib import md5
from pathlib import Path
from random import random
from time import sleep, time
from typing import Callable, Optional
from typing import Callable, Optional, Union
import bs4
import requests
from requests.status_codes import codes
from urllib3.util.retry import Retry
from . import config
@ -44,6 +48,13 @@ _shared_session = None
@contextmanager
def session():
"""Return the shared request session.
The session is shared by all request functions and provides cookie
persistence and connection pooling.
Opening the session before making a request allows you to set headers
or change the retry behavior.
"""
global _shared_session
if _shared_session:
@ -86,7 +97,7 @@ def throttle(
if len(calls) == calls.maxlen:
wait_until = calls.popleft() + per_seconds + jitter()
timeout = wait_until - time()
log.debug(f"waiting {timeout:.2} seconds ...")
log.debug(f"waiting {timeout:.2} seconds ...")
sleep(timeout)
# call
@ -148,7 +159,7 @@ def http_get(s: requests.Session, url: str, *args, **kwds) -> requests.Response:
if cachefile:
if cachefile.exists():
log.debug(
f"loading {req.url} ({req.headers!r}) from cache {cachefile} ... 💾"
f"💾 loading {req.url} ({req.headers!a}) from cache {cachefile} ..."
)
with cachefile.open() as fp:
resp = CachedResponse(**json.load(fp))
@ -158,7 +169,7 @@ def http_get(s: requests.Session, url: str, *args, **kwds) -> requests.Response:
)
return resp
log.debug(f"loading {req.url} ({req.headers!r}) ... ⚡️")
log.debug(f"⚡️ loading {req.url} ({req.headers!a}) ...")
resp = s.send(req, allow_redirects=False, stream=True)
resp.raise_for_status()
@ -182,8 +193,130 @@ def http_get(s: requests.Session, url: str, *args, **kwds) -> requests.Response:
def soup_from_url(url):
"""Return a BeautifulSoup instance from the contents for the given URL."""
with session() as s:
r = http_get(s, url)
soup = bs4.BeautifulSoup(r.text, "html5lib")
return soup
def last_modified_from_response(resp):
if last_mod := resp.headers.get("Last-Modified"):
try:
return email.utils.parsedate_to_datetime(last_mod).timestamp()
except:
log.exception("🐛 Received invalid value for Last-Modified: %s", last_mod)
def last_modified_from_file(path: Path):
return path.stat().st_mtime
def download(
url: str,
file_path: Union[Path, str] = None,
*,
replace_existing: bool = None,
only_if_newer: bool = False,
timeout: float = None,
verify_ssl: bool = True,
chunk_callback=None,
response_callback=None,
):
"""Download a file.
Existing files will not be overwritten unless `replace_existing` is set.
Setting `only_if_newer` will check if the remote file is newer than the
local file, otherwise the download will be aborted.
"""
if replace_existing is None:
replace_existing = only_if_newer
file_exists = None
if file_path is not None:
file_path = Path(file_path)
file_exists = file_path.exists() and file_path.stat().st_size
if file_exists and not replace_existing:
raise FileExistsError(23, "Would replace existing file", str(file_path))
with session() as s:
headers = {}
if file_exists and only_if_newer:
assert file_path
file_lastmod = last_modified_from_file(file_path)
headers["If-Modified-Since"] = email.utils.formatdate(
file_lastmod, usegmt=True
)
req = s.prepare_request(requests.Request("GET", url, headers=headers))
log.debug("⚡️ loading %s (%s) ...", req.url, req.headers)
resp = s.send(
req, allow_redirects=True, stream=True, timeout=timeout, verify=verify_ssl
)
if response_callback is not None:
try:
response_callback(resp)
except:
log.exception("🐛 Error in response callback.")
log.debug("☕️ Response status: %s; headers: %s", resp.status_code, resp.headers)
resp.raise_for_status()
if resp.status_code == codes.not_modified:
log.debug("✋ Remote file has not changed, skipping download.")
return
if file_path is None:
return resp.content
assert replace_existing is True
resp_lastmod = last_modified_from_response(resp)
# Check Last-Modified in case the server ignored If-Modified-Since.
# XXX also check Content-Length?
if file_exists and only_if_newer and resp_lastmod is not None:
assert file_lastmod
if resp_lastmod <= file_lastmod:
log.debug("✋ Local file is newer, skipping download.")
resp.close()
return
# Create intermediate directories if necessary.
download_dir = file_path.parent
download_dir.mkdir(parents=True, exist_ok=True)
# Write content to temp file.
tempdir = download_dir
tempfd, tempfile_path = tempfile.mkstemp(
dir=tempdir, prefix=f".download-{file_path.name}."
)
one_mb = 2 ** 20
chunk_size = 8 * one_mb
try:
log.debug("💾 Writing to temp file %s ...", tempfile_path)
for chunk in resp.iter_content(chunk_size=chunk_size, decode_unicode=False):
os.write(tempfd, chunk)
if chunk_callback:
try:
chunk_callback(chunk)
except:
log.exception("🐛 Error in chunk callback.")
finally:
os.close(tempfd)
# Move downloaded file to destination.
if file_exists:
log.debug("💾 Replacing existing file: %s", file_path)
Path(tempfile_path).replace(file_path)
# Fix file attributes.
if resp_lastmod is not None:
os.utime(file_path, (resp_lastmod, resp_lastmod))

View file

@ -12,13 +12,14 @@ from starlette.authentication import (
SimpleUser,
requires,
)
from starlette.background import BackgroundTask
from starlette.exceptions import HTTPException
from starlette.middleware import Middleware
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
from . import config, db, imdb
from . import config, db, imdb, imdb_import
from .db import close_connection_pool, find_ratings, open_connection_pool
from .middleware.responsetime import ResponseTimeMiddleware
from .models import Group, Movie, User, asplain
@ -192,6 +193,13 @@ async def add_movie(request):
pass
@route("/movies/_reload_imdb", methods=["POST"])
@requires(["authenticated", "admin"])
async def load_imdb_movies(request):
task = BackgroundTask(imdb_import.load_from_web)
return JSONResponse({"status": "Import started."}, background=task, status_code=202)
@route("/users")
@requires(["authenticated", "admin"])
async def list_users(request):
@ -217,6 +225,15 @@ async def set_rating_for_user(request):
request.path_params["user_id"]
@route("/users/_reload_ratings", methods=["POST"])
@requires(["authenticated", "admin"])
async def load_imdb_user_ratings(request):
ratings = [rating async for rating in imdb.refresh_user_ratings_from_imdb()]
return JSONResponse({"new_ratings": [asplain(r) for r in ratings]})
@route("/groups", methods=["POST"])
@requires(["authenticated", "admin"])
async def add_group(request):