unwind/unwind/imdb.py

529 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import re
from collections import namedtuple
from dataclasses import dataclass, field
from datetime import datetime
from typing import AsyncIterable, NewType
from urllib.parse import urljoin
import bs4
from . import db
from .models import Movie, Rating, User
from .request import adownload, asession, asoup_from_url, cache_path
log = logging.getLogger(__name__)
ImdbRating = NewType("ImdbRating", float) # Value range: [1.0, 10.0]
UnwindScore = NewType("UnwindScore", int) # Value range: [0, 100]
MovieId = NewType("MovieId", str) # Pattern: ttXXXXXXXX
UserId = NewType("UserId", str) # Pattern: urXXXXXXXX
# div#ratings-container
# div.lister-item.mode-detail
# div.lister-item-content
# h3.lister-item-header
# a
# [href]
# .text
# span.lister-item-year.text
# br
# a
# [href]
# .text
# span.lister-item-year.text
# span.runtime.text
# span.genre.text
# div.ipl-rating-widget
# div.ipl-rating-star.small
# span.ipl-rating-star__rating.text
# div.ipl-rating-star.ipl-rating-star--other-user.small
# span.ipl-rating-star__rating.text
# p.text-muted.text ("Rated on 06 May 2021")
# p.text-muted.text-small span[name=nv] [data-value]
async def refresh_user_ratings_from_imdb(stop_on_dupe: bool = True):
async with asession() as s:
s.headers["Accept-Language"] = "en-US, en;q=0.5"
async with db.new_connection() as conn:
users = list(await db.get_all(conn, User))
for user in users:
log.info("⚡️ Loading data for %s ...", user.name)
try:
async for rating, is_updated in load_and_store_ratings(user.imdb_id):
assert rating.user is not None and rating.user.id == user.id
if stop_on_dupe and not is_updated:
break
yield rating
except BaseException as err:
log.error("❌ Could not load rating for %s!", user.name, exc_info=err)
def user_ratings_url(user_id):
return f"https://www.imdb.com/user/{user_id}/ratings"
def movie_url(imdb_id: str):
return f"https://www.imdb.com/title/{imdb_id}/"
def imdb_rating_from_score(score: UnwindScore) -> ImdbRating:
"""Return the IMDb rating from an Unwind Movie score."""
assert 0 <= score <= 100
rating = round(score * 9 / 100 + 1, 1)
assert 1.0 <= rating <= 10.0
return ImdbRating(rating)
def score_from_imdb_rating(rating: ImdbRating | int) -> UnwindScore:
"""Return the Unwind Movie score for an IMDb rating."""
# Scale IMDb's 10 point rating to our score of [0, 100].
# There's a pitfall here!
# You might think this would be simply IMDb's rating times 10, *but*
# the lowest possible rating on IMDb is actually 1.
assert 1.0 <= rating <= 10.0
score = round(100 * (rating - 1) / 9)
assert 0 <= score <= 100
return UnwindScore(score)
# find_name: e.g. "Your Mom's Ratings"
find_name = re.compile(r"(?P<name>.*)'s Ratings").fullmatch
# find_rating_date: e.g. "Rated on 06 May 2021"
find_rating_date = re.compile(r"Rated on (?P<date>\d{2} \w+ \d{4})").fullmatch
# find_rating_date_2: e.g. "Rated on May 01, 2024"
find_rating_date_2 = re.compile(r"Rated on (?P<date>\w+ \d{2}, \d{4})").fullmatch
find_runtime = re.compile(r"((?P<h>\d+) hr)? ?((?P<m>\d+) min)?").fullmatch
# find_runtime_2: e.g. "1h 38m"
find_runtime_2 = re.compile(r"((?P<h>\d+)h )?((?P<m>\d+)m)?").fullmatch
# find_year: e.g. "(1992)"
find_year = re.compile(
r"(\([IVX]+\) )?\((?P<year>\d{4})(( |\d{4})| (?P<type>[^)]+))?\)"
).fullmatch
# find_year_2: e.g. "2024", "19712003", "2024"
find_year_2 = re.compile(r"(?P<year>\d{4})((?P<end_year>\d{4})?)?").fullmatch
find_movie_id = re.compile(r"/title/(?P<id>tt\d+)/").search
find_movie_name = re.compile(r"\d+\. (?P<name>.+)").fullmatch
# find_vote_count: e.g. "(5.9K)", "(1K)", "(8)"
find_vote_count = re.compile(r"\((?P<count>\d+(\.\d+)?K?)\)").fullmatch
def _first_string(tag: bs4.Tag) -> str | None:
for child in tag.children:
if isinstance(child, str):
return child
def _tv_episode_title(series_name: str, episode_name: str) -> str:
return f"{series_name.strip()} / {episode_name.strip()}"
def _movie_and_rating_from_item_legacy(item: bs4.Tag) -> tuple[Movie, Rating]:
genres = (genre := item.find("span", "genre")) and genre.string or ""
movie = Movie(
title=item.h3.a.string.strip(),
genres=set(s.strip() for s in genres.split(",")),
)
episode_br = item.h3.br
if episode_br:
episode_a = episode_br.find_next("a")
if not episode_a:
raise ValueError("Unknown document structure.")
movie.media_type = "TV Episode"
movie.title = _tv_episode_title(movie.title, episode_a.string)
if match := find_year(episode_br.find_next("span", "lister-item-year").string):
movie.release_year = int(match["year"])
if match := find_movie_id(episode_a["href"]):
movie.imdb_id = match["id"]
if (tag := item.find("span", "runtime")) and (match := find_runtime(tag.string)):
movie.runtime = int(match["h"] or 0) * 60 + int(match["m"] or 0)
if not episode_br:
if match := find_year(item.h3.find("span", "lister-item-year").string):
if media_type := match["type"]:
movie.media_type = media_type.strip()
movie.release_year = int(match["year"])
if match := find_movie_id(item.h3.a["href"]):
movie.imdb_id = match["id"]
if not movie.media_type:
movie.media_type = "Movie"
if match := item.find("span", attrs={"name": "nv"}):
movie.imdb_votes = int(match["data-value"])
rating = Rating()
ratings_item = item.find("div", "ipl-rating-widget")
if match := find_rating_date(ratings_item.find_next("p", "text-muted").string):
rating.rating_date = datetime.strptime(match["date"], "%d %b %Y")
if match := ratings_item.find("div", "ipl-rating-star--other-user"):
if rating_item := match.find("span", "ipl-rating-star__rating"):
rating.score = score_from_imdb_rating(float(rating_item.string))
if match := ratings_item.find("div", "ipl-rating-star small"):
if rating_item := match.find("span", "ipl-rating-star__rating"):
movie.imdb_score = score_from_imdb_rating(float(rating_item.string))
return movie, rating
def _movie_and_rating_from_item_2024(item: bs4.Tag) -> Movie:
movie = Movie()
# Data for `original_title` and `genres` is not available from the ratings page.
if match := find_movie_name(item.h3.string.strip()):
movie.title = match["name"]
if (match := item.find("a", "ipc-lockup-overlay")) and (
match := find_movie_id(match["href"])
):
movie.imdb_id = match["id"]
if match := item.find("span", "ratingGroup--imdb-rating"):
movie.imdb_score = score_from_imdb_rating(float(_first_string(match)))
for metadata in item.find_all("span", "dli-title-metadata-item"):
# Other known metadata types, with some example values:
# - Episode count: "10 eps"
# - Age rating: "TV-PG", "TV-MA", "R"
if match := find_runtime_2(metadata.string.strip()):
movie.runtime = int(match["h"] or 0) * 60 + int(match["m"] or 0)
if match := find_year_2(metadata.string.strip()):
movie.release_year = int(match["year"])
if match := item.find("span", "dli-title-type-data"):
movie.media_type = match.string.strip()
if not movie.media_type:
movie.media_type = "Movie"
# TODO `imdb_votes` is available as exact value from the pages' JSON template.
if (match := item.find("span", "ipc-rating-star--voteCount")) and (
match := find_vote_count("".join(match.stripped_strings))
):
count, k, _ = match["count"].partition("K")
votes = float(count)
if k:
votes *= 1_000
movie.imdb_votes = int(votes)
if movie.media_type == "TV Episode":
titles = item.find_all("h3")
if len(titles) != 2:
raise ValueError("Unknown document structure.")
movie.title = _tv_episode_title(movie.title, titles[1].string)
if match := find_year(item.find("span", "dli-ep-year").get_text()):
movie.release_year = int(match["year"])
return movie
_ForgedRequest = namedtuple("_ForgedRequest", "url headers")
@dataclass
class _RatingsPage:
ratings: list[Rating] = field(default_factory=list)
next_page_url: str | None = None
imdb_user_id: UserId | None = None
imdb_user_name: str | None = None
async def _load_ratings_page(url: str, user_id: UserId) -> _RatingsPage:
"""Dispatch to handlers for different ratings page versions."""
soup = await asoup_from_url(url)
if soup.find("meta", property="imdb:pageConst") is not None:
return await _load_ratings_page_2024(user_id, url, soup)
elif soup.find("meta", property="pageId") is not None:
return await _load_ratings_page_legacy(url, soup)
raise RuntimeError("Unknown ratings page version.")
async def _load_ratings_page_2024(
user_id: UserId, url: str, soup: bs4.BeautifulSoup
) -> _RatingsPage:
"""Handle the ratings page from 2024."""
page = _RatingsPage()
if (meta := soup.find("meta", property="imdb:pageConst")) is None:
raise RuntimeError("No pageId found.")
assert isinstance(meta, bs4.Tag)
if isinstance(page_id := meta["content"], list):
page_id = page_id[0]
page.imdb_user_id = page_id
if (headline := soup.title) is None:
raise RuntimeError("No user link found.")
assert isinstance(headline.string, str)
if match := find_name(headline.string):
page.imdb_user_name = match["name"]
items = soup.find_all("li", "ipc-metadata-list-summary-item")
movies: list[Movie] = []
for i, item in enumerate(items):
try:
movie = _movie_and_rating_from_item_2024(item)
except Exception as err:
log.error(
"Error in %s item #%s (%s): %a: %s",
url,
i,
cache_path(_ForgedRequest(url, headers={})),
" ".join(item.h3.stripped_strings),
err,
)
continue
movies.append(movie)
movies_dict = {m.imdb_id: m for m in movies}
async for rating in _load_user_movie_ratings(user_id, list(movies_dict.keys())):
movie = movies_dict[rating.movie_id]
rating = Rating(
movie=movie,
score=score_from_imdb_rating(rating.imdb_rating),
rating_date=rating.rating_date,
)
page.ratings.append(rating)
# TODO: next page requires querying IMDb's Graph API
return page
async def _load_ratings_page_legacy(url: str, soup: bs4.BeautifulSoup) -> _RatingsPage:
"""Handle the ratings page as it was before 2024."""
page = _RatingsPage()
if (meta := soup.find("meta", property="pageId")) is None:
raise RuntimeError("No pageId found.")
assert isinstance(meta, bs4.Tag)
if isinstance(page_id := meta["content"], list):
page_id = page_id[0]
page.imdb_user_id = page_id
if (headline := soup.h1) is None:
raise RuntimeError("No headline found.")
assert isinstance(headline.string, str)
if match := find_name(headline.string):
page.imdb_user_name = match["name"]
items = soup.find_all("div", "lister-item-content")
for i, item in enumerate(items):
try:
movie, rating = _movie_and_rating_from_item_legacy(item)
except Exception as err:
log.error(
"Error in %s item #%s (%s): %a: %s",
url,
i,
cache_path(_ForgedRequest(url, headers={})),
" ".join(item.h3.stripped_strings),
err,
)
continue
rating.movie = movie
page.ratings.append(rating)
if (footer := soup.find("div", "footer")) is None:
raise RuntimeError("No footer found.")
assert isinstance(footer, bs4.Tag)
if (next_link := footer.find("a", string=re.compile("Next"))) is not None:
assert isinstance(next_link, bs4.Tag)
next_href = next_link["href"]
assert isinstance(next_href, str)
page.next_page_url = urljoin(url, next_href)
return page
async def load_and_store_ratings(
user_id: UserId,
) -> AsyncIterable[tuple[Rating, bool]]:
async with db.new_connection() as conn:
user = await db.get(conn, User, imdb_id=user_id) or User(
imdb_id=user_id, name="", secret=""
)
is_first = True
async for rating in load_ratings(user_id):
assert rating.movie
rating.user = user
async with db.transaction() as conn:
if is_first:
is_first = False
# All rating objects share the same user.
await db.add_or_update_user(conn, rating.user)
rating.user_id = rating.user.id
await db.add_or_update_movie(conn, rating.movie)
rating.movie_id = rating.movie.id
is_updated = await db.add_or_update_rating(conn, rating)
yield rating, is_updated
async def load_ratings(user_id: UserId) -> AsyncIterable[Rating]:
next_url = user_ratings_url(user_id)
while next_url:
ratings_page = await _load_ratings_page(next_url, user_id)
next_url = ratings_page.next_page_url
for rating in ratings_page.ratings:
yield rating
async def _ids_from_list_html(url: str) -> AsyncIterable[MovieId]:
"""Return all IMDb movie IDs (`tt*`) from the given URL."""
# document.querySelectorAll('li.ipc-metadata-list-summary-item a.ipc-title-link-wrapper')
# .href: '/title/tt1213644/?ref_=chtbtm_t_1'
# .text(): '1. Disaster Movie'
soup = await asoup_from_url(url)
for item in soup.find_all("li", "ipc-metadata-list-summary-item"):
if (link := item.find("a", "ipc-title-link-wrapper")) is not None:
if (href := link.get("href")) is not None:
if match_ := find_movie_id(href):
yield match_["id"]
async def load_most_popular_100() -> list[MovieId]:
"""Return the IMDb's top 100 most popular movies.
IMDb Charts: Most Popular Movies
As determined by IMDb users
"""
url = "https://www.imdb.com/chart/moviemeter/"
ids = [tid async for tid in _ids_from_list_html(url)]
if len(ids) != 100:
raise RuntimeError(f"Expected exactly 100 items, got {len(ids)}")
return ids
async def load_bottom_100() -> list[MovieId]:
"""Return the IMDb's bottom 100 lowest rated movies.
IMDb Charts: Lowest Rated Movies
Bottom 100 as voted by IMDb users
"""
url = "https://www.imdb.com/chart/bottom/"
ids = [tid async for tid in _ids_from_list_html(url)]
if len(ids) != 100:
raise RuntimeError(f"Expected exactly 100 items, got {len(ids)}")
return ids
async def load_top_250() -> list[MovieId]:
"""Return the IMDb's top 250 highest rated movies.
IMDb Charts: IMDb Top 250 Movies
As rated by regular IMDb voters.
"""
# Called from page https://www.imdb.com/chart/top/
qgl_api_url = "https://caching.graphql.imdb.com/"
query = {
"operationName": "Top250MoviesPagination",
"variables": {"first": 250, "locale": "en-US"},
"extensions": {
"persistedQuery": {
"sha256Hash": "26114ee01d97e04f65d6c8c7212ae8b7888fa57ceed105450d1fce09df749b2d",
"version": 1,
}
},
}
headers = {
"accept": "application/graphql+json, application/json",
"content-type": "application/json",
"origin": "https://www.imdb.com",
}
jsonstr = await adownload(qgl_api_url, query=query, headers=headers)
data = json.loads(jsonstr)
try:
imdb_title_ids = [
edge["node"]["id"] for edge in data["data"]["chartTitles"]["edges"]
]
has_next_page = data["data"]["chartTitles"]["pageInfo"]["hasNextPage"]
has_previous_page = data["data"]["chartTitles"]["pageInfo"]["hasPreviousPage"]
except KeyError as err:
log.error("Unexpected data structure.", exc_info=err)
raise
if len(imdb_title_ids) != 250 or has_next_page or has_previous_page:
raise RuntimeError(f"Expected exactly 250 items, got {len(imdb_title_ids)}")
return imdb_title_ids
@dataclass
class _UserMovieRating:
movie_id: MovieId
rating_date: datetime
imdb_rating: ImdbRating
async def _load_user_movie_ratings(
user_id: UserId, movie_ids: list[MovieId]
) -> AsyncIterable[_UserMovieRating]:
qgl_api_url = "https://api.graphql.imdb.com/"
headers = {
"accept": "application/graphql+json, application/json",
"content-type": "application/json",
"origin": "https://www.imdb.com",
}
query = {
"operationName": "UserRatingsAndWatchOptions",
"variables": {
"locale": "en-US",
"idArray": movie_ids,
"includeUserRating": False,
"location": {"latLong": {"lat": "65.03", "long": "-18.82"}},
"otherUserId": user_id,
"fetchOtherUserRating": True,
},
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "9672397d6bf156302f8f61e7ede2750222bd2689e65e21cfedc5abd5ca0f4aea",
}
},
}
async with asession() as s:
r = await s.post(qgl_api_url, headers=headers, json=query, timeout=10)
r.raise_for_status()
data = r.json()
try:
titles = data["data"]["titles"]
if len(titles) != len(movie_ids):
log.warning("Expected %s items, got %s.", len(movie_ids), len(titles))
for item in titles:
yield _UserMovieRating(
movie_id=item["id"],
rating_date=datetime.fromisoformat(item["otherUserRating"]["date"]),
imdb_rating=item["otherUserRating"]["value"],
)
except KeyError as err:
log.error("Unexpected data structure.", exc_info=err)
raise