feat: add import script for Academy awards

This commit is contained in:
ducklet 2024-05-25 01:22:26 +02:00
parent f723459333
commit 02a9621734
8 changed files with 170 additions and 49 deletions

View file

@ -75,8 +75,10 @@ async def test_get_ratings_for_group_with_awards(
award2 = models.Award(
movie_id=movie2.id, category="imdb-top-250", details='{"position":99}'
)
await db.add(conn, award1)
await db.add(conn, award2)
award3 = models.Award(
movie_id=movie1.id, category="oscars", details='{"name":"Best Visual Effects"}'
)
await db.add(conn, award1, award2, award3)
rating = models.Rating(
movie_id=movie1.id, user_id=user.id, score=66, rating_date=datetime.now(tz=UTC)
@ -92,7 +94,7 @@ async def test_get_ratings_for_group_with_awards(
"original_title": movie1.original_title,
"user_scores": [rating.score],
"year": movie1.release_year,
"awards": ["imdb-top-250:23"],
"awards": ["imdb-top-250:23", "oscars:Best Visual Effects"],
}
resp = unauthorized_client.get(path)

View file

@ -0,0 +1,100 @@
import argparse
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Iterable
from unwind import db, models, types
log = logging.getLogger(__name__)
name = "import-wikidata-oscars"
help = "Import Academy awards information from a Wikidata dump."
# To generate the JSON file, run the following query
# at https://query.wikidata.org/ and export as (simpel) JSON:
"""
SELECT ?awardLabel ?filmLabel ?imdbId ?time WHERE {
?award wdt:P31 wd:Q19020.
?film wdt:P31 wd:Q11424;
p:P166 ?awardStat.
?awardStat ps:P166 ?award.
OPTIONAL {
?awardStat pq:P805 ?awardEdition.
?awardEdition wdt:P585 ?time.
?film wdt:P345 ?imdbId.
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
ORDER BY DESC (?time)
"""
def add_args(cmd: argparse.ArgumentParser) -> None:
cmd.add_argument("--json-file", required=True, type=Path)
def load_awards(json_file: Path) -> Iterable[tuple[types.ImdbMovieId, models.Award]]:
with json_file.open() as fd:
data = json.load(fd)
name_prefix = "Academy Award for "
special_names = {
"Special Achievement Academy Award": "Special Achievement",
"Academy Honorary Award": "Honorary",
}
for item in data:
name = item["awardLabel"]
if name in special_names:
name = special_names[name]
elif name.startswith(name_prefix):
name = name.removeprefix(name_prefix)
else:
raise ValueError(f"Award name is unexpected: {name!a}")
# award = models.Award(category="oscars",details={"name":name},created=created)
award = models.Award(category="oscars")
# award._details = {"name": name}
award.name = name
if (datestr := item.get("time")) is not None:
award.created = datetime.fromisoformat(datestr)
if "imdbId" not in item:
log.warning("⚠️ IMDb ID missing for movie: %a", item["filmLabel"])
else:
yield item["imdbId"], award
async def remove_all_oscars(conn: db.Connection) -> None:
stmt = models.awards.delete().where(models.awards.c.category == "oscars")
await conn.execute(stmt)
async def main(args: argparse.Namespace) -> None:
await db.open_connection_pool()
json_file: Path = args.json_file
awards = dict(load_awards(json_file))
async with db.new_connection() as conn:
imdb_ids = list(awards)
available = await db.get_movie_ids(conn, imdb_ids)
if missing := set(imdb_ids).difference(available):
log.warning(
"⚠️ File (%a) contained %i unknown movies: %a",
str(json_file),
len(missing),
missing,
)
async with db.transaction() as conn:
await remove_all_oscars(conn)
for imdb_id, unwind_id in available.items():
award = awards[imdb_id]
award.movie_id = unwind_id
await db.add(conn, award)
log.info(f"✨ Imported {len(available)} oscars.")
await db.close_connection_pool()

View file

@ -2,9 +2,7 @@ import argparse
import logging
from typing import Callable
import sqlalchemy as sa
from unwind import db, imdb, models, types, utils
from unwind import db, imdb, models
log = logging.getLogger(__name__)
@ -23,15 +21,6 @@ def add_args(cmd: argparse.ArgumentParser) -> None:
)
async def get_movie_ids(
conn: db.Connection, imdb_ids: list[imdb.MovieId]
) -> dict[imdb.MovieId, types.ULID]:
c = models.movies.c
query = sa.select(c.imdb_id, c.id).where(c.imdb_id.in_(imdb_ids))
rows = await db.fetch_all(conn, query)
return {row.imdb_id: types.ULID(row.id) for row in rows}
async def remove_all_awards(
conn: db.Connection, category: models.AwardCategory
) -> None:
@ -50,7 +39,7 @@ async def update_awards(conn: db.Connection, category: models.AwardCategory) ->
load_imdb_ids = _award_handlers[category]
imdb_ids = await load_imdb_ids()
available = await get_movie_ids(conn, imdb_ids)
available = await db.get_movie_ids(conn, imdb_ids)
if missing := set(imdb_ids).difference(available):
log.warning(
"⚠️ Charts for category (%a) contained %i unknown movies: %a",
@ -68,8 +57,8 @@ async def update_awards(conn: db.Connection, category: models.AwardCategory) ->
award = models.Award(
movie_id=movie_id,
category=category,
details=utils.json_dump({"position": pos}),
)
award.position = pos
await db.add(conn, award)

View file

@ -28,7 +28,7 @@ from .models import (
ratings,
utcnow,
)
from .types import ULID, ImdbMovieId, UserIdStr
from .types import ULID, ImdbMovieId, MovieId, UserIdStr
log = logging.getLogger(__name__)
@ -237,7 +237,8 @@ async def transacted(
await conn.rollback()
async def add(conn: Connection, /, item: Model) -> None:
async def add(conn: Connection, /, *items: Model) -> None:
for item in items:
# Support late initializing - used for optimization.
if getattr(item, "_is_lazy", False):
assert hasattr(item, "_lazy_init")
@ -449,6 +450,16 @@ async def get_awards(
return awards_dict
async def get_movie_ids(
conn: Connection, imdb_ids: list[ImdbMovieId]
) -> dict[ImdbMovieId, MovieId]:
query = sa.select(movies.c.imdb_id, movies.c.id).where(
movies.c.imdb_id.in_(imdb_ids)
)
rows = await fetch_all(conn, query)
return {row.imdb_id: MovieId(ULID(row.id)) for row in rows}
def sql_escape(s: str, char: str = "#") -> str:
return s.replace(char, 2 * char).replace("%", f"{char}%").replace("_", f"{char}_")

View file

@ -4,7 +4,7 @@ import re
from collections import namedtuple
from dataclasses import dataclass, field
from datetime import datetime
from typing import AsyncIterable, NewType
from typing import AsyncIterable
from urllib.parse import urljoin
import bs4
@ -12,14 +12,11 @@ import bs4
from . import db
from .models import Movie, Rating, User
from .request import adownload, asession, asoup_from_url, cache_path
from .types import ImdbMovieId, ImdbRating, ImdbUserId, Score100
from .utils import json_dump
log = logging.getLogger(__name__)
ImdbRating = NewType("ImdbRating", float) # Value range: [1.0, 10.0]
UnwindScore = NewType("UnwindScore", int) # Value range: [0, 100]
MovieId = NewType("MovieId", str) # Pattern: ttXXXXXXXX
UserId = NewType("UserId", str) # Pattern: urXXXXXXXX
# div#ratings-container
# div.lister-item.mode-detail
@ -75,7 +72,7 @@ def movie_url(imdb_id: str):
return f"https://www.imdb.com/title/{imdb_id}/"
def imdb_rating_from_score(score: UnwindScore) -> ImdbRating:
def imdb_rating_from_score(score: Score100) -> ImdbRating:
"""Return the IMDb rating from an Unwind Movie score."""
assert 0 <= score <= 100
rating = round(score * 9 / 100 + 1, 1)
@ -83,7 +80,7 @@ def imdb_rating_from_score(score: UnwindScore) -> ImdbRating:
return ImdbRating(rating)
def score_from_imdb_rating(rating: ImdbRating | int) -> UnwindScore:
def score_from_imdb_rating(rating: ImdbRating | int) -> Score100:
"""Return the Unwind Movie score for an IMDb rating."""
# Scale IMDb's 10 point rating to our score of [0, 100].
# There's a pitfall here!
@ -92,7 +89,7 @@ def score_from_imdb_rating(rating: ImdbRating | int) -> UnwindScore:
assert 1.0 <= rating <= 10.0
score = round(100 * (rating - 1) / 9)
assert 0 <= score <= 100
return UnwindScore(score)
return Score100(score)
# find_name: e.g. "Your Mom's Ratings"
@ -237,11 +234,11 @@ _ForgedRequest = namedtuple("_ForgedRequest", "url headers")
class _RatingsPage:
ratings: list[Rating] = field(default_factory=list)
next_page_url: str | None = None
imdb_user_id: UserId | None = None
imdb_user_id: ImdbUserId | None = None
imdb_user_name: str | None = None
async def _load_ratings_page(url: str, user_id: UserId) -> _RatingsPage:
async def _load_ratings_page(url: str, user_id: ImdbUserId) -> _RatingsPage:
"""Dispatch to handlers for different ratings page versions."""
soup = await asoup_from_url(url)
@ -255,7 +252,7 @@ async def _load_ratings_page(url: str, user_id: UserId) -> _RatingsPage:
async def _load_ratings_page_2024(
user_id: UserId, url: str, soup: bs4.BeautifulSoup
user_id: ImdbUserId, url: str, soup: bs4.BeautifulSoup
) -> _RatingsPage:
"""Handle the ratings page from 2024."""
page = _RatingsPage()
@ -356,7 +353,9 @@ async def _load_ratings_page_legacy(url: str, soup: bs4.BeautifulSoup) -> _Ratin
return page
async def load_and_store_ratings(user_id: UserId) -> AsyncIterable[tuple[Rating, bool]]:
async def load_and_store_ratings(
user_id: ImdbUserId,
) -> AsyncIterable[tuple[Rating, bool]]:
"""Load user ratings from imdb.com and store them in our database.
All loaded ratings are yielded together with the information whether each rating
@ -388,7 +387,7 @@ async def load_and_store_ratings(user_id: UserId) -> AsyncIterable[tuple[Rating,
yield rating, is_updated
async def load_ratings(user_id: UserId) -> AsyncIterable[Rating]:
async def load_ratings(user_id: ImdbUserId) -> AsyncIterable[Rating]:
"""Return all ratings for the given user from imdb.com."""
next_url = user_ratings_url(user_id)
@ -399,7 +398,7 @@ async def load_ratings(user_id: UserId) -> AsyncIterable[Rating]:
yield rating
async def _ids_from_list_html(url: str) -> AsyncIterable[MovieId]:
async def _ids_from_list_html(url: str) -> AsyncIterable[ImdbMovieId]:
"""Return all IMDb movie IDs (`tt*`) from the given URL."""
# document.querySelectorAll('li.ipc-metadata-list-summary-item a.ipc-title-link-wrapper')
# .href: '/title/tt1213644/?ref_=chtbtm_t_1'
@ -412,7 +411,7 @@ async def _ids_from_list_html(url: str) -> AsyncIterable[MovieId]:
yield match_["id"]
async def load_most_popular_100() -> list[MovieId]:
async def load_most_popular_100() -> list[ImdbMovieId]:
"""Return the IMDb's top 100 most popular movies.
IMDb Charts: Most Popular Movies
@ -425,7 +424,7 @@ async def load_most_popular_100() -> list[MovieId]:
return ids
async def load_bottom_100() -> list[MovieId]:
async def load_bottom_100() -> list[ImdbMovieId]:
"""Return the IMDb's bottom 100 lowest rated movies.
IMDb Charts: Lowest Rated Movies
@ -438,7 +437,7 @@ async def load_bottom_100() -> list[MovieId]:
return ids
async def load_top_250() -> list[MovieId]:
async def load_top_250() -> list[ImdbMovieId]:
"""Return the IMDb's top 250 highest rated movies.
IMDb Charts: IMDb Top 250 Movies
@ -483,13 +482,13 @@ async def load_top_250() -> list[MovieId]:
@dataclass
class _UserMovieRating:
movie_id: MovieId
movie_id: ImdbMovieId
rating_date: datetime
imdb_rating: ImdbRating
async def _load_user_movie_ratings(
user_id: UserId, movie_ids: list[MovieId]
user_id: ImdbUserId, movie_ids: list[ImdbMovieId]
) -> AsyncIterable[_UserMovieRating]:
qgl_api_url = "https://api.graphql.imdb.com/"
headers = {

View file

@ -577,5 +577,15 @@ class Award:
details["position"] = position
self._details = details
@property
def name(self) -> str:
return self._details["name"]
@name.setter
def name(self, name: str):
details = self._details
details["name"] = name
self._details = details
awards = Award.__table__

View file

@ -37,10 +37,12 @@ class ULID(ulid.ULID):
AwardId = NewType("AwardId", ULID)
GroupId = NewType("GroupId", ULID)
ImdbMovieId = NewType("ImdbMovieId", str)
ImdbMovieId = NewType("ImdbMovieId", str) # Pattern: ttXXXXXXXX
ImdbRating = NewType("ImdbRating", float) # Value range: [1.0, 10.0]
ImdbUserId = NewType("ImdbUserId", str) # Pattern: urXXXXXXXX
MovieId = NewType("MovieId", ULID)
MovieIdStr = NewType("MovieIdStr", str)
RatingId = NewType("RatingId", ULID)
Score100 = NewType("Score100", int) # [0, 100]
Score100 = NewType("Score100", int) # Value range: [0, 100]
UserId = NewType("UserId", ULID)
UserIdStr = NewType("UserIdStr", str)

View file

@ -60,6 +60,14 @@ class RatingAggregate:
)
def _serialize_award(award: models.Award) -> str:
if award.category == "oscars":
return f"{award.category}:{award.name}"
elif award.category.startswith("imdb-"):
return f"{award.category}:{award.position}"
raise RuntimeError(f"Unsupported category: {award.category}")
def aggregate_ratings(
ratings: Iterable[Rating],
user_ids: Container[types.UserIdStr],
@ -84,7 +92,7 @@ def aggregate_ratings(
original_title=r.original_title,
user_scores=[],
year=r.release_year,
awards=[f"{a.category}:{a.position}" for a in awards],
awards=sorted(_serialize_award(a) for a in awards),
),
)
# XXX do we need this? why don't we just get the ratings we're supposed to aggregate?