diff --git a/metadex/db.py b/metadex/db.py index e3bd146..f5b558a 100644 --- a/metadex/db.py +++ b/metadex/db.py @@ -17,6 +17,7 @@ from sqlalchemy import ( Table, UniqueConstraint, create_engine, + func, ) from sqlalchemy.engine.base import Connection, Engine from sqlalchemy.engine.row import Row @@ -24,7 +25,7 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.sql import and_, or_, select, text from sqlalchemy.sql.schema import ForeignKey -from . import config +from . import config, utils log = logging.getLogger(__name__) @@ -58,9 +59,33 @@ metadex = Table( UniqueConstraint("location", "hostname"), ) +settings = Table( + "settings", + metadata, + Column("id", Integer, primary_key=True), + Column("key", String, nullable=False, index=True, unique=True), + Column("value", String, nullable=True), +) + engine: Engine = None # type:ignore +def setting(conn: Connection, *, key: str) -> "str | None": + """Return the settings value for the given key.""" + stmt = select(settings.c.value).where(settings.c.key == key) + return conn.execute(stmt).scalar() + + +def set_setting(conn: Connection, *, key: str, value: str) -> None: + """Store the given value in the persistent settings.""" + stmt: Executable + stmt = settings.update().where(settings.c.key == key) + updated = conn.execute(stmt, {"value": value}).rowcount + if updated == 0: + stmt = settings.insert() + conn.execute(stmt, {"key": key, "value": value}) + + def check_integrity(conn: Connection) -> None: stmt = text("PRAGMA integrity_check") state = conn.execute(stmt).scalar() @@ -448,7 +473,15 @@ def remove_all( log.warning("Purging file from DB: %a:%a", hostname, loc) stmt = metadex.delete().where(selector) - return conn.execute(stmt).rowcount + deleted = conn.execute(stmt).rowcount + + # Mark parent as updated to ensure it's considered for recalculation. + parent_id = _ancestor_id(conn, location=location, hostname=hostname) + if parent_id is not None: + stmt = metadex.update().where(metadex.c.id == parent_id) + conn.execute(stmt, {"updated": datetime.now()}) + + return deleted @contextmanager @@ -535,6 +568,25 @@ def _parent_id( return val +def _ancestor_id( + conn: Connection, + *, + location: str, + hostname: str, +) -> "MetadexId | None": + path = Path(location) + root = Path(path.root) + while path != root: + path = path.parent + stmt = select(metadex.c.id).where( + and_(metadex.c.location == str(path), metadex.c.hostname == hostname) + ) + if m_id := conn.execute(stmt).scalar(): + assert isinstance(m_id, MetadexId) + return m_id + return None + + def reassign_parent_ids(conn: Connection) -> None: stmt: Executable stmt = select( @@ -563,3 +615,124 @@ def reassign_parent_ids(conn: Connection) -> None: stmt = metadex.update().where(metadex.c.id == m_id) conn.execute(stmt, {"parent_id": p_id}) + + +_last_updated_settings_key = "dirsize:last_updated" + + +def recalculate_dir_sizes( + conn: Connection, *, updated_since: "datetime | None" = None +) -> None: + """Recalculate the cumulative sizes for some directories. + + All directories containing items that were changed after the given point + in time will have their sizes set to the sum of sizes of all their children. + If `updated_since` is not given, a value stored in the database from when + this function was last run will be used. + + This function is not well optimized. Depending on the metadex's size and + when it was last run it can take a very long time. + """ + stmt: Executable + if updated_since is None: + updated_since = ( + datetime.fromisoformat(last_upd) + if (last_upd := setting(conn, key=_last_updated_settings_key)) + else datetime.fromtimestamp(0) + ) + + stmt = ( + select(func.count("*")) + .select_from(metadex) + .where(metadex.c.updated >= updated_since) + ) + file_count = conn.execute(stmt).scalar() + assert isinstance(file_count, int) + + utils.clear_status_line() + log.info(f"Found {file_count} files that cause size recalculation.") + start = datetime.now().isoformat() + + stmt = select( + metadex.c.id, + metadex.c.location, + metadex.c.hostname, + metadex.c.stat_type, + metadex.c.parent_id, + ) + if updated_since: + stmt = stmt.where(metadex.c.updated >= updated_since) + + # visited = set() + for i, (m_id, loc, host, type_, p_id) in enumerate(conn.execute(stmt), 1): + progress = i / file_count + utils.set_status_line(f"Recalculating sizes ({progress:.0%}): {host}:{loc}") + + if type_ == "f": + pass # update all parents + m_id = p_id + elif type_ == "d": + pass # update this & all parent + + # At this point `m_id` will always point to a directory. + + # if m_id in visited: + # break + # visited.add(m_id) + + while m_id is not None: + stmt = select(func.sum(metadex.c.stat_bytes)).where( + metadex.c.parent_id == m_id + ) + total_bytes = conn.execute(stmt).scalar() or 0 + + stmt = metadex.update().where(metadex.c.id == m_id) + conn.execute(stmt, {"stat_bytes": total_bytes}) + + stmt = select(metadex.c.parent_id).where(metadex.c.id == m_id) + p_id = conn.execute(stmt).scalar_one_or_none() + if m_id == p_id: + break + m_id = p_id + + set_setting(conn, key=_last_updated_settings_key, value=start) + + +def recalculate_dir_sizes_full(conn: Connection) -> None: + """Recalculate the cumulative sizes for all directories. + + This will ignore and discarding any existing sizing information. + This function is not optimized, the operation is very expensive and might + take many hours depending on the metadex's size. + """ + stmt: Executable + stmt = ( + select(func.count("*")).select_from(metadex).where(metadex.c.stat_type == "d") + ) + dir_count = conn.execute(stmt).scalar() + assert isinstance(dir_count, int) + + utils.clear_status_line() + log.info(f"Found {dir_count} directories that require size recalculation.") + start = datetime.now().isoformat() + + stmt = select(metadex.c.id, metadex.c.location, metadex.c.hostname).where( + metadex.c.stat_type == "d" + ) + for i, (m_id, loc, host) in enumerate(conn.execute(stmt), 1): + progress = i / dir_count + utils.set_status_line(f"Recalculating sizes ({progress:.0%}): {host}:{loc}") + + stmt = select(func.sum(metadex.c.stat_bytes)).where( + and_( + metadex.c.stat_type == "f", + metadex.c.hostname == host, + metadex.c.location.startswith(loc, autoescape=True), + ) + ) + total_bytes = conn.execute(stmt).scalar() or 0 + + stmt = metadex.update().where(metadex.c.id == m_id) + conn.execute(stmt, {"stat_bytes": total_bytes}) + + set_setting(conn, key=_last_updated_settings_key, value=start) diff --git a/metadex/metadex.py b/metadex/metadex.py index fbde390..3bb4283 100644 --- a/metadex/metadex.py +++ b/metadex/metadex.py @@ -123,6 +123,8 @@ def _scan_add_only( # Or put more simply: new stuff on the left, old on the right. dirs.extendleft(subdirs) + db.recalculate_dir_sizes(conn) + return context @@ -346,6 +348,8 @@ def ingest_db_file( elif action == "changed": context.changed += 1 + db.recalculate_dir_sizes(conn) + return context @@ -401,6 +405,8 @@ def ingest_rclone_json( elif action == "changed": context.changed += 1 + db.recalculate_dir_sizes(conn) + return context @@ -444,6 +450,8 @@ def _ingest_ls_add_only( elif action == "changed": context.changed += 1 + db.recalculate_dir_sizes(conn) + return context @@ -511,6 +519,8 @@ def _ingest_ls_remove_missing( expected.discard(f.path.name) + db.recalculate_dir_sizes(conn) + return context @@ -655,6 +665,8 @@ def rm(pathspec: str, *, include_children: bool = False) -> None: db.remove_all(conn, location=path, hostname=host) + db.recalculate_dir_sizes(conn) + def hosts() -> "set[str]": with db.transaction() as conn: