store error in progress
This commit is contained in:
parent
e939e57a8f
commit
3d5656392e
5 changed files with 130 additions and 49 deletions
43
unwind/db.py
43
unwind/db.py
|
|
@ -1,3 +1,4 @@
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
@ -131,28 +132,42 @@ async def get_import_progress() -> Optional[Progress]:
|
||||||
return await get(Progress, type="import-imdb-movies", order_by="started DESC")
|
return await get(Progress, type="import-imdb-movies", order_by="started DESC")
|
||||||
|
|
||||||
|
|
||||||
|
async def stop_import_progress(*, error: BaseException = None):
|
||||||
|
"""Stop the current import.
|
||||||
|
|
||||||
|
If an error is given, it will be logged to the progress state.
|
||||||
|
"""
|
||||||
|
current = await get_import_progress()
|
||||||
|
is_running = current and current.stopped is None
|
||||||
|
|
||||||
|
if not is_running:
|
||||||
|
return
|
||||||
|
assert current
|
||||||
|
|
||||||
|
if error:
|
||||||
|
current.error = repr(error)
|
||||||
|
current.stopped = utcnow().isoformat()
|
||||||
|
|
||||||
|
await update(current)
|
||||||
|
|
||||||
|
|
||||||
async def set_import_progress(progress: float) -> Progress:
|
async def set_import_progress(progress: float) -> Progress:
|
||||||
"""Set the current import progress percentage."""
|
"""Set the current import progress percentage.
|
||||||
|
|
||||||
|
If no import is currently running, this will create a new one.
|
||||||
|
"""
|
||||||
progress = min(max(0.0, progress), 100.0) # clamp to 0 <= progress <= 100
|
progress = min(max(0.0, progress), 100.0) # clamp to 0 <= progress <= 100
|
||||||
|
|
||||||
current = await get_import_progress()
|
current = await get_import_progress()
|
||||||
is_stopped = current and current.stopped is not None
|
is_running = current and current.stopped is None
|
||||||
|
|
||||||
if progress == 100.0 and is_stopped:
|
if not is_running:
|
||||||
assert current
|
|
||||||
return current
|
|
||||||
|
|
||||||
is_update = current and current.stopped is None
|
|
||||||
|
|
||||||
if not current or not is_update:
|
|
||||||
current = Progress(type="import-imdb-movies")
|
current = Progress(type="import-imdb-movies")
|
||||||
|
assert current
|
||||||
|
|
||||||
current.state = f"{progress:.2f}"
|
current.percent = progress
|
||||||
|
|
||||||
if progress >= 100:
|
if is_running:
|
||||||
current.stopped = utcnow().isoformat()
|
|
||||||
|
|
||||||
if is_update:
|
|
||||||
await update(current)
|
await update(current)
|
||||||
else:
|
else:
|
||||||
await add(current)
|
await add(current)
|
||||||
|
|
|
||||||
|
|
@ -235,24 +235,30 @@ async def load_from_web():
|
||||||
"""
|
"""
|
||||||
await db.set_import_progress(0)
|
await db.set_import_progress(0)
|
||||||
|
|
||||||
basics_url = "https://datasets.imdbws.com/title.basics.tsv.gz"
|
|
||||||
ratings_url = "https://datasets.imdbws.com/title.ratings.tsv.gz"
|
|
||||||
ratings_file = config.datadir / "imdb/title.ratings.tsv.gz"
|
|
||||||
basics_file = config.datadir / "imdb/title.basics.tsv.gz"
|
|
||||||
|
|
||||||
ratings_mtime = ratings_file.stat().st_mtime if ratings_file.exists() else None
|
|
||||||
bastics_mtime = basics_file.stat().st_mtime if basics_file.exists() else None
|
|
||||||
|
|
||||||
with request.session():
|
|
||||||
request.download(ratings_url, ratings_file, only_if_newer=True)
|
|
||||||
request.download(basics_url, basics_file, only_if_newer=True)
|
|
||||||
|
|
||||||
is_changed = (
|
|
||||||
ratings_mtime != ratings_file.stat().st_mtime
|
|
||||||
or bastics_mtime != basics_file.stat().st_mtime
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
|
basics_url = "https://datasets.imdbws.com/title.basics.tsv.gz"
|
||||||
|
ratings_url = "https://datasets.imdbws.com/title.ratings.tsv.gz"
|
||||||
|
ratings_file = config.datadir / "imdb/title.ratings.tsv.gz"
|
||||||
|
basics_file = config.datadir / "imdb/title.basics.tsv.gz"
|
||||||
|
|
||||||
|
ratings_mtime = ratings_file.stat().st_mtime if ratings_file.exists() else None
|
||||||
|
bastics_mtime = basics_file.stat().st_mtime if basics_file.exists() else None
|
||||||
|
|
||||||
|
with request.session():
|
||||||
|
request.download(ratings_url, ratings_file, only_if_newer=True)
|
||||||
|
request.download(basics_url, basics_file, only_if_newer=True)
|
||||||
|
|
||||||
|
is_changed = (
|
||||||
|
ratings_mtime != ratings_file.stat().st_mtime
|
||||||
|
or bastics_mtime != basics_file.stat().st_mtime
|
||||||
|
)
|
||||||
|
|
||||||
if is_changed:
|
if is_changed:
|
||||||
await import_from_file(basics_path=basics_file, ratings_path=ratings_file)
|
await import_from_file(basics_path=basics_file, ratings_path=ratings_file)
|
||||||
finally:
|
|
||||||
await db.set_import_progress(100)
|
except BaseException as err:
|
||||||
|
await db.stop_import_progress(error=err)
|
||||||
|
raise
|
||||||
|
|
||||||
|
else:
|
||||||
|
await db.stop_import_progress()
|
||||||
|
|
|
||||||
|
|
@ -153,6 +153,34 @@ class Progress:
|
||||||
started: datetime = field(default_factory=utcnow)
|
started: datetime = field(default_factory=utcnow)
|
||||||
stopped: Optional[str] = None
|
stopped: Optional[str] = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _state(self) -> dict:
|
||||||
|
return json.loads(self.state or "{}")
|
||||||
|
|
||||||
|
@_state.setter
|
||||||
|
def _state(self, state: dict):
|
||||||
|
self.state = json.dumps(state, separators=(",", ":"))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def percent(self) -> float:
|
||||||
|
return self._state["percent"]
|
||||||
|
|
||||||
|
@percent.setter
|
||||||
|
def percent(self, percent: float):
|
||||||
|
state = self._state
|
||||||
|
state["percent"] = percent
|
||||||
|
self._state = state
|
||||||
|
|
||||||
|
@property
|
||||||
|
def error(self) -> str:
|
||||||
|
return self._state.get("error", "")
|
||||||
|
|
||||||
|
@error.setter
|
||||||
|
def error(self, error: str):
|
||||||
|
state = self._state
|
||||||
|
state["error"] = error
|
||||||
|
self._state = state
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Movie:
|
class Movie:
|
||||||
|
|
|
||||||
24
unwind/sql/20210728-223416.sql
Normal file
24
unwind/sql/20210728-223416.sql
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
-- add IMDb vote count
|
||||||
|
|
||||||
|
CREATE TABLE _migrate_progress (
|
||||||
|
id TEXT PRIMARY KEY NOT NULL,
|
||||||
|
type TEXT NOT NULL,
|
||||||
|
state TEXT NOT NULL,
|
||||||
|
started TEXT NOT NULL,
|
||||||
|
stopped TEXT
|
||||||
|
);;
|
||||||
|
|
||||||
|
INSERT INTO _migrate_progress
|
||||||
|
SELECT
|
||||||
|
id,
|
||||||
|
type,
|
||||||
|
'{"percent":' || state || '}' AS state,
|
||||||
|
started,
|
||||||
|
stopped
|
||||||
|
FROM progress
|
||||||
|
WHERE true;;
|
||||||
|
|
||||||
|
DROP TABLE progress;;
|
||||||
|
|
||||||
|
ALTER TABLE _migrate_progress
|
||||||
|
RENAME TO progress;;
|
||||||
|
|
@ -23,7 +23,7 @@ from starlette.routing import Mount, Route
|
||||||
from . import config, db, imdb, imdb_import
|
from . import config, db, imdb, imdb_import
|
||||||
from .db import close_connection_pool, find_ratings, open_connection_pool
|
from .db import close_connection_pool, find_ratings, open_connection_pool
|
||||||
from .middleware.responsetime import ResponseTimeMiddleware
|
from .middleware.responsetime import ResponseTimeMiddleware
|
||||||
from .models import Group, Movie, Progress, User, asplain
|
from .models import Group, Movie, User, asplain
|
||||||
from .types import ULID
|
from .types import ULID
|
||||||
from .utils import b64encode, phc_compare, phc_scrypt
|
from .utils import b64encode, phc_compare, phc_scrypt
|
||||||
|
|
||||||
|
|
@ -206,16 +206,26 @@ async def progress_for_load_imdb_movies(request):
|
||||||
return JSONResponse({"status": "No import exists."}, status_code=404)
|
return JSONResponse({"status": "No import exists."}, status_code=404)
|
||||||
|
|
||||||
p = asplain(progress)
|
p = asplain(progress)
|
||||||
percent = float(p["state"])
|
percent = progress.percent
|
||||||
|
error = progress.error
|
||||||
|
|
||||||
return JSONResponse(
|
status = None
|
||||||
{
|
if error:
|
||||||
"status": "Import is running." if percent < 100 else "Import finished.",
|
status = "Error during import."
|
||||||
"progress": percent,
|
elif percent < 100:
|
||||||
"started": p["started"],
|
status = "Import is running."
|
||||||
"stopped": p["stopped"],
|
else:
|
||||||
}
|
status = "Import finished."
|
||||||
)
|
|
||||||
|
resp = {
|
||||||
|
"status": status,
|
||||||
|
"progress": percent,
|
||||||
|
"error": error,
|
||||||
|
"started": p["started"],
|
||||||
|
"stopped": p["stopped"],
|
||||||
|
}
|
||||||
|
|
||||||
|
return JSONResponse(resp)
|
||||||
|
|
||||||
|
|
||||||
@route("/movies/_reload_imdb", methods=["POST"])
|
@route("/movies/_reload_imdb", methods=["POST"])
|
||||||
|
|
@ -223,13 +233,11 @@ async def progress_for_load_imdb_movies(request):
|
||||||
async def load_imdb_movies(request):
|
async def load_imdb_movies(request):
|
||||||
async with import_lock:
|
async with import_lock:
|
||||||
progress = await db.get_import_progress()
|
progress = await db.get_import_progress()
|
||||||
if progress:
|
if progress and not progress.stopped:
|
||||||
percent = float(progress.state)
|
return JSONResponse(
|
||||||
if percent < 100:
|
{"status": "Import is running.", "progress": progress.percent},
|
||||||
return JSONResponse(
|
status_code=409,
|
||||||
{"status": "Import is running.", "progress": percent},
|
)
|
||||||
status_code=409,
|
|
||||||
)
|
|
||||||
|
|
||||||
await db.set_import_progress(0)
|
await db.set_import_progress(0)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue