add locking & progress report to IMDb import (web)
This commit is contained in:
parent
5ad4946f5b
commit
4193c32955
4 changed files with 73 additions and 4 deletions
41
unwind/db.py
41
unwind/db.py
|
|
@ -7,7 +7,16 @@ import sqlalchemy
|
||||||
from databases import Database
|
from databases import Database
|
||||||
|
|
||||||
from . import config
|
from . import config
|
||||||
from .models import Movie, Rating, User, asplain, fields, fromplain, optional_fields
|
from .models import (
|
||||||
|
Movie,
|
||||||
|
Rating,
|
||||||
|
User,
|
||||||
|
asplain,
|
||||||
|
fields,
|
||||||
|
fromplain,
|
||||||
|
optional_fields,
|
||||||
|
utcnow,
|
||||||
|
)
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
@ -116,6 +125,36 @@ async def apply_db_patches(db: Database):
|
||||||
await db.execute("vacuum")
|
await db.execute("vacuum")
|
||||||
|
|
||||||
|
|
||||||
|
async def get_import_progress() -> float:
|
||||||
|
"""Return the current import progress percentage.
|
||||||
|
|
||||||
|
The progress will be a value from 0 to 100. If the value is 100, the last
|
||||||
|
import was completed successfully and no import is currently running.
|
||||||
|
"""
|
||||||
|
query = "SELECT state FROM progress WHERE id='import_imdb'"
|
||||||
|
state = await shared_connection().fetch_val(query)
|
||||||
|
return float(state or 100)
|
||||||
|
|
||||||
|
|
||||||
|
async def set_import_progress(progress: float):
|
||||||
|
"""Set the current import progress percentage."""
|
||||||
|
progress = min(max(0.0, progress), 100.0) # clamp to 0 <= progress <= 100
|
||||||
|
|
||||||
|
values = {"id": "import_imdb"}
|
||||||
|
|
||||||
|
if progress >= 100:
|
||||||
|
query = "DELETE FROM progress WHERE id=:id"
|
||||||
|
else:
|
||||||
|
query = """
|
||||||
|
INSERT INTO progress(id, state, started)
|
||||||
|
VALUES (:id, :state, :started)
|
||||||
|
ON CONFLICT DO UPDATE SET state=excluded.state
|
||||||
|
"""
|
||||||
|
values.update({"state": f"{progress:.2f}", "started": utcnow().isoformat()})
|
||||||
|
|
||||||
|
await shared_connection().execute(query=query, values=values)
|
||||||
|
|
||||||
|
|
||||||
def shared_connection() -> Database:
|
def shared_connection() -> Database:
|
||||||
global _shared_connection
|
global _shared_connection
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -195,6 +195,7 @@ async def import_from_file(*, basics_path: Path, ratings_path: Path):
|
||||||
|
|
||||||
perc = 100 * i / total
|
perc = 100 * i / total
|
||||||
if perc >= perc_next_report:
|
if perc >= perc_next_report:
|
||||||
|
await db.set_import_progress(perc)
|
||||||
log.info("⏳ Imported %s%%", round(perc, 1))
|
log.info("⏳ Imported %s%%", round(perc, 1))
|
||||||
perc_next_report += perc_step
|
perc_next_report += perc_step
|
||||||
|
|
||||||
|
|
@ -222,6 +223,8 @@ async def import_from_file(*, basics_path: Path, ratings_path: Path):
|
||||||
await add_or_update_many_movies(chunk)
|
await add_or_update_many_movies(chunk)
|
||||||
chunk = []
|
chunk = []
|
||||||
|
|
||||||
|
await db.set_import_progress(100)
|
||||||
|
|
||||||
|
|
||||||
async def load_from_web():
|
async def load_from_web():
|
||||||
"""Refresh the full IMDb movie database.
|
"""Refresh the full IMDb movie database.
|
||||||
|
|
@ -233,6 +236,8 @@ async def load_from_web():
|
||||||
See https://www.imdb.com/interfaces/ and https://datasets.imdbws.com/ for
|
See https://www.imdb.com/interfaces/ and https://datasets.imdbws.com/ for
|
||||||
more information on the IMDb database dumps.
|
more information on the IMDb database dumps.
|
||||||
"""
|
"""
|
||||||
|
await db.set_import_progress(0)
|
||||||
|
|
||||||
basics_url = "https://datasets.imdbws.com/title.basics.tsv.gz"
|
basics_url = "https://datasets.imdbws.com/title.basics.tsv.gz"
|
||||||
ratings_url = "https://datasets.imdbws.com/title.ratings.tsv.gz"
|
ratings_url = "https://datasets.imdbws.com/title.ratings.tsv.gz"
|
||||||
ratings_file = config.datadir / "imdb/title.ratings.tsv.gz"
|
ratings_file = config.datadir / "imdb/title.ratings.tsv.gz"
|
||||||
|
|
@ -249,5 +254,8 @@ async def load_from_web():
|
||||||
ratings_mtime != ratings_file.stat().st_mtime
|
ratings_mtime != ratings_file.stat().st_mtime
|
||||||
or bastics_mtime != basics_file.stat().st_mtime
|
or bastics_mtime != basics_file.stat().st_mtime
|
||||||
)
|
)
|
||||||
|
try:
|
||||||
if is_changed:
|
if is_changed:
|
||||||
await import_from_file(basics_path=basics_file, ratings_path=ratings_file)
|
await import_from_file(basics_path=basics_file, ratings_path=ratings_file)
|
||||||
|
finally:
|
||||||
|
await db.set_import_progress(100)
|
||||||
|
|
|
||||||
7
unwind/sql/20210711-172808--progress-table.sql
Normal file
7
unwind/sql/20210711-172808--progress-table.sql
Normal file
|
|
@ -0,0 +1,7 @@
|
||||||
|
-- add progress table
|
||||||
|
|
||||||
|
CREATE TABLE progress (
|
||||||
|
id TEXT PRIMARY KEY NOT NULL,
|
||||||
|
state TEXT NOT NULL,
|
||||||
|
started TEXT NOT NULL
|
||||||
|
);;
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import secrets
|
import secrets
|
||||||
from json.decoder import JSONDecodeError
|
from json.decoder import JSONDecodeError
|
||||||
|
|
@ -193,11 +194,25 @@ async def add_movie(request):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
import_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
|
||||||
@route("/movies/_reload_imdb", methods=["POST"])
|
@route("/movies/_reload_imdb", methods=["POST"])
|
||||||
@requires(["authenticated", "admin"])
|
@requires(["authenticated", "admin"])
|
||||||
async def load_imdb_movies(request):
|
async def load_imdb_movies(request):
|
||||||
|
async with import_lock:
|
||||||
|
progress = await db.get_import_progress()
|
||||||
|
if progress < 100:
|
||||||
|
return JSONResponse(
|
||||||
|
{"status": "Import running.", "progress": progress}, status_code=409
|
||||||
|
)
|
||||||
|
|
||||||
|
await db.set_import_progress(0)
|
||||||
|
|
||||||
task = BackgroundTask(imdb_import.load_from_web)
|
task = BackgroundTask(imdb_import.load_from_web)
|
||||||
return JSONResponse({"status": "Import started."}, background=task, status_code=202)
|
return JSONResponse(
|
||||||
|
{"status": "Import started.", "progress": 0.0}, background=task, status_code=202
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@route("/users")
|
@route("/users")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue