We should only assert where we know the result, here the input file could just as well be empty.
297 lines
8.2 KiB
Python
297 lines
8.2 KiB
Python
import asyncio
|
|
import csv
|
|
import gzip
|
|
import logging
|
|
from dataclasses import dataclass, fields
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Generator, Literal, Type, TypeVar, overload
|
|
|
|
from . import config, db, request
|
|
from .db import add_or_update_many_movies
|
|
from .imdb import score_from_imdb_rating
|
|
from .models import Movie
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
T = TypeVar("T")
|
|
|
|
# See
|
|
# - https://www.imdb.com/interfaces/
|
|
# - https://datasets.imdbws.com/
|
|
|
|
|
|
@dataclass
|
|
class BasicRow:
|
|
tconst: str
|
|
titleType: str
|
|
primaryTitle: str
|
|
originalTitle: str
|
|
isAdult: bool
|
|
startYear: int | None
|
|
endYear: int | None
|
|
runtimeMinutes: int | None
|
|
genres: set[str] | None
|
|
|
|
@classmethod
|
|
def from_row(cls, row):
|
|
assert row[4] in "01" # isAdult
|
|
inst = cls(
|
|
tconst=row[0],
|
|
titleType=row[1],
|
|
primaryTitle=row[2],
|
|
originalTitle=row[3],
|
|
isAdult=row[4] == "1",
|
|
startYear=None if row[5] == r"\N" else int(row[5]),
|
|
endYear=None if row[6] == r"\N" else int(row[6]),
|
|
runtimeMinutes=None if row[7] == r"\N" else int(row[7]),
|
|
genres=None if row[8] == r"\N" else set(row[8].split(",")),
|
|
)
|
|
assert inst.titleType in title_types
|
|
return inst
|
|
|
|
def as_movie(self):
|
|
assert self.startYear is not None
|
|
return Movie.lazy(
|
|
title=self.primaryTitle,
|
|
original_title=self.originalTitle,
|
|
release_year=self.startYear,
|
|
media_type=title_types[self.titleType],
|
|
imdb_id=self.tconst,
|
|
imdb_score=None,
|
|
runtime=self.runtimeMinutes,
|
|
genres=self.genres or set(),
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class RatingRow:
|
|
tconst: str
|
|
averageRating: float
|
|
numVotes: int
|
|
|
|
@classmethod
|
|
def from_row(cls, row):
|
|
inst = cls(tconst=row[0], averageRating=float(row[1]), numVotes=int(row[2]))
|
|
assert inst.tconst != r"\N"
|
|
return inst
|
|
|
|
def as_movie(self):
|
|
return Movie.lazy(
|
|
imdb_id=self.tconst,
|
|
imdb_score=score_from_imdb_rating(self.averageRating),
|
|
imdb_votes=self.numVotes,
|
|
)
|
|
|
|
|
|
title_types = {
|
|
"movie": "Movie",
|
|
"radioEpisode": "Radio Episode",
|
|
"radioSeries": "Radio Series",
|
|
"short": "Short",
|
|
"tvEpisode": "TV Episode",
|
|
"tvMiniSeries": "TV Mini Series",
|
|
"tvMovie": "TV Movie",
|
|
"tvPilot": "TV Pilot",
|
|
"tvSeries": "TV Series",
|
|
"tvShort": "TV Short",
|
|
"tvSpecial": "TV Special",
|
|
"video": "Video",
|
|
"videoGame": "Video Game",
|
|
}
|
|
|
|
|
|
def gz_mtime(path: Path) -> datetime:
|
|
"""Return the timestamp of the compressed file."""
|
|
g = gzip.GzipFile(path, "rb")
|
|
g.peek(1) # start reading the file to fill the timestamp field
|
|
assert g.mtime is not None
|
|
return datetime.fromtimestamp(g.mtime).replace(tzinfo=timezone.utc)
|
|
|
|
|
|
def count_lines(path: Path) -> int:
|
|
i = 0
|
|
|
|
one_mb = 2**20
|
|
buf_size = 8 * one_mb # 8 MiB seems to give a good read/process performance.
|
|
|
|
with gzip.open(path, "rt") as f:
|
|
while buf := f.read(buf_size):
|
|
i += buf.count("\n")
|
|
|
|
return i
|
|
|
|
|
|
@overload
|
|
def read_imdb_tsv(
|
|
path: Path, row_type, *, unpack: Literal[False]
|
|
) -> Generator[list[str], None, None]:
|
|
...
|
|
|
|
|
|
@overload
|
|
def read_imdb_tsv(
|
|
path: Path, row_type: Type[T], *, unpack: Literal[True] = True
|
|
) -> Generator[T, None, None]:
|
|
...
|
|
|
|
|
|
def read_imdb_tsv(path: Path, row_type, *, unpack=True):
|
|
with gzip.open(path, "rt", newline="") as f:
|
|
rows = csv.reader(f, delimiter="\t", quoting=csv.QUOTE_NONE)
|
|
|
|
# skip header line
|
|
rows = iter(rows)
|
|
header = next(rows)
|
|
try:
|
|
assert tuple(f.name for f in fields(row_type)) == tuple(header)
|
|
except AssertionError:
|
|
log.error("Unexpected header line: %s", header)
|
|
raise
|
|
|
|
if unpack is False:
|
|
yield from rows
|
|
return
|
|
|
|
for i, row in enumerate(rows, start=1):
|
|
try:
|
|
yield row_type.from_row(row)
|
|
except Exception as err:
|
|
log.error("Error in line %s: %s", i, row, exc_info=err)
|
|
raise
|
|
|
|
|
|
def read_ratings(path: Path):
|
|
mtime = gz_mtime(path)
|
|
rows = read_imdb_tsv(path, RatingRow)
|
|
|
|
for row in rows:
|
|
m = row.as_movie()
|
|
m.updated = mtime
|
|
yield m
|
|
|
|
|
|
def read_ratings_as_mapping(path: Path):
|
|
"""Optimized function to quickly load all ratings."""
|
|
rows = read_imdb_tsv(path, RatingRow, unpack=False)
|
|
return {r[0]: (round(100 * (float(r[1]) - 1) / 9), int(r[2])) for r in rows}
|
|
|
|
|
|
def read_basics(path: Path) -> Generator[Movie | None, None, None]:
|
|
mtime = gz_mtime(path)
|
|
rows = read_imdb_tsv(path, BasicRow)
|
|
|
|
for row in rows:
|
|
if row.startYear is None:
|
|
log.debug("Skipping movie, missing year: %s", row)
|
|
yield None
|
|
continue
|
|
|
|
m = row.as_movie()
|
|
m.updated = mtime
|
|
yield m
|
|
|
|
|
|
async def import_from_file(*, basics_path: Path, ratings_path: Path):
|
|
log.info("💾 Loading scores ...")
|
|
ratings = read_ratings_as_mapping(ratings_path)
|
|
|
|
log.info("💾 Importing movies ...")
|
|
total = count_lines(basics_path)
|
|
log.debug("Found %i movies.", total)
|
|
if total == 0:
|
|
raise RuntimeError(f"No movies found.")
|
|
perc_next_report = 0.0
|
|
perc_step = 0.1
|
|
|
|
chunk = []
|
|
|
|
for i, m in enumerate(read_basics(basics_path)):
|
|
perc = 100 * i / total
|
|
if perc >= perc_next_report:
|
|
await db.set_import_progress(perc)
|
|
log.info("⏳ Imported %s%%", round(perc, 1))
|
|
perc_next_report += perc_step
|
|
|
|
if m is None:
|
|
continue
|
|
|
|
if m.media_type not in {
|
|
"Movie",
|
|
"Short",
|
|
"TV Mini Series",
|
|
"TV Movie",
|
|
"TV Series",
|
|
"TV Short",
|
|
"TV Special",
|
|
"Video",
|
|
}:
|
|
log.debug("Skipping movie, unwanted media type: %s", m.media_type)
|
|
continue
|
|
|
|
m.imdb_score, m.imdb_votes = ratings.get(m.imdb_id, [None, None])
|
|
chunk.append(m)
|
|
|
|
if len(chunk) > 1000:
|
|
await add_or_update_many_movies(chunk)
|
|
chunk = []
|
|
|
|
if chunk:
|
|
await add_or_update_many_movies(chunk)
|
|
chunk = []
|
|
|
|
log.info("👍 Imported 100%")
|
|
await db.set_import_progress(100)
|
|
|
|
|
|
async def download_datasets(*, basics_path: Path, ratings_path: Path) -> None:
|
|
"""Download IMDb movie database dumps.
|
|
|
|
See https://www.imdb.com/interfaces/ and https://datasets.imdbws.com/ for
|
|
more information on the IMDb database dumps.
|
|
"""
|
|
basics_url = "https://datasets.imdbws.com/title.basics.tsv.gz"
|
|
ratings_url = "https://datasets.imdbws.com/title.ratings.tsv.gz"
|
|
|
|
async with request.asession():
|
|
await asyncio.gather(
|
|
request.adownload(ratings_url, to_path=ratings_path, only_if_newer=True),
|
|
request.adownload(basics_url, to_path=basics_path, only_if_newer=True),
|
|
)
|
|
|
|
|
|
async def load_from_web(*, force: bool = False) -> None:
|
|
"""Refresh the full IMDb movie database.
|
|
|
|
The latest dumps are first downloaded and then imported into the database.
|
|
Beware that this can take a very long time (about half an hour), as the
|
|
amount of data is quite large.
|
|
|
|
See https://www.imdb.com/interfaces/ and https://datasets.imdbws.com/ for
|
|
more information on the IMDb database dumps.
|
|
"""
|
|
await db.set_import_progress(0)
|
|
|
|
try:
|
|
ratings_file = config.datadir / "imdb/title.ratings.tsv.gz"
|
|
basics_file = config.datadir / "imdb/title.basics.tsv.gz"
|
|
|
|
ratings_mtime = ratings_file.stat().st_mtime if ratings_file.exists() else None
|
|
bastics_mtime = basics_file.stat().st_mtime if basics_file.exists() else None
|
|
|
|
await download_datasets(basics_path=basics_file, ratings_path=ratings_file)
|
|
|
|
is_changed = (
|
|
ratings_mtime != ratings_file.stat().st_mtime
|
|
or bastics_mtime != basics_file.stat().st_mtime
|
|
)
|
|
|
|
if force or is_changed:
|
|
await import_from_file(basics_path=basics_file, ratings_path=ratings_file)
|
|
|
|
except BaseException as err:
|
|
await db.stop_import_progress(error=err)
|
|
raise
|
|
|
|
else:
|
|
await db.stop_import_progress()
|