unwind/unwind/imdb.py

206 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import re
from collections import namedtuple
from datetime import datetime
from typing import Optional, Tuple
from urllib.parse import urljoin
from . import db
from .models import Movie, Rating, User
from .request import cache_path, session, soup_from_url
log = logging.getLogger(__name__)
# div#ratings-container
# div.lister-item.mode-detail
# div.lister-item-content
# h3.lister-item-header
# a
# [href]
# .text
# span.lister-item-year.text
# br
# a
# [href]
# .text
# span.lister-item-year.text
# span.runtime.text
# span.genre.text
# div.ipl-rating-widget
# div.ipl-rating-star.small
# span.ipl-rating-star__rating.text
# div.ipl-rating-star.ipl-rating-star--other-user.small
# span.ipl-rating-star__rating.text
# p.text-muted.text ("Rated on 06 May 2021")
async def refresh_user_ratings_from_imdb(stop_on_dupe=True):
with session() as s:
s.headers["Accept-Language"] = "en-GB, en;q=0.5"
for user in await db.get_all(User):
log.info("⚡️ Loading data for %s ...", user.name)
async for rating, is_updated in load_ratings(user.imdb_id):
assert rating.user == user
if stop_on_dupe and not is_updated:
break
yield rating
def user_ratings_url(user_id):
return f"https://www.imdb.com/user/{user_id}/ratings"
def movie_url(imdb_id: str):
return f"https://www.imdb.com/title/{imdb_id}/"
def imdb_rating_from_score(score: int) -> float:
"""Return the IMDb rating from an Unwind Movie score."""
assert 0 <= score <= 100
rating = round(score * 9 / 100 + 1, 1)
assert 1.0 <= rating <= 10.0
return rating
def score_from_imdb_rating(rating: float) -> int:
"""Return the Unwind Movie score for an IMDb rating."""
# Scale IMDb's 10 point rating to our score of [0, 100].
# There's a pitfall here!
# You might think this would be simply IMDb's rating times 10, *but*
# the lowest possible rating on IMDb is actually 1.
assert 1.0 <= rating <= 10.0
score = round(100 * (rating - 1) / 9)
assert 0 <= score <= 100
return score
find_name = re.compile(r"(?P<name>.*)'s Ratings").fullmatch
find_rating_date = re.compile(r"Rated on (?P<date>\d{2} \w+ \d{4})").fullmatch
find_runtime = re.compile(r"((?P<h>\d+) hr)? ?((?P<m>\d+) min)?").fullmatch
# find_year = re.compile(
# r"(\([IVX]+\) )?\((?P<year>\d{4})(( |\d{4})| TV (Special|Movie)| Video)?\)"
# ).fullmatch
find_year = re.compile(
r"(\([IVX]+\) )?\((?P<year>\d{4})(( |\d{4})| (?P<type>[^)]+))?\)"
).fullmatch
find_movie_id = re.compile(r"/title/(?P<id>tt\d+)/").search
def movie_and_rating_from_item(item) -> tuple[Movie, Rating]:
movie = Movie(
title=item.h3.a.string.strip(),
genres=set(s.strip() for s in item.find("span", "genre").string.split(",")),
)
episode_br = item.h3.br
if episode_br:
episode_a = episode_br.find_next("a")
if not episode_a:
raise ValueError("Unknown document structure.")
movie.media_type = "TV Episode"
movie.title += " / " + episode_a.string.strip()
if match := find_year(episode_br.find_next("span", "lister-item-year").string):
movie.release_year = int(match["year"])
if match := find_movie_id(episode_a["href"]):
movie.imdb_id = match["id"]
if (tag := item.find("span", "runtime")) and (match := find_runtime(tag.string)):
movie.runtime = int(match["h"] or 0) * 60 + int(match["m"] or 0)
if not episode_br:
if match := find_year(item.h3.find("span", "lister-item-year").string):
if media_type := match["type"]:
movie.media_type = media_type.strip()
movie.release_year = int(match["year"])
if match := find_movie_id(item.h3.a["href"]):
movie.imdb_id = match["id"]
if not movie.media_type:
movie.media_type = "Movie"
rating = Rating()
ratings_item = item.find("div", "ipl-rating-widget")
if match := find_rating_date(ratings_item.find_next("p", "text-muted").string):
rating.rating_date = datetime.strptime(match["date"], "%d %b %Y")
if match := ratings_item.find("div", "ipl-rating-star--other-user"):
if rating_item := match.find("span", "ipl-rating-star__rating"):
rating.score = score_from_imdb_rating(float(rating_item.string))
if match := ratings_item.find("div", "ipl-rating-star small"):
if rating_item := match.find("span", "ipl-rating-star__rating"):
movie.score = score_from_imdb_rating(float(rating_item.string))
return movie, rating
ForgedRequest = namedtuple("ForgedRequest", "url headers")
async def parse_page(url) -> Tuple[list[Rating], Optional[str]]:
ratings = []
soup = soup_from_url(url)
meta = soup.find("meta", property="pageId")
headline = soup.h1
assert meta is not None and headline is not None
user = User(imdb_id=meta["content"], name="")
if match := find_name(headline.string):
user.name = match["name"]
items = soup.find_all("div", "lister-item-content")
for i, item in enumerate(items):
try:
movie, rating = movie_and_rating_from_item(item)
except Exception as err:
log.error(
"Error in %s item #%s (%s): %s: %s",
url,
i,
cache_path(ForgedRequest(url, headers={})),
" ".join(item.h3.stripped_strings),
err,
)
continue
rating.user = user
rating.movie = movie
ratings.append(rating)
footer = soup.find("div", "footer")
assert footer is not None
next_url = urljoin(url, footer.find(string=re.compile(r"Next")).parent["href"])
return (ratings, next_url if url != next_url else None)
async def load_ratings(user_id):
next_url = user_ratings_url(user_id)
while next_url:
ratings, next_url = await parse_page(next_url)
for i, rating in enumerate(ratings):
if i == 0:
# All rating objects share the same user.
await db.add_or_update_user(rating.user)
rating.user_id = rating.user.id
await db.add_or_update_movie(rating.movie)
rating.movie_id = rating.movie.id
is_updated = await db.add_or_update_rating(rating)
yield rating, is_updated