run reformatting

This commit is contained in:
ducklet 2023-02-08 23:00:57 +01:00
parent 6fac05d9d3
commit b6670191e5
5 changed files with 3 additions and 32 deletions

View file

@ -69,7 +69,6 @@ def getargs() -> argparse.Namespace:
# Command: scan
with command_parser("scan", help="Import from a local file system.") as subparser:
subparser.add_argument(
"basedir",
type=Path,
@ -97,7 +96,6 @@ def getargs() -> argparse.Namespace:
help="Import from `ls -lR`.",
description="When ingesting data from an external source, the hostname will not be set automatically.",
) as subparser:
subparser.add_argument(
"infile",
nargs="?",
@ -121,7 +119,6 @@ def getargs() -> argparse.Namespace:
with command_parser(
"ingest-db", help="Import from a metadex.sqlite file."
) as subparser:
subparser.add_argument(
"infile",
type=Path,
@ -138,7 +135,6 @@ def getargs() -> argparse.Namespace:
with command_parser(
"ingest-rclone-json", help="Import from `rclone lsjson`."
) as subparser:
subparser.add_argument(
"infile",
nargs="?",
@ -301,15 +297,12 @@ def cmd_ls(args: argparse.Namespace) -> int:
args.file = [f for f in args.file if f]
if not args.file:
# List all known hosts.
for host in sorted(metadex.hosts(), key=str.casefold):
print(f"{host}:")
else:
for pathspec in args.file:
is_match = False
for file in metadex.ls(pathspec, type=args.type, match=args.match):
is_match = True

View file

@ -249,7 +249,6 @@ def search(
hostname_like: "str | None" = None,
hostname_regex: "str | None" = None,
) -> "Iterable[Row]":
stmt = select(metadex)
if type:
@ -524,8 +523,7 @@ def reassign_parent_ids(conn: Connection):
stmt = select(
metadex.c.id, metadex.c.parent_id, metadex.c.location, metadex.c.hostname
)
for (m_id, p_id_old, loc, host) in conn.execute(stmt):
for m_id, p_id_old, loc, host in conn.execute(stmt):
parent_loc = str(Path(loc).parent)
if parent_loc == loc:
p_id = None

View file

@ -95,12 +95,10 @@ def parse_file(
def parse_lines(
lines: Iterable[str], *, ref_year: "int | None" = None
) -> Iterable["File | ChangeDir"]:
workdir = Path("/")
dirname: "Path | None" = None
for i, line in enumerate(lines, start=1):
if not line:
# empty line, reset context
if dirname is not None:

View file

@ -7,6 +7,7 @@ from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable, Literal, TextIO
from typing_extensions import TypeAlias
from . import config, db, ignore, ls_parser, models, utils
@ -56,7 +57,6 @@ def _scan_add_only(
context = _LogContext()
with db.transaction() as conn:
context.seen += 1
d = models.File.dict_from_entry(path)
@ -80,7 +80,6 @@ def _scan_add_only(
dirs.append(path)
while dirs:
cwd = dirs.popleft()
try:
scan = os.scandir(cwd)
@ -91,7 +90,6 @@ def _scan_add_only(
subdirs: deque[Path] = deque()
with scan as files:
for f in files:
context.seen += 1
_log_context(f.path, context)
@ -139,7 +137,6 @@ def _scan_remove_missing(
context = _LogContext()
with db.transaction() as conn:
context.seen += 1
d = models.File.dict_from_entry(path)
@ -163,7 +160,6 @@ def _scan_remove_missing(
dirs.append(path)
while dirs:
cwd = dirs.popleft()
try:
scan = os.scandir(cwd)
@ -176,7 +172,6 @@ def _scan_remove_missing(
subdirs: deque[Path] = deque()
with scan as files:
for f in files:
context.seen += 1
_log_context(f.path, context)
@ -333,9 +328,7 @@ def ingest_db_file(
with db.transaction() as conn, other_db.transaction(
force_rollback=True
) as other_conn:
for row in db.iter_all(other_conn):
context.seen += 1
_log_context(row["location"], context)
@ -392,7 +385,6 @@ def ingest_rclone_json(
context = _LogContext()
with db.transaction() as conn:
for d in _parse_rclone_json(file, remote_base=remote_base):
context.seen += 1
@ -431,9 +423,7 @@ def _ingest_ls_add_only(
context = _LogContext()
with db.transaction() as conn:
for f in ls_parser.parse_file(file, ref_year=ref_year):
if isinstance(f, ls_parser.ChangeDir):
continue
@ -483,11 +473,8 @@ def _ingest_ls_remove_missing(
context = _LogContext()
with db.transaction() as conn:
for f in ls_parser.parse_file(file, ref_year=ref_year):
if isinstance(f, ls_parser.ChangeDir):
if f.to is not None:
expected = {name for name in db.files_in_dir(conn, str(f.to))}
@ -550,14 +537,12 @@ def _ls_files(
with db.transaction() as conn:
if match == "regex":
for f in db.search(
conn, type=type, hostname_regex=host, regex=f"(?i){path}"
):
yield models.File(**f) # type: ignore
elif match == "glob":
filters: dict[str, "str | None"] = {"type": type}
if host and _uses_glob(host):
filters["hostname_like"] = liketerm_from_glob(host)
@ -588,7 +573,6 @@ def _ls_files(
yield models.File(**f) # type: ignore
elif match == "fuzzy":
term = "%".join(db.escape(p) for p in path.split("/"))
for f in db.search(conn, like=f"%{term}%", type=type, hostname=host):
@ -598,9 +582,7 @@ def _ls_files(
def _ls_dir_contents(
*, host: str, path: str, type: "models.StatType | None" = None
) -> Iterable[models.File]:
with db.transaction() as conn:
row = db.get_file(conn, location=path, hostname=host)
if not row:
@ -660,7 +642,6 @@ def rm(pathspec: str, *, include_children: bool = False) -> None:
path = path[:-1]
with db.transaction() as conn:
row = db.get_file(conn, hostname=host, location=path)
if not row:

View file

@ -5,6 +5,7 @@ from os import DirEntry
from pathlib import Path
from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG
from typing import Literal
from typing_extensions import Self
from . import config