metadex/metadex/metadex.py

619 lines
18 KiB
Python
Raw Normal View History

2022-08-14 20:41:58 +02:00
import logging
import os
import re
import sys
import time
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from shutil import get_terminal_size
from typing import Iterable, Literal, TextIO
from . import config, db, ignore, ls_parser, models
log = logging.getLogger(__name__)
init = db.init
close = db.close
def scan(
path: Path,
*,
ignore_file: Path,
remove_missing: bool = False,
map_pathspecs: "list[str]" = [],
) -> "_LogContext":
f = _scan_remove_missing if remove_missing else _scan_add_only
return f(path, ignore_file=ignore_file, map_pathspecs=map_pathspecs)
# Opportunistically compensate for wide chars on the terminal.
_terminal_width = int(get_terminal_size().columns * 0.9)
_last_log = 0
def _log_ephemeral(msg: str, *, debounce_ms: "int | None" = 200):
global _last_log
if debounce_ms is not None:
now = time.monotonic()
if _last_log + (debounce_ms / 1000) > now:
return
_last_log = now
msg = msg.encode(errors="replace").decode()
if len(msg) > _terminal_width:
msg = msg[: _terminal_width - 3] + "..."
sys.stderr.write(msg.ljust(_terminal_width) + "\r")
@dataclass
class _LogContext:
seen: int = 0
ignored: int = 0
added: int = 0
changed: int = 0
removed: int = 0
def _log_context(path, context: _LogContext):
if config.is_stdout_piped:
return
_log_ephemeral(
f"{context.seen} a:{context.added} c:{context.changed} i:{context.ignored} r:{context.removed} {path}"
)
def _scan_add_only(path: Path, *, ignore_file: Path, map_pathspecs: "list[str]"):
is_ignored = ignore.parse(ignore_file)
maps = _parse_pathspec_mapping(map_pathspecs)
context = _LogContext()
with db.transaction() as conn:
context.seen += 1
d = models.File.dict_from_entry(path)
_apply_mapping(maps, d)
if is_ignored(d["location"]):
log.warning(
"Skipping ignored basedir: %a:%a",
d["hostname"],
d["location"],
)
return context
if (action := db.upsert_if_changed(conn, d)) == "added":
context.added += 1
elif action == "changed":
context.changed += 1
dirs: deque[Path] = deque()
if d["stat_type"] == "d":
dirs.append(path)
while dirs:
cwd = dirs.popleft()
try:
scan = os.scandir(cwd)
except Exception as err:
log.error(err)
continue
subdirs: deque[Path] = deque()
with scan as files:
for f in files:
context.seen += 1
_log_context(f.path, context)
d = models.File.dict_from_entry(f)
_apply_mapping(maps, d)
if is_ignored(d["location"]):
log.debug(
"Skipping ignored entry: %a:%a",
d["hostname"],
d["location"],
)
context.ignored += 1
continue
if (action := db.upsert_if_changed(conn, d)) == "added":
context.added += 1
append = subdirs.append
elif action == "changed":
context.changed += 1
append = subdirs.append
else:
append = subdirs.appendleft
if f.is_dir(follow_symlinks=False):
append(Path(f.path))
# `subdirs` sorts all changed dirs to the right, which means when we
# extend `dirs` using `extendleft` it'll put them all left-most.
# Or put more simply: new stuff on the left, old on the right.
dirs.extendleft(subdirs)
return context
def _scan_remove_missing(path: Path, *, ignore_file: Path, map_pathspecs: "list[str]"):
"""Like `scan` but also search for missing files."""
is_ignored = ignore.parse(ignore_file)
maps = _parse_pathspec_mapping(map_pathspecs)
context = _LogContext()
with db.transaction() as conn:
context.seen += 1
d = models.File.dict_from_entry(path)
_apply_mapping(maps, d)
if is_ignored(d["location"]):
log.warning(
"Skipping ignored basedir: %a:%a",
d["hostname"],
d["location"],
)
return context
if (action := db.upsert_if_changed(conn, d)) == "added":
context.added += 1
elif action == "changed":
context.changed += 1
dirs: deque[Path] = deque()
if d["stat_type"] == "d":
dirs.append(path)
while dirs:
cwd = dirs.popleft()
try:
scan = os.scandir(cwd)
except Exception as err:
log.error(err)
continue
expected = {name for name in db.files_in_dir(conn, str(cwd))}
subdirs: deque[Path] = deque()
with scan as files:
for f in files:
context.seen += 1
_log_context(f.path, context)
d = models.File.dict_from_entry(f)
_apply_mapping(maps, d)
if is_ignored(d["location"]):
log.debug(
"Skipping ignored entry: %a:%a",
d["hostname"],
d["location"],
)
context.ignored += 1
continue
if (action := db.upsert_if_changed(conn, d)) == "added":
context.added += 1
append = subdirs.append
elif action == "changed":
context.changed += 1
append = subdirs.append
else:
append = subdirs.appendleft
if f.is_dir(follow_symlinks=False):
append(Path(f.path))
expected.discard(f.name)
# `subdirs` sorts all changed dirs to the right, which means when we
# extend `dirs` using `extendleft` it'll put them all left-most.
# Or put more simply: new stuff on the left, old on the right.
dirs.extendleft(subdirs)
for name in expected:
f = str(cwd / name)
if is_ignored(f):
continue
log.info("File removed: %a", f)
db.remove_all(conn, f)
return context
_pathspec_re = re.compile(r"((?P<host>[^:/]*):)?(?P<path>.*)")
_src_dest_re = re.compile(r"src=(?P<src>.*),dest=(?P<dest>.*)")
def _parse_pathspec(pathspec: str):
match = _pathspec_re.fullmatch(pathspec)
assert match
host: "str | None" = match["host"]
path: str = match["path"] or "/"
return host, path
def _clean_dirname(loc: str, *, force_absolute=True):
if force_absolute and not loc.startswith("/"):
loc = "/" + loc
if not loc.endswith("/"):
loc += "/"
return loc
# if loc != "/" and loc.endswith("/"):
# return loc[:-1]
# return loc
def _parse_pathspec_mapping(map_pathspecs: "list[str]"):
Hostname = str
Location = str
maps: dict[Hostname, dict[Location, tuple[Hostname, Location]]] = {}
for pathspec_mapping in map_pathspecs:
match = _src_dest_re.fullmatch(pathspec_mapping)
if not match:
log.error("Invalid mapping: %a", pathspec_mapping)
raise ValueError("Could not parse mapping.")
src_host, src_path = _parse_pathspec(match["src"])
if not src_host:
src_host = config.hostname
log.warning("Using default hostname for mapping source: %a", src_host)
# log.error("Hostname is required when mapping paths: %a", match["src"])
# raise ValueError("Missing hostname.")
src_path = _clean_dirname(src_path)
if src_host not in maps:
maps[src_host] = {}
dest_host, dest_path = _parse_pathspec(match["dest"])
if not dest_host:
dest_host = config.hostname
log.warning("Using default hostname for mapping dest: %a", dest_host)
# log.error("Hostname is required when mapping paths: %a", match["dest"])
# raise ValueError("Missing hostname.")
dest_path = _clean_dirname(dest_path)
maps[src_host][src_path] = dest_host, dest_path
log.info("Mapping %a:%a -> %a:%a", src_host, src_path, dest_host, dest_path)
return maps
def _apply_mapping(maps: dict, d: dict):
hostname = d["hostname"]
location = (
d["location"]
if d["stat_type"] != "d"
else _clean_dirname(d["location"], force_absolute=False)
)
if hostname in maps:
for src_loc, (dest_host, dest_loc) in maps[hostname].items():
if location.startswith(src_loc):
d["hostname"] = dest_host
d["location"] = dest_loc + d["location"][len(src_loc) :]
log.debug(
"Mapping %a -> %a",
f"{hostname}:{location}",
f'{d["hostname"]}:{d["location"]}',
)
break
def ingest_db_file(
db_file: Path,
*,
ignore_file: Path,
map_pathspecs: "list[str]" = [],
select_pathspecs: "list[str]" = [],
) -> _LogContext:
is_ignored = ignore.parse(ignore_file)
maps = _parse_pathspec_mapping(map_pathspecs)
context = _LogContext()
other_db = db.Db(db_file)
with db.transaction() as conn, other_db.transaction(
force_rollback=True
) as other_conn:
for row in db.iter_all(other_conn):
context.seen += 1
_log_context(row["location"], context)
d = dict(row)
_apply_mapping(maps, d)
if is_ignored(d["location"]):
log.debug("Skipping ignored entry: %a:%a", d["hostname"], d["location"])
context.ignored += 1
continue
if (action := db.upsert_if_changed(conn, d)) == "added":
context.added += 1
elif action == "changed":
context.changed += 1
return context
def ingest_ls(
file: TextIO,
*,
ignore_file: Path,
ref_year: "int | None",
remove_missing: bool = False,
) -> _LogContext:
f = _ingest_ls_remove_missing if remove_missing else _ingest_ls_add_only
return f(file, ignore_file=ignore_file, ref_year=ref_year)
def _ingest_ls_add_only(file: TextIO, *, ignore_file: Path, ref_year: "int | None"):
is_ignored = ignore.parse(ignore_file)
context = _LogContext()
with db.transaction() as conn:
for f in ls_parser.parse_file(file, ref_year=ref_year):
if isinstance(f, ls_parser.ChangeDir):
continue
context.seen += 1
_log_context(f.path, context)
d = _dict_from_lsfile(f)
# _apply_mapping(maps, d)
if is_ignored(d["location"]):
log.debug("Skipping ignored entry: %a:%a", d["hostname"], d["location"])
context.ignored += 1
continue
if (action := db.upsert_if_changed(conn, d)) == "added":
context.added += 1
elif action == "changed":
context.changed += 1
return context
def _dict_from_lsfile(f: ls_parser.File) -> dict:
mode = f.mode[0]
if mode == "-":
mode = "f"
elif mode not in "dl":
mode = "-"
return dict(
location=str(f.path),
hostname=config.hostname,
stat_bytes=f.size_bytes,
stat_modified=f.date,
stat_type=mode,
)
def _ingest_ls_remove_missing(
file: TextIO, *, ignore_file: Path, ref_year: "int | None"
):
is_ignored = ignore.parse(ignore_file)
expected: set[str] = set()
context = _LogContext()
with db.transaction() as conn:
for f in ls_parser.parse_file(file, ref_year=ref_year):
if isinstance(f, ls_parser.ChangeDir):
if f.to is not None:
expected = {name for name in db.files_in_dir(conn, str(f.to))}
elif f.from_:
# remove missing
for name in expected:
loc = str(f.from_ / name)
if is_ignored(loc):
log.info("Ignoring file (for removal): %a", loc)
continue
log.info("File removed: %a", loc)
context.removed += db.remove_all(conn, loc)
continue
context.seen += 1
_log_context(f.path, context)
d = _dict_from_lsfile(f)
# _apply_mapping(maps, d)
if is_ignored(d["location"]):
log.debug("Skipping ignored entry: %a:%a", d["hostname"], d["location"])
context.ignored += 1
continue
if (action := db.upsert_if_changed(conn, d)) == "added":
context.added += 1
elif action == "changed":
context.changed += 1
expected.discard(f.path.name)
return context
def _ls_files(
*,
host: "str | None",
path: str,
type: "models.StatType | None" = None,
match: Literal["regex", "glob", "fuzzy"] = "glob",
) -> Iterable[models.File]:
def map_replace(mapping: dict, string: str):
pattern = "|".join(re.escape(k) for k in mapping.keys())
return re.sub(pattern, lambda m: mapping[m[0]], string)
def liketerm_from_glob(glob: str) -> str:
s = db.escape(glob)
s = map_replace({"*": "%", "?": "_"}, s)
return s
def regex_from_glob(glob: str) -> str:
s = re.escape(glob)
s = map_replace({r"\*\*": ".*", r"\*": "[^/]*", r"\?": "[^/]"}, s)
return s
with db.transaction() as conn:
if match == "regex":
for f in db.search(
conn, type=type, hostname_regex=host, regex=f"(?i){path}"
):
yield models.File(**f) # type: ignore
elif match == "glob":
filters = {"type": type}
if host and _uses_glob(host):
filters["hostname_like"] = liketerm_from_glob(host)
else:
filters["hostname"] = host
if not _uses_glob(path):
rterm = re.escape(path)
lterm = path # no `db.escape`, `endswith` does autoescape
result = db.search(
conn,
endswith=lterm,
regex=f"(?i)(^|/){rterm}$", # ensure a full name match
**filters,
)
else:
rterm = regex_from_glob(path)
lterm = liketerm_from_glob(path)
result = db.search(
conn,
regex=f"(?i)(^|/){rterm}$",
like=f"%{lterm}", # helps to drastically speed up the regex match
**filters,
)
for f in result:
yield models.File(**f) # type: ignore
elif match == "fuzzy":
term = "%".join(db.escape(p) for p in path.split("/"))
for f in db.search(conn, like=f"%{term}%", type=type, hostname=host):
yield models.File(**f) # type: ignore
def _ls_dir_contents(*, host: str, path: str) -> Iterable[models.File]:
with db.transaction() as conn:
row = db.get_file(conn, location=path, hostname=host)
if not row:
log.warning("No match: %a:%a", host, path)
return
if row["stat_type"] != "d":
yield models.File(**row) # type: ignore
return
for f in db.get_files(conn, parent_id=row["id"]):
yield models.File(**f) # type: ignore
def _uses_glob(string: str) -> bool:
return "*" in string or "?" in string
def ls(
pathspec: str,
*,
type: "models.StatType | None" = None,
match: Literal["regex", "glob", "fuzzy"] = "glob",
) -> Iterable[models.File]:
host, path = _parse_pathspec(pathspec)
if host == "":
host = config.hostname # allow ":foo" as shortcut for local search
log.info("Using path spec: %a:%a", host, path)
if path != "/" and path.endswith("/"):
# In our DB no path except root (`/`) ends with `/`.
path = path.rstrip("/")
if host and path.startswith("/") and not _uses_glob(host + path):
yield from _ls_dir_contents(host=host, path=path)
else:
yield from _ls_files(host=host, path=path, type=type, match=match)
def rm(pathspec: str, *, include_children: bool = False):
"""Remove the given path and all its descendants."""
host, path = _parse_pathspec(pathspec)
if not host or not path.startswith("/"):
log.error(
"A full absolute path including hostname is required when removing files: %a",
pathspec,
)
raise ValueError("Incomplete path specification.")
if path != "/" and path.endswith("/"):
path = path[:-1]
with db.transaction() as conn:
row = db.get_file(conn, hostname=host, location=path)
if not row:
log.error("No matching file found: %a", pathspec)
raise ValueError("Path not found.")
children = db.get_files(conn, parent_id=row["id"])
if children and not include_children:
log.error("File has children: %a", pathspec)
raise RuntimeError("Path has children.")
db.remove_all(conn, location=path, hostname=host)
def hosts() -> "set[str]":
with db.transaction() as conn:
return set(db.all_hostnames(conn))