From cc38046c1b7670b2b3b917aeb0c874af6b728d63 Mon Sep 17 00:00:00 2001 From: ducklet Date: Wed, 8 Feb 2023 22:38:11 +0100 Subject: [PATCH] move status line helper to utils module --- metadex/__main__.py | 6 +-- metadex/metadex.py | 72 +--------------------------------- metadex/utils.py | 94 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 73 deletions(-) diff --git a/metadex/__main__.py b/metadex/__main__.py index 3ecd799..77ea545 100644 --- a/metadex/__main__.py +++ b/metadex/__main__.py @@ -231,7 +231,7 @@ def cmd_ingest_rclone_json(args: argparse.Namespace) -> None: metadex.close() msg = f"Checked {context.seen} files, {context.added} new, {context.changed} changed, {context.ignored} ignored, {context.removed} removed" - print(msg.ljust(metadex._terminal_width)) + print(msg.ljust(utils._terminal_width)) @command("ingest-ls") @@ -259,7 +259,7 @@ def cmd_ingest_db(args: argparse.Namespace) -> None: ) msg = f"Checked {context.seen} files, {context.added} new, {context.changed} changed, {context.ignored} ignored, {context.removed} removed" - print(msg.ljust(metadex._terminal_width)) + print(msg.ljust(utils._terminal_width)) metadex.close() @@ -278,7 +278,7 @@ def cmd_scan(args: argparse.Namespace) -> None: ) msg = f"{basedir}: Checked {context.seen} files, {context.added} new, {context.changed} changed, {context.ignored} ignored, {context.removed} removed" - print(msg.ljust(metadex._terminal_width)) + print(msg.ljust(utils._terminal_width)) metadex.close() diff --git a/metadex/metadex.py b/metadex/metadex.py index bb1ad3c..4dcfc27 100644 --- a/metadex/metadex.py +++ b/metadex/metadex.py @@ -2,19 +2,14 @@ import json import logging import os import re -import sys -import time from collections import deque from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path -from shutil import get_terminal_size from typing import Any, Iterable, Literal, TextIO from typing_extensions import TypeAlias -from wcwidth import wcwidth - -from . import config, db, ignore, ls_parser, models +from . import config, db, ignore, ls_parser, models, utils log = logging.getLogger(__name__) @@ -33,69 +28,6 @@ def scan( return f(path, ignore_file=ignore_file, map_pathspecs=map_pathspecs) -_terminal_width = get_terminal_size().columns -_last_log = 0 - - -def _cut_and_fill( - s: str, - *, - term_width: int = _terminal_width, - suffix: str = "...", - fill_char: str = " ", -): - """Return the given string cut off to fit into the given terminal width - and fill in the remaining cells of the line. - - Some (unicode) characters require more (or less) than a single cell when - printed to terminal. This function correctly takes these into account. - The given `suffix` is appended when a string is cut off. - """ - width = _wcswidth(s) - if width <= term_width: - return s + fill_char * (term_width - width) - - suffix_len = _wcswidth(suffix) - idx = suffix_len + 1 - term_width -= suffix_len - while True: - width = _wcswidth(s[:-idx]) - if width <= term_width: - break - idx += 1 - return s[:-idx] + suffix + fill_char * (term_width - width) - - -def _wcswidth(pwcs, /): - """ - Given a unicode string, return its printable length on a terminal. - :param str pwcs: Measure width of given unicode string. - :rtype: int - :returns: The width, in cells, necessary to display the first ``n`` - characters of the unicode string ``pwcs``. - """ - width = 0 - for char in pwcs: - wcw = wcwidth(char) - if wcw < 0: - wcw = 1 # Assume width 1 for "non-printables", seems to do the trick. - width += wcw - return width - - -def _log_ephemeral(msg: str, *, debounce_ms: "int | None" = 200): - global _last_log - - if debounce_ms is not None: - now = time.monotonic() - if _last_log + (debounce_ms / 1000) > now: - return - _last_log = now - - msg = msg.encode(errors="replace").decode() - sys.stderr.write(_cut_and_fill(msg) + "\r") - - @dataclass class _LogContext: seen: int = 0 @@ -109,7 +41,7 @@ def _log_context(path: "str | Path", context: _LogContext) -> None: if config.is_stdout_piped: return - _log_ephemeral( + utils.set_status_line( f"{context.seen} a:{context.added} c:{context.changed} i:{context.ignored} r:{context.removed} {path}" ) diff --git a/metadex/utils.py b/metadex/utils.py index 4fb9fdc..897bab9 100644 --- a/metadex/utils.py +++ b/metadex/utils.py @@ -1,7 +1,12 @@ import os +import sys +import time from pathlib import Path +from shutil import get_terminal_size from typing import Literal +from wcwidth import wcwidth + _size_quantifiers = "BKMGTP" _size_map: "dict[str, int]" = { _size_quantifiers[i]: 2 ** (10 * i) for i in range(len(_size_quantifiers)) @@ -53,3 +58,92 @@ def abspath(path: Path) -> Path: Similar to Path.resolve(strict=False), but doesn't resolve symlinks.""" return Path(os.path.abspath(path)) + + +_terminal_width = get_terminal_size().columns + + +def _cut_and_fill( + s: str, + *, + term_width: int = _terminal_width, + suffix: str = "...", + fill_char: str = " ", +) -> str: + """Return the given string cut off to fit into the given terminal width + and fill in the remaining cells of the line. + + Some (unicode) characters require more (or less) than a single cell when + printed to terminal. This function correctly takes these into account. + The given `suffix` is appended when a string is cut off. + """ + width = _wcswidth(s) + if width <= term_width: + return s + fill_char * (term_width - width) + + suffix_len = _wcswidth(suffix) + idx = suffix_len + 1 + term_width -= suffix_len + while True: + width = _wcswidth(s[:-idx]) + if width <= term_width: + break + idx += 1 + return s[:-idx] + suffix + fill_char * (term_width - width) + + +def _wcswidth(pwcs: str, /) -> int: + """ + Given a unicode string, return its printable length on a terminal. + :param str pwcs: Measure width of given unicode string. + :rtype: int + :returns: The width, in cells, necessary to display the first ``n`` + characters of the unicode string ``pwcs``. + """ + width = 0 + for char in pwcs: + wcw = wcwidth(char) + if wcw < 0: + wcw = 1 # Assume width 1 for "non-printables", seems to do the trick. + width += wcw + return width + + +_last_log = 0.0 + + +def set_status_line(msg: str, *, debounce_ms: "int | None" = 200) -> None: + """Report on the status of a running process. + + The status line is here is meant as a kind of ephemeral and shared single + line of logging output. It writes a full line of text to a terminal and will + cut off any text exceeding the line length, and will fill up any remaining + space of the line with whitespace. + It does not mix well with any other kind of logging output. When you want + to use this function, stop logging with any other logger, use the status + line logger exclusively, and finally call `clean_status_line()`. + + This function has a built-in debounce controlled via `debounce_ms` which + will make sure the performance of the application isn't affected too much + by excessive logging. + """ + global _last_log + + if debounce_ms is not None: + now = time.monotonic() + if _last_log + (debounce_ms / 1000) > now: + return + _last_log = now + + msg = msg.encode(errors="replace").decode() + sys.stderr.write(_cut_and_fill(msg) + "\r") + + +def clear_status_line() -> None: + """Clean up the status line. + + Call this function after you're done with whatever process you were + reporting status for. + This will make sure there's no garbage left on the status line. + """ + set_status_line("", debounce_ms=None)