use dir sizes to sum up the size of all descendants
This fundamentally changes the way we use size information for dirs in Metadex. The implementation is currently hardly optimized and thus might considerably slow down the application in some circumstances.
This commit is contained in:
parent
22ef62202a
commit
d399d7caba
2 changed files with 187 additions and 2 deletions
177
metadex/db.py
177
metadex/db.py
|
|
@ -17,6 +17,7 @@ from sqlalchemy import (
|
|||
Table,
|
||||
UniqueConstraint,
|
||||
create_engine,
|
||||
func,
|
||||
)
|
||||
from sqlalchemy.engine.base import Connection, Engine
|
||||
from sqlalchemy.engine.row import Row
|
||||
|
|
@ -24,7 +25,7 @@ from sqlalchemy.exc import IntegrityError
|
|||
from sqlalchemy.sql import and_, or_, select, text
|
||||
from sqlalchemy.sql.schema import ForeignKey
|
||||
|
||||
from . import config
|
||||
from . import config, utils
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -58,9 +59,33 @@ metadex = Table(
|
|||
UniqueConstraint("location", "hostname"),
|
||||
)
|
||||
|
||||
settings = Table(
|
||||
"settings",
|
||||
metadata,
|
||||
Column("id", Integer, primary_key=True),
|
||||
Column("key", String, nullable=False, index=True, unique=True),
|
||||
Column("value", String, nullable=True),
|
||||
)
|
||||
|
||||
engine: Engine = None # type:ignore
|
||||
|
||||
|
||||
def setting(conn: Connection, *, key: str) -> "str | None":
|
||||
"""Return the settings value for the given key."""
|
||||
stmt = select(settings.c.value).where(settings.c.key == key)
|
||||
return conn.execute(stmt).scalar()
|
||||
|
||||
|
||||
def set_setting(conn: Connection, *, key: str, value: str) -> None:
|
||||
"""Store the given value in the persistent settings."""
|
||||
stmt: Executable
|
||||
stmt = settings.update().where(settings.c.key == key)
|
||||
updated = conn.execute(stmt, {"value": value}).rowcount
|
||||
if updated == 0:
|
||||
stmt = settings.insert()
|
||||
conn.execute(stmt, {"key": key, "value": value})
|
||||
|
||||
|
||||
def check_integrity(conn: Connection) -> None:
|
||||
stmt = text("PRAGMA integrity_check")
|
||||
state = conn.execute(stmt).scalar()
|
||||
|
|
@ -448,7 +473,15 @@ def remove_all(
|
|||
log.warning("Purging file from DB: %a:%a", hostname, loc)
|
||||
|
||||
stmt = metadex.delete().where(selector)
|
||||
return conn.execute(stmt).rowcount
|
||||
deleted = conn.execute(stmt).rowcount
|
||||
|
||||
# Mark parent as updated to ensure it's considered for recalculation.
|
||||
parent_id = _ancestor_id(conn, location=location, hostname=hostname)
|
||||
if parent_id is not None:
|
||||
stmt = metadex.update().where(metadex.c.id == parent_id)
|
||||
conn.execute(stmt, {"updated": datetime.now()})
|
||||
|
||||
return deleted
|
||||
|
||||
|
||||
@contextmanager
|
||||
|
|
@ -535,6 +568,25 @@ def _parent_id(
|
|||
return val
|
||||
|
||||
|
||||
def _ancestor_id(
|
||||
conn: Connection,
|
||||
*,
|
||||
location: str,
|
||||
hostname: str,
|
||||
) -> "MetadexId | None":
|
||||
path = Path(location)
|
||||
root = Path(path.root)
|
||||
while path != root:
|
||||
path = path.parent
|
||||
stmt = select(metadex.c.id).where(
|
||||
and_(metadex.c.location == str(path), metadex.c.hostname == hostname)
|
||||
)
|
||||
if m_id := conn.execute(stmt).scalar():
|
||||
assert isinstance(m_id, MetadexId)
|
||||
return m_id
|
||||
return None
|
||||
|
||||
|
||||
def reassign_parent_ids(conn: Connection) -> None:
|
||||
stmt: Executable
|
||||
stmt = select(
|
||||
|
|
@ -563,3 +615,124 @@ def reassign_parent_ids(conn: Connection) -> None:
|
|||
|
||||
stmt = metadex.update().where(metadex.c.id == m_id)
|
||||
conn.execute(stmt, {"parent_id": p_id})
|
||||
|
||||
|
||||
_last_updated_settings_key = "dirsize:last_updated"
|
||||
|
||||
|
||||
def recalculate_dir_sizes(
|
||||
conn: Connection, *, updated_since: "datetime | None" = None
|
||||
) -> None:
|
||||
"""Recalculate the cumulative sizes for some directories.
|
||||
|
||||
All directories containing items that were changed after the given point
|
||||
in time will have their sizes set to the sum of sizes of all their children.
|
||||
If `updated_since` is not given, a value stored in the database from when
|
||||
this function was last run will be used.
|
||||
|
||||
This function is not well optimized. Depending on the metadex's size and
|
||||
when it was last run it can take a very long time.
|
||||
"""
|
||||
stmt: Executable
|
||||
if updated_since is None:
|
||||
updated_since = (
|
||||
datetime.fromisoformat(last_upd)
|
||||
if (last_upd := setting(conn, key=_last_updated_settings_key))
|
||||
else datetime.fromtimestamp(0)
|
||||
)
|
||||
|
||||
stmt = (
|
||||
select(func.count("*"))
|
||||
.select_from(metadex)
|
||||
.where(metadex.c.updated >= updated_since)
|
||||
)
|
||||
file_count = conn.execute(stmt).scalar()
|
||||
assert isinstance(file_count, int)
|
||||
|
||||
utils.clear_status_line()
|
||||
log.info(f"Found {file_count} files that cause size recalculation.")
|
||||
start = datetime.now().isoformat()
|
||||
|
||||
stmt = select(
|
||||
metadex.c.id,
|
||||
metadex.c.location,
|
||||
metadex.c.hostname,
|
||||
metadex.c.stat_type,
|
||||
metadex.c.parent_id,
|
||||
)
|
||||
if updated_since:
|
||||
stmt = stmt.where(metadex.c.updated >= updated_since)
|
||||
|
||||
# visited = set()
|
||||
for i, (m_id, loc, host, type_, p_id) in enumerate(conn.execute(stmt), 1):
|
||||
progress = i / file_count
|
||||
utils.set_status_line(f"Recalculating sizes ({progress:.0%}): {host}:{loc}")
|
||||
|
||||
if type_ == "f":
|
||||
pass # update all parents
|
||||
m_id = p_id
|
||||
elif type_ == "d":
|
||||
pass # update this & all parent
|
||||
|
||||
# At this point `m_id` will always point to a directory.
|
||||
|
||||
# if m_id in visited:
|
||||
# break
|
||||
# visited.add(m_id)
|
||||
|
||||
while m_id is not None:
|
||||
stmt = select(func.sum(metadex.c.stat_bytes)).where(
|
||||
metadex.c.parent_id == m_id
|
||||
)
|
||||
total_bytes = conn.execute(stmt).scalar() or 0
|
||||
|
||||
stmt = metadex.update().where(metadex.c.id == m_id)
|
||||
conn.execute(stmt, {"stat_bytes": total_bytes})
|
||||
|
||||
stmt = select(metadex.c.parent_id).where(metadex.c.id == m_id)
|
||||
p_id = conn.execute(stmt).scalar_one_or_none()
|
||||
if m_id == p_id:
|
||||
break
|
||||
m_id = p_id
|
||||
|
||||
set_setting(conn, key=_last_updated_settings_key, value=start)
|
||||
|
||||
|
||||
def recalculate_dir_sizes_full(conn: Connection) -> None:
|
||||
"""Recalculate the cumulative sizes for all directories.
|
||||
|
||||
This will ignore and discarding any existing sizing information.
|
||||
This function is not optimized, the operation is very expensive and might
|
||||
take many hours depending on the metadex's size.
|
||||
"""
|
||||
stmt: Executable
|
||||
stmt = (
|
||||
select(func.count("*")).select_from(metadex).where(metadex.c.stat_type == "d")
|
||||
)
|
||||
dir_count = conn.execute(stmt).scalar()
|
||||
assert isinstance(dir_count, int)
|
||||
|
||||
utils.clear_status_line()
|
||||
log.info(f"Found {dir_count} directories that require size recalculation.")
|
||||
start = datetime.now().isoformat()
|
||||
|
||||
stmt = select(metadex.c.id, metadex.c.location, metadex.c.hostname).where(
|
||||
metadex.c.stat_type == "d"
|
||||
)
|
||||
for i, (m_id, loc, host) in enumerate(conn.execute(stmt), 1):
|
||||
progress = i / dir_count
|
||||
utils.set_status_line(f"Recalculating sizes ({progress:.0%}): {host}:{loc}")
|
||||
|
||||
stmt = select(func.sum(metadex.c.stat_bytes)).where(
|
||||
and_(
|
||||
metadex.c.stat_type == "f",
|
||||
metadex.c.hostname == host,
|
||||
metadex.c.location.startswith(loc, autoescape=True),
|
||||
)
|
||||
)
|
||||
total_bytes = conn.execute(stmt).scalar() or 0
|
||||
|
||||
stmt = metadex.update().where(metadex.c.id == m_id)
|
||||
conn.execute(stmt, {"stat_bytes": total_bytes})
|
||||
|
||||
set_setting(conn, key=_last_updated_settings_key, value=start)
|
||||
|
|
|
|||
|
|
@ -123,6 +123,8 @@ def _scan_add_only(
|
|||
# Or put more simply: new stuff on the left, old on the right.
|
||||
dirs.extendleft(subdirs)
|
||||
|
||||
db.recalculate_dir_sizes(conn)
|
||||
|
||||
return context
|
||||
|
||||
|
||||
|
|
@ -346,6 +348,8 @@ def ingest_db_file(
|
|||
elif action == "changed":
|
||||
context.changed += 1
|
||||
|
||||
db.recalculate_dir_sizes(conn)
|
||||
|
||||
return context
|
||||
|
||||
|
||||
|
|
@ -401,6 +405,8 @@ def ingest_rclone_json(
|
|||
elif action == "changed":
|
||||
context.changed += 1
|
||||
|
||||
db.recalculate_dir_sizes(conn)
|
||||
|
||||
return context
|
||||
|
||||
|
||||
|
|
@ -444,6 +450,8 @@ def _ingest_ls_add_only(
|
|||
elif action == "changed":
|
||||
context.changed += 1
|
||||
|
||||
db.recalculate_dir_sizes(conn)
|
||||
|
||||
return context
|
||||
|
||||
|
||||
|
|
@ -511,6 +519,8 @@ def _ingest_ls_remove_missing(
|
|||
|
||||
expected.discard(f.path.name)
|
||||
|
||||
db.recalculate_dir_sizes(conn)
|
||||
|
||||
return context
|
||||
|
||||
|
||||
|
|
@ -655,6 +665,8 @@ def rm(pathspec: str, *, include_children: bool = False) -> None:
|
|||
|
||||
db.remove_all(conn, location=path, hostname=host)
|
||||
|
||||
db.recalculate_dir_sizes(conn)
|
||||
|
||||
|
||||
def hosts() -> "set[str]":
|
||||
with db.transaction() as conn:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue