feat: add support for new ratings page

Genres are no longer available for ratings, so we make them optional.
Adds support for validating generics in union types.
This commit is contained in:
ducklet 2024-05-11 17:13:48 +02:00
parent 06e60fb212
commit 1a7d85b31d
7 changed files with 385 additions and 37 deletions

View file

@ -4,4 +4,4 @@ cd "$RUN_DIR"
[ -z "${DEBUG:-}" ] || set -x
exec python -m unwind "$@"
exec poetry run python -m unwind "$@"

Binary file not shown.

Binary file not shown.

View file

@ -1,4 +1,5 @@
import bz2
import json
from pathlib import Path
from unittest.mock import AsyncMock
@ -75,10 +76,102 @@ async def test_load_ratings_page(monkeypatch):
monkeypatch.setattr(imdb, "asoup_from_url", AsyncMock(return_value=soup))
page = await imdb.load_ratings_page("fakeurl")
page = await imdb._load_ratings_page("fakeurl", "ur655321")
assert len(page.ratings) == 100
assert page.imdb_user_id is not None
assert page.imdb_user_id == "ur655321"
assert page.imdb_user_name == "AlexUltra"
assert page.next_page_url is not None
assert page.next_page_url.startswith("/user/ur655321/ratings?")
def _mock_response(content: bytes):
class MockResponse:
def raise_for_status(self):
pass
def json(self):
return json.loads(content)
return MockResponse()
@pytest.mark.asyncio
async def test_load_ratings_page_20240510(monkeypatch):
with bz2.open(fixturesdir / "ratings-ur655321-20240510.html.bz2", "rb") as f:
html = f.read()
soup = bs4.BeautifulSoup(html, "html5lib")
monkeypatch.setattr(imdb, "asoup_from_url", AsyncMock(return_value=soup))
with bz2.open(fixturesdir / "ratings-ur655321-20240510.gql.json.bz2", "rb") as f:
jsonstr = f.read()
async with imdb.asession() as s:
monkeypatch.setattr(s, "post", AsyncMock(return_value=_mock_response(jsonstr)))
page = await imdb._load_ratings_page("fakeurl", "ur655321")
assert len(page.ratings) == 100
assert page.imdb_user_id is not None
assert page.imdb_user_id == "ur655321"
assert page.imdb_user_name == "AlexUltra"
assert page.next_page_url is None, "not supported for new ratings page"
def movie(item: dict):
for rating in page.ratings:
assert rating.movie
if rating.movie.imdb_id == item["imdb_id"]:
rating_dict = {key: getattr(rating.movie, key) for key in item.keys()}
return rating_dict
raise AssertionError()
a_movie = {
"title": "Kung Fu Panda 4",
"release_year": 2024,
"media_type": "Movie",
"imdb_id": "tt21692408",
"imdb_score": 59,
"imdb_votes": 36000,
"runtime": 94,
}
assert a_movie == movie(a_movie)
a_running_tvseries = {
"title": "Palm Royale",
"release_year": 2024,
"media_type": "TV Series",
"imdb_id": "tt8888540",
"imdb_score": 64,
"imdb_votes": 6000,
}
assert a_running_tvseries == movie(a_running_tvseries)
a_finished_tvseries = {
"title": "Fawlty Towers",
"release_year": 1975,
"media_type": "TV Series",
"imdb_id": "tt0072500",
"imdb_score": 87,
"imdb_votes": 100000,
}
assert a_finished_tvseries == movie(a_finished_tvseries)
a_tvepisode = {
"title": "Columbo / No Time to Die",
"original_title": None,
"release_year": 1992,
"media_type": "TV Episode",
"imdb_id": "tt0103987",
"imdb_score": 59,
"imdb_votes": 2100,
"runtime": 98,
}
assert a_tvepisode == movie(a_tvepisode)
a_videogame = {
"title": "Alan Wake",
"original_title": None,
"release_year": 2010,
"media_type": "Video Game",
"imdb_id": "tt0466662",
"imdb_score": 82,
"imdb_votes": 7300,
}
assert a_videogame == movie(a_videogame)

View file

@ -4,7 +4,7 @@ import re
from collections import namedtuple
from dataclasses import dataclass, field
from datetime import datetime
from typing import AsyncIterable
from typing import AsyncIterable, NewType
from urllib.parse import urljoin
import bs4
@ -15,6 +15,11 @@ from .request import adownload, asession, asoup_from_url, cache_path
log = logging.getLogger(__name__)
ImdbRating = NewType("ImdbRating", float) # Value range: [1.0, 10.0]
UnwindScore = NewType("UnwindScore", int) # Value range: [0, 100]
MovieId = NewType("MovieId", str) # Pattern: ttXXXXXXXX
UserId = NewType("UserId", str) # Pattern: urXXXXXXXX
# div#ratings-container
# div.lister-item.mode-detail
# div.lister-item-content
@ -69,15 +74,15 @@ def movie_url(imdb_id: str):
return f"https://www.imdb.com/title/{imdb_id}/"
def imdb_rating_from_score(score: int) -> float:
def imdb_rating_from_score(score: UnwindScore) -> ImdbRating:
"""Return the IMDb rating from an Unwind Movie score."""
assert 0 <= score <= 100
rating = round(score * 9 / 100 + 1, 1)
assert 1.0 <= rating <= 10.0
return rating
return ImdbRating(rating)
def score_from_imdb_rating(rating: float) -> int:
def score_from_imdb_rating(rating: ImdbRating | int) -> UnwindScore:
"""Return the Unwind Movie score for an IMDb rating."""
# Scale IMDb's 10 point rating to our score of [0, 100].
# There's a pitfall here!
@ -86,22 +91,41 @@ def score_from_imdb_rating(rating: float) -> int:
assert 1.0 <= rating <= 10.0
score = round(100 * (rating - 1) / 9)
assert 0 <= score <= 100
return score
return UnwindScore(score)
# find_name: e.g. "Your Mom's Ratings"
find_name = re.compile(r"(?P<name>.*)'s Ratings").fullmatch
# find_rating_date: e.g. "Rated on 06 May 2021"
find_rating_date = re.compile(r"Rated on (?P<date>\d{2} \w+ \d{4})").fullmatch
# find_rating_date_2: e.g. "Rated on May 01, 2024"
find_rating_date_2 = re.compile(r"Rated on (?P<date>\w+ \d{2}, \d{4})").fullmatch
find_runtime = re.compile(r"((?P<h>\d+) hr)? ?((?P<m>\d+) min)?").fullmatch
# find_year = re.compile(
# r"(\([IVX]+\) )?\((?P<year>\d{4})(( |\d{4})| TV (Special|Movie)| Video)?\)"
# ).fullmatch
# find_runtime_2: e.g. "1h 38m"
find_runtime_2 = re.compile(r"((?P<h>\d+)h )?((?P<m>\d+)m)?").fullmatch
# find_year: e.g. "(1992)"
find_year = re.compile(
r"(\([IVX]+\) )?\((?P<year>\d{4})(( |\d{4})| (?P<type>[^)]+))?\)"
).fullmatch
# find_year_2: e.g. "2024", "19712003", "2024"
find_year_2 = re.compile(r"(?P<year>\d{4})((?P<end_year>\d{4})?)?").fullmatch
find_movie_id = re.compile(r"/title/(?P<id>tt\d+)/").search
find_movie_name = re.compile(r"\d+\. (?P<name>.+)").fullmatch
# find_vote_count: e.g. "(5.9K)", "(1K)", "(8)"
find_vote_count = re.compile(r"\((?P<count>\d+(\.\d+)?K?)\)").fullmatch
def movie_and_rating_from_item(item: bs4.Tag) -> tuple[Movie, Rating]:
def _first_string(tag: bs4.Tag) -> str | None:
for child in tag.children:
if isinstance(child, str):
return child
def _tv_episode_title(series_name: str, episode_name: str) -> str:
return f"{series_name.strip()} / {episode_name.strip()}"
def _movie_and_rating_from_item_legacy(item: bs4.Tag) -> tuple[Movie, Rating]:
genres = (genre := item.find("span", "genre")) and genre.string or ""
movie = Movie(
title=item.h3.a.string.strip(),
@ -115,7 +139,7 @@ def movie_and_rating_from_item(item: bs4.Tag) -> tuple[Movie, Rating]:
raise ValueError("Unknown document structure.")
movie.media_type = "TV Episode"
movie.title += " / " + episode_a.string.strip()
movie.title = _tv_episode_title(movie.title, episode_a.string)
if match := find_year(episode_br.find_next("span", "lister-item-year").string):
movie.release_year = int(match["year"])
if match := find_movie_id(episode_a["href"]):
@ -153,25 +177,140 @@ def movie_and_rating_from_item(item: bs4.Tag) -> tuple[Movie, Rating]:
return movie, rating
ForgedRequest = namedtuple("ForgedRequest", "url headers")
def _movie_and_rating_from_item_2024(item: bs4.Tag) -> Movie:
movie = Movie()
MovieId = str # ttXXXXXXXX
UserId = str # urXXXXXXXX
# Data for `original_title` and `genres` is not available from the ratings page.
if match := find_movie_name(item.h3.string.strip()):
movie.title = match["name"]
if (match := item.find("a", "ipc-lockup-overlay")) and (
match := find_movie_id(match["href"])
):
movie.imdb_id = match["id"]
if match := item.find("span", "ratingGroup--imdb-rating"):
movie.imdb_score = score_from_imdb_rating(float(_first_string(match)))
for metadata in item.find_all("span", "dli-title-metadata-item"):
# Other known metadata types, with some example values:
# - Episode count: "10 eps"
# - Age rating: "TV-PG", "TV-MA", "R"
if match := find_runtime_2(metadata.string.strip()):
movie.runtime = int(match["h"] or 0) * 60 + int(match["m"] or 0)
if match := find_year_2(metadata.string.strip()):
movie.release_year = int(match["year"])
if match := item.find("span", "dli-title-type-data"):
movie.media_type = match.string.strip()
if not movie.media_type:
movie.media_type = "Movie"
# TODO `imdb_votes` is available as exact value from the pages' JSON template.
if (match := item.find("span", "ipc-rating-star--voteCount")) and (
match := find_vote_count("".join(match.stripped_strings))
):
count, k, _ = match["count"].partition("K")
votes = float(count)
if k:
votes *= 1_000
movie.imdb_votes = int(votes)
if movie.media_type == "TV Episode":
titles = item.find_all("h3")
if len(titles) != 2:
raise ValueError("Unknown document structure.")
movie.title = _tv_episode_title(movie.title, titles[1].string)
if match := find_year(item.find("span", "dli-ep-year").get_text()):
movie.release_year = int(match["year"])
return movie
_ForgedRequest = namedtuple("_ForgedRequest", "url headers")
@dataclass
class RatingsPage:
class _RatingsPage:
ratings: list[Rating] = field(default_factory=list)
next_page_url: str | None = None
imdb_user_id: UserId | None = None
imdb_user_name: str | None = None
async def load_ratings_page(url: str) -> RatingsPage:
page = RatingsPage()
async def _load_ratings_page(url: str, user_id: UserId) -> _RatingsPage:
"""Dispatch to handlers for different ratings page versions."""
soup = await asoup_from_url(url)
if soup.find("meta", property="imdb:pageConst") is not None:
return await _load_ratings_page_2024(user_id, url, soup)
elif soup.find("meta", property="pageId") is not None:
return await _load_ratings_page_legacy(url, soup)
raise RuntimeError("Unknown ratings page version.")
async def _load_ratings_page_2024(
user_id: UserId, url: str, soup: bs4.BeautifulSoup
) -> _RatingsPage:
"""Handle the ratings page from 2024."""
page = _RatingsPage()
if (meta := soup.find("meta", property="imdb:pageConst")) is None:
raise RuntimeError("No pageId found.")
assert isinstance(meta, bs4.Tag)
if isinstance(page_id := meta["content"], list):
page_id = page_id[0]
page.imdb_user_id = page_id
if (headline := soup.title) is None:
raise RuntimeError("No user link found.")
assert isinstance(headline.string, str)
if match := find_name(headline.string):
page.imdb_user_name = match["name"]
items = soup.find_all("li", "ipc-metadata-list-summary-item")
movies: list[Movie] = []
for i, item in enumerate(items):
try:
movie = _movie_and_rating_from_item_2024(item)
except Exception as err:
log.error(
"Error in %s item #%s (%s): %a: %s",
url,
i,
cache_path(_ForgedRequest(url, headers={})),
" ".join(item.h3.stripped_strings),
err,
)
continue
movies.append(movie)
movies_dict = {m.imdb_id: m for m in movies}
async for rating in _load_user_movie_ratings(user_id, list(movies_dict.keys())):
movie = movies_dict[rating.movie_id]
rating = Rating(
movie=movie,
score=score_from_imdb_rating(rating.imdb_rating),
rating_date=rating.rating_date,
)
page.ratings.append(rating)
# TODO: next page requires querying IMDb's Graph API
return page
async def _load_ratings_page_legacy(url: str, soup: bs4.BeautifulSoup) -> _RatingsPage:
"""Handle the ratings page as it was before 2024."""
page = _RatingsPage()
if (meta := soup.find("meta", property="pageId")) is None:
raise RuntimeError("No pageId found.")
assert isinstance(meta, bs4.Tag)
@ -188,13 +327,13 @@ async def load_ratings_page(url: str) -> RatingsPage:
items = soup.find_all("div", "lister-item-content")
for i, item in enumerate(items):
try:
movie, rating = movie_and_rating_from_item(item)
movie, rating = _movie_and_rating_from_item_legacy(item)
except Exception as err:
log.error(
"Error in %s item #%s (%s): %s: %s",
"Error in %s item #%s (%s): %a: %s",
url,
i,
cache_path(ForgedRequest(url, headers={})),
cache_path(_ForgedRequest(url, headers={})),
" ".join(item.h3.stripped_strings),
err,
)
@ -245,11 +384,11 @@ async def load_and_store_ratings(
yield rating, is_updated
async def load_ratings(user_id: MovieId) -> AsyncIterable[Rating]:
async def load_ratings(user_id: UserId) -> AsyncIterable[Rating]:
next_url = user_ratings_url(user_id)
while next_url:
ratings_page = await load_ratings_page(next_url)
ratings_page = await _load_ratings_page(next_url, user_id)
next_url = ratings_page.next_page_url
for rating in ratings_page.ratings:
yield rating
@ -261,8 +400,8 @@ async def _ids_from_list_html(url: str) -> AsyncIterable[MovieId]:
# .href: '/title/tt1213644/?ref_=chtbtm_t_1'
# .text(): '1. Disaster Movie'
soup = await asoup_from_url(url)
for item in soup.find_all("li", class_="ipc-metadata-list-summary-item"):
if (link := item.find("a", class_="ipc-title-link-wrapper")) is not None:
for item in soup.find_all("li", "ipc-metadata-list-summary-item"):
if (link := item.find("a", "ipc-title-link-wrapper")) is not None:
if (href := link.get("href")) is not None:
if match_ := find_movie_id(href):
yield match_["id"]
@ -304,10 +443,19 @@ async def load_top_250() -> list[MovieId]:
qgl_api_url = "https://caching.graphql.imdb.com/"
query = {
"operationName": "Top250MoviesPagination",
"variables": r'{"first":250,"locale":"en-US"}',
"extensions": r'{"persistedQuery":{"sha256Hash":"26114ee01d97e04f65d6c8c7212ae8b7888fa57ceed105450d1fce09df749b2d","version":1}}',
"variables": {"first": 250, "locale": "en-US"},
"extensions": {
"persistedQuery": {
"sha256Hash": "26114ee01d97e04f65d6c8c7212ae8b7888fa57ceed105450d1fce09df749b2d",
"version": 1,
}
},
}
headers = {
"accept": "application/graphql+json, application/json",
"content-type": "application/json",
"origin": "https://www.imdb.com",
}
headers = {"content-type": "application/json"}
jsonstr = await adownload(qgl_api_url, query=query, headers=headers)
data = json.loads(jsonstr)
try:
@ -324,3 +472,58 @@ async def load_top_250() -> list[MovieId]:
raise RuntimeError(f"Expected exactly 250 items, got {len(imdb_title_ids)}")
return imdb_title_ids
@dataclass
class _UserMovieRating:
movie_id: MovieId
rating_date: datetime
imdb_rating: ImdbRating
async def _load_user_movie_ratings(
user_id: UserId, movie_ids: list[MovieId]
) -> AsyncIterable[_UserMovieRating]:
qgl_api_url = "https://api.graphql.imdb.com/"
headers = {
"accept": "application/graphql+json, application/json",
"content-type": "application/json",
"origin": "https://www.imdb.com",
}
query = {
"operationName": "UserRatingsAndWatchOptions",
"variables": {
"locale": "en-US",
"idArray": movie_ids,
"includeUserRating": False,
"location": {"latLong": {"lat": "65.03", "long": "-18.82"}},
"otherUserId": user_id,
"fetchOtherUserRating": True,
},
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "9672397d6bf156302f8f61e7ede2750222bd2689e65e21cfedc5abd5ca0f4aea",
}
},
}
async with asession() as s:
r = await s.post(qgl_api_url, headers=headers, json=query, timeout=10)
r.raise_for_status()
data = r.json()
try:
titles = data["data"]["titles"]
if len(titles) != len(movie_ids):
log.warning("Expected %s items, got %s.", len(movie_ids), len(titles))
for item in titles:
yield _UserMovieRating(
movie_id=item["id"],
rating_date=datetime.fromisoformat(item["otherUserRating"]["date"]),
imdb_rating=item["otherUserRating"]["value"],
)
except KeyError as err:
log.error("Unexpected data structure.", exc_info=err)
raise

View file

@ -197,16 +197,30 @@ def fromplain(cls: Type[T], d: Mapping, *, serialized: bool = False) -> T:
def validate(o: object) -> None:
for f in fields(o):
vtype = type(getattr(o, f.name))
if vtype is not f.type:
if get_origin(f.type) is vtype or (
(isinstance(f.type, UnionType) or get_origin(f.type) is Union)
and vtype in get_args(f.type)
):
if vtype is f.type:
continue
origin = get_origin(f.type)
if origin is vtype:
continue
is_union = isinstance(f.type, UnionType) or origin is Union
if is_union:
# Support unioned types.
utypes = get_args(f.type)
if vtype in utypes:
continue
# Support generic types (set[str], list[int], etc.)
gtypes = [g for u in utypes if (g := get_origin(u)) is not None]
if any(vtype is gtype for gtype in gtypes):
continue
raise ValueError(f"Invalid value type: {f.name}: {vtype}")
def utcnow():
def utcnow() -> datetime:
"""Return the current time as timezone aware datetime."""
return datetime.now(timezone.utc)
@ -293,7 +307,7 @@ class Movie:
Column("imdb_score", Integer),
Column("imdb_votes", Integer),
Column("runtime", Integer),
Column("genres", String, nullable=False),
Column("genres", String),
Column("created", String, nullable=False), # datetime
Column("updated", String, nullable=False), # datetime
)
@ -309,7 +323,7 @@ class Movie:
imdb_score: int | None = None # range: [0,100]
imdb_votes: int | None = None
runtime: int | None = None # minutes
genres: set[str] = None
genres: set[str] | None = None
created: datetime = field(default_factory=utcnow)
updated: datetime = field(default_factory=utcnow)

View file

@ -0,0 +1,38 @@
-- remove NOTNULL constraint from movies.genres
CREATE TABLE _migrate_movies (
id TEXT PRIMARY KEY NOT NULL,
title TEXT NOT NULL,
original_title TEXT,
release_year INTEGER NOT NULL,
media_type TEXT NOT NULL,
imdb_id TEXT NOT NULL UNIQUE,
imdb_score INTEGER,
imdb_votes INTEGER,
runtime INTEGER,
genres TEXT,
created TEXT NOT NULL,
updated TEXT NOT NULL
);;
INSERT INTO _migrate_movies
SELECT
id,
title,
original_title,
release_year,
media_type,
imdb_id,
imdb_score,
imdb_votes,
runtime,
genres,
created,
updated
FROM movies
WHERE true;;
DROP TABLE movies;;
ALTER TABLE _migrate_movies
RENAME TO movies;;