From d399d7caba3174e49fcb1d2938a899da730aa612 Mon Sep 17 00:00:00 2001 From: ducklet Date: Sat, 11 Feb 2023 18:29:20 +0100 Subject: [PATCH] use dir sizes to sum up the size of all descendants This fundamentally changes the way we use size information for dirs in Metadex. The implementation is currently hardly optimized and thus might considerably slow down the application in some circumstances. --- metadex/db.py | 177 ++++++++++++++++++++++++++++++++++++++++++++- metadex/metadex.py | 12 +++ 2 files changed, 187 insertions(+), 2 deletions(-) diff --git a/metadex/db.py b/metadex/db.py index e3bd146..f5b558a 100644 --- a/metadex/db.py +++ b/metadex/db.py @@ -17,6 +17,7 @@ from sqlalchemy import ( Table, UniqueConstraint, create_engine, + func, ) from sqlalchemy.engine.base import Connection, Engine from sqlalchemy.engine.row import Row @@ -24,7 +25,7 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.sql import and_, or_, select, text from sqlalchemy.sql.schema import ForeignKey -from . import config +from . import config, utils log = logging.getLogger(__name__) @@ -58,9 +59,33 @@ metadex = Table( UniqueConstraint("location", "hostname"), ) +settings = Table( + "settings", + metadata, + Column("id", Integer, primary_key=True), + Column("key", String, nullable=False, index=True, unique=True), + Column("value", String, nullable=True), +) + engine: Engine = None # type:ignore +def setting(conn: Connection, *, key: str) -> "str | None": + """Return the settings value for the given key.""" + stmt = select(settings.c.value).where(settings.c.key == key) + return conn.execute(stmt).scalar() + + +def set_setting(conn: Connection, *, key: str, value: str) -> None: + """Store the given value in the persistent settings.""" + stmt: Executable + stmt = settings.update().where(settings.c.key == key) + updated = conn.execute(stmt, {"value": value}).rowcount + if updated == 0: + stmt = settings.insert() + conn.execute(stmt, {"key": key, "value": value}) + + def check_integrity(conn: Connection) -> None: stmt = text("PRAGMA integrity_check") state = conn.execute(stmt).scalar() @@ -448,7 +473,15 @@ def remove_all( log.warning("Purging file from DB: %a:%a", hostname, loc) stmt = metadex.delete().where(selector) - return conn.execute(stmt).rowcount + deleted = conn.execute(stmt).rowcount + + # Mark parent as updated to ensure it's considered for recalculation. + parent_id = _ancestor_id(conn, location=location, hostname=hostname) + if parent_id is not None: + stmt = metadex.update().where(metadex.c.id == parent_id) + conn.execute(stmt, {"updated": datetime.now()}) + + return deleted @contextmanager @@ -535,6 +568,25 @@ def _parent_id( return val +def _ancestor_id( + conn: Connection, + *, + location: str, + hostname: str, +) -> "MetadexId | None": + path = Path(location) + root = Path(path.root) + while path != root: + path = path.parent + stmt = select(metadex.c.id).where( + and_(metadex.c.location == str(path), metadex.c.hostname == hostname) + ) + if m_id := conn.execute(stmt).scalar(): + assert isinstance(m_id, MetadexId) + return m_id + return None + + def reassign_parent_ids(conn: Connection) -> None: stmt: Executable stmt = select( @@ -563,3 +615,124 @@ def reassign_parent_ids(conn: Connection) -> None: stmt = metadex.update().where(metadex.c.id == m_id) conn.execute(stmt, {"parent_id": p_id}) + + +_last_updated_settings_key = "dirsize:last_updated" + + +def recalculate_dir_sizes( + conn: Connection, *, updated_since: "datetime | None" = None +) -> None: + """Recalculate the cumulative sizes for some directories. + + All directories containing items that were changed after the given point + in time will have their sizes set to the sum of sizes of all their children. + If `updated_since` is not given, a value stored in the database from when + this function was last run will be used. + + This function is not well optimized. Depending on the metadex's size and + when it was last run it can take a very long time. + """ + stmt: Executable + if updated_since is None: + updated_since = ( + datetime.fromisoformat(last_upd) + if (last_upd := setting(conn, key=_last_updated_settings_key)) + else datetime.fromtimestamp(0) + ) + + stmt = ( + select(func.count("*")) + .select_from(metadex) + .where(metadex.c.updated >= updated_since) + ) + file_count = conn.execute(stmt).scalar() + assert isinstance(file_count, int) + + utils.clear_status_line() + log.info(f"Found {file_count} files that cause size recalculation.") + start = datetime.now().isoformat() + + stmt = select( + metadex.c.id, + metadex.c.location, + metadex.c.hostname, + metadex.c.stat_type, + metadex.c.parent_id, + ) + if updated_since: + stmt = stmt.where(metadex.c.updated >= updated_since) + + # visited = set() + for i, (m_id, loc, host, type_, p_id) in enumerate(conn.execute(stmt), 1): + progress = i / file_count + utils.set_status_line(f"Recalculating sizes ({progress:.0%}): {host}:{loc}") + + if type_ == "f": + pass # update all parents + m_id = p_id + elif type_ == "d": + pass # update this & all parent + + # At this point `m_id` will always point to a directory. + + # if m_id in visited: + # break + # visited.add(m_id) + + while m_id is not None: + stmt = select(func.sum(metadex.c.stat_bytes)).where( + metadex.c.parent_id == m_id + ) + total_bytes = conn.execute(stmt).scalar() or 0 + + stmt = metadex.update().where(metadex.c.id == m_id) + conn.execute(stmt, {"stat_bytes": total_bytes}) + + stmt = select(metadex.c.parent_id).where(metadex.c.id == m_id) + p_id = conn.execute(stmt).scalar_one_or_none() + if m_id == p_id: + break + m_id = p_id + + set_setting(conn, key=_last_updated_settings_key, value=start) + + +def recalculate_dir_sizes_full(conn: Connection) -> None: + """Recalculate the cumulative sizes for all directories. + + This will ignore and discarding any existing sizing information. + This function is not optimized, the operation is very expensive and might + take many hours depending on the metadex's size. + """ + stmt: Executable + stmt = ( + select(func.count("*")).select_from(metadex).where(metadex.c.stat_type == "d") + ) + dir_count = conn.execute(stmt).scalar() + assert isinstance(dir_count, int) + + utils.clear_status_line() + log.info(f"Found {dir_count} directories that require size recalculation.") + start = datetime.now().isoformat() + + stmt = select(metadex.c.id, metadex.c.location, metadex.c.hostname).where( + metadex.c.stat_type == "d" + ) + for i, (m_id, loc, host) in enumerate(conn.execute(stmt), 1): + progress = i / dir_count + utils.set_status_line(f"Recalculating sizes ({progress:.0%}): {host}:{loc}") + + stmt = select(func.sum(metadex.c.stat_bytes)).where( + and_( + metadex.c.stat_type == "f", + metadex.c.hostname == host, + metadex.c.location.startswith(loc, autoescape=True), + ) + ) + total_bytes = conn.execute(stmt).scalar() or 0 + + stmt = metadex.update().where(metadex.c.id == m_id) + conn.execute(stmt, {"stat_bytes": total_bytes}) + + set_setting(conn, key=_last_updated_settings_key, value=start) diff --git a/metadex/metadex.py b/metadex/metadex.py index fbde390..3bb4283 100644 --- a/metadex/metadex.py +++ b/metadex/metadex.py @@ -123,6 +123,8 @@ def _scan_add_only( # Or put more simply: new stuff on the left, old on the right. dirs.extendleft(subdirs) + db.recalculate_dir_sizes(conn) + return context @@ -346,6 +348,8 @@ def ingest_db_file( elif action == "changed": context.changed += 1 + db.recalculate_dir_sizes(conn) + return context @@ -401,6 +405,8 @@ def ingest_rclone_json( elif action == "changed": context.changed += 1 + db.recalculate_dir_sizes(conn) + return context @@ -444,6 +450,8 @@ def _ingest_ls_add_only( elif action == "changed": context.changed += 1 + db.recalculate_dir_sizes(conn) + return context @@ -511,6 +519,8 @@ def _ingest_ls_remove_missing( expected.discard(f.path.name) + db.recalculate_dir_sizes(conn) + return context @@ -655,6 +665,8 @@ def rm(pathspec: str, *, include_children: bool = False) -> None: db.remove_all(conn, location=path, hostname=host) + db.recalculate_dir_sizes(conn) + def hosts() -> "set[str]": with db.transaction() as conn: