grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
common.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3Common utilities.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS1 bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 23.10.2021
11@modified 06.05.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.common
15from __future__ import print_function
16import argparse
17import copy
18import datetime
19import functools
20import glob
21import importlib
22import inspect
23import io
24import itertools
25import logging
26import math
27import os
28import re
29import shutil
30import sys
31import threading
32import time
33try: import curses
34except ImportError: curses = None
35
36import six
37try: import zstandard
38except ImportError: zstandard = None
39
40
41## Python types for filesystem paths
42PATH_TYPES = (six.binary_type, six.text_type)
43if six.PY34: PATH_TYPES += (importlib.import_module("pathlib").Path, )
44## Python types for both byte strings and text strings
45STRING_TYPES = (six.binary_type, six.text_type)
46## Python types for text strings
47TEXT_TYPES = (six.binary_type, six.text_type) if six.PY2 else (six.text_type, )
48
49
50class MatchMarkers(object):
51 """Highlight markers for matches in message values."""
52
53
54 ID = "matching"
55
56 START = "<%s>" % ID
57
58 END = "</%s>" % ID
59
60 EMPTY = START + END
61
62 EMPTY_REPL = "%s''%s" % (START, END)
63
64 @classmethod
65 def populate(cls, value):
66 """Populates highlight markers with specified value."""
67 cls.IDID = str(value)
68 cls.STARTSTART = "<%s>" % cls.IDID
69 cls.ENDEND = "</%s>" % cls.IDID
71 cls.EMPTY_REPLEMPTY_REPL = "%s''%s" % (cls.STARTSTART, cls.ENDEND)
72
73
74
75class ConsolePrinter(object):
76 """
77 Prints to console, supports color output.
78
79 If configured with `apimode=True`, logs debugs and warnings to logger and raises errors.
80 """
81
82 STYLE_RESET = "\x1b(B\x1b[m" # Default color+weight
83 STYLE_HIGHLIGHT = "\x1b[31m" # Red
84 STYLE_LOWLIGHT = "\x1b[38;2;105;105;105m" # Dim gray
85 STYLE_SPECIAL = "\x1b[35m" # Purple
86 STYLE_SPECIAL2 = "\x1b[36m" # Cyan
87 STYLE_WARN = "\x1b[33m" # Yellow
88 STYLE_ERROR = "\x1b[31m\x1b[2m" # Dim red
89
90 DEBUG_START, DEBUG_END = STYLE_LOWLIGHT, STYLE_RESET # Metainfo wrappers
91 WARN_START, WARN_END = STYLE_WARN, STYLE_RESET # Warning message wrappers
92 ERROR_START, ERROR_END = STYLE_ERROR, STYLE_RESET # Error message wrappers
93
94
95 COLOR = None
96
97
98 WIDTH = 80
99
100
101 PRINTS = {}
102
103
104 APIMODE = False
105
106 _COLORFLAG = None
107
108 _LINEOPEN = False
109
110 _UNIQUES = set()
111
112 @classmethod
113 def configure(cls, color=True, apimode=False):
114 """
115 Initializes printer, for terminal output or library mode.
116
117 For terminal output, initializes terminal colors, or disables colors if unsupported.
118
119 @param color True / False / None for auto-detect from TTY support;
120 will be disabled if terminal does not support colors
121 @param apimode whether to log debugs and warnings to logger and raise errors,
122 instead of printing
123 """
124 cls.APIMODEAPIMODE = bool(apimode)
125 cls._COLORFLAG = color
126 if apimode:
127 cls.DEBUG_START, cls.DEBUG_END = "", ""
128 cls.WARN_START, cls.WARN_END = "", ""
129 cls.ERROR_START, cls.ERROR_END = "", ""
130 else: cls.init_terminal()
133 @classmethod
134 def init_terminal(cls):
135 """Initializes terminal for color output, or disables color output if unsupported."""
136 if cls.COLOR is not None: return
137
138 try: cls.WIDTHWIDTH = shutil.get_terminal_size().columns # Py3
139 except Exception: pass # Py2
140 cls.COLOR = (cls._COLORFLAG is not False)
141 try:
142 curses.setupterm()
143 if cls.COLOR and not sys.stdout.isatty():
144 raise Exception()
145 except Exception:
146 cls.COLOR = bool(cls._COLORFLAG)
147 try:
148 if sys.stdout.isatty() or cls.COLOR:
149 cls.WIDTHWIDTH = curses.initscr().getmaxyx()[1]
150 curses.endwin()
151 except Exception: pass
152
153 if cls.COLOR:
155 cls.WARN_START, cls.WARN_END = cls.STYLE_WARN, cls.STYLE_RESET
156 cls.ERROR_START, cls.ERROR_END = cls.STYLE_ERROR, cls.STYLE_RESET
157 else:
158 cls.DEBUG_START, cls.DEBUG_END = "", ""
159 cls.WARN_START, cls.WARN_END = "", ""
160 cls.ERROR_START, cls.ERROR_END = "", ""
161
162
163 @classmethod
164 def print(cls, text="", *args, **kwargs):
165 """
166 Prints text, formatted with args and kwargs.
167
168 @param __file file object to print to if not sys.stdout
169 @param __end line end to use if not linefeed "\n"
170 @param __once whether text should be printed only once
171 and discarded on any further calls (applies to unformatted text)
172 """
173 text = str(text)
174 if kwargs.pop("__once", False):
175 if text in cls._UNIQUES: return
176 cls._UNIQUES.add(text)
177 fileobj, end = kwargs.pop("__file", sys.stdout), kwargs.pop("__end", "\n")
178 pref, suff = kwargs.pop("__prefix", ""), kwargs.pop("__suffix", "")
179 if cls._LINEOPEN_LINEOPEN and "\n" in end: pref = "\n" + pref # Add linefeed to end open line
180 text = cls._format(text, *args, **kwargs)
181
182 cls.PRINTS[fileobj] = cls.PRINTS.get(fileobj, 0) + 1
183 cls._LINEOPEN_LINEOPEN = "\n" not in end
184 cls.init_terminal()
185 print(pref + text + suff, end=end, file=fileobj)
186 not fileobj.isatty() and fileobj.flush()
187
188
189 @classmethod
190 def error(cls, text="", *args, **kwargs):
191 """
192 Prints error to stderr, formatted with args and kwargs, in error colors if supported.
193
194 Raises exception instead if APIMODE.
195 """
196 if cls.APIMODEAPIMODE:
197 raise Exception(cls._format(text, *args, __once=False, **kwargs))
198 KWS = dict(__file=sys.stderr, __prefix=cls.ERROR_START, __suffix=cls.ERROR_END)
199 cls.print(text, *args, **dict(kwargs, **KWS))
200
201
202 @classmethod
203 def warn(cls, text="", *args, **kwargs):
204 """
205 Prints warning to stderr, or logs to logger if APIMODE.
206
207 Text is formatted with args and kwargs, in warning colors if supported.
208 """
209 if cls.APIMODEAPIMODE:
210 text = cls._format(text, *args, **kwargs)
211 if text: logging.getLogger(__name__).warning(text)
212 return
213 KWS = dict(__file=sys.stderr, __prefix=cls.WARN_START, __suffix=cls.WARN_END)
214 cls.print(text, *args, **dict(kwargs, **KWS))
216
217 @classmethod
218 def debug(cls, text="", *args, **kwargs):
219 """
220 Prints debug text to stderr, or logs to logger if APIMODE.
221
222 Text is formatted with args and kwargs, in warning colors if supported.
223 """
224 if cls.APIMODEAPIMODE:
225 text = cls._format(text, *args, **kwargs)
226 if text: logging.getLogger(__name__).debug(text)
227 return
228 KWS = dict(__file=sys.stderr, __prefix=cls.DEBUG_START, __suffix=cls.DEBUG_END)
229 cls.print(text, *args, **dict(kwargs, **KWS))
231
232 @classmethod
233 def log(cls, level, text="", *args, **kwargs):
234 """
235 Prints text to stderr, or logs to logger if APIMODE.
236
237 Text is formatted with args and kwargs, in level colors if supported.
238
239 @param level logging level like `logging.ERROR` or "ERROR"
240 """
241 if cls.APIMODEAPIMODE:
242 text = cls._format(text, *args, **kwargs)
243 if text: logging.getLogger(__name__).log(level, text)
244 return
245 level = logging.getLevelName(level)
246 if not isinstance(level, TEXT_TYPES): level = logging.getLevelName(level)
247 func = {"DEBUG": cls.debug, "WARNING": cls.warn, "ERROR": cls.error}.get(level, cls.print)
248 func(text, *args, **dict(kwargs, __file=sys.stderr))
249
250
251 @classmethod
252 def flush(cls):
253 """Ends current open line, if any."""
254 if cls._LINEOPEN_LINEOPEN: print()
255 cls._LINEOPEN_LINEOPEN = False
256
257
258 @classmethod
259 def _format(cls, text="", *args, **kwargs):
260 """
261 Returns text formatted with printf-style or format() arguments.
262
263 @param __once registers text, returns "" if text not unique
264 """
265 text, fmted = str(text), False
266 if kwargs.get("__once"):
267 if text in cls._UNIQUES: return ""
268 cls._UNIQUES.add(text)
269 for k in ("__file", "__end", "__once", "__prefix", "__suffix"): kwargs.pop(k, None)
270 try: text, fmted = (text % args if args else text), bool(args)
271 except Exception: pass
272 try: text, fmted = (text % kwargs if kwargs else text), fmted or bool(kwargs)
273 except Exception: pass
274 try: text = text.format(*args, **kwargs) if not fmted and (args or kwargs) else text
275 except Exception: pass
276 return text
277
278
279
280class ArgumentUtil(object):
281 """Namespace for program argument handling."""
282
283 UNSIGNED_INTS = {"NTH_MESSAGE", "NTH_MATCH", "MAX_COUNT", "MAX_PER_TOPIC", "MAX_TOPICS",
284 "BEFORE", "AFTER", "CONTEXT", "LINES_AROUND_MATCH", "MAX_FIELD_LINES",
285 "MAX_MESSAGE_LINES", "WRAP_WIDTH", "QUEUE_SIZE_IN", "QUEUE_SIZE_OUT"}
286 UNSIGNED_FLOATS = {"NTH_INTERVAL", "TIME_SCALE"}
287 SIGNED_INTS = {"START_INDEX", "END_INDEX", "START_LINE", "END_LINE"}
288 STRINGS = {"PUBLISH_PREFIX", "PUBLISH_SUFFIX", "PUBLISH_FIXNAME"}
289 STRING_COLLECTIONS = {"TOPIC", "SKIP_TOPIC", "TYPE", "SKIP_TYPE", "SELECT_FIELD",
290 "NO_SELECT_FIELD", "EMIT_FIELD", "NO_EMIT_FIELD", "MATCH_WRAPPER"}
291 NO_FLATTENS = {"WRITE"}
293 UNSIGNED_WHEN = {"START_INDEX": "LIVE", "END_INDEX": "LIVE"}
294 PRECASTS = {"NTH_INTERVAL": lambda v: import_item("grepros.api").to_sec(v)}
295 DEDUPE_UNLESS = {"PATTERN": "EXPRESSION"}
298 class HelpFormatter(argparse.RawTextHelpFormatter):
299 """RawTextHelpFormatter returning custom metavar for non-flattenable list arguments."""
301 def _format_action_invocation(self, action):
302 """Returns formatted invocation."""
303 # Avoids oververbose duplicate output like:
304 # --write TARGET [format=bag] [KEY=VALUE ...] [TARGET [format=bag] [KEY=VALUE ...] ...]
305 if action.dest in ArgumentUtil.NO_FLATTENS:
306 return " ".join(action.option_strings + [action.metavar])
307 return super(ArgumentUtil.HelpFormatter, self)._format_action_invocation(action)
308
309
310 @classmethod
311 def make_parser(cls, arguments, formatter=HelpFormatter):
312 """
313 Returns a configured ArgumentParser instance for program arguments.
314
315 @param arguments argparse options as {description, epilog, arguments: [], groups: []}
316 @param formatter help formatter class to use
317 """
318 kws = dict(description=arguments["description"], epilog=arguments["epilog"],
319 formatter_class=formatter, add_help=False)
320 if sys.version_info >= (3, 5): kws.update(allow_abbrev=False)
321 argparser = argparse.ArgumentParser(**kws)
322 for arg in map(dict, arguments["arguments"]):
323 argparser.add_argument(*arg.pop("args"), **arg)
324 for group, groupargs in arguments.get("groups", {}).items():
325 grouper = argparser.add_argument_group(group)
326 for arg in map(dict, groupargs):
327 grouper.add_argument(*arg.pop("args"), **arg)
328 return argparser
329
330
331 @classmethod
332 def validate(cls, args, cli=False):
333 """
334 Converts and validates program argument namespace, prints and raises on error.
335
336 Returns new namespace with arguments in expected type and form.
337 """
338 args = cls.flatten(args)
339 cls.transform(args, cli)
340 if not cls.verify(args):
341 raise Exception("Invalid arguments")
342 return args
343
344
345 @classmethod
346 def flatten(cls, args):
347 """Returns new program argument namespace with list values flattened and deduplicated."""
348 args = structcopy(args)
349 for k, v in vars(args).items():
350 if not isinstance(v, list): continue # for k, v
351 v2 = v
352 if k not in cls.NO_FLATTENS:
353 v2 = [x for xx in v2 for x in (xx if isinstance(xx, list) else [xx])]
354 if k not in cls.DEDUPE_UNLESS or not getattr(args, cls.DEDUPE_UNLESS[k], True):
355 v2 = [here.append(x) or x for here in ([],) for x in v2 if x not in here]
356 if v2 != v: setattr(args, k, v2)
357 return args
358
359
360 @classmethod
361 def transform(cls, args, cli=False):
362 """Sets command-line specific flag state to program argument namespace."""
363 if not cli: return
364
365 if args.CONTEXT:
366 args.BEFORE = args.AFTER = args.CONTEXT
367
368 # Show progress bar only if no console output
369 args.PROGRESS = args.PROGRESS and not args.CONSOLE
370
371 # Default to printing metadata for publish/write if no console output and no progress
372 args.VERBOSE = False if args.SKIP_VERBOSE else args.VERBOSE or \
373 (False if args.PROGRESS else cli and not args.CONSOLE)
374
375 # Print filename prefix on each console message line if not single specific file
376 args.LINE_PREFIX = args.LINE_PREFIX and (args.RECURSE or len(args.FILE) != 1
377 or args.PATH or any("*" in x for x in args.FILE))
378
379 for k, v in [("START_TIME", args.START_TIME), ("END_TIME", args.END_TIME)]:
380 if not isinstance(v, (six.binary_type, six.text_type)): continue # for v, k
381 try: v = float(v)
382 except Exception: pass # If numeric, retain string for source to read as relative time
383 try: v if isinstance(v, float) else setattr(args, k, parse_datetime(v))
384 except Exception: pass
385
386
387 @classmethod
388 def verify(cls, args):
389 """
390 Validates arguments, prints errors, returns success.
391
392 @param args arguments object like argparse.Namespace
393 """
394 errors = []
395
396 # Validate --write .. key=value
397 for opts in getattr(args, "WRITE", []): # List of lists, one for each --write
398 erropts = []
399 for opt in opts[1:]:
400 try: dict([opt.split("=", 1)])
401 except Exception: erropts.append(opt)
402 if erropts:
403 errors.append('Invalid KEY=VALUE in "--write %s": %s' %
404 (" ".join(opts), " ".join(erropts)))
405
406 for n in ("START_TIME", "END_TIME"):
407 v = getattr(args, n, None)
408 if v is None: continue # for v, n
409 try: v = float(v)
410 except Exception: pass
411 try: isinstance(v, (six.binary_type, six.text_type)) and parse_datetime(v)
412 except Exception: errors.append("Invalid ISO datetime for %s: %s" %
413 (n.lower().replace("_", " "), v))
414
415 errors.extend(cls.process_types(args))
416
417 for err in errors: ConsolePrinter.log(logging.ERROR, err)
418 return not errors
419
420
421 @classmethod
422 def process_types(cls, args):
423 """Converts and validates types in argument namespace, returns list of errors, if any."""
424 def cast(v, ctor):
425 try: return ctor(v), None
426 except Exception as e: return v, e
427
428 vals1, vals2, errors = vars(args), {}, {}
429
430 for k, f in cls.PRECASTS.items():
431 if vals1.get(k) is not None: vals1[k] = f(vals1[k])
432 for k, v in vals1.items():
433 if v is None: continue # for k, v
434 err = None
435 if k in cls.UNSIGNED_INTS | cls.SIGNED_INTS: v, err = cast(v, int)
436 elif k in cls.UNSIGNED_FLOATS: v, err = cast(v, float)
437 elif k in cls.STRINGS: v = str(v)
438 elif k in cls.STRING_COLLECTIONS:
439 v = [str(x) for x in (v if isinstance(v, (dict, list, set, tuple)) else [v])]
440 if not err and k in cls.UNSIGNED_INTS | cls.UNSIGNED_FLOATS and v < 0:
441 err = "Cannot be negative."
442 if not err and vals1.get(cls.UNSIGNED_WHEN.get(k)) and v < 0:
443 label = cls.UNSIGNED_WHEN[k].lower().replace("_", " ")
444 err = "Cannot be negative for %s." % label
445 (errors if err else vals2)[k] = err or v
446
447 error_texts = []
448 for k, err in errors.items():
449 text = "Invalid value for %s: %s" % (k.lower().replace("_", " "), getattr(args, k))
450 if isinstance(err, six.string_types): text += ". %s" % err
451 error_texts.append(text)
452 for k, v in vals2.items() if not errors else ():
453 setattr(args, k, v)
454 return error_texts
455
456
457
458class Decompressor(object):
459 """Decompresses zstandard archives."""
460
461
462 EXTENSIONS = (".zst", ".zstd")
463
464
465 ZSTD_MAGIC = b"\x28\xb5\x2f\xfd"
466
467
468 @classmethod
469 def decompress(cls, path, progress=False):
470 """
471 Decompresses file to same directory, showing optional progress bar.
472
473 @return uncompressed file path
474 """
475 cls.validate()
476 path2, bar, size, processed = os.path.splitext(path)[0], None, os.path.getsize(path), 0
477 fmt = lambda s: format_bytes(s, strip=False)
478 if progress:
479 tpl = " Decompressing %s (%s): {afterword}" % (os.path.basename(path), fmt(size))
480 bar = ProgressBar(pulse=True, aftertemplate=tpl)
481
482 ConsolePrinter.warn("Compressed file %s (%s), decompressing to %s.", path, fmt(size), path2)
483 bar and bar.update(0).start() # Start progress pulse
484 try:
485 with open(path, "rb") as f, open(path2, "wb") as g:
486 reader = zstandard.ZstdDecompressor().stream_reader(f)
487 while True:
488 chunk = reader.read(1048576)
489 if not chunk: break # while
490
491 g.write(chunk)
492 processed += len(chunk)
493 bar and (setattr(bar, "afterword", fmt(processed)), bar.update(processed))
494 reader.close()
495 except Exception:
496 os.remove(path2)
497 raise
498 finally: bar and (setattr(bar, "pulse", False), bar.update(processed).stop())
499 return path2
500
501
502 @classmethod
503 def is_compressed(cls, path):
504 """Returns whether file is a recognized archive."""
505 result = os.path.isfile(path)
506 if result:
507 result = any(str(path).lower().endswith(x) for x in cls.EXTENSIONS)
508 if result:
509 with open(path, "rb") as f:
510 result = (f.read(len(cls.ZSTD_MAGIC)) == cls.ZSTD_MAGIC)
511 return result
512
513
514 @classmethod
515 def make_decompressed_name(cls, path):
516 """Returns the path without archive extension, if any."""
517 return os.path.splitext(path)[0] if cls.is_compressed(path) else path
518
519
520 @classmethod
521 def validate(cls):
522 """Raises error if decompression library not available."""
523 if not zstandard: raise Exception("zstandard not installed, cannot decompress")
524
525
527class ProgressBar(threading.Thread):
528 """
529 A simple ASCII progress bar with a ticker thread
530
531 Drawn like
532 '[---------/ 36% ] Progressing text..'.
533 or for pulse mode
534 '[ ---- ] Progressing text..'.
535 """
536
537 def __init__(self, max=100, value=0, min=0, width=30, forechar="-",
538 backchar=" ", foreword="", afterword="", interval=1,
539 pulse=False, aftertemplate=" {afterword}", **afterargs):
540 """
541 Creates a new progress bar, without drawing it yet.
542
543 @param max progress bar maximum value, 100%
544 @param value progress bar initial value
545 @param min progress bar minimum value, for 0%
546 @param width progress bar width (in characters)
547 @param forechar character used for filling the progress bar
548 @param backchar character used for filling the background
549 @param foreword text in front of progress bar
550 @param afterword text after progress bar
551 @param interval ticker thread interval, in seconds
552 @param pulse ignore value-min-max, use constant pulse instead
553 @param aftertemplate afterword format() template, populated with vars(self) and afterargs
554 @param afterargs additional keywords for aftertemplate formatting
555 """
556 threading.Thread.__init__(self)
557 self.max = max
558 self.value = value
559 self.min = min
560 self.width = width
561 self.forechar = forechar
562 self.backchar = backchar
563 self.foreword = foreword
564 self.afterword = afterword
565 self.interval = interval
566 self.pulse = pulse
567 self.aftertemplate = aftertemplate
568 self.afterargs = afterargs
569 self.daemon = True # Daemon threads do not keep application running
570 self.percent = None # Current progress ratio in per cent
571 self.value = 0 # Current progress bar value
572 self.pause = False # Whether drawing is currently paused
573 self.pulse_pos = 0 # Current pulse position
574 self.bar = "%s[%s%s]%s" % (foreword,
575 backchar if pulse else forechar,
576 backchar * (width - 3),
577 aftertemplate.format(**dict(vars(self), **self.afterargs)))
578 self.printbar = self.bar # Printable text, with padding to clear previous
579 self.progresschar = itertools.cycle("-\\|/")
580 self.is_running = False
582
583 def update(self, value=None, draw=True, flush=False):
584 """Updates the progress bar value, and refreshes by default; returns self."""
585 if value is not None:
586 self.value = value if self.pulse else min(self.max, max(self.min, value))
587 args = dict(vars(self), **self.afterargs) if self.afterargs else vars(self)
588 afterword = self.aftertemplate.format(**args)
589 w_full = self.width - 2
590 if self.pulse:
591 if self.pulse_pos is None:
592 bartext = "%s[%s]%s" % (self.foreword,
593 self.forechar * (self.width - 2),
594 afterword)
595 else:
596 dash = self.forechar * max(1, int((self.width - 2) / 7))
597 pos = self.pulse_pos
598 if pos < len(dash):
599 dash = dash[:pos]
600 elif pos >= self.width - 1:
601 dash = dash[:-(pos - self.width - 2)]
602
603 bar = "[%s]" % (self.backchar * w_full)
604 # Write pulse dash into the middle of the bar
605 pos1 = min(self.width - 1, pos + 1)
606 bar = bar[:pos1 - len(dash)] + dash + bar[pos1:]
607 bartext = "%s%s%s" % (self.foreword, bar, afterword)
608 self.pulse_pos = (self.pulse_pos + 1) % (self.width + 2)
609 else:
610 percent = int(round(100.0 * self.value / (self.max or 1)))
611 percent = 99 if percent == 100 and self.value < self.max else percent
612 w_done = max(1, int(round((percent / 100.0) * w_full)))
613 # Build bar outline, animate by cycling last char from progress chars
614 char_last = self.forechar
615 if draw and w_done < w_full: char_last = next(self.progresschar)
616 bartext = "%s[%s%s%s]%s" % (
617 self.foreword, self.forechar * (w_done - 1), char_last,
618 self.backchar * (w_full - w_done), afterword)
619 # Write percentage into the middle of the bar
620 centertxt = " %2d%% " % percent
621 pos = len(self.foreword) + int(self.width / 2 - len(centertxt) / 2)
622 bartext = bartext[:pos] + centertxt + bartext[pos + len(centertxt):]
623 self.percent = percent
624 self.printbar = bartext + " " * max(0, len(self.bar) - len(bartext))
625 self.bar, prevbar = bartext, self.bar
626 if draw and (flush or prevbar != self.bar): self.draw(flush)
627 return self
628
629
630 def draw(self, flush=False):
631 """
632 Prints the progress bar, from the beginning of the current line.
633
634 @param flush add linefeed to end, forcing a new line for any next print
635 """
636 ConsolePrinter.print("\r" + self.printbar, __end=" ")
637 if len(self.printbar) != len(self.bar): # Draw twice to position caret at true content end
638 self.printbar = self.bar
639 ConsolePrinter.print("\r" + self.printbar, __end=" ")
640 if flush: ConsolePrinter.flush()
641
642
643 def run(self):
644 self.is_running = True
645 while self.is_running:
646 if not self.pause: self.update(self.value)
647 time.sleep(self.interval)
648
649
650 def stop(self):
651 self.is_running = False
652
653
655class LenIterable(object):
656 """Wrapper for iterable value with specified fixed length."""
657
658 def __init__(self, iterable, count):
659 """
660 @param iterable any iterable value
661 @param count value to return for len(self), or callable to return value from
662 """
663 self._iterer = iter(iterable)
664 self._count = count
665
666 def __iter__(self): return self
667
668 def __next__(self): return next(self._iterer)
669
670 def __len__(self): return self._count() if callable(self._count) else self._count
671
672
673
674class TextWrapper(object):
675 """
676 TextWrapper that supports custom substring widths in line width calculation.
677
678 Intended for wrapping text containing ANSI control codes.
679 Heavily refactored from Python standard library textwrap.TextWrapper.
680 """
681
683 SPACE_RGX = re.compile(r"([%s]+)" % re.escape("\t\n\x0b\x0c\r "))
684
685
686 LENCACHEMAX = 10000
687
688
689 def __init__(self, width=80, subsequent_indent=" ", break_long_words=True,
690 drop_whitespace=False, max_lines=None, placeholder=" ...", custom_widths=None):
691 """
692 @param width default maximum width to wrap at, 0 disables
693 @param subsequent_indent string prepended to all consecutive lines
694 @param break_long_words break words longer than width
695 @param drop_whitespace drop leading and trailing whitespace from lines
696 @param max_lines count to truncate lines from
697 @param placeholder appended to last retained line when truncating
698 @param custom_widths {substring: len} to use in line width calculation
699 """
700 self.width = width
701 self.subsequent_indent = subsequent_indent
702 self.break_long_words = break_long_words
703 self.drop_whitespace = drop_whitespace
704 self.max_lines = max_lines
705 self.placeholder = placeholder
706
707 self.lencache = {}
708 self.customs = {s: l for s, l in (custom_widths or {}).items() if s}
709 self.custom_lens = [(s, len(s) - l) for s, l in self.customs.items()]
710 self.custom_rgx = re.compile("(%s)" % "|".join(re.escape(s) for s in self.customs))
711 self.disabled = not self.width
712 self.minwidth = 1 + self.strlen(self.subsequent_indent) \
713 + self.strlen(self.placeholder if self.max_lines else "")
714 self.width = max(self.width, self.minwidth)
715 self.realwidth = self.width
718 def wrap(self, text):
719 """Returns a list of wrapped text lines, without linebreaks."""
720 if self.disabled: return [text]
721 result = []
722 for i, line in enumerate(text.splitlines()):
723 chunks = [c for c in self.SPACE_RGX.split(line) if c]
724 lines = self._wrap_chunks(chunks)
725 if i and lines and self.subsequent_indent:
726 lines[0] = self.subsequent_indent + lines[0]
727 result.extend(lines)
728 if self.max_lines and result and len(result) >= self.max_lines:
729 break # for i, line
730 if self.max_lines and result and (len(result) > self.max_lines
731 or len(result) == self.max_lines and not text.endswith(result[-1].strip())):
732 result = result[:self.max_lines]
733 if not result[-1].endswith(self.placeholder.lstrip()):
734 result[-1] += self.placeholder
735 if len(self.lencache) > self.LENCACHEMAX: self.lencache.clear()
736 return result
737
738
739 def reserve_width(self, reserved=""):
740 """Decreases the configured width by given amount (number or string)."""
741 reserved = self.strlen(reserved) if isinstance(reserved, TEXT_TYPES) else reserved
742 self.width = max(self.minwidth, self.realwidth - reserved)
743
744
745 def strlen(self, v):
746 """Returns length of string, using custom substring widths."""
747 if v not in self.lencache:
748 self.lencache[v] = len(v) - sum(v.count(s) * ld for s, ld in self.custom_lens)
749 return self.lencache[v]
750
751
752 def strip(self, v):
753 """Returns string with custom substrings and whitespace stripped."""
754 return self.custom_rgx.sub("", v).strip()
755
756
757 def _wrap_chunks(self, chunks):
758 """Returns a list of lines joined from text chunks, wrapped to width."""
759 lines = []
760 chunks.reverse() # Reverse for efficient popping
761
762 placeholder_len = self.strlen(self.placeholder)
763 while chunks:
764 cur_line, cur_len = [], 0 # [chunk, ], sum(map(len, cur_line))
765 indent = self.subsequent_indent if lines else ""
766 width = self.width - self.strlen(indent)
767
768 if self.drop_whitespace and lines and not self.strip(chunks[-1]):
769 del chunks[-1] # Drop initial whitespace on subsequent lines
770
771 while chunks:
772 l = self.strlen(chunks[-1])
773 if cur_len + l <= width:
774 cur_line.append(chunks.pop())
775 cur_len += l
776 else: # Line full
777 break # while chunks (inner-while)
778
779 if chunks and self.strlen(chunks[-1]) > width:
780 # Current line is full, and next chunk is too big to fit on any line
781 self._handle_long_word(chunks, cur_line, cur_len, width)
782 cur_len = sum(map(self.strlen, cur_line))
783
784 if self.drop_whitespace and cur_line and not self.strip(cur_line[-1]):
785 cur_len -= len(cur_line[-1]) # Drop line last whitespace chunk
786 del cur_line[-1]
787
788 if cur_line:
789 if (self.max_lines is None or len(lines) + 1 < self.max_lines
790 or (not chunks or self.drop_whitespace
791 and len(chunks) == 1 and not self.strip(chunks[0])) \
792 and cur_len <= width): # Current line ok
793 lines.append(indent + "".join(cur_line))
794 continue # while chunks
795 else:
796 continue # while chunks
797
798 while cur_line: # Truncate for max_lines
799 if self.strip(cur_line[-1]):
800 if cur_len + placeholder_len <= width:
801 lines.append(indent + "".join(cur_line))
802 break # while cur_line
803 if len(cur_line) == 1:
804 lines.append(indent + cur_line[-1])
805 cur_len -= self.strlen(cur_line[-1])
806 del cur_line[-1]
807 else:
808 if not lines or self.strlen(lines[-1]) + placeholder_len > self.width:
809 lines.append(indent + self.placeholder.lstrip())
810 break # while chunks
811
812 return lines
813
814
815 def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width):
816 """
817 Breaks last chunk if not only containing a custom-width string,
818 else adds last chunk to current line if line still empty.
819 """
820 text = reversed_chunks[-1]
821 break_pos = 1 if width < 1 else width - cur_len
822 breakable = self.break_long_words and text not in self.customs
823 if breakable:
824 unbreakable_spans = [m.span() for m in self.custom_rgx.finditer(text)]
825 text_in_spans = [x for x in unbreakable_spans if x[0] <= break_pos < x[1]]
826 last_span = text_in_spans and sorted(text_in_spans, key=lambda x: -x[1])[0]
827 break_pos = last_span[1] if last_span else break_pos
828 breakable = 0 < break_pos < len(text)
829
830 if breakable:
831 cur_line.append(text[:break_pos])
832 reversed_chunks[-1] = text[break_pos:]
833 elif not cur_line:
834 cur_line.append(reversed_chunks.pop())
835
836
837def drop_zeros(v, replace=""):
838 """Drops trailing zeros and empty decimal separator, if any."""
839 repl = lambda m: ("." if m[1] or replace else "") + (m[1] or "") + len(m[2]) * replace
840 return re.sub(r"\.(\d*[1-9])?(0+)$", repl, str(v))
841
842
843def ellipsize(text, limit, ellipsis=".."):
844 """Returns text ellipsized if beyond limit."""
845 if limit <= 0 or len(text) <= limit:
846 return text
847 return text[:max(0, limit - len(ellipsis))] + ellipsis
848
849
850def ensure_namespace(val, defaults=None, dashify=("WRITE_OPTIONS", ), **kwargs):
851 """
852 Returns a copy of value as `argparse.Namespace`, with all keys uppercase.
853
854 Arguments with list/tuple values in defaults are ensured to have list/tuple values.
855
856 @param val `argparse.Namespace` or dictionary or `None`
857 @param defaults additional arguments to set to namespace if missing
858 @param dashify names of dictionary arguments where to replace
859 the first underscore in string keys with a dash
860 @param kwargs any and all argument overrides as keyword overrides
861 """
862 if val is None or isinstance(val, dict): val = argparse.Namespace(**val or {})
863 else: val = structcopy(val)
864 for k, v in list(vars(val).items()):
865 if not k.isupper():
866 delattr(val, k)
867 setattr(val, k.upper(), v)
868 for k, v in ((k.upper(), v) for k, v in (defaults.items() if defaults else ())):
869 if not hasattr(val, k): setattr(val, k, structcopy(v))
870 for k, v in ((k.upper(), v) for k, v in kwargs.items()): setattr(val, k, v)
871 for k, v in ((k.upper(), v) for k, v in (defaults.items() if defaults else ())):
872 if isinstance(v, (tuple, list)) and not isinstance(getattr(val, k), (tuple, list)):
873 setattr(val, k, [getattr(val, k)])
874 for arg in (getattr(val, n.upper(), None) for n in dashify or ()):
875 for k in (list(arg) if isinstance(arg, dict) else []):
876 if isinstance(k, six.text_type) and "_" in k and 0 < k.index("_") < len(k) - 1:
877 arg[k.replace("_", "-", 1)] = arg.pop(k)
878 return val
879
880
881def filter_dict(dct, keys=(), values=(), reverse=False):
882 """
883 Filters string dictionary by keys and values, supporting * wildcards.
884 Dictionary values may be additional lists; keys with emptied lists are dropped.
885
886 Retains only entries that find a match (supports * wildcards);
887 if reverse, retains only entries that do not find a match.
888 """
889 result = type(dct)()
890 kpatterns = [wildcard_to_regex(x, end=True) for x in keys]
891 vpatterns = [wildcard_to_regex(x, end=True) for x in values]
892 for k, vv in dct.items() if not reverse else ():
893 is_array = isinstance(vv, (list, tuple))
894 for v in (vv if is_array else [vv]):
895 if (not keys or k in keys or any(p.match(k) for p in kpatterns)) \
896 and (not values or v in values or any(p.match(v) for p in vpatterns)):
897 result.setdefault(k, []).append(v) if is_array else result.update({k: v})
898 for k, vv in dct.items() if reverse else ():
899 is_array = isinstance(vv, (list, tuple))
900 for v in (vv if is_array else [vv]):
901 if (k not in keys and not any(p.match(k) for p in kpatterns)) \
902 and (v not in values and not any(p.match(v) for p in vpatterns)):
903 result.setdefault(k, []).append(v) if is_array else result.update({k: v})
904 return result
906
907def find_files(names=(), paths=(), suffixes=(), skip_suffixes=(), recurse=False):
908 """
909 Yields filenames from current directory or given paths.
910
911 Seeks only files with given extensions if names not given.
912 Logs errors for names and paths not found.
913
914 @param names list of specific files to return (supports * wildcards)
915 @param paths list of paths to look under, if not using current directory
916 @param suffixes list of suffixes to select if no wilcarded names, as (".ext1", ..)
917 @param skip_suffixes list of suffixes to skip if no wildcarded names, as (".ext1", ..)
918 @param recurse whether to recurse into subdirectories
919 """
920 namesfound, pathsfound = set(), set()
921 ok = lambda f: (not suffixes or any(map(f.endswith, suffixes))) \
922 and not any(map(f.endswith, skip_suffixes))
923 def iter_files(directory):
924 """Yields matching filenames from path."""
925 if os.path.isfile(directory):
926 ConsolePrinter.log(logging.ERROR, "%s: Is a file", directory)
927 return
928 for root in sorted(glob.glob(directory)): # Expand * wildcards, if any
929 pathsfound.add(directory)
930 for path, _, files in os.walk(root):
931 for n in names:
932 p = n if not paths or os.path.isabs(n) else os.path.join(path, n)
933 for f in (f for f in glob.glob(p) if "*" not in n or ok(f)):
934 if os.path.isdir(f):
935 ConsolePrinter.log(logging.ERROR, "%s: Is a directory", f)
936 continue # for f
937 namesfound.add(n)
938 yield f
939 for f in () if names else (os.path.join(path, f) for f in sorted(files) if ok(f)):
940 yield f
941 if not recurse:
942 break # for root
943
944 processed = set()
945 for f in (f for p in paths or ["."] for f in iter_files(p)):
946 if os.path.abspath(f) not in processed:
947 processed.add(os.path.abspath(f))
948 if not paths and f == os.path.join(".", os.path.basename(f)):
949 f = os.path.basename(f) # Strip leading "./"
950 yield f
951
952 for path in (p for p in paths if p not in pathsfound):
953 ConsolePrinter.log(logging.ERROR, "%s: No such directory", path)
954 for name in (n for n in names if n not in namesfound):
955 ConsolePrinter.log(logging.ERROR, "%s: No such file", name)
956
957
958def format_timedelta(delta):
959 """Formats the datetime.timedelta as "3d 40h 23min 23.1sec"."""
960 dd, rem = divmod(delta.total_seconds(), 24*3600)
961 hh, rem = divmod(rem, 3600)
962 mm, ss = divmod(rem, 60)
963 items = []
964 for c, n in (dd, "d"), (hh, "h"), (mm, "min"), (ss, "sec"):
965 f = "%d" % c if "sec" != n else drop_zeros(round(c, 9))
966 if f != "0": items += [f + n]
967 return " ".join(items or ["0sec"])
968
969
970def format_bytes(size, precision=2, inter=" ", strip=True):
971 """Returns a formatted byte size (like 421.40 MB), trailing zeros optionally removed."""
972 result = "" if math.isinf(size) or math.isnan(size) else "0 bytes"
973 if size and result:
974 UNITS = ["bytes"] + [x + "B" for x in "KMGTPEZY"]
975 size, sign = abs(size), ("-" if size < 0 else "")
976 exponent = min(int(math.log(size, 1024)), len(UNITS) - 1)
977 result = "%.*f" % (precision, size / (1024. ** exponent))
978 if strip: result = drop_zeros(result)
979 result = sign + result + inter + (UNITS[exponent] if result != "1" or exponent else "byte")
980 return result
981
982
983def format_stamp(stamp):
984 """Returns ISO datetime from UNIX timestamp."""
985 return datetime.datetime.fromtimestamp(stamp).isoformat(sep=" ")
986
988def get_name(obj):
989 """
990 Returns the fully namespaced name for a Python module, class, function or object.
991
992 E.g. "my.thing" or "my.module.MyCls" or "my.module.MyCls.my_method"
993 or "my.module.MyCls<0x1234abcd>" or "my.module.MyCls<0x1234abcd>.my_method".
994 """
995 namer = lambda x: getattr(x, "__qualname__", getattr(x, "__name__", ""))
996 if inspect.ismodule(obj): return namer(obj)
997 if inspect.isclass(obj): return ".".join((obj.__module__, namer(obj)))
998 if inspect.isroutine(obj):
999 parts = []
1000 try: self = six.get_method_self(obj)
1001 except Exception: self = None
1002 if self is not None: parts.extend((get_name(self), obj.__name__))
1003 elif hasattr(obj, "im_class"): parts.extend((get_name(obj.im_class), namer(obj))) # Py2
1004 elif hasattr(obj, "__module__"): parts.extend((obj.__module__, namer(obj)))
1005 else: parts.append(namer(obj))
1006 return ".".join(parts)
1007 cls = type(obj)
1008 return "%s.%s<0x%x>" % (cls.__module__, namer(cls), id(obj))
1009
1010
1011def has_arg(func, name):
1012 """Returns whether function supports taking specified argument by name."""
1013 spec = getattr(inspect, "getfullargspec", getattr(inspect, "getargspec", None))(func) # Py3/Py2
1014 return name in spec.args or name in getattr(spec, "kwonlyargs", ()) or \
1015 bool(getattr(spec, "varkw", None) or getattr(spec, "keywords", None))
1016
1017
1018def import_item(name):
1019 """
1020 Returns imported module, or identifier from imported namespace; raises on error.
1021
1022 @param name Python module name like "my.module"
1023 or module namespace identifier like "my.module.Class"
1024 """
1025 result, parts = None, name.split(".")
1026 for i, item in enumerate(parts):
1027 path, success = ".".join(parts[:i + 1]), False
1028 try: result, success = importlib.import_module(path), True
1029 except ImportError: pass
1030 if not success and i:
1031 try: result, success = getattr(result, item), True
1032 except AttributeError: pass
1033 if not success:
1034 raise ImportError("No module or identifier named %r" % path)
1035 return result
1036
1037
1038def is_iterable(value):
1039 """Returns whether value is iterable."""
1040 try: iter(value)
1041 except Exception: return False
1042 return True
1043
1044
1045def is_stream(value):
1046 """Returns whether value is a file-like object."""
1047 try: return isinstance(value, (file, io.IOBase)) # Py2
1048 except NameError: return isinstance(value, io.IOBase) # Py3
1049
1050
1051def makedirs(path):
1052 """Creates directory structure for path if not already existing."""
1053 parts, accum = list(filter(bool, os.path.realpath(path).split(os.sep))), []
1054 while parts:
1055 accum.append(parts.pop(0))
1056 curpath = os.path.join(os.sep, accum[0] + os.sep, *accum[1:]) # Windows drive letter thing
1057 if not os.path.exists(curpath):
1058 os.mkdir(curpath)
1059
1060
1061def structcopy(value):
1062 """
1063 Returns a deep copy of a standard data structure (dict, list, set, tuple),
1064 other object types reused instead of copied.
1065 """
1066 COLLECTIONS = (dict, list, set, tuple)
1067 memo = {}
1068 def collect(x): # Walk structure and collect objects to skip copying
1069 if isinstance(x, argparse.Namespace): x = vars(x)
1070 if not isinstance(x, COLLECTIONS): return memo.update([(id(x), x)])
1071 for y in sum(map(list, x.items()), []) if isinstance(x, dict) else x: collect(y)
1072 collect(value)
1073 return copy.deepcopy(value, memo)
1074
1075
1076def memoize(func):
1077 """Returns a results-caching wrapper for the function, cache used if arguments hashable."""
1078 cache = {}
1079 def inner(*args, **kwargs):
1080 key = args + sum(kwargs.items(), ())
1081 try: hash(key)
1082 except Exception: return func(*args, **kwargs)
1083 if key not in cache:
1084 cache[key] = func(*args, **kwargs)
1085 return cache[key]
1086 return functools.update_wrapper(inner, func)
1087
1088
1089def merge_dicts(d1, d2):
1090 """Merges d2 into d1, recursively for nested dicts, returns d1."""
1091 for k, v in d2.items():
1092 if k in d1 and isinstance(v, dict) and isinstance(d1[k], dict):
1093 merge_dicts(d1[k], v)
1094 else:
1095 d1[k] = v
1096 return d1
1097
1098
1099def merge_spans(spans, join_blanks=False):
1100 """
1101 Returns a sorted list of (start, end) spans with overlapping spans merged.
1102
1103 @param join_blanks whether to merge consecutive zero-length spans,
1104 e.g. [(0, 0), (1, 1)] -> [(0, 1)]
1105 """
1106 result = sorted(spans)
1107 if result and join_blanks:
1108 blanks = [(a, b) for a, b in result if a == b]
1109 others = [(a, b) for a, b in result if a != b]
1110 others.extend(blanks[:1])
1111 for span in blanks[1:]:
1112 if span[0] == others[-1][1] + 1:
1113 others[-1] = (others[-1][0], span[1])
1114 else:
1115 others.append(span)
1116 result = sorted(others)
1117 result, rest = result[:1], result[1:]
1118 for span in rest:
1119 if span[0] <= result[-1][1]:
1120 result[-1] = (result[-1][0], max(span[1], result[-1][1]))
1121 else:
1122 result.append(span)
1123 return result
1124
1125
1126def parse_datetime(text):
1127 """Returns datetime object from ISO datetime string (may be partial). Raises if invalid."""
1128 BASE = re.sub(r"\D", "", datetime.datetime.min.isoformat()) # "00010101000000"
1129 text = re.sub(r"\D", "", text)
1130 text += BASE[len(text):] if text else ""
1131 dt = datetime.datetime.strptime(text[:len(BASE)], "%Y%m%d%H%M%S")
1132 return dt + datetime.timedelta(microseconds=int(text[len(BASE):][:6] or "0"))
1133
1134
1135def parse_number(value, suffixes=None):
1136 """
1137 Returns an integer parsed from text, raises on error.
1138
1139 @param value text or binary string to parse, may contain abbrevations like "12K"
1140 @param suffixes a dictionary of multipliers like {"K": 1024}, case-insensitive
1141 """
1142 value, suffix = value.decode() if isinstance(value, six.binary_type) else value, None
1143 if suffixes:
1144 suffix = next((k for k, v in suffixes.items() if value.lower().endswith(k.lower())), None)
1145 value = value[:-len(suffix)] if suffix else value
1146 return int(float(value) * (suffixes[suffix] if suffix else 1))
1147
1148
1149def path_to_regex(text, sep=".", wildcard="*", end=False, intify=False):
1150 """
1151 Returns re.Pattern for matching path strings with optional integer indexes.
1152
1153 @param text separated wildcarded path pattern like "foo*.bar"
1154 @param sep path parts separator, optional
1155 @param wildcard simple wildcard to convert to Python wildcard pattern, optional
1156 @param end whether pattern should match until end (terminates with $)
1157 @param intify whether path should match optional integer index between parts,
1158 like "foo.bar" as "foo(\.\d+)?\.bar"
1159 """
1160 pattern, split_wild = "", lambda x: x.split(wildcard) if wildcard else [x]
1161 for i, part in enumerate(text.split(sep) if sep else [text]):
1162 pattern += (r"(%s\d+)?" % re.escape(sep)) if i and intify else ""
1163 pattern += (re.escape(sep) if i else "") + ".*".join(map(re.escape, split_wild(part)))
1164 return re.compile(pattern + ("$" if end else ""), re.I)
1165
1166
1167def plural(word, items=None, numbers=True, single="1", sep=",", pref="", suf=""):
1168 """
1169 Returns the word as 'count words', or '1 word' if count is 1,
1170 or 'words' if count omitted.
1171
1172 @param items item collection or count,
1173 or None to get just the plural of the word
1174 @param numbers if False, count is omitted from final result
1175 @param single prefix to use for word if count is 1, e.g. "a"
1176 @param sep thousand-separator to use for count
1177 @param pref prefix to prepend to count, e.g. "~150"
1178 @param suf suffix to append to count, e.g. "150+"
1179 """
1180 count = len(items) if hasattr(items, "__len__") else items or 0
1181 isupper = word[-1:].isupper()
1182 suffix = "es" if word and word[-1:].lower() in "sxyz" \
1183 and not word[-2:].lower().endswith("ay") \
1184 else "s" if word else ""
1185 if count != 1 and "es" == suffix and "y" == word[-1:].lower():
1186 word = word[:-1] + ("I" if isupper else "i")
1187 if isupper: suffix = suffix.upper()
1188 result = word + ("" if 1 == count else suffix)
1189 if numbers and items is not None:
1190 if 1 == count: fmtcount = single
1191 elif not count: fmtcount = "0"
1192 elif sep: fmtcount = "".join([
1193 x + (sep if i and not i % 3 else "") for i, x in enumerate(str(count)[::-1])
1194 ][::-1])
1195 else: fmtcount = str(count)
1196
1197 fmtcount = pref + fmtcount + suf
1198 result = "%s %s" % (single if 1 == count else fmtcount, result)
1199 return result.strip()
1200
1201
1202def unique_path(pathname, empty_ok=False):
1203 """
1204 Returns a unique version of the path.
1205
1206 If a file or directory with the same name already exists, returns a unique
1207 version (e.g. "/tmp/my.2.file" if ""/tmp/my.file" already exists).
1208
1209 @param empty_ok whether to ignore existence if file is empty
1210 """
1211 result = pathname
1212 if "linux2" == sys.platform and six.PY2 and isinstance(result, six.text_type) \
1213 and "utf-8" != sys.getfilesystemencoding():
1214 result = result.encode("utf-8") # Linux has trouble if locale not UTF-8
1215 if os.path.isfile(result) and empty_ok and not os.path.getsize(result):
1216 return result if isinstance(result, STRING_TYPES) else str(result)
1217 path, name = os.path.split(result)
1218 base, ext = os.path.splitext(name)
1219 if len(name) > 255: # Filesystem limitation
1220 name = base[:255 - len(ext) - 2] + ".." + ext
1221 result = os.path.join(path, name)
1222 counter = 2
1223 while os.path.exists(result):
1224 suffix = ".%s%s" % (counter, ext)
1225 name = base + suffix
1226 if len(name) > 255:
1227 name = base[:255 - len(suffix) - 2] + ".." + suffix
1228 result = os.path.join(path, name)
1229 counter += 1
1230 return result
1231
1232
1233def verify_io(f, mode):
1234 """
1235 Returns whether stream or file path can be read from and/or written to as binary.
1236
1237 Prints or raises error if not.
1238
1239 Tries to open file in append mode if verifying path writability,
1240 auto-creating missing directories if any, will delete any file or directory created.
1241
1242 @param f file path, or stream
1243 @param mode "r" for readable, "w" for writable, "a" for readable and writable
1244 """
1245 result, op = True, ""
1246 if is_stream(f):
1247 try:
1248 pos = f.tell()
1249 if mode in ("r", "a"):
1250 op = " reading from"
1251 result = isinstance(f.read(1), bytes)
1252 if result and mode in ("w", "a"):
1253 op = " writing to"
1254 result, _ = True, f.write(b"")
1255 f.seek(pos)
1256 return result
1257 except Exception as e:
1258 ConsolePrinter.log(logging.ERROR, "Error%s %s: %s", op, type(f).__name__, e)
1259 return False
1260
1261 present, paths_created = os.path.exists(f), []
1262 try:
1263 if not present and mode in ("w", "a"):
1264 op = " writing to"
1265 path = os.path.realpath(os.path.dirname(f))
1266 parts, accum = [x for x in path.split(os.sep) if x], []
1267 while parts:
1268 accum.append(parts.pop(0))
1269 curpath = os.path.join(os.sep, accum[0] + os.sep, *accum[1:]) # Windows drive letter thing
1270 if not os.path.exists(curpath):
1271 os.mkdir(curpath)
1272 paths_created.append(curpath)
1273 elif not present and "r" == mode:
1274 return False
1275 op = " opening"
1276 with open(f, {"r": "rb", "w": "ab", "a": "ab+"}[mode]) as g:
1277 if mode in ("r", "a"):
1278 op = " reading from"
1279 result = isinstance(g.read(1), bytes)
1280 if result and mode in ("w", "a"):
1281 op = " writing to"
1282 result, _ = True, g.write(b"")
1283 return result
1284 except Exception as e:
1285 ConsolePrinter.log(logging.ERROR, "Error%s %s: %s", op, f, e)
1286 return False
1287 finally:
1288 if not present:
1289 try: os.remove(f)
1290 except Exception: pass
1291 for path in paths_created[::-1]:
1292 try: os.rmdir(path)
1293 except Exception: pass
1294
1295
1296def wildcard_to_regex(text, end=False):
1297 """
1298 Returns plain wildcard like "foo*bar" as re.Pattern("foo.*bar", re.I).
1299
1300 @param end whether pattern should match until end (adds $)
1301 """
1302 suff = "$" if end else ""
1303 return re.compile(".*".join(map(re.escape, text.split("*"))) + suff, re.I)
1304
1305
1306__all__ = [
1307 "PATH_TYPES", "ArgumentUtil", "ConsolePrinter", "Decompressor", "LenIterable", "MatchMarkers",
1308 "ProgressBar", "TextWrapper", "drop_zeros", "ellipsize", "ensure_namespace", "filter_dict",
1309 "find_files", "format_bytes", "format_stamp", "format_timedelta", "get_name", "has_arg",
1310 "import_item", "is_iterable", "is_stream", "makedirs", "memoize", "merge_dicts", "merge_spans",
1311 "parse_datetime", "parse_number", "path_to_regex", "plural", "unique_path", "verify_io",
1312 "wildcard_to_regex",
1313]
RawTextHelpFormatter returning custom metavar for non-flattenable list arguments.
Definition common.py:307
Namespace for program argument handling.
Definition common.py:289
verify(cls, args)
Validates arguments, prints errors, returns success.
Definition common.py:404
process_types(cls, args)
Converts and validates types in argument namespace, returns list of errors, if any.
Definition common.py:433
transform(cls, args, cli=False)
Sets command-line specific flag state to program argument namespace.
Definition common.py:372
make_parser(cls, arguments, formatter=HelpFormatter)
Returns a configured ArgumentParser instance for program arguments.
Definition common.py:328
flatten(cls, args)
Returns new program argument namespace with list values flattened and deduplicated.
Definition common.py:357
validate(cls, args, cli=False)
Converts and validates program argument namespace, prints and raises on error.
Definition common.py:348
Prints to console, supports color output.
Definition common.py:80
print(cls, text="", *args, **kwargs)
Prints text, formatted with args and kwargs.
Definition common.py:179
flush(cls)
Ends current open line, if any.
Definition common.py:259
bool APIMODE
Whether logging debugs and warnings and raising errors, instead of printing.
Definition common.py:104
log(cls, level, text="", *args, **kwargs)
Prints text to stderr, or logs to logger if APIMODE.
Definition common.py:247
int WIDTH
Console width in characters, updated from shutil and curses.
Definition common.py:98
error(cls, text="", *args, **kwargs)
Prints error to stderr, formatted with args and kwargs, in error colors if supported.
Definition common.py:202
init_terminal(cls)
Initializes terminal for color output, or disables color output if unsupported.
Definition common.py:142
debug(cls, text="", *args, **kwargs)
Prints debug text to stderr, or logs to logger if APIMODE.
Definition common.py:230
configure(cls, color=True, apimode=False)
Initializes printer, for terminal output or library mode.
Definition common.py:131
dict PRINTS
{sys.stdout: number of texts printed, sys.stderr: ..}
Definition common.py:101
warn(cls, text="", *args, **kwargs)
Prints warning to stderr, or logs to logger if APIMODE.
Definition common.py:215
COLOR
Whether using colors in output.
Definition common.py:95
Decompresses zstandard archives.
Definition common.py:469
decompress(cls, path, progress=False)
Decompresses file to same directory, showing optional progress bar.
Definition common.py:485
is_compressed(cls, path)
Returns whether file is a recognized archive.
Definition common.py:514
make_decompressed_name(cls, path)
Returns the path without archive extension, if any.
Definition common.py:526
validate(cls)
Raises error if decompression library not available.
Definition common.py:532
tuple EXTENSIONS
Supported archive extensions.
Definition common.py:473
str ZSTD_MAGIC
zstd file header magic start bytes
Definition common.py:476
Wrapper for iterable value with specified fixed length.
Definition common.py:666
__init__(self, iterable, count)
Definition common.py:674
populate(cls, value)
Populates highlight markers with specified value.
Definition common.py:65
str EMPTY_REPL
Replacement for empty string match.
Definition common.py:62
str END
Placeholder at end of match.
Definition common.py:58
str START
Placeholder in front of match.
Definition common.py:56
str ID
Unique marker for match highlight replacements.
Definition common.py:54
str EMPTY
Placeholder for empty string match.
Definition common.py:60
A simple ASCII progress bar with a ticker thread.
Definition common.py:546
__init__(self, max=100, value=0, min=0, width=30, forechar="-", backchar=" ", foreword="", afterword="", interval=1, pulse=False, aftertemplate=" {afterword}", **afterargs)
Creates a new progress bar, without drawing it yet.
Definition common.py:566
update(self, value=None, draw=True, flush=False)
Updates the progress bar value, and refreshes by default; returns self.
Definition common.py:594
draw(self, flush=False)
Prints the progress bar, from the beginning of the current line.
Definition common.py:646
TextWrapper that supports custom substring widths in line width calculation.
Definition common.py:692
strip(self, v)
Returns string with custom substrings and whitespace stripped.
Definition common.py:765
wrap(self, text)
Returns a list of wrapped text lines, without linebreaks.
Definition common.py:731
int LENCACHEMAX
Max length of strlen cache.
Definition common.py:698
reserve_width(self, reserved="")
Decreases the configured width by given amount (number or string).
Definition common.py:752
SPACE_RGX
Regex for breaking text at whitespace.
Definition common.py:695
strlen(self, v)
Returns length of string, using custom substring widths.
Definition common.py:758
path_to_regex(text, sep=".", wildcard="*", end=False, intify=False)
Returns re.Pattern for matching path strings with optional integer indexes.
Definition common.py:1173
format_stamp(stamp)
Returns ISO datetime from UNIX timestamp.
Definition common.py:1000
parse_datetime(text)
Returns datetime object from ISO datetime string (may be partial).
Definition common.py:1141
is_iterable(value)
Returns whether value is iterable.
Definition common.py:1054
wildcard_to_regex(text, end=False)
Returns plain wildcard like "foo*bar" as re.Pattern("foo.*bar", re.I).
Definition common.py:1315
merge_spans(spans, join_blanks=False)
Returns a sorted list of (start, end) spans with overlapping spans merged.
Definition common.py:1120
unique_path(pathname, empty_ok=False)
Returns a unique version of the path.
Definition common.py:1224
is_stream(value)
Returns whether value is a file-like object.
Definition common.py:1061
has_arg(func, name)
Returns whether function supports taking specified argument by name.
Definition common.py:1028
filter_dict(dct, keys=(), values=(), reverse=False)
Filters string dictionary by keys and values, supporting * wildcards.
Definition common.py:905
find_files(names=(), paths=(), suffixes=(), skip_suffixes=(), recurse=False)
Yields filenames from current directory or given paths.
Definition common.py:936
format_timedelta(delta)
Formats the datetime.timedelta as "3d 40h 23min 23.1sec".
Definition common.py:975
structcopy(value)
Returns a deep copy of a standard data structure (dict, list, set, tuple), other object types reused ...
Definition common.py:1081
parse_number(value, suffixes=None)
Returns an integer parsed from text, raises on error.
Definition common.py:1156