init with some kind of working prototype

This commit is contained in:
ducklet 2021-06-15 19:09:21 +02:00
commit b5cb22822e
22 changed files with 1292 additions and 0 deletions

132
unwind/imdb.py Normal file
View file

@ -0,0 +1,132 @@
import logging
import re
from collections import namedtuple
from datetime import datetime
from typing import Optional
from urllib.parse import urljoin
from .db import add_or_update_movie, add_or_update_rating, add_or_update_user
from .models import Movie, Rating, User, asplain, fromplain
from .request import soup_from_url
log = logging.getLogger(__name__)
# div#ratings-container
# div.lister-item.mode-detail
# div.lister-item-content
# h3.lister-item-header
# a
# [href]
# .text
# span.lister-item-year.text
# br
# a
# [href]
# .text
# span.lister-item-year.text
# span.runtime.text
# span.genre.text
# div.ipl-rating-widget
# div.ipl-rating-star.small
# span.ipl-rating-star__rating.text
# div.ipl-rating-star.ipl-rating-star--other-user.small
# span.ipl-rating-star__rating.text
# p.text-muted.text ("Rated on 06 May 2021")
def imdb_url(user_id):
return f"https://www.imdb.com/user/{user_id}/ratings"
find_name = re.compile(r"(?P<name>.*)'s Ratings").fullmatch
find_rating_date = re.compile(r"Rated on (?P<date>\d{2} \w+ \d{4})").fullmatch
find_runtime = re.compile(r"((?P<h>\d+) hr)? ?((?P<m>\d+) min)?").fullmatch
# find_year = re.compile(
# r"(\([IVX]+\) )?\((?P<year>\d{4})(( |\d{4})| TV (Special|Movie)| Video)?\)"
# ).fullmatch
find_year = re.compile(
r"(\([IVX]+\) )?\((?P<year>\d{4})(( |\d{4})| (?P<type>[^)]+))?\)"
).fullmatch
find_movie_id = re.compile(r"/title/(?P<id>tt\d+)/").search
async def parse_page(url, stop_on_dupe=True) -> Optional[str]:
soup = soup_from_url(url)
user = User(imdb_id=soup.find("meta", property="pageId")["content"], name="")
if match := find_name(soup.h1.string):
user.name = match["name"]
await add_or_update_user(user)
items = soup.find_all("div", "lister-item-content")
for i, item in enumerate(items):
movie = Movie(
title=item.h3.a.string.strip(),
genres=set(s.strip() for s in item.find("span", "genre").string.split(",")),
)
episode_br = item.h3.br
if episode_br:
episode_a = episode_br.find_next("a")
if not episode_a:
log.error("Unknown document structure.")
continue
movie.media_type = "TV Episode"
movie.title += " / " + episode_a.string.strip()
if match := find_year(
episode_br.find_next("span", "lister-item-year").string
):
movie.release_year = int(match["year"])
if match := find_movie_id(episode_a["href"]):
movie.imdb_id = match["id"]
rating = Rating(user_id=user.id)
if (tag := item.find("span", "runtime")) and (
match := find_runtime(tag.string)
):
movie.runtime = int(match["h"] or 0) * 60 + int(match["m"] or 0)
if not episode_br:
if match := find_year(item.h3.find("span", "lister-item-year").string):
if media_type := match["type"]:
movie.media_type = media_type.strip()
movie.release_year = int(match["year"])
if match := find_movie_id(item.h3.a["href"]):
movie.imdb_id = match["id"]
ratings_item = item.find("div", "ipl-rating-widget")
if match := find_rating_date(ratings_item.find_next("p", "text-muted").string):
rating.rating_date = datetime.strptime(match["date"], "%d %b %Y")
for rating_item in ratings_item.find_all("span", "ipl-rating-star__rating")[:2]:
if "ipl-rating-star--other-user" in rating_item.parent["class"]:
rating.score = int(float(rating_item.string) * 10)
else:
movie.score = int(float(rating_item.string) * 10)
if not movie.media_type:
movie.media_type = "Movie"
await add_or_update_movie(movie)
rating.movie_id = movie.id # needs to be set _after_ movie has been updated
is_updated = await add_or_update_rating(rating)
if stop_on_dupe and not is_updated:
log.info("Import stopped after %s items. Caught up to known state. ✋", i)
return None
next_url = urljoin(
url, soup.find("div", "footer").find(string=re.compile(r"Next")).parent["href"]
)
return next_url if url != next_url else None
async def load_imdb(user_id):
next_url = imdb_url(user_id)
while next_url := await parse_page(next_url):
pass