move status line helper to utils module

This commit is contained in:
ducklet 2023-02-08 22:38:11 +01:00
parent 72bd33660b
commit cc38046c1b
3 changed files with 99 additions and 73 deletions

View file

@ -231,7 +231,7 @@ def cmd_ingest_rclone_json(args: argparse.Namespace) -> None:
metadex.close()
msg = f"Checked {context.seen} files, {context.added} new, {context.changed} changed, {context.ignored} ignored, {context.removed} removed"
print(msg.ljust(metadex._terminal_width))
print(msg.ljust(utils._terminal_width))
@command("ingest-ls")
@ -259,7 +259,7 @@ def cmd_ingest_db(args: argparse.Namespace) -> None:
)
msg = f"Checked {context.seen} files, {context.added} new, {context.changed} changed, {context.ignored} ignored, {context.removed} removed"
print(msg.ljust(metadex._terminal_width))
print(msg.ljust(utils._terminal_width))
metadex.close()
@ -278,7 +278,7 @@ def cmd_scan(args: argparse.Namespace) -> None:
)
msg = f"{basedir}: Checked {context.seen} files, {context.added} new, {context.changed} changed, {context.ignored} ignored, {context.removed} removed"
print(msg.ljust(metadex._terminal_width))
print(msg.ljust(utils._terminal_width))
metadex.close()

View file

@ -2,19 +2,14 @@ import json
import logging
import os
import re
import sys
import time
from collections import deque
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from shutil import get_terminal_size
from typing import Any, Iterable, Literal, TextIO
from typing_extensions import TypeAlias
from wcwidth import wcwidth
from . import config, db, ignore, ls_parser, models
from . import config, db, ignore, ls_parser, models, utils
log = logging.getLogger(__name__)
@ -33,69 +28,6 @@ def scan(
return f(path, ignore_file=ignore_file, map_pathspecs=map_pathspecs)
_terminal_width = get_terminal_size().columns
_last_log = 0
def _cut_and_fill(
s: str,
*,
term_width: int = _terminal_width,
suffix: str = "...",
fill_char: str = " ",
):
"""Return the given string cut off to fit into the given terminal width
and fill in the remaining cells of the line.
Some (unicode) characters require more (or less) than a single cell when
printed to terminal. This function correctly takes these into account.
The given `suffix` is appended when a string is cut off.
"""
width = _wcswidth(s)
if width <= term_width:
return s + fill_char * (term_width - width)
suffix_len = _wcswidth(suffix)
idx = suffix_len + 1
term_width -= suffix_len
while True:
width = _wcswidth(s[:-idx])
if width <= term_width:
break
idx += 1
return s[:-idx] + suffix + fill_char * (term_width - width)
def _wcswidth(pwcs, /):
"""
Given a unicode string, return its printable length on a terminal.
:param str pwcs: Measure width of given unicode string.
:rtype: int
:returns: The width, in cells, necessary to display the first ``n``
characters of the unicode string ``pwcs``.
"""
width = 0
for char in pwcs:
wcw = wcwidth(char)
if wcw < 0:
wcw = 1 # Assume width 1 for "non-printables", seems to do the trick.
width += wcw
return width
def _log_ephemeral(msg: str, *, debounce_ms: "int | None" = 200):
global _last_log
if debounce_ms is not None:
now = time.monotonic()
if _last_log + (debounce_ms / 1000) > now:
return
_last_log = now
msg = msg.encode(errors="replace").decode()
sys.stderr.write(_cut_and_fill(msg) + "\r")
@dataclass
class _LogContext:
seen: int = 0
@ -109,7 +41,7 @@ def _log_context(path: "str | Path", context: _LogContext) -> None:
if config.is_stdout_piped:
return
_log_ephemeral(
utils.set_status_line(
f"{context.seen} a:{context.added} c:{context.changed} i:{context.ignored} r:{context.removed} {path}"
)

View file

@ -1,7 +1,12 @@
import os
import sys
import time
from pathlib import Path
from shutil import get_terminal_size
from typing import Literal
from wcwidth import wcwidth
_size_quantifiers = "BKMGTP"
_size_map: "dict[str, int]" = {
_size_quantifiers[i]: 2 ** (10 * i) for i in range(len(_size_quantifiers))
@ -53,3 +58,92 @@ def abspath(path: Path) -> Path:
Similar to Path.resolve(strict=False), but doesn't resolve symlinks."""
return Path(os.path.abspath(path))
_terminal_width = get_terminal_size().columns
def _cut_and_fill(
s: str,
*,
term_width: int = _terminal_width,
suffix: str = "...",
fill_char: str = " ",
) -> str:
"""Return the given string cut off to fit into the given terminal width
and fill in the remaining cells of the line.
Some (unicode) characters require more (or less) than a single cell when
printed to terminal. This function correctly takes these into account.
The given `suffix` is appended when a string is cut off.
"""
width = _wcswidth(s)
if width <= term_width:
return s + fill_char * (term_width - width)
suffix_len = _wcswidth(suffix)
idx = suffix_len + 1
term_width -= suffix_len
while True:
width = _wcswidth(s[:-idx])
if width <= term_width:
break
idx += 1
return s[:-idx] + suffix + fill_char * (term_width - width)
def _wcswidth(pwcs: str, /) -> int:
"""
Given a unicode string, return its printable length on a terminal.
:param str pwcs: Measure width of given unicode string.
:rtype: int
:returns: The width, in cells, necessary to display the first ``n``
characters of the unicode string ``pwcs``.
"""
width = 0
for char in pwcs:
wcw = wcwidth(char)
if wcw < 0:
wcw = 1 # Assume width 1 for "non-printables", seems to do the trick.
width += wcw
return width
_last_log = 0.0
def set_status_line(msg: str, *, debounce_ms: "int | None" = 200) -> None:
"""Report on the status of a running process.
The status line is here is meant as a kind of ephemeral and shared single
line of logging output. It writes a full line of text to a terminal and will
cut off any text exceeding the line length, and will fill up any remaining
space of the line with whitespace.
It does not mix well with any other kind of logging output. When you want
to use this function, stop logging with any other logger, use the status
line logger exclusively, and finally call `clean_status_line()`.
This function has a built-in debounce controlled via `debounce_ms` which
will make sure the performance of the application isn't affected too much
by excessive logging.
"""
global _last_log
if debounce_ms is not None:
now = time.monotonic()
if _last_log + (debounce_ms / 1000) > now:
return
_last_log = now
msg = msg.encode(errors="replace").decode()
sys.stderr.write(_cut_and_fill(msg) + "\r")
def clear_status_line() -> None:
"""Clean up the status line.
Call this function after you're done with whatever process you were
reporting status for.
This will make sure there's no garbage left on the status line.
"""
set_status_line("", debounce_ms=None)