5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS1 bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.common
15from __future__ import print_function
34except ImportError: curses = None
38except ImportError: zstandard = None
41## Python types for filesystem paths
42PATH_TYPES = (six.binary_type, six.text_type)
43if six.PY34: PATH_TYPES += (importlib.import_module("pathlib").Path, )
44## Python types for both byte strings and text strings
45STRING_TYPES = (six.binary_type, six.text_type)
46## Python types for text strings
47TEXT_TYPES = (six.binary_type, six.text_type) if six.PY2 else (six.text_type, )
50class MatchMarkers(object):
51 """Highlight markers for matches in message values."""
62 EMPTY_REPL =
"%s''%s" % (START, END)
66 """Populates highlight markers with specified value."""
77 Prints to console, supports color output.
79 If configured with `apimode=
True`, logs debugs
and warnings to logger
and raises errors.
82 STYLE_RESET = "\x1b(B\x1b[m"
83 STYLE_HIGHLIGHT =
"\x1b[31m"
84 STYLE_LOWLIGHT =
"\x1b[38;2;105;105;105m"
85 STYLE_SPECIAL =
"\x1b[35m"
86 STYLE_SPECIAL2 =
"\x1b[36m"
87 STYLE_WARN =
"\x1b[33m"
88 STYLE_ERROR =
"\x1b[31m\x1b[2m"
90 DEBUG_START, DEBUG_END = STYLE_LOWLIGHT, STYLE_RESET
91 WARN_START, WARN_END = STYLE_WARN, STYLE_RESET
92 ERROR_START, ERROR_END = STYLE_ERROR, STYLE_RESET
113 def configure(cls, color=True, apimode=False):
115 Initializes printer, for terminal output
or library mode.
117 For terminal output, initializes terminal colors,
or disables colors
if unsupported.
119 @param color
True /
False /
None for auto-detect
from TTY support;
120 will be disabled
if terminal does
not support colors
121 @param apimode whether to log debugs
and warnings to logger
and raise errors,
135 """Initializes terminal for color output, or disables color output if unsupported."""
136 if cls.
COLOR is not None:
return
138 try: cls.
WIDTHWIDTH = shutil.get_terminal_size().columns
139 except Exception:
pass
143 if cls.
COLOR and not sys.stdout.isatty():
148 if sys.stdout.isatty()
or cls.
COLOR:
149 cls.
WIDTHWIDTH = curses.initscr().getmaxyx()[1]
151 except Exception:
pass
164 def print(cls, text="", *args, **kwargs):
166 Prints text, formatted with args
and kwargs.
168 @param __file file object to
print to
if not sys.stdout
169 @param __end line end to use
if not linefeed
"\n"
170 @param __once whether text should be printed only once
171 and discarded on any further calls (applies to unformatted text)
174 if kwargs.pop(
"__once",
False):
177 fileobj, end = kwargs.pop(
"__file", sys.stdout), kwargs.pop(
"__end",
"\n")
178 pref, suff = kwargs.pop(
"__prefix",
""), kwargs.pop(
"__suffix",
"")
180 text = cls.
_format(text, *args, **kwargs)
185 print(pref + text + suff, end=end, file=fileobj)
186 not fileobj.isatty()
and fileobj.flush()
190 def error(cls, text="", *args, **kwargs):
192 Prints error to stderr, formatted with args
and kwargs,
in error colors
if supported.
194 Raises exception instead
if APIMODE.
199 cls.
print(text, *args, **dict(kwargs, **KWS))
203 def warn(cls, text="", *args, **kwargs):
205 Prints warning to stderr, or logs to logger
if APIMODE.
207 Text
is formatted
with args
and kwargs,
in warning colors
if supported.
210 text = cls.
_format(text, *args, **kwargs)
211 if text: logging.getLogger(__name__).warning(text)
214 cls.
print(text, *args, **dict(kwargs, **KWS))
218 def debug(cls, text="", *args, **kwargs):
220 Prints debug text to stderr, or logs to logger
if APIMODE.
222 Text
is formatted
with args
and kwargs,
in warning colors
if supported.
225 text = cls.
_format(text, *args, **kwargs)
226 if text: logging.getLogger(__name__).
debug(text)
229 cls.
print(text, *args, **dict(kwargs, **KWS))
233 def log(cls, level, text="", *args, **kwargs):
235 Prints text to stderr, or logs to logger
if APIMODE.
237 Text
is formatted
with args
and kwargs,
in level colors
if supported.
239 @param level logging level like `logging.ERROR`
or "ERROR"
242 text = cls.
_format(text, *args, **kwargs)
243 if text: logging.getLogger(__name__).
log(level, text)
245 level = logging.getLevelName(level)
246 if not isinstance(level, TEXT_TYPES): level = logging.getLevelName(level)
248 func(text, *args, **dict(kwargs, __file=sys.stderr))
253 """Ends current open line, if any."""
259 def _format(cls, text="", *args, **kwargs):
261 Returns text formatted with printf-style
or format() arguments.
263 @param __once registers text, returns
"" if text
not unique
265 text, fmted = str(text), False
266 if kwargs.get(
"__once"):
269 for k
in (
"__file",
"__end",
"__once",
"__prefix",
"__suffix"): kwargs.pop(k,
None)
270 try: text, fmted = (text % args
if args
else text), bool(args)
271 except Exception:
pass
272 try: text, fmted = (text % kwargs
if kwargs
else text), fmted
or bool(kwargs)
273 except Exception:
pass
274 try: text = text.format(*args, **kwargs)
if not fmted
and (args
or kwargs)
else text
275 except Exception:
pass
281 """Namespace for program argument handling."""
283 UNSIGNED_INTS = {
"NTH_MESSAGE",
"NTH_MATCH",
"MAX_COUNT",
"MAX_PER_TOPIC",
"MAX_TOPICS",
284 "BEFORE",
"AFTER",
"CONTEXT",
"LINES_AROUND_MATCH",
"MAX_FIELD_LINES",
285 "MAX_MESSAGE_LINES",
"WRAP_WIDTH",
"QUEUE_SIZE_IN",
"QUEUE_SIZE_OUT"}
286 UNSIGNED_FLOATS = {
"NTH_INTERVAL",
"TIME_SCALE"}
287 SIGNED_INTS = {
"START_INDEX",
"END_INDEX",
"START_LINE",
"END_LINE"}
288 STRINGS = {
"PUBLISH_PREFIX",
"PUBLISH_SUFFIX",
"PUBLISH_FIXNAME"}
289 STRING_COLLECTIONS = {
"TOPIC",
"SKIP_TOPIC",
"TYPE",
"SKIP_TYPE",
"SELECT_FIELD",
290 "NO_SELECT_FIELD",
"EMIT_FIELD",
"NO_EMIT_FIELD",
"MATCH_WRAPPER"}
291 NO_FLATTENS = {
"WRITE"}
293 UNSIGNED_WHEN = {
"START_INDEX":
"LIVE",
"END_INDEX":
"LIVE"}
294 PRECASTS = {
"NTH_INTERVAL":
lambda v: import_item(
"grepros.api").to_sec(v)}
295 DEDUPE_UNLESS = {
"PATTERN":
"EXPRESSION"}
299 """RawTextHelpFormatter returning custom metavar for non-flattenable list arguments."""
301 def _format_action_invocation(self, action):
302 """Returns formatted invocation."""
305 if action.dest
in ArgumentUtil.NO_FLATTENS:
306 return " ".join(action.option_strings + [action.metavar])
311 def make_parser(cls, arguments, formatter=HelpFormatter):
313 Returns a configured ArgumentParser instance for program arguments.
315 @param arguments argparse options
as {description, epilog, arguments: [], groups: []}
316 @param formatter help formatter
class to use
318 kws = dict(description=arguments["description"], epilog=arguments[
"epilog"],
319 formatter_class=formatter, add_help=
False)
320 if sys.version_info >= (3, 5): kws.update(allow_abbrev=
False)
321 argparser = argparse.ArgumentParser(**kws)
322 for arg
in map(dict, arguments[
"arguments"]):
323 argparser.add_argument(*arg.pop(
"args"), **arg)
324 for group, groupargs
in arguments.get(
"groups", {}).items():
325 grouper = argparser.add_argument_group(group)
326 for arg
in map(dict, groupargs):
327 grouper.add_argument(*arg.pop(
"args"), **arg)
334 Converts and validates program argument namespace, prints
and raises on error.
336 Returns new namespace
with arguments
in expected type
and form.
347 """Returns new program argument namespace with list values flattened and deduplicated."""
349 for k, v
in vars(args).items():
350 if not isinstance(v, list):
continue
353 v2 = [x
for xx
in v2
for x
in (xx
if isinstance(xx, list)
else [xx])]
355 v2 = [here.append(x)
or x
for here
in ([],)
for x
in v2
if x
not in here]
356 if v2 != v: setattr(args, k, v2)
362 """Sets command-line specific flag state to program argument namespace."""
366 args.BEFORE = args.AFTER = args.CONTEXT
369 args.PROGRESS = args.PROGRESS
and not args.CONSOLE
372 args.VERBOSE =
False if args.SKIP_VERBOSE
else args.VERBOSE
or \
373 (
False if args.PROGRESS
else cli
and not args.CONSOLE)
376 args.LINE_PREFIX = args.LINE_PREFIX
and (args.RECURSE
or len(args.FILE) != 1
377 or args.PATH
or any(
"*" in x
for x
in args.FILE))
379 for k, v
in [(
"START_TIME", args.START_TIME), (
"END_TIME", args.END_TIME)]:
380 if not isinstance(v, (six.binary_type, six.text_type)):
continue
382 except Exception:
pass
383 try: v
if isinstance(v, float)
else setattr(args, k,
parse_datetime(v))
384 except Exception:
pass
390 Validates arguments, prints errors, returns success.
392 @param args arguments object like argparse.Namespace
397 for opts
in getattr(args,
"WRITE", []):
400 try: dict([opt.split(
"=", 1)])
401 except Exception: erropts.append(opt)
403 errors.append(
'Invalid KEY=VALUE in "--write %s": %s' %
404 (
" ".join(opts),
" ".join(erropts)))
406 for n
in (
"START_TIME",
"END_TIME"):
407 v = getattr(args, n,
None)
408 if v
is None:
continue
410 except Exception:
pass
411 try: isinstance(v, (six.binary_type, six.text_type))
and parse_datetime(v)
412 except Exception: errors.append(
"Invalid ISO datetime for %s: %s" %
413 (n.lower().replace(
"_",
" "), v))
417 for err
in errors: ConsolePrinter.log(logging.ERROR, err)
423 """Converts and validates types in argument namespace, returns list of errors, if any."""
425 try:
return ctor(v),
None
426 except Exception
as e:
return v, e
428 vals1, vals2, errors = vars(args), {}, {}
431 if vals1.get(k)
is not None: vals1[k] = f(vals1[k])
432 for k, v
in vals1.items():
433 if v
is None:
continue
437 elif k
in cls.
STRINGS: v = str(v)
439 v = [str(x)
for x
in (v
if isinstance(v, (dict, list, set, tuple))
else [v])]
441 err =
"Cannot be negative."
442 if not err
and vals1.get(cls.
UNSIGNED_WHEN.get(k))
and v < 0:
444 err =
"Cannot be negative for %s." % label
445 (errors
if err
else vals2)[k] = err
or v
448 for k, err
in errors.items():
449 text =
"Invalid value for %s: %s" % (k.lower().replace(
"_",
" "), getattr(args, k))
450 if isinstance(err, six.string_types): text +=
". %s" % err
451 error_texts.append(text)
452 for k, v
in vals2.items()
if not errors
else ():
459 """Decompresses zstandard archives."""
462 EXTENSIONS = (
".zst",
".zstd")
465 ZSTD_MAGIC = b
"\x28\xb5\x2f\xfd"
471 Decompresses file to same directory, showing optional progress bar.
473 @return uncompressed file path
476 path2, bar, size, processed = os.path.splitext(path)[0], None, os.path.getsize(path), 0
477 fmt =
lambda s: format_bytes(s, strip=
False)
479 tpl =
" Decompressing %s (%s): {afterword}" % (os.path.basename(path), fmt(size))
482 ConsolePrinter.warn(
"Compressed file %s (%s), decompressing to %s.", path, fmt(size), path2)
483 bar
and bar.update(0).start()
485 with open(path,
"rb")
as f, open(path2,
"wb")
as g:
486 reader = zstandard.ZstdDecompressor().stream_reader(f)
488 chunk = reader.read(1048576)
492 processed += len(chunk)
493 bar
and (setattr(bar,
"afterword", fmt(processed)), bar.update(processed))
498 finally: bar
and (setattr(bar,
"pulse",
False), bar.update(processed).stop())
504 """Returns whether file is a recognized archive."""
505 result = os.path.isfile(path)
507 result = any(str(path).lower().endswith(x)
for x
in cls.
EXTENSIONS)
509 with open(path,
"rb")
as f:
516 """Returns the path without archive extension, if any."""
517 return os.path.splitext(path)[0]
if cls.
is_compressed(path)
else path
522 """Raises error if decompression library not available."""
523 if not zstandard:
raise Exception(
"zstandard not installed, cannot decompress")
529 A simple ASCII progress bar with a ticker thread
532 '[---------/ 36% ] Progressing text..'.
534 '[ ---- ] Progressing text..'.
537 def __init__(self, max=100, value=0, min=0, width=30, forechar="-",
538 backchar=" ", foreword="", afterword="", interval=1,
539 pulse=False, aftertemplate=" {afterword}", **afterargs):
541 Creates a new progress bar, without drawing it yet.
543 @param max progress bar maximum value, 100%
544 @param value progress bar initial value
545 @param min progress bar minimum value,
for 0%
546 @param width progress bar width (
in characters)
547 @param forechar character used
for filling the progress bar
548 @param backchar character used
for filling the background
549 @param foreword text
in front of progress bar
550 @param afterword text after progress bar
551 @param interval ticker thread interval,
in seconds
552 @param pulse ignore value-min-max, use constant pulse instead
553 @param aftertemplate afterword format() template, populated
with vars(self)
and afterargs
554 @param afterargs additional keywords
for aftertemplate formatting
556 threading.Thread.__init__(self)
574 self.
bar =
"%s[%s%s]%s" % (foreword,
575 backchar
if pulse
else forechar,
576 backchar * (width - 3),
577 aftertemplate.format(**dict(vars(self), **self.
afterargs)))
583 def update(self, value=None, draw=True, flush=False):
584 """Updates the progress bar value, and refreshes by default; returns self."""
585 if value
is not None:
592 bartext =
"%s[%s]%s" % (self.
foreword,
600 elif pos >= self.
width - 1:
601 dash = dash[:-(pos - self.
width - 2)]
603 bar =
"[%s]" % (self.
backchar * w_full)
606 bar = bar[:pos1 - len(dash)] + dash + bar[pos1:]
607 bartext =
"%s%s%s" % (self.
foreword, bar, afterword)
610 percent = int(round(100.0 * self.
value / (self.
max or 1)))
611 percent = 99
if percent == 100
and self.
value < self.
max else percent
612 w_done =
max(1, int(round((percent / 100.0) * w_full)))
615 if draw
and w_done < w_full: char_last = next(self.
progresschar)
616 bartext =
"%s[%s%s%s]%s" % (
618 self.
backchar * (w_full - w_done), afterword)
620 centertxt =
" %2d%% " % percent
621 pos = len(self.
foreword) + int(self.
width / 2 - len(centertxt) / 2)
622 bartext = bartext[:pos] + centertxt + bartext[pos + len(centertxt):]
624 self.
printbar = bartext +
" " *
max(0, len(self.
bar) - len(bartext))
625 self.
bar, prevbar = bartext, self.
bar
626 if draw
and (flush
or prevbar != self.
bar): self.
draw(flush)
630 def draw(self, flush=False):
632 Prints the progress bar, from the beginning of the current line.
634 @param flush add linefeed to end, forcing a new line
for any next
print
636 ConsolePrinter.print("\r" + self.
printbar, __end=
" ")
639 ConsolePrinter.print(
"\r" + self.
printbar, __end=
" ")
640 if flush: ConsolePrinter.flush()
656 """Wrapper for iterable value with specified fixed length."""
658 def __init__(self, iterable, count):
660 @param iterable any iterable value
661 @param count value to
return for len(self),
or callable to
return value
from
663 self._iterer = iter(iterable)
668 def __next__(self):
return next(self.
_iterer)
676 TextWrapper that supports custom substring widths in line width calculation.
678 Intended
for wrapping text containing ANSI control codes.
679 Heavily refactored
from Python standard library textwrap.TextWrapper.
683 SPACE_RGX = re.compile(r"([%s]+)" % re.escape(
"\t\n\x0b\x0c\r "))
689 def __init__(self, width=80, subsequent_indent=" ", break_long_words=True,
690 drop_whitespace=False, max_lines=None, placeholder=" ...", custom_widths=None):
692 @param width default maximum width to wrap at, 0 disables
693 @param subsequent_indent string prepended to all consecutive lines
694 @param break_long_words
break words longer than width
695 @param drop_whitespace drop leading
and trailing whitespace
from lines
696 @param max_lines count to truncate lines
from
697 @param placeholder appended to last retained line when truncating
698 @param custom_widths {substring: len} to use
in line width calculation
708 self.customs = {s: l for s, l
in (custom_widths
or {}).items()
if s}
710 self.
custom_rgx = re.compile(
"(%s)" %
"|".join(re.escape(s)
for s
in self.
customs))
719 """Returns a list of wrapped text lines, without linebreaks."""
722 for i, line
in enumerate(text.splitlines()):
733 if not result[-1].endswith(self.
placeholder.lstrip()):
740 """Decreases the configured width by given amount (number or string)."""
741 reserved = self.
strlen(reserved)
if isinstance(reserved, TEXT_TYPES)
else reserved
746 """Returns length of string, using custom substring widths."""
753 """Returns string with custom substrings and whitespace stripped."""
757 def _wrap_chunks(self, chunks):
758 """Returns a list of lines joined from text chunks, wrapped to width."""
764 cur_line, cur_len = [], 0
772 l = self.
strlen(chunks[-1])
773 if cur_len + l <= width:
774 cur_line.append(chunks.pop())
779 if chunks
and self.
strlen(chunks[-1]) > width:
782 cur_len = sum(map(self.
strlen, cur_line))
785 cur_len -= len(cur_line[-1])
791 and len(chunks) == 1
and not self.
strip(chunks[0])) \
792 and cur_len <= width):
793 lines.append(indent +
"".join(cur_line))
799 if self.
strip(cur_line[-1]):
800 if cur_len + placeholder_len <= width:
801 lines.append(indent +
"".join(cur_line))
803 if len(cur_line) == 1:
804 lines.append(indent + cur_line[-1])
805 cur_len -= self.
strlen(cur_line[-1])
808 if not lines
or self.
strlen(lines[-1]) + placeholder_len > self.
width:
815 def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width):
817 Breaks last chunk if not only containing a custom-width string,
818 else adds last chunk to current line
if line still empty.
820 text = reversed_chunks[-1]
821 break_pos = 1 if width < 1
else width - cur_len
824 unbreakable_spans = [m.span()
for m
in self.
custom_rgx.finditer(text)]
825 text_in_spans = [x
for x
in unbreakable_spans
if x[0] <= break_pos < x[1]]
826 last_span = text_in_spans
and sorted(text_in_spans, key=
lambda x: -x[1])[0]
827 break_pos = last_span[1]
if last_span
else break_pos
828 breakable = 0 < break_pos < len(text)
831 cur_line.append(text[:break_pos])
832 reversed_chunks[-1] = text[break_pos:]
834 cur_line.append(reversed_chunks.pop())
837def drop_zeros(v, replace=""):
838 """Drops trailing zeros and empty decimal separator, if any."""
839 repl =
lambda m: (
"." if m[1]
or replace
else "") + (m[1]
or "") + len(m[2]) * replace
840 return re.sub(
r"\.(\d*[1-9])?(0+)$", repl, str(v))
843def ellipsize(text, limit, ellipsis=".."):
844 """Returns text ellipsized if beyond limit."""
845 if limit <= 0
or len(text) <= limit:
847 return text[:max(0, limit - len(ellipsis))] + ellipsis
850def ensure_namespace(val, defaults=None, dashify=(
"WRITE_OPTIONS", ), **kwargs):
852 Returns a copy of value as `argparse.Namespace`,
with all keys uppercase.
854 Arguments
with list/tuple values
in defaults are ensured to have list/tuple values.
856 @param val `argparse.Namespace`
or dictionary
or `
None`
857 @param defaults additional arguments to set to namespace
if missing
858 @param dashify names of dictionary arguments where to replace
859 the first underscore
in string keys
with a dash
860 @param kwargs any
and all argument overrides
as keyword overrides
862 if val
is None or isinstance(val, dict): val = argparse.Namespace(**val
or {})
864 for k, v
in list(vars(val).items()):
867 setattr(val, k.upper(), v)
868 for k, v
in ((k.upper(), v)
for k, v
in (defaults.items()
if defaults
else ())):
869 if not hasattr(val, k): setattr(val, k,
structcopy(v))
870 for k, v
in ((k.upper(), v)
for k, v
in kwargs.items()): setattr(val, k, v)
871 for k, v
in ((k.upper(), v)
for k, v
in (defaults.items()
if defaults
else ())):
872 if isinstance(v, (tuple, list))
and not isinstance(getattr(val, k), (tuple, list)):
873 setattr(val, k, [getattr(val, k)])
874 for arg
in (getattr(val, n.upper(),
None)
for n
in dashify
or ()):
875 for k
in (list(arg)
if isinstance(arg, dict)
else []):
876 if isinstance(k, six.text_type)
and "_" in k
and 0 < k.index(
"_") < len(k) - 1:
877 arg[k.replace(
"_",
"-", 1)] = arg.pop(k)
881def filter_dict(dct, keys=(), values=(), reverse=
False):
883 Filters string dictionary by keys and values, supporting * wildcards.
884 Dictionary values may be additional lists; keys
with emptied lists are dropped.
886 Retains only entries that find a match (supports * wildcards);
887 if reverse, retains only entries that do
not find a match.
892 for k, vv
in dct.items()
if not reverse
else ():
893 is_array = isinstance(vv, (list, tuple))
894 for v
in (vv
if is_array
else [vv]):
895 if (
not keys
or k
in keys
or any(p.match(k)
for p
in kpatterns)) \
896 and (
not values
or v
in values
or any(p.match(v)
for p
in vpatterns)):
897 result.setdefault(k, []).append(v)
if is_array
else result.update({k: v})
898 for k, vv
in dct.items()
if reverse
else ():
899 is_array = isinstance(vv, (list, tuple))
900 for v
in (vv
if is_array
else [vv]):
901 if (k
not in keys
and not any(p.match(k)
for p
in kpatterns)) \
902 and (v
not in values
and not any(p.match(v)
for p
in vpatterns)):
903 result.setdefault(k, []).append(v)
if is_array
else result.update({k: v})
907def find_files(names=(), paths=(), suffixes=(), skip_suffixes=(), recurse=
False):
909 Yields filenames from current directory
or given paths.
911 Seeks only files
with given extensions
if names
not given.
912 Logs errors
for names
and paths
not found.
914 @param names list of specific files to
return (supports * wildcards)
915 @param paths list of paths to look under,
if not using current directory
916 @param suffixes list of suffixes to select
if no wilcarded names,
as (
".ext1", ..)
917 @param skip_suffixes list of suffixes to skip
if no wildcarded names,
as (
".ext1", ..)
918 @param recurse whether to recurse into subdirectories
920 namesfound, pathsfound = set(), set()
921 ok = lambda f: (
not suffixes
or any(map(f.endswith, suffixes))) \
922 and not any(map(f.endswith, skip_suffixes))
923 def iter_files(directory):
924 """Yields matching filenames from path."""
925 if os.path.isfile(directory):
926 ConsolePrinter.log(logging.ERROR,
"%s: Is a file", directory)
928 for root
in sorted(glob.glob(directory)):
929 pathsfound.add(directory)
930 for path, _, files
in os.walk(root):
932 p = n
if not paths
or os.path.isabs(n)
else os.path.join(path, n)
933 for f
in (f
for f
in glob.glob(p)
if "*" not in n
or ok(f)):
935 ConsolePrinter.log(logging.ERROR,
"%s: Is a directory", f)
939 for f
in ()
if names
else (os.path.join(path, f)
for f
in sorted(files)
if ok(f)):
945 for f
in (f
for p
in paths
or [
"."]
for f
in iter_files(p)):
946 if os.path.abspath(f)
not in processed:
947 processed.add(os.path.abspath(f))
948 if not paths
and f == os.path.join(
".", os.path.basename(f)):
949 f = os.path.basename(f)
952 for path
in (p
for p
in paths
if p
not in pathsfound):
953 ConsolePrinter.log(logging.ERROR,
"%s: No such directory", path)
954 for name
in (n
for n
in names
if n
not in namesfound):
955 ConsolePrinter.log(logging.ERROR,
"%s: No such file", name)
959 """Formats the datetime.timedelta as "3d 40h 23min 23.1sec"."""
960 dd, rem = divmod(delta.total_seconds(), 24*3600)
961 hh, rem = divmod(rem, 3600)
962 mm, ss = divmod(rem, 60)
964 for c, n
in (dd,
"d"), (hh,
"h"), (mm,
"min"), (ss,
"sec"):
965 f =
"%d" % c
if "sec" != n
else drop_zeros(round(c, 9))
966 if f !=
"0": items += [f + n]
967 return " ".join(items
or [
"0sec"])
970def format_bytes(size, precision=2, inter=" ", strip=True):
971 """Returns a formatted byte size (like 421.40 MB), trailing zeros optionally removed."""
972 result =
"" if math.isinf(size)
or math.isnan(size)
else "0 bytes"
974 UNITS = [
"bytes"] + [x +
"B" for x
in "KMGTPEZY"]
975 size, sign = abs(size), (
"-" if size < 0
else "")
976 exponent = min(int(math.log(size, 1024)), len(UNITS) - 1)
977 result =
"%.*f" % (precision, size / (1024. ** exponent))
978 if strip: result = drop_zeros(result)
979 result = sign + result + inter + (UNITS[exponent]
if result !=
"1" or exponent
else "byte")
984 """Returns ISO datetime from UNIX timestamp."""
985 return datetime.datetime.fromtimestamp(stamp).isoformat(sep=
" ")
990 Returns the fully namespaced name for a Python module,
class, function
or object.
992 E.g.
"my.thing" or "my.module.MyCls" or "my.module.MyCls.my_method"
993 or "my.module.MyCls<0x1234abcd>" or "my.module.MyCls<0x1234abcd>.my_method".
995 namer = lambda x: getattr(x,
"__qualname__", getattr(x,
"__name__",
""))
996 if inspect.ismodule(obj):
return namer(obj)
997 if inspect.isclass(obj):
return ".".join((obj.__module__, namer(obj)))
998 if inspect.isroutine(obj):
1000 try: self = six.get_method_self(obj)
1001 except Exception: self =
None
1002 if self
is not None: parts.extend((get_name(self), obj.__name__))
1003 elif hasattr(obj,
"im_class"): parts.extend((get_name(obj.im_class), namer(obj)))
1004 elif hasattr(obj,
"__module__"): parts.extend((obj.__module__, namer(obj)))
1005 else: parts.append(namer(obj))
1006 return ".".join(parts)
1008 return "%s.%s<0x%x>" % (cls.__module__, namer(cls), id(obj))
1012 """Returns whether function supports taking specified argument by name."""
1013 spec = getattr(inspect,
"getfullargspec", getattr(inspect,
"getargspec",
None))(func)
1014 return name
in spec.args
or name
in getattr(spec,
"kwonlyargs", ())
or \
1015 bool(getattr(spec,
"varkw",
None)
or getattr(spec,
"keywords",
None))
1018def import_item(name):
1020 Returns imported module, or identifier
from imported namespace; raises on error.
1022 @param name Python module name like
"my.module"
1023 or module namespace identifier like
"my.module.Class"
1025 result, parts = None, name.split(
".")
1026 for i, item
in enumerate(parts):
1027 path, success =
".".join(parts[:i + 1]),
False
1028 try: result, success = importlib.import_module(path),
True
1029 except ImportError:
pass
1030 if not success
and i:
1031 try: result, success = getattr(result, item),
True
1032 except AttributeError:
pass
1034 raise ImportError(
"No module or identifier named %r" % path)
1039 """Returns whether value is iterable."""
1041 except Exception:
return False
1046 """Returns whether value is a file-like object."""
1047 try:
return isinstance(value, (file, io.IOBase))
1048 except NameError:
return isinstance(value, io.IOBase)
1052 """Creates directory structure for path if not already existing."""
1053 parts, accum = list(filter(bool, os.path.realpath(path).split(os.sep))), []
1055 accum.append(parts.pop(0))
1056 curpath = os.path.join(os.sep, accum[0] + os.sep, *accum[1:])
1057 if not os.path.exists(curpath):
1063 Returns a deep copy of a standard data structure (dict, list, set, tuple),
1064 other object types reused instead of copied.
1066 COLLECTIONS = (dict, list, set, tuple)
1069 if isinstance(x, argparse.Namespace): x = vars(x)
1070 if not isinstance(x, COLLECTIONS):
return memo.update([(id(x), x)])
1071 for y
in sum(map(list, x.items()), [])
if isinstance(x, dict)
else x: collect(y)
1073 return copy.deepcopy(value, memo)
1077 """Returns a results-caching wrapper for the function, cache used if arguments hashable."""
1079 def inner(*args, **kwargs):
1080 key = args + sum(kwargs.items(), ())
1082 except Exception:
return func(*args, **kwargs)
1083 if key
not in cache:
1084 cache[key] = func(*args, **kwargs)
1086 return functools.update_wrapper(inner, func)
1089def merge_dicts(d1, d2):
1090 """Merges d2 into d1, recursively for nested dicts, returns d1."""
1091 for k, v
in d2.items():
1092 if k
in d1
and isinstance(v, dict)
and isinstance(d1[k], dict):
1093 merge_dicts(d1[k], v)
1101 Returns a sorted list of (start, end) spans with overlapping spans merged.
1103 @param join_blanks whether to merge consecutive zero-length spans,
1104 e.g. [(0, 0), (1, 1)] -> [(0, 1)]
1106 result = sorted(spans)
1107 if result
and join_blanks:
1108 blanks = [(a, b)
for a, b
in result
if a == b]
1109 others = [(a, b)
for a, b
in result
if a != b]
1110 others.extend(blanks[:1])
1111 for span
in blanks[1:]:
1112 if span[0] == others[-1][1] + 1:
1113 others[-1] = (others[-1][0], span[1])
1116 result = sorted(others)
1117 result, rest = result[:1], result[1:]
1119 if span[0] <= result[-1][1]:
1120 result[-1] = (result[-1][0], max(span[1], result[-1][1]))
1127 """Returns datetime object from ISO datetime string (may be partial). Raises if invalid."""
1128 BASE = re.sub(
r"\D",
"", datetime.datetime.min.isoformat())
1129 text = re.sub(
r"\D",
"", text)
1130 text += BASE[len(text):]
if text
else ""
1131 dt = datetime.datetime.strptime(text[:len(BASE)],
"%Y%m%d%H%M%S")
1132 return dt + datetime.timedelta(microseconds=int(text[len(BASE):][:6]
or "0"))
1137 Returns an integer parsed from text, raises on error.
1139 @param value text
or binary string to parse, may contain abbrevations like
"12K"
1140 @param suffixes a dictionary of multipliers like {
"K": 1024}, case-insensitive
1142 value, suffix = value.decode() if isinstance(value, six.binary_type)
else value,
None
1144 suffix = next((k
for k, v
in suffixes.items()
if value.lower().endswith(k.lower())),
None)
1145 value = value[:-len(suffix)]
if suffix
else value
1146 return int(float(value) * (suffixes[suffix]
if suffix
else 1))
1149def path_to_regex(text, sep=".", wildcard="*", end=False, intify=False):
1151 Returns re.Pattern for matching path strings
with optional integer indexes.
1153 @param text separated wildcarded path pattern like
"foo*.bar"
1154 @param sep path parts separator, optional
1155 @param wildcard simple wildcard to convert to Python wildcard pattern, optional
1156 @param end whether pattern should match until end (terminates
with $)
1157 @param intify whether path should match optional integer index between parts,
1158 like
"foo.bar" as "foo(\.\d+)?\.bar"
1160 pattern, split_wild = "",
lambda x: x.split(wildcard)
if wildcard
else [x]
1161 for i, part
in enumerate(text.split(sep)
if sep
else [text]):
1162 pattern += (
r"(%s\d+)?" % re.escape(sep))
if i
and intify
else ""
1163 pattern += (re.escape(sep)
if i
else "") +
".*".join(map(re.escape, split_wild(part)))
1164 return re.compile(pattern + (
"$" if end
else ""), re.I)
1167def plural(word, items=None, numbers=True, single="1", sep=",", pref="", suf=""):
1169 Returns the word as 'count words',
or '1 word' if count
is 1,
1170 or 'words' if count omitted.
1172 @param items item collection
or count,
1173 or None to get just the plural of the word
1174 @param numbers
if False, count
is omitted
from final result
1175 @param single prefix to use
for word
if count
is 1, e.g.
"a"
1176 @param sep thousand-separator to use
for count
1177 @param pref prefix to prepend to count, e.g.
"~150"
1178 @param suf suffix to append to count, e.g.
"150+"
1180 count = len(items) if hasattr(items,
"__len__")
else items
or 0
1181 isupper = word[-1:].isupper()
1182 suffix =
"es" if word
and word[-1:].lower()
in "sxyz" \
1183 and not word[-2:].lower().endswith(
"ay") \
1184 else "s" if word
else ""
1185 if count != 1
and "es" == suffix
and "y" == word[-1:].lower():
1186 word = word[:-1] + (
"I" if isupper
else "i")
1187 if isupper: suffix = suffix.upper()
1188 result = word + (
"" if 1 == count
else suffix)
1189 if numbers
and items
is not None:
1190 if 1 == count: fmtcount = single
1191 elif not count: fmtcount =
"0"
1192 elif sep: fmtcount =
"".join([
1193 x + (sep
if i
and not i % 3
else "")
for i, x
in enumerate(str(count)[::-1])
1195 else: fmtcount = str(count)
1197 fmtcount = pref + fmtcount + suf
1198 result =
"%s %s" % (single
if 1 == count
else fmtcount, result)
1199 return result.strip()
1204 Returns a unique version of the path.
1206 If a file or directory
with the same name already exists, returns a unique
1207 version (e.g.
"/tmp/my.2.file" if ""/tmp/my.file
" already exists).
1209 @param empty_ok whether to ignore existence
if file
is empty
1212 if "linux2" == sys.platform
and six.PY2
and isinstance(result, six.text_type) \
1213 and "utf-8" != sys.getfilesystemencoding():
1214 result = result.encode(
"utf-8")
1215 if os.path.isfile(result)
and empty_ok
and not os.path.getsize(result):
1216 return result
if isinstance(result, STRING_TYPES)
else str(result)
1217 path, name = os.path.split(result)
1218 base, ext = os.path.splitext(name)
1220 name = base[:255 - len(ext) - 2] +
".." + ext
1221 result = os.path.join(path, name)
1223 while os.path.exists(result):
1224 suffix =
".%s%s" % (counter, ext)
1225 name = base + suffix
1227 name = base[:255 - len(suffix) - 2] +
".." + suffix
1228 result = os.path.join(path, name)
1233def verify_io(f, mode):
1235 Returns whether stream or file path can be read
from and/
or written to
as binary.
1237 Prints
or raises error
if not.
1239 Tries to open file
in append mode
if verifying path writability,
1240 auto-creating missing directories
if any, will delete any file
or directory created.
1242 @param f file path,
or stream
1243 @param mode
"r" for readable,
"w" for writable,
"a" for readable
and writable
1245 result, op = True,
""
1249 if mode
in (
"r",
"a"):
1250 op =
" reading from"
1251 result = isinstance(f.read(1), bytes)
1252 if result
and mode
in (
"w",
"a"):
1254 result, _ =
True, f.write(b
"")
1257 except Exception
as e:
1258 ConsolePrinter.log(logging.ERROR,
"Error%s %s: %s", op, type(f).__name__, e)
1261 present, paths_created = os.path.exists(f), []
1263 if not present
and mode
in (
"w",
"a"):
1265 path = os.path.realpath(os.path.dirname(f))
1266 parts, accum = [x
for x
in path.split(os.sep)
if x], []
1268 accum.append(parts.pop(0))
1269 curpath = os.path.join(os.sep, accum[0] + os.sep, *accum[1:])
1270 if not os.path.exists(curpath):
1272 paths_created.append(curpath)
1273 elif not present
and "r" == mode:
1276 with open(f, {
"r":
"rb",
"w":
"ab",
"a":
"ab+"}[mode])
as g:
1277 if mode
in (
"r",
"a"):
1278 op =
" reading from"
1279 result = isinstance(g.read(1), bytes)
1280 if result
and mode
in (
"w",
"a"):
1282 result, _ =
True, g.write(b
"")
1284 except Exception
as e:
1285 ConsolePrinter.log(logging.ERROR,
"Error%s %s: %s", op, f, e)
1290 except Exception:
pass
1291 for path
in paths_created[::-1]:
1293 except Exception:
pass
1298 Returns plain wildcard like "foo*bar" as re.Pattern(
"foo.*bar", re.I).
1300 @param end whether pattern should match until end (adds $)
1302 suff = "$" if end
else ""
1303 return re.compile(
".*".join(map(re.escape, text.split(
"*"))) + suff, re.I)
1307 "PATH_TYPES",
"ArgumentUtil",
"ConsolePrinter",
"Decompressor",
"LenIterable",
"MatchMarkers",
1308 "ProgressBar",
"TextWrapper",
"drop_zeros",
"ellipsize",
"ensure_namespace",
"filter_dict",
1309 "find_files",
"format_bytes",
"format_stamp",
"format_timedelta",
"get_name",
"has_arg",
1310 "import_item",
"is_iterable",
"is_stream",
"makedirs",
"memoize",
"merge_dicts",
"merge_spans",
1311 "parse_datetime",
"parse_number",
"path_to_regex",
"plural",
"unique_path",
"verify_io",
1312 "wildcard_to_regex",
Namespace for program argument handling.
verify(cls, args)
Validates arguments, prints errors, returns success.
process_types(cls, args)
Converts and validates types in argument namespace, returns list of errors, if any.
transform(cls, args, cli=False)
Sets command-line specific flag state to program argument namespace.
make_parser(cls, arguments, formatter=HelpFormatter)
Returns a configured ArgumentParser instance for program arguments.
flatten(cls, args)
Returns new program argument namespace with list values flattened and deduplicated.
validate(cls, args, cli=False)
Converts and validates program argument namespace, prints and raises on error.
Prints to console, supports color output.
print(cls, text="", *args, **kwargs)
Prints text, formatted with args and kwargs.
flush(cls)
Ends current open line, if any.
bool APIMODE
Whether logging debugs and warnings and raising errors, instead of printing.
log(cls, level, text="", *args, **kwargs)
Prints text to stderr, or logs to logger if APIMODE.
int WIDTH
Console width in characters, updated from shutil and curses.
error(cls, text="", *args, **kwargs)
Prints error to stderr, formatted with args and kwargs, in error colors if supported.
init_terminal(cls)
Initializes terminal for color output, or disables color output if unsupported.
debug(cls, text="", *args, **kwargs)
Prints debug text to stderr, or logs to logger if APIMODE.
configure(cls, color=True, apimode=False)
Initializes printer, for terminal output or library mode.
dict PRINTS
{sys.stdout: number of texts printed, sys.stderr: ..}
warn(cls, text="", *args, **kwargs)
Prints warning to stderr, or logs to logger if APIMODE.
COLOR
Whether using colors in output.
Decompresses zstandard archives.
decompress(cls, path, progress=False)
Decompresses file to same directory, showing optional progress bar.
is_compressed(cls, path)
Returns whether file is a recognized archive.
make_decompressed_name(cls, path)
Returns the path without archive extension, if any.
validate(cls)
Raises error if decompression library not available.
tuple EXTENSIONS
Supported archive extensions.
str ZSTD_MAGIC
zstd file header magic start bytes
Wrapper for iterable value with specified fixed length.
__init__(self, iterable, count)
populate(cls, value)
Populates highlight markers with specified value.
str EMPTY_REPL
Replacement for empty string match.
str END
Placeholder at end of match.
str START
Placeholder in front of match.
str ID
Unique marker for match highlight replacements.
str EMPTY
Placeholder for empty string match.
A simple ASCII progress bar with a ticker thread.
__init__(self, max=100, value=0, min=0, width=30, forechar="-", backchar=" ", foreword="", afterword="", interval=1, pulse=False, aftertemplate=" {afterword}", **afterargs)
Creates a new progress bar, without drawing it yet.
update(self, value=None, draw=True, flush=False)
Updates the progress bar value, and refreshes by default; returns self.
draw(self, flush=False)
Prints the progress bar, from the beginning of the current line.
TextWrapper that supports custom substring widths in line width calculation.
strip(self, v)
Returns string with custom substrings and whitespace stripped.
wrap(self, text)
Returns a list of wrapped text lines, without linebreaks.
int LENCACHEMAX
Max length of strlen cache.
reserve_width(self, reserved="")
Decreases the configured width by given amount (number or string).
SPACE_RGX
Regex for breaking text at whitespace.
strlen(self, v)
Returns length of string, using custom substring widths.
path_to_regex(text, sep=".", wildcard="*", end=False, intify=False)
Returns re.Pattern for matching path strings with optional integer indexes.
format_stamp(stamp)
Returns ISO datetime from UNIX timestamp.
parse_datetime(text)
Returns datetime object from ISO datetime string (may be partial).
is_iterable(value)
Returns whether value is iterable.
wildcard_to_regex(text, end=False)
Returns plain wildcard like "foo*bar" as re.Pattern("foo.*bar", re.I).
merge_spans(spans, join_blanks=False)
Returns a sorted list of (start, end) spans with overlapping spans merged.
unique_path(pathname, empty_ok=False)
Returns a unique version of the path.
is_stream(value)
Returns whether value is a file-like object.
has_arg(func, name)
Returns whether function supports taking specified argument by name.
filter_dict(dct, keys=(), values=(), reverse=False)
Filters string dictionary by keys and values, supporting * wildcards.
find_files(names=(), paths=(), suffixes=(), skip_suffixes=(), recurse=False)
Yields filenames from current directory or given paths.
format_timedelta(delta)
Formats the datetime.timedelta as "3d 40h 23min 23.1sec".
structcopy(value)
Returns a deep copy of a standard data structure (dict, list, set, tuple), other object types reused ...
parse_number(value, suffixes=None)
Returns an integer parsed from text, raises on error.