add imdb full import mode

This commit is contained in:
ducklet 2021-06-21 18:54:03 +02:00
parent b5cb22822e
commit 7dd10f8bc3
17 changed files with 721 additions and 109 deletions

View file

@ -6,8 +6,8 @@ from typing import Optional
from urllib.parse import urljoin
from .db import add_or_update_movie, add_or_update_rating, add_or_update_user
from .models import Movie, Rating, User, asplain, fromplain
from .request import soup_from_url
from .models import Movie, Rating, User
from .request import cache_path, soup_from_url
log = logging.getLogger(__name__)
@ -38,6 +38,26 @@ def imdb_url(user_id):
return f"https://www.imdb.com/user/{user_id}/ratings"
def imdb_rating_from_score(score: int) -> float:
"""Return the IMDb rating from an Unwind Movie score."""
assert 0 <= score <= 100
rating = round(score * 9 / 100 + 1, 1)
assert 1.0 <= rating <= 10.0
return rating
def score_from_imdb_rating(rating: float) -> int:
"""Return the Unwind Movie score for an IMDb rating."""
# Scale IMDb's 10 point rating to our score of [0, 100].
# There's a pitfall here!
# You might think this would be simply IMDb's rating times 10, *but*
# the lowest possible rating on IMDb is actually 1.
assert 1.0 <= rating <= 10.0
score = round(100 * (rating - 1) / 9)
assert 0 <= score <= 100
return score
find_name = re.compile(r"(?P<name>.*)'s Ratings").fullmatch
find_rating_date = re.compile(r"Rated on (?P<date>\d{2} \w+ \d{4})").fullmatch
find_runtime = re.compile(r"((?P<h>\d+) hr)? ?((?P<m>\d+) min)?").fullmatch
@ -50,67 +70,88 @@ find_year = re.compile(
find_movie_id = re.compile(r"/title/(?P<id>tt\d+)/").search
def movie_and_rating_from_item(item) -> tuple[Movie, Rating]:
movie = Movie(
title=item.h3.a.string.strip(),
genres=set(s.strip() for s in item.find("span", "genre").string.split(",")),
)
episode_br = item.h3.br
if episode_br:
episode_a = episode_br.find_next("a")
if not episode_a:
raise ValueError("Unknown document structure.")
movie.media_type = "TV Episode"
movie.title += " / " + episode_a.string.strip()
if match := find_year(episode_br.find_next("span", "lister-item-year").string):
movie.release_year = int(match["year"])
if match := find_movie_id(episode_a["href"]):
movie.imdb_id = match["id"]
if (tag := item.find("span", "runtime")) and (match := find_runtime(tag.string)):
movie.runtime = int(match["h"] or 0) * 60 + int(match["m"] or 0)
if not episode_br:
if match := find_year(item.h3.find("span", "lister-item-year").string):
if media_type := match["type"]:
movie.media_type = media_type.strip()
movie.release_year = int(match["year"])
if match := find_movie_id(item.h3.a["href"]):
movie.imdb_id = match["id"]
if not movie.media_type:
movie.media_type = "Movie"
rating = Rating()
ratings_item = item.find("div", "ipl-rating-widget")
if match := find_rating_date(ratings_item.find_next("p", "text-muted").string):
rating.rating_date = datetime.strptime(match["date"], "%d %b %Y")
if match := ratings_item.find("div", "ipl-rating-star--other-user"):
if rating_item := match.find("span", "ipl-rating-star__rating"):
rating.score = score_from_imdb_rating(float(rating_item.string))
if match := ratings_item.find("div", "ipl-rating-star small"):
if rating_item := match.find("span", "ipl-rating-star__rating"):
movie.score = score_from_imdb_rating(float(rating_item.string))
return movie, rating
ForgedRequest = namedtuple("ForgedRequest", "url headers")
async def parse_page(url, stop_on_dupe=True) -> Optional[str]:
soup = soup_from_url(url)
user = User(imdb_id=soup.find("meta", property="pageId")["content"], name="")
if match := find_name(soup.h1.string):
meta = soup.find("meta", property="pageId")
headline = soup.h1
assert meta is not None and headline is not None
user = User(imdb_id=meta["content"], name="")
if match := find_name(headline.string):
user.name = match["name"]
await add_or_update_user(user)
items = soup.find_all("div", "lister-item-content")
for i, item in enumerate(items):
movie = Movie(
title=item.h3.a.string.strip(),
genres=set(s.strip() for s in item.find("span", "genre").string.split(",")),
)
episode_br = item.h3.br
if episode_br:
episode_a = episode_br.find_next("a")
if not episode_a:
log.error("Unknown document structure.")
continue
movie.media_type = "TV Episode"
movie.title += " / " + episode_a.string.strip()
if match := find_year(
episode_br.find_next("span", "lister-item-year").string
):
movie.release_year = int(match["year"])
if match := find_movie_id(episode_a["href"]):
movie.imdb_id = match["id"]
rating = Rating(user_id=user.id)
if (tag := item.find("span", "runtime")) and (
match := find_runtime(tag.string)
):
movie.runtime = int(match["h"] or 0) * 60 + int(match["m"] or 0)
if not episode_br:
if match := find_year(item.h3.find("span", "lister-item-year").string):
if media_type := match["type"]:
movie.media_type = media_type.strip()
movie.release_year = int(match["year"])
if match := find_movie_id(item.h3.a["href"]):
movie.imdb_id = match["id"]
ratings_item = item.find("div", "ipl-rating-widget")
if match := find_rating_date(ratings_item.find_next("p", "text-muted").string):
rating.rating_date = datetime.strptime(match["date"], "%d %b %Y")
for rating_item in ratings_item.find_all("span", "ipl-rating-star__rating")[:2]:
if "ipl-rating-star--other-user" in rating_item.parent["class"]:
rating.score = int(float(rating_item.string) * 10)
else:
movie.score = int(float(rating_item.string) * 10)
if not movie.media_type:
movie.media_type = "Movie"
try:
movie, rating = movie_and_rating_from_item(item)
except Exception as err:
log.error(
"Error in %s item #%s (%s): %s: %s",
url,
i,
cache_path(ForgedRequest(url, headers={})),
" ".join(item.h3.stripped_strings),
err,
)
continue
await add_or_update_movie(movie)
rating.user_id = user.id
rating.movie_id = movie.id # needs to be set _after_ movie has been updated
is_updated = await add_or_update_rating(rating)
@ -118,9 +159,9 @@ async def parse_page(url, stop_on_dupe=True) -> Optional[str]:
log.info("Import stopped after %s items. Caught up to known state. ✋", i)
return None
next_url = urljoin(
url, soup.find("div", "footer").find(string=re.compile(r"Next")).parent["href"]
)
footer = soup.find("div", "footer")
assert footer is not None
next_url = urljoin(url, footer.find(string=re.compile(r"Next")).parent["href"])
return next_url if url != next_url else None