grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
outputs.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3Main outputs for emitting messages.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 23.10.2021
11@modified 21.04.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.outputs
15from __future__ import print_function
16import atexit
17import collections
18import datetime
19import os
20import re
21import sys
22
23import six
24import yaml
25
26from . import api
27from . import common
28from . common import ArgumentUtil, ConsolePrinter, MatchMarkers
29from . inputs import Source
30
31
32class Sink(object):
33 """Output base class."""
34
35
36 FILE_EXTENSIONS = ()
37
38
39 DEFAULT_ARGS = dict(META=False)
40
41 def __init__(self, args=None, **kwargs):
42 """
43 @param args arguments as namespace or dictionary, case-insensitive
44 @param args.meta whether to emit metainfo
45 @param kwargs any and all arguments as keyword overrides, case-insensitive
46 """
47 self._batch_meta = {} # {source batch: "source metadata"}
48 self._counts = {} # {(topic, typename, typehash): count}
49
50 self.args = common.ensure_namespace(args, Sink.DEFAULT_ARGS, **kwargs)
52 self.valid = None
54 self.source = Source(self.args)
56 def __enter__(self):
57 """Context manager entry."""
58 return self
59
60 def __exit__(self, exc_type, exc_value, traceback):
61 """Context manager exit, closes sink."""
62 self.close()
63
64 def emit_meta(self):
65 """Outputs source metainfo like bag header as debug stream, if not already emitted."""
66 batch = self.args.META and self.source.get_batch()
67 if self.args.META and batch not in self._batch_meta:
68 meta = self._batch_meta[batch] = self.source.format_meta()
69 meta and ConsolePrinter.debug(meta)
70
71 def emit(self, topic, msg, stamp=None, match=None, index=None):
72 """
73 Outputs ROS message.
74
75 @param topic full name of ROS topic the message is from
76 @param msg ROS message
77 @param stamp message ROS timestamp, if not current ROS time
78 @param match ROS message with values tagged with match markers if matched, else None
79 @param index message index in topic, if any
80 """
81 topickey = api.TypeMeta.make(msg, topic).topickey
82 self._counts[topickey] = self._counts.get(topickey, 0) + 1
83
84 def bind(self, source):
85 """Attaches source to sink."""
86 self.source = source
87
88 def configure(self, args=None, **kwargs):
89 """
90 Updates sink configuration.
91
92 @param args arguments as namespace or dictionary, case-insensitive
93 @param kwargs any and all arguments as keyword overrides, case-insensitive
94 """
95 self.args = common.ensure_namespace(args, vars(self.args), **kwargs)
96 self.valid = None
97
98 def validate(self):
99 """Returns whether sink prerequisites are met (like ROS environment set if LiveSink)."""
100 if self.valid is not None: return self.valid
101 try: self.args, self.valid = ArgumentUtil.validate(self.args), True
102 except Exception: self.valid = False
103 return self.valid
104
105 def close(self):
106 """Shuts down output, closing any files or connections."""
107 self._batch_meta.clear()
108 self._counts.clear()
109
110 def flush(self):
111 """Writes out any pending data to disk."""
112
113 def thread_excepthook(self, text, exc):
114 """Handles exception, used by background threads."""
115 ConsolePrinter.error(text)
116
117 def is_highlighting(self):
118 """Returns whether this sink requires highlighted matches."""
119 return False
120
121 @classmethod
122 def autodetect(cls, target):
123 """Returns true if target is recognizable as output for this sink class."""
124 ext = os.path.splitext(target or "")[-1].lower()
125 return ext in cls.FILE_EXTENSIONS
126
127 def _ensure_stamp_index(self, topic, msg, stamp=None, index=None):
128 """Returns (stamp, index) populated with current ROS time and topic index if `None`."""
129 if stamp is None: stamp = api.get_rostime(fallback=True)
130 if index is None: index = self._counts.get(api.TypeMeta.make(msg, topic).topickey, 0) + 1
131 return stamp, index
132
133
134class TextSinkMixin(object):
135 """Provides message formatting as text."""
136
138 NOCOLOR_HIGHLIGHT_WRAPPERS = "**", "**"
139
140
141 DEFAULT_ARGS = dict(COLOR=True, EMIT_FIELD=(), NOEMIT_FIELD=(), HIGHLIGHT=True,
142 MAX_FIELD_LINES=None, START_LINE=None, END_LINE=None,
143 MAX_MESSAGE_LINES=None, LINES_AROUND_MATCH=None, MATCHED_FIELDS_ONLY=False,
144 WRAP_WIDTH=None, MATCH_WRAPPER=None)
145
146 def __init__(self, args=None, **kwargs):
147 """
148 @param args arguments as namespace or dictionary, case-insensitive
149 @param args.color False or "never" for not using colors in replacements
150 @param args.highlight highlight matched values (default true)
151 @param args.emit_field message fields to emit if not all
152 @param args.noemit_field message fields to skip in output
153 @param args.max_field_lines maximum number of lines to output per field
154 @param args.start_line message line number to start output from
155 @param args.end_line message line number to stop output at
156 @param args.max_message_lines maximum number of lines to output per message
157 @param args.lines_around_match number of message lines around matched fields to output
158 @param args.matched_fields_only output only the fields where match was found
159 @param args.wrap_width character width to wrap message YAML output at
160 @param args.match_wrapper string to wrap around matched values,
161 both sides if one value, start and end if more than one,
162 or no wrapping if zero values
163 @param kwargs any and all arguments as keyword overrides, case-insensitive
164 """
165 self._prefix = "" # Put before each message line (filename if grepping 1+ files)
166 self._wrapper = None # TextWrapper instance
167 self._patterns = {} # {key: [(() if any field else ('path', ), re.Pattern), ]}
168 self._format_repls = {} # {text to replace if highlight: replacement text}
169 self._styles = collections.defaultdict(str) # {label: ANSI code string}
170
171
172 def validate(self):
173 """Returns whether arguments are valid, emits error if not, else populates options."""
174 args = common.ensure_namespace(self.args, TextSinkMixin.DEFAULT_ARGS)
175 try: args = ArgumentUtil.validate(args)
176 except Exception: return False
177 self._configure(args)
178 return True
179
180
181 def format_message(self, msg, highlight=False):
182 """Returns message as formatted string, optionally highlighted for matches if configured."""
183 if self.args.MAX_MESSAGE_LINES == 0: return ""
184 text = self.message_to_yaml(msg).rstrip("\n")
186 highlight = highlight and self.args.HIGHLIGHT
187 if self._prefix or self.args.START_LINE or self.args.END_LINE \
188 or self.args.MAX_MESSAGE_LINES or (self.args.LINES_AROUND_MATCH and highlight):
189 lines = text.splitlines()
190
191 if self.args.START_LINE or self.args.END_LINE or self.args.MAX_MESSAGE_LINES:
192 start = self.args.START_LINE or 0
193 start = max(start, -len(lines)) - (start > 0) # <0 to sanity, >0 to 0-base
194 end = self.args.END_LINE or len(lines)
195 end = max(end, -len(lines)) - (end > 0) # <0 to sanity, >0 to 0-base
196 if self.args.MAX_MESSAGE_LINES: end = min(end, start + self.args.MAX_MESSAGE_LINES)
197 lines = lines[start:end + 1]
198 lines = lines and (lines[:-1] + [lines[-1] + self._styles["rst"]])
199
200 if self.args.LINES_AROUND_MATCH and highlight:
201 spans, NUM = [], self.args.LINES_AROUND_MATCH
202 for i, l in enumerate(lines):
203 if MatchMarkers.START in l:
204 spans.append([max(0, i - NUM), min(i + NUM + 1, len(lines))])
205 if MatchMarkers.END in l and spans:
206 spans[-1][1] = min(i + NUM + 1, len(lines))
207 lines = sum((lines[a:b - 1] + [lines[b - 1] + self._styles["rst"]]
208 for a, b in common.merge_spans(spans)), [])
209
210 if self._prefix:
211 lines = [self._prefix + l for l in lines]
212
213 text = "\n".join(lines)
214
215 for a, b in self._format_repls.items() if highlight else ():
216 text = re.sub(r"(%s)\1+" % re.escape(a), r"\1", text) # Remove consecutive duplicates
217 text = text.replace(a, b)
218
219 return text
220
221
222 def message_to_yaml(self, val, top=(), typename=None):
223 """Returns ROS message or other value as YAML."""
224 # Refactored from genpy.message.strify_message().
225 unquote = lambda v: v[1:-1] if v[:1] == v[-1:] == '"' else v
227 def retag_match_lines(lines):
228 """Adds match tags to lines where wrapping separated start and end."""
229 PH = self._wrapper.placeholder
230 for i, l in enumerate(lines):
231 startpos0, endpos0 = l.find (MatchMarkers.START), l.find (MatchMarkers.END)
232 startpos1, endpos1 = l.rfind(MatchMarkers.START), l.rfind(MatchMarkers.END)
233 if endpos0 >= 0 and (startpos0 < 0 or startpos0 > endpos0):
234 lines[i] = l = re.sub(r"^(\s*)", r"\1" + MatchMarkers.START, l)
235 if startpos1 >= 0 and endpos1 < startpos1 and i + 1 < len(lines):
236 lines[i + 1] = re.sub(r"^(\s*)", r"\1" + MatchMarkers.START, lines[i + 1])
237 if startpos1 >= 0 and startpos1 > endpos1:
238 CUT, EXTRA = (-len(PH), PH) if PH and l.endswith(PH) else (len(l), "")
239 lines[i] = l[:CUT] + MatchMarkers.END + EXTRA
240 return lines
241
242 def truncate(v):
243 """Returns text or list/tuple truncated to length used in final output."""
244 if self.args.LINES_AROUND_MATCH \
245 or (not self.args.MAX_MESSAGE_LINES and (self.args.END_LINE or 0) <= 0): return v
246
247 MAX_CHAR_LEN = 1 + len(MatchMarkers.START) + len(MatchMarkers.END)
248 # For list/tuple, account for comma and space
249 if isinstance(v, (list, tuple)): textlen = bytelen = 2 + len(v) * (2 + MAX_CHAR_LEN)
250 else: textlen, bytelen = self._wrapper.strlen(v), len(v)
251 if textlen < 10000: return v
252
253 # Heuristic optimization: shorten superlong texts before converting to YAML
254 # if outputting a maximum number of lines per message
255 # (e.g. a lidar pointcloud can be 10+MB of text and take 10+ seconds to format).
256 MIN_CHARS_PER_LINE = self._wrapper.width
257 if MAX_CHAR_LEN != 1:
258 MIN_CHARS_PER_LINE = self._wrapper.width // MAX_CHAR_LEN * 2
259 MAX_LINES = self.args.MAX_MESSAGE_LINES or self.args.END_LINE
260 MAX_CHARS = MAX_LEN = MAX_LINES * MIN_CHARS_PER_LINE * self._wrapper.width + 100
261 if bytelen > MAX_CHARS: # Use worst-case max length plus some extra
262 if isinstance(v, (list, tuple)): MAX_LEN = MAX_CHARS // 3
263 v = v[:MAX_LEN]
264 return v
265
266 indent = " " * len(top)
267 if isinstance(val, six.integer_types + (float, bool)):
268 return str(val)
269 if isinstance(val, common.TEXT_TYPES):
270 if val in ("", MatchMarkers.EMPTY):
271 return MatchMarkers.EMPTY_REPL if val else "''"
272 # default_style='"' avoids trailing "...\n"
273 return yaml.safe_dump(truncate(val), default_style='"', width=sys.maxsize).rstrip("\n")
274 if isinstance(val, (list, tuple)):
275 if not val:
276 return "[]"
277 if api.scalar(typename) in api.ROS_STRING_TYPES:
278 yaml_str = yaml.safe_dump(truncate(val)).rstrip('\n')
279 return "\n" + "\n".join(indent + line for line in yaml_str.splitlines())
280 vals = [x for i, v in enumerate(truncate(val))
281 for x in [self.message_to_yaml(v, top + (i, ), typename)] if x]
282 if api.scalar(typename) in api.ROS_NUMERIC_TYPES:
283 return "[%s]" % ", ".join(unquote(str(v)) for v in vals)
284 return ("\n" + "\n".join(indent + "- " + v for v in vals)) if vals else ""
285 if api.is_ros_message(val):
286 MATCHED_ONLY = self.args.MATCHED_FIELDS_ONLY and not self.args.LINES_AROUND_MATCH
287 vals, fieldmap = [], api.get_message_fields(val)
288 prints, noprints = self._patterns["print"], self._patterns["noprint"]
289 fieldmap = api.filter_fields(fieldmap, top, include=prints, exclude=noprints)
290 for k, t in fieldmap.items():
291 v = self.message_to_yaml(api.get_message_value(val, k, t), top + (k, ), t)
292 if not v or MATCHED_ONLY and MatchMarkers.START not in v:
293 continue # for k, t
294
295 if t not in api.ROS_STRING_TYPES: v = unquote(v)
296 if api.scalar(t) in api.ROS_BUILTIN_TYPES:
297 is_strlist = t.endswith("]") and api.scalar(t) in api.ROS_STRING_TYPES
298 is_num = api.scalar(t) in api.ROS_NUMERIC_TYPES
299 extra_indent = indent if is_strlist else " " * len(indent + k + ": ")
300 self._wrapper.reserve_width(self._prefix + extra_indent)
301 self._wrapper.drop_whitespace = t.endswith("]") and not is_strlist
302 self._wrapper.break_long_words = not is_num
303 v = ("\n" + extra_indent).join(retag_match_lines(self._wrapper.wrap(v)))
304 if is_strlist and self._wrapper.strip(v) != "[]": v = "\n" + v
305 vals.append("%s%s: %s" % (indent, k, api.format_message_value(val, k, v)))
306 return ("\n" if indent and vals else "") + "\n".join(vals)
307
308 return str(val)
309
310
311 def _configure(self, args):
312 """Initializes output settings."""
313 self._patterns.clear()
314 self._styles.clear()
315 self._styles.default_factory = str
316 prints, noprints = args.EMIT_FIELD, args.NOEMIT_FIELD
317 for key, vals in [("print", prints), ("noprint", noprints)]:
318 self._patterns[key] = [(tuple(v.split(".")), common.path_to_regex(v)) for v in vals]
319
320 if args.COLOR not in ("never", False):
321 self._styles.update({"hl0": ConsolePrinter.STYLE_HIGHLIGHT if self.args.HIGHLIGHT
322 else "",
323 "ll0": ConsolePrinter.STYLE_LOWLIGHT,
324 "pfx0": ConsolePrinter.STYLE_SPECIAL, # Content line prefix start
325 "sep0": ConsolePrinter.STYLE_SPECIAL2})
326 self._styles.default_factory = lambda: ConsolePrinter.STYLE_RESET
327
328 WRAPS = args.MATCH_WRAPPER if self.args.HIGHLIGHT else ""
329 if WRAPS is None and args.COLOR in ("never", False): WRAPS = self.NOCOLOR_HIGHLIGHT_WRAPPERS
330 WRAPS = ((WRAPS or [""]) * 2)[:2]
331 self._styles["hl0"] = self._styles["hl0"] + WRAPS[0]
332 self._styles["hl1"] = WRAPS[1] + self._styles["hl1"]
333
334 custom_widths = {MatchMarkers.START: len(WRAPS[0]), MatchMarkers.END: len(WRAPS[1]),
335 self._styles["ll0"]: 0, self._styles["ll1"]: 0,
336 self._styles["pfx0"]: 0, self._styles["pfx1"]: 0,
337 self._styles["sep0"]: 0, self._styles["sep1"]: 0}
338 wrapargs = dict(max_lines=args.MAX_FIELD_LINES,
339 placeholder="%s ...%s" % (self._styles["ll0"], self._styles["ll1"]))
340 if args.WRAP_WIDTH is not None: wrapargs.update(width=args.WRAP_WIDTH)
341 self._wrapper = common.TextWrapper(custom_widths=custom_widths, **wrapargs)
342 self._format_repls = {MatchMarkers.START: self._styles["hl0"],
343 MatchMarkers.END: self._styles["hl1"]}
344
345
346
347class RolloverSinkMixin(object):
348 """Provides output file rollover by size, duration, or message count."""
349
350
351 DEFAULT_ARGS = dict(WRITE=None, WRITE_OPTIONS={}, VERBOSE=False)
352
354 OPTIONS_TEMPLATES = [
355 ("rollover-size=NUM", "size limit for individual files\nin {label} output\n"
356 "as bytes (supports abbreviations like 1K or 2M or 3G)"),
357 ("rollover-count=NUM", "message limit for individual files\nin {label} output\n"
358 "(supports abbreviations like 1K or 2M or 3G)"),
359 ("rollover-duration=INTERVAL", "message time span limit for individual files\n"
360 "in {label} output\n"
361 "as seconds (supports abbreviations like 60m or 2h or 1d)"),
362 ("rollover-template=STR", "output filename template for individual files\n"
363 "in {label} output,\n"
364 'supporting strftime format codes like "%%H-%%M-%%S"\n'
365 'and "%%(index)s" as output file index'),
366 ]
367
368 START_META_TEMPLATE = "{mcount} in {tcount} to "
369
370 FILE_META_TEMPLATE = "{name} ({size})"
371
372 MULTI_META_TEMPLATE = "\n- {name} ({size}, {mcount}, {tcount})"
373
375 def __init__(self, args=None, **kwargs):
376 """
377 @param args arguments as namespace or dictionary, case-insensitive
378 @param args.write base name of output file to write if not using rollover-template
379 @param args.write_options {"rollover-size": bytes limit for individual output files,
380 "rollover-count": message limit for individual output files,
381 "rollover-duration": time span limit for individual output files,
382 as ROS duration or convertible seconds,
383 "rollover-template": output filename template, supporting
384 strftime format codes like "%H-%M-%S"
385 and "%(index)s" as output file index,
386 "overwrite": whether to overwrite existing file
387 (default false)}
388 @param args.verbose whether to emit debug information
389 @param kwargs any and all arguments as keyword overrides, case-insensitive
390 """
391 self._rollover_limits = {} # {?"size": int, ?"count": int, ?"duration": ROS duration}
392 self._rollover_template = None
393 self._rollover_files = collections.OrderedDict() # {path: {"counts", "start", "size"}}
394
395
396 self.filename = None
398
399 def validate(self):
400 """Returns whether write options are valid, emits error if not, else populates options."""
401 ok = True
402 self._rollover_limits.clear()
404 for k in ("size", "count", "duration"):
405 value = value0 = self.args.WRITE_OPTIONS.get("rollover-%s" % k)
406 if value is None: continue # for k
407 SUFFIXES = dict(zip("smhd", [1, 60, 3600, 24*3600])) if "duration" == k else \
408 dict(zip("KMGT", [2**10, 2**20, 2**30, 2**40])) if "size" == k else \
409 dict(zip("KMGT", [10**3, 10**6, 10**9, 10**12]))
410 try:
411 if isinstance(value, (six.binary_type, six.text_type)):
412 value = common.parse_number(value, SUFFIXES)
413 value = (api.to_duration if "duration" == k else int)(value)
414 except Exception: pass
415 if (value is None or value < 0) if "duration" != k \
416 else (k != api.get_ros_time_category(value) or api.to_sec(value) < 0):
417 ConsolePrinter.error("Invalid rollover %s option: %r. "
418 "Value must be a non-negative %s.", k, value0, k)
419 ok = False
420 elif value:
421 self._rollover_limits[k] = value
422 if self.args.WRITE_OPTIONS.get("rollover-template"):
423 value = self.args.WRITE_OPTIONS["rollover-template"]
424 value = re.sub(r"(^|[^%])%\‍(index\‍)", r"\1%%(index)", value)
425 try: datetime.datetime.now().strftime(value)
426 except Exception:
427 ConsolePrinter.error("Invalid rollover template option: %r. "
428 "Value must contain valid strftime codes.", value)
429 ok = False
430 else:
431 if ok:
432 self._rollover_template = value
433 if ok and not self._rollover_limits:
434 ConsolePrinter.warn("Ignoring rollover template option: "
435 "no rollover limits given.")
436 return ok
437
438
439 def ensure_rollover(self, topic, msg, stamp):
440 """
441 Closes current output file and prepares new filename if rollover limit reached.
442 """
443 if not self._rollover_limits: return
444
445 self.filename = self.filename or self.make_filename()
446 do_rollover, props = False, self._rollover_files.setdefault(self.filename, {})
447 stamp = api.time_message(stamp, to_message=False) # Ensure rclpy stamp in ROS2
448
449 if self._rollover_limits.get("size") and props:
450 props["size"] = self.sizesize
451 do_rollover = (props["size"] or 0) >= self._rollover_limits["size"]
452 if not do_rollover and self._rollover_limits.get("count") and props:
453 do_rollover = (sum(props["counts"].values()) >= self._rollover_limits["count"])
454 if not do_rollover and self._rollover_limits.get("duration") and props:
455 stamps = [stamp, props["start"]]
456 do_rollover = (max(stamps) - min(stamps) >= self._rollover_limits["duration"])
457 if do_rollover:
458 self.close_output()
459 props["size"] = self.sizesize
460 self.filename = self.make_filename()
461 props = self._rollover_files[self.filename] = {}
462
463 topickey = api.TypeMeta.make(msg, topic).topickey
464 if not props: props.update({"counts": {}, "start": stamp, "size": None})
465 props["start"] = min((props["start"], stamp))
466 props["counts"][topickey] = props["counts"].get(topickey, 0) + 1
467
468
469 def close_output(self):
470 """Closes output file, if any."""
471 raise NotImplementedError
472
473
474 def make_filename(self):
475 """Returns new filename for output, accounting for rollover template and overwrite."""
476 result = self.args.WRITE
477 if self._rollover_template and self._rollover_limits:
478 result = datetime.datetime.now().strftime(self._rollover_template)
479 try: result %= {"index": len(self._rollover_files)}
480 except Exception: pass
481 if self.args.WRITE_OPTIONS.get("overwrite") not in (True, "true"):
482 result = common.unique_path(result, empty_ok=True)
483 return result
484
485
486 def format_output_meta(self):
487 """Returns output file metainfo string, with names and sizes and message/topic counts."""
488 if not self._counts: return ""
489 SIZE_ERROR = "error getting size"
490 result = self.START_META_TEMPLATE.format(
491 mcount=common.plural("message", sum(self._counts.values())),
492 tcount=common.plural("topic", self._counts)
494 if len(self._rollover_files) < 2:
495 sz = self.sizesize
496 sizestr = SIZE_ERROR if sz is None else common.format_bytes(sz)
497 result += self.FILE_META_TEMPLATE.format(name=self.filename, size=sizestr) + "."
498 else:
499 for path, props in self._rollover_files.items():
500 if props["size"] is None:
501 try: props["size"] = os.path.getsize(path)
502 except Exception as e:
503 ConsolePrinter.warn("Error getting size of %s: %s", path, e)
504 sizesum = sum(x["size"] for x in self._rollover_files.values() if x["size"] is not None)
505 result += self.FILE_META_TEMPLATE.format(
506 name=common.plural("file", self._rollover_files),
507 size=common.format_bytes(sizesum)
508 ) + (":" if self.args.VERBOSE else ".")
509 for path, props in self._rollover_files.items() if self.args.VERBOSE else ():
510 sizestr = SIZE_ERROR if props["size"] is None else common.format_bytes(props["size"])
511 result += self.MULTI_META_TEMPLATE.format(name=path, size=sizestr,
512 mcount=common.plural("message", sum(props["counts"].values())),
513 tcount=common.plural("topic", props["counts"])
514 )
515 return result
516
517
518 @property
519 def size(self):
520 """Returns current file size in bytes, or None if size lookup failed."""
521 try: return os.path.getsize(self.filename)
522 except Exception as e:
523 ConsolePrinter.warn("Error getting size of %s: %s", self.filename, e)
524 return None
525
526
527 @classmethod
528 def get_write_options(cls, label):
529 """Returns command-line help texts for rollover options, as [(name, help)]."""
530 return [(k, v.format(label=label)) for k, v in cls.OPTIONS_TEMPLATES]
531
532
533
535 """Prints messages to console."""
536
537 META_LINE_TEMPLATE = "{ll0}{sep} {line}{ll1}"
538 MESSAGE_SEP_TEMPLATE = "{ll0}{sep}{ll1}"
539 PREFIX_TEMPLATE = "{pfx0}{batch}{pfx1}{sep0}{sep}{sep1}"
540 MATCH_PREFIX_SEP = ":" # Printed after bag filename for matched message lines
541 CONTEXT_PREFIX_SEP = "-" # Printed after bag filename for context message lines
542 SEP = "---" # Prefix of message separators and metainfo lines
544
545 DEFAULT_ARGS = dict(COLOR=True, EMIT_FIELD=(), NOEMIT_FIELD=(), HIGHLIGHT=True, META=False,
546 LINE_PREFIX=True, MAX_FIELD_LINES=None, START_LINE=None,
547 END_LINE=None, MAX_MESSAGE_LINES=None, LINES_AROUND_MATCH=None,
548 MATCHED_FIELDS_ONLY=False, WRAP_WIDTH=None, MATCH_WRAPPER=None)
551 def __init__(self, args=None, **kwargs):
552 """
553 @param args arguments as namespace or dictionary, case-insensitive
554 @param args.color False or "never" for not using colors in replacements
555 @param args.highlight highlight matched values (default true)
556 @param args.meta whether to print metainfo
557 @param args.emit_field message fields to emit if not all
558 @param args.noemit_field message fields to skip in output
559 @param args.line_prefix print source prefix like bag filename on each message line
560 @param args.max_field_lines maximum number of lines to print per field
561 @param args.start_line message line number to start output from
562 @param args.end_line message line number to stop output at
563 @param args.max_message_lines maximum number of lines to output per message
564 @param args.lines_around_match number of message lines around matched fields to output
565 @param args.matched_fields_only output only the fields where match was found
566 @param args.wrap_width character width to wrap message YAML output at
567 @param args.match_wrapper string to wrap around matched values,
568 both sides if one value, start and end if more than one,
569 or no wrapping if zero values
570 @param kwargs any and all arguments as keyword overrides, case-insensitive
571 """
572 args = common.ensure_namespace(args, ConsoleSink.DEFAULT_ARGS, **kwargs)
573 super(ConsoleSink, self).__init__(args)
574 TextSinkMixin.__init__(self, args)
575
576
577 def emit_meta(self):
578 """Prints source metainfo like bag header, if not already printed."""
579 if not self.validatevalidatevalidate(): raise Exception("invalid")
580 batch = self.args.META and self.source.get_batch()
581 if self.args.META and batch not in self._batch_meta:
582 meta = self._batch_meta[batch] = self.source.format_meta()
583 kws = dict(self._styles, sep=self.SEP)
584 meta = "\n".join(x and self.META_LINE_TEMPLATE.format(**dict(kws, line=x))
585 for x in meta.splitlines())
586 meta and ConsolePrinter.print(meta)
588
589 def emit(self, topic, msg, stamp=None, match=None, index=None):
590 """Prints separator line and message text."""
591 if not self.validatevalidatevalidate(): raise Exception("invalid")
592 self._prefix_prefix = ""
593 stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
594 if self.args.LINE_PREFIX and self.source.get_batch():
595 sep = self.MATCH_PREFIX_SEP if match else self.CONTEXT_PREFIX_SEP
596 kws = dict(self._styles, sep=sep, batch=self.source.get_batch())
597 self._prefix_prefix = self.PREFIX_TEMPLATE.format(**kws)
598 kws = dict(self._styles, sep=self.SEP)
599 if self.args.META:
600 meta = self.source.format_message_meta(topic, msg, stamp, index)
601 meta = "\n".join(x and self.META_LINE_TEMPLATE.format(**dict(kws, line=x))
602 for x in meta.splitlines())
603 meta and ConsolePrinter.print(meta)
604 elif self._counts:
605 sep = self.MESSAGE_SEP_TEMPLATE.format(**kws)
606 sep and ConsolePrinter.print(sep)
607 ConsolePrinter.print(self.format_message(match or msg, highlight=bool(match)))
608 super(ConsoleSink, self).emit(topic, msg, stamp, match, index)
609
610
611 def is_highlighting(self):
612 """Returns True if sink is configured to highlight matched values."""
613 return bool(self.args.HIGHLIGHT)
614
615
616 def validate(self):
617 """Returns whether arguments environment set, populates options, emits error if not."""
618 if self.validvalid is not None: return self.validvalid
619 if self.args.WRAP_WIDTH is None:
620 self.args.WRAP_WIDTH = ConsolePrinter.WIDTH
621 self.validvalid = Sink.validate(self) and TextSinkMixin.validate(self)
622 return self.validvalid
623
624
625
627 """Writes messages to bagfile."""
628
629
630 DEFAULT_ARGS = dict(META=False, WRITE_OPTIONS={}, VERBOSE=False)
632 def __init__(self, args=None, **kwargs):
633 """
634 @param args arguments as namespace or dictionary, case-insensitive;
635 or a single path as the ROS bagfile to write,
636 or a stream or {@link grepros.api.Bag Bag} instance to write to
637 @param args.write name of ROS bagfile to create or append to,
638 or a stream to write to
639 @param args.write_options {"overwrite": whether to overwrite existing file
640 (default false),
641 "rollover-size": bytes limit for individual output files,
642 "rollover-count": message limit for individual output files,
643 "rollover-duration": time span limit for individual output files,
644 as ROS duration or convertible seconds,
645 "rollover-template": output filename template, supporting
646 strftime format codes like "%H-%M-%S"
647 and "%(index)s" as output file index}
648 @param args.meta whether to emit metainfo
649 @param args.verbose whether to emit debug information
650 @param kwargs any and all arguments as keyword overrides, case-insensitive
651 """
652
653 args0 = args
654 args = {"WRITE": str(args)} if isinstance(args, common.PATH_TYPES) else \
655 {"WRITE": args} if common.is_stream(args) else \
656 {} if isinstance(args, api.Bag) else args
657 args = common.ensure_namespace(args, BagSink.DEFAULT_ARGS, **kwargs)
658 super(BagSink, self).__init__(args)
659 RolloverSinkMixin.__init__(self, args)
660 self._bag = args0 if isinstance(args0, api.Bag) else None
661 self._overwrite = (args.WRITE_OPTIONS.get("overwrite") in ("true", True))
662 self._close_printed = False
663 self._is_pathed = self._bag is None and not common.is_stream(self.args.WRITE)
664
665 atexit.register(self.closecloseclose)
666
667 def emit(self, topic, msg, stamp=None, match=None, index=None):
668 """Writes message to output bagfile."""
669 if not self.validatevalidatevalidate(): raise Exception("invalid")
670 stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
671 if self._is_pathed: RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
672 self._ensure_open()
673 topickey = api.TypeMeta.make(msg, topic).topickey
674 if self.args.VERBOSE and topickey not in self._counts:
675 ConsolePrinter.debug("Adding topic %s in bag output.", topic)
677 qoses = self.source.get_message_meta(topic, msg, stamp).get("qoses")
678 self._bag.write(topic, msg, stamp, qoses=qoses)
679 super(BagSink, self).emit(topic, msg, stamp, match, index)
680
681 def validate(self):
682 """Returns whether write options are valid and ROS environment set, emits error if not."""
683 if self.validvalid is not None: return self.validvalid
684 result = Sink.validate(self)
685 if not RolloverSinkMixin.validate(self):
686 result = False
687 if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
688 ConsolePrinter.error("Invalid overwrite option for bag: %r. "
689 "Choose one of {true, false}.",
690 self.args.WRITE_OPTIONS["overwrite"])
691 result = False
692 if not self._bag \
693 and not common.verify_io(self.make_filename() if self._is_pathed else self.args.WRITE, "w"):
694 ConsolePrinter.error("File not writable.")
695 result = False
696 if not self._bag and common.is_stream(self.args.WRITE) \
697 and not any(c.STREAMABLE for c in api.Bag.WRITER_CLASSES):
698 ConsolePrinter.error("Bag format does not support writing streams.")
699 result = False
700 if self._bag and self._bag.mode not in ("a", "w"):
701 ConsolePrinter.error("Bag not in write mode.")
702 result = False
703 self.validvalid = api.validate() and result
704 return self.validvalid
705
706 def close(self):
707 """Closes output bag, if any, emits metainfo."""
708 self._bag and self._bag.close()
709 if not self._close_printed and self._counts and self._bag:
710 self._close_printed = True
711 ConsolePrinter.debug("Wrote bag output for %s", self.format_output_meta())
712 super(BagSink, self).close()
713
714 def close_output(self):
715 """Closes output bag, if any."""
716 self._bag and self._bag.close()
717 self._bag = None
718
719 @property
720 def size(self):
721 """Returns current file size in bytes, or None if size lookup failed."""
722 try: return os.path.getsize(self._bag.filename if self._bag else self.filenamefilename) \
723 if not self._bag or (self._bag.filename and api.ROS1) else self._bag.size
724 except Exception as e:
725 ConsolePrinter.warn("Error getting size of %s: %s", self.filenamefilename, e)
726 return None
727
728 def _ensure_open(self):
729 """Opens output file if not already open."""
730 if self._bag is not None:
731 if self._bag.closed:
732 self._bag.open()
733 self._close_printed = False
734 return
735 self._close_printed = False
736 if common.is_stream(self.args.WRITE):
737 self._bag = api.Bag(self.args.WRITE, mode=getattr(self.args.WRITE, "mode", "w"))
738 self.filenamefilename = "<stream>"
739 return
740
741 filename = self.filenamefilename = self.filenamefilename or self.make_filename()
742 if not self._overwrite and os.path.isfile(filename) and os.path.getsize(filename):
743 cls = api.Bag.autodetect(filename)
744 if cls and "a" not in getattr(cls, "MODES", ("a", )):
745 filename = self.filenamefilename = common.unique_path(filename)
746 if self.args.VERBOSE:
747 ConsolePrinter.debug("Making unique filename %r, as %s does not support "
748 "appending.", filename, cls.__name___)
749 if self.args.VERBOSE:
750 sz = os.path.isfile(filename) and os.path.getsize(filename)
751 ConsolePrinter.debug("%s bag output %s%s.",
752 "Overwriting" if sz and self._overwrite else
753 "Appending to" if sz else "Creating",
754 filename, (" (%s)" % common.format_bytes(sz)) if sz else "")
755 common.makedirs(os.path.dirname(filename))
756 self._bag = api.Bag(filename, mode="w" if self._overwrite else "a")
757
758 @classmethod
759 def autodetect(cls, target):
760 """Returns true if target is recognizable as a ROS bag."""
761 ext = os.path.splitext(target or "")[-1].lower()
762 return ext in api.BAG_EXTENSIONS
763
764
765class LiveSink(Sink):
766 """Publishes messages to ROS topics."""
767
768
769 DEFAULT_ARGS = dict(LIVE=False, META=False, QUEUE_SIZE_OUT=10, PUBLISH_PREFIX="",
770 PUBLISH_SUFFIX="", PUBLISH_FIXNAME="", VERBOSE=False)
771
772 def __init__(self, args=None, **kwargs):
773 """
774 @param args arguments as namespace or dictionary, case-insensitive
775 @param args.live whether reading messages from live ROS topics
776 @param args.queue_size_out publisher queue size (default 10)
777 @param args.publish_prefix output topic prefix, prepended to input topic
778 @param args.publish_suffix output topic suffix, appended to output topic
779 @param args.publish_fixname single output topic name to publish to,
780 overrides prefix and suffix if given
781 @param args.meta whether to emit metainfo
782 @param args.verbose whether to emit debug information
783 @param kwargs any and all arguments as keyword overrides, case-insensitive
784 """
785 args = common.ensure_namespace(args, LiveSink.DEFAULT_ARGS, **kwargs)
786 super(LiveSink, self).__init__(args)
787 self._pubs = {} # {(intopic, typename, typehash): ROS publisher}
788 self._close_printed = False
789
790 def emit(self, topic, msg, stamp=None, match=None, index=None):
791 """Publishes message to output topic."""
792 if not self.validatevalidate(): raise Exception("invalid")
793 api.init_node()
794 with api.TypeMeta.make(msg, topic) as m:
795 topickey, cls = (m.topickey, m.typeclass)
796 if topickey not in self._pubs:
797 topic2 = self.args.PUBLISH_PREFIX + topic + self.args.PUBLISH_SUFFIX
798 topic2 = self.args.PUBLISH_FIXNAME or topic2
799 if self.args.VERBOSE:
800 ConsolePrinter.debug("Publishing from %s to %s.", topic, topic2)
801
802 pub = None
803 if self.args.PUBLISH_FIXNAME:
804 pub = next((v for (_, c), v in self._pubs.items() if c == cls), None)
805 pub = pub or api.create_publisher(topic2, cls, queue_size=self.args.QUEUE_SIZE_OUT)
806 self._pubs[topickey] = pub
807
808 self._pubs[topickey].publish(msg)
809 self._close_printed = False
810 super(LiveSink, self).emit(topic, msg, stamp, match, index)
811
812 def bind(self, source):
813 """Attaches source to sink and blocks until connected to ROS."""
814 if not self.validatevalidate(): raise Exception("invalid")
815 super(LiveSink, self).bind(source)
816 api.init_node()
817
818 def validate(self):
819 """
820 Returns whether ROS environment is set for publishing,
821 and output topic configuration is valid, emits error if not.
822 """
823 if self.validvalid is not None: return self.validvalid
824 result = Sink.validate(self)
825 if not api.validate(live=True):
826 result = False
827 config_ok = True
828 if self.args.LIVE and not any((self.args.PUBLISH_PREFIX, self.args.PUBLISH_SUFFIX,
829 self.args.PUBLISH_FIXNAME)):
830 ConsolePrinter.error("Need topic prefix or suffix or fixname "
831 "when republishing messages from live ROS topics.")
832 config_ok = False
833 self.validvalid = result and config_ok
834 return self.validvalid
835
836 def close(self):
837 """Shuts down publishers."""
838 if not self._close_printed and self._counts:
839 self._close_printed = True
840 ConsolePrinter.debug("Published %s to %s.",
841 common.plural("message", sum(self._counts.values())),
842 common.plural("topic", self._pubs))
843 for k in list(self._pubs):
844 try: self._pubs.pop(k).unregister()
845 except Exception as e:
846 if self.args.VERBOSE:
847 ConsolePrinter.warn("Error closing publisher on topic %r: %s", k[0], e)
848 super(LiveSink, self).close()
850
851class AppSink(Sink):
852 """Provides messages to callback function."""
853
854
855 DEFAULT_ARGS = dict(EMIT=None, METAEMIT=None, HIGHLIGHT=False)
856
857 def __init__(self, args=None, **kwargs):
858 """
859 @param args arguments as namespace or dictionary, case-insensitive;
860 or emit callback
861 @param args.emit callback(topic, msg, stamp, highlighted msg, index in topic), if any
862 @param args.metaemit callback(metadata dict) if any, invoked before first emit from source batch
863 @param args.highlight whether to expect highlighted matching fields from source messages
864 @param kwargs any and all arguments as keyword overrides, case-insensitive
865 """
866 if callable(args): args = common.ensure_namespace(None, emit=args)
867 args = common.ensure_namespace(args, AppSink.DEFAULT_ARGS, **kwargs)
868 super(AppSink, self).__init__(args)
869
870 def emit_meta(self):
871 """Invokes registered metaemit callback, if any, and not already invoked."""
872 if not self.validatevalidate(): raise Exception("invalid")
873 if not self.source: return
874 batch = self.source.get_batch() if self.argsargs.METAEMIT else None
875 if self.argsargs.METAEMIT and batch not in self._batch_meta:
876 meta = self._batch_meta[batch] = self.source.get_meta()
877 self.argsargs.METAEMIT(meta)
878
879 def emit(self, topic, msg, stamp=None, match=None, index=None):
880 """Registers message and invokes registered emit callback, if any."""
881 if not self.validatevalidate(): raise Exception("invalid")
882 stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
883 super(AppSink, self).emit(topic, msg, stamp, match, index)
884 if self.argsargs.EMIT: self.argsargs.EMIT(topic, msg, stamp, match, index)
885
886 def is_highlighting(self):
887 """Returns whether emitted matches are highlighted."""
888 return self.argsargs.HIGHLIGHT
889
890 def validate(self):
891 """Returns whether callbacks are valid, emits error if not."""
892 if self.validvalid is not None: return self.validvalid
893 self.validvalid = True
894 for key in ("EMIT", "METAEMIT"):
895 if getattr(self.argsargs, key) and not callable(getattr(self.argsargs, key)):
896 ConsolePrinter.error("Invalid callback for %s: %r", key, getattr(self.argsargs, key))
897 self.validvalid = False
898 return self.validvalid
899
900
901class MultiSink(Sink):
902 """Combines any number of sinks."""
904
905 FLAG_CLASSES = {"PUBLISH": LiveSink, "CONSOLE": ConsoleSink, "APP": AppSink}
906
908 FORMAT_CLASSES = {"bag": BagSink}
909
910 def __init__(self, args=None, sinks=(), **kwargs):
911 """
912 Accepts more arguments, given to the real sinks constructed.
913
914 @param args arguments as namespace or dictionary, case-insensitive
915 @param args.console print matches to console
916 @param args.write [[target, format=FORMAT, key=value, ], ]
917 @param args.publish publish matches to live topics
918 @param args.app provide messages to given callback function
919 @param sinks pre-created sinks, arguments will be ignored
920 @param kwargs any and all arguments as keyword overrides, case-insensitive
921 """
922 args = common.ensure_namespace(args, **kwargs)
923 super(MultiSink, self).__init__(args)
924 self.validvalid = True
926
927 self.sinks = [cls(args) for flag, cls in self.FLAG_CLASSES.items()
928 if getattr(args, flag, None)] if not sinks else list(sinks)
929
930 for dumpopts in getattr(args, "WRITE", []) if not sinks else ():
931 kwargs = dict(x.split("=", 1) for x in dumpopts[1:] if isinstance(x, common.TEXT_TYPES))
932 kwargs.update(kv for x in dumpopts[1:] if isinstance(x, dict) for kv in x.items())
933 target, cls = dumpopts[0], self.FORMAT_CLASSES.get(kwargs.pop("format", None))
934 if not cls:
935 cls = next((c for c in sorted(self.FORMAT_CLASSES.values(),
936 key=lambda x: x is BagSink)
937 if callable(getattr(c, "autodetect", None))
938 and c.autodetect(target)), None)
939 if not cls:
940 ConsolePrinter.error('Unknown output format in "%s"' % " ".join(map(str, dumpopts)))
941 self.validvalid = False
942 continue # for dumpopts
943 clsargs = common.structcopy(args)
944 clsargs.WRITE, clsargs.WRITE_OPTIONS = target, kwargs
945 self.sinks += [cls(clsargs)]
946
947 def emit_meta(self):
948 """Outputs source metainfo in one sink, if not already emitted."""
949 if not self.validatevalidate(): raise Exception("invalid")
950 sink = next((s for s in self.sinks if isinstance(s, ConsoleSink)), None)
951 # Emit meta in one sink only, prefer console
952 sink = sink or self.sinks[0] if self.sinks else None
953 sink and sink.emit_meta()
954
955 def emit(self, topic, msg, stamp=None, match=None, index=None):
956 """Outputs ROS message to all sinks."""
957 if not self.validatevalidate(): raise Exception("invalid")
958 stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
959 for sink in self.sinks:
960 sink.emit(topic, msg, stamp, match, index)
961 super(MultiSink, self).emit(topic, msg, stamp, match, index)
962
963 def bind(self, source):
964 """Attaches source to all sinks, sets thread_excepthook on all sinks."""
965 super(MultiSink, self).bind(source)
966 for sink in self.sinks:
967 sink.bind(source)
968 sink.thread_excepthook = self.thread_excepthook
969
970 def configure(self, args=None, **kwargs):
971 """
972 Updates sinks configuration.
973
974 @param args arguments as namespace or dictionary, case-insensitive
975 @param kwargs any and all arguments as keyword overrides, case-insensitive
976 """
977 args = common.ensure_namespace(args, **kwargs)
978 hasattr(args, "WRITE") and delattr(args, "WRITE") # Special arg for MultiSink
979 for sink in self.sinks: sink.configure(args, **kwargs)
981 def validate(self):
982 """Returns whether prerequisites are met for all sinks."""
983 if not self.sinks:
984 ConsolePrinter.error("No output configured.")
985 return bool(self.sinks) and all([sink.validate() for sink in self.sinks]) and self.validvalid
986
987 def close(self):
988 """Closes all sinks."""
989 for sink in self.sinks:
990 sink.close()
991
992 def flush(self):
993 """Flushes all sinks."""
994 for sink in self.sinks:
995 sink.flush()
996
997 def is_highlighting(self):
998 """Returns whether any sink requires highlighted matches."""
999 return any(s.is_highlighting() for s in self.sinks)
1000
1001
1002__all__ = [
1003 "AppSink", "BagSink", "ConsoleSink", "LiveSink", "MultiSink", "RolloverSinkMixin", "Sink",
1004 "TextSinkMixin",
1005]
Bag factory metaclass.
Definition api.py:381
TextWrapper that supports custom substring widths in line width calculation.
Definition common.py:692
Message producer base class.
Definition inputs.py:35
Provides messages to callback function.
Definition outputs.py:867
__init__(self, args=None, **kwargs)
Definition outputs.py:882
emit(self, topic, msg, stamp=None, match=None, index=None)
Registers message and invokes registered emit callback, if any.
Definition outputs.py:896
is_highlighting(self)
Returns whether emitted matches are highlighted.
Definition outputs.py:903
validate(self)
Returns whether callbacks are valid, emits error if not.
Definition outputs.py:907
emit_meta(self)
Invokes registered metaemit callback, if any, and not already invoked.
Definition outputs.py:887
Writes messages to bagfile.
Definition outputs.py:636
__init__(self, args=None, **kwargs)
Definition outputs.py:662
autodetect(cls, target)
Returns true if target is recognizable as a ROS bag.
Definition outputs.py:774
close_output(self)
Closes output bag, if any.
Definition outputs.py:725
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes message to output bagfile.
Definition outputs.py:678
validate(self)
Returns whether write options are valid and ROS environment set, emits error if not.
Definition outputs.py:692
close(self)
Closes output bag, if any, emits metainfo.
Definition outputs.py:717
size
Returns current file size in bytes, or None if size lookup failed.
Definition outputs.py:732
Prints messages to console.
Definition outputs.py:543
__init__(self, args=None, **kwargs)
Definition outputs.py:581
emit(self, topic, msg, stamp=None, match=None, index=None)
Prints separator line and message text.
Definition outputs.py:599
is_highlighting(self)
Returns True if sink is configured to highlight matched values.
Definition outputs.py:621
validate(self)
Returns whether arguments environment set, populates options, emits error if not.
Definition outputs.py:626
emit_meta(self)
Prints source metainfo like bag header, if not already printed.
Definition outputs.py:587
Publishes messages to ROS topics.
Definition outputs.py:780
__init__(self, args=None, **kwargs)
Definition outputs.py:800
emit(self, topic, msg, stamp=None, match=None, index=None)
Publishes message to output topic.
Definition outputs.py:806
validate(self)
Returns whether ROS environment is set for publishing, and output topic configuration is valid,...
Definition outputs.py:838
close(self)
Shuts down publishers.
Definition outputs.py:852
bind(self, source)
Attaches source to sink and blocks until connected to ROS.
Definition outputs.py:828
Combines any number of sinks.
Definition outputs.py:918
configure(self, args=None, **kwargs)
Updates sinks configuration.
Definition outputs.py:993
dict FORMAT_CLASSES
Autobinding between –write TARGET format=FORMAT and sink classes.
Definition outputs.py:925
emit(self, topic, msg, stamp=None, match=None, index=None)
Outputs ROS message to all sinks.
Definition outputs.py:972
sinks
List of all combined sinks.
Definition outputs.py:944
is_highlighting(self)
Returns whether any sink requires highlighted matches.
Definition outputs.py:1014
flush(self)
Flushes all sinks.
Definition outputs.py:1009
validate(self)
Returns whether prerequisites are met for all sinks.
Definition outputs.py:998
close(self)
Closes all sinks.
Definition outputs.py:1004
__init__(self, args=None, sinks=(), **kwargs)
Accepts more arguments, given to the real sinks constructed.
Definition outputs.py:938
dict FLAG_CLASSES
Autobinding between argument flags and sink classes.
Definition outputs.py:922
bind(self, source)
Attaches source to all sinks, sets thread_excepthook on all sinks.
Definition outputs.py:980
emit_meta(self)
Outputs source metainfo in one sink, if not already emitted.
Definition outputs.py:964
Provides output file rollover by size, duration, or message count.
Definition outputs.py:353
__init__(self, args=None, **kwargs)
Definition outputs.py:397
filename
Current output file path.
Definition outputs.py:403
close_output(self)
Closes output file, if any.
Definition outputs.py:476
validate(self)
Returns whether write options are valid, emits error if not, else populates options.
Definition outputs.py:406
size
Returns current file size in bytes, or None if size lookup failed.
Definition outputs.py:527
ensure_rollover(self, topic, msg, stamp)
Closes current output file and prepares new filename if rollover limit reached.
Definition outputs.py:449
format_output_meta(self)
Returns output file metainfo string, with names and sizes and message/topic counts.
Definition outputs.py:493
get_write_options(cls, label)
Returns command-line help texts for rollover options, as [(name, help)].
Definition outputs.py:537
list OPTIONS_TEMPLATES
Command-line help templates for rollover options, as [(name, text with s label placeholder)].
Definition outputs.py:360
make_filename(self)
Returns new filename for output, accounting for rollover template and overwrite.
Definition outputs.py:481
configure(self, args=None, **kwargs)
Updates sink configuration.
Definition outputs.py:95
thread_excepthook(self, text, exc)
Handles exception, used by background threads.
Definition outputs.py:115
source
inputs.Source instance bound to this sink
Definition outputs.py:55
__init__(self, args=None, **kwargs)
Definition outputs.py:47
autodetect(cls, target)
Returns true if target is recognizable as output for this sink class.
Definition outputs.py:123
tuple FILE_EXTENSIONS
Auto-detection file extensions for subclasses, as (".ext", )
Definition outputs.py:36
valid
Result of validate()
Definition outputs.py:53
emit(self, topic, msg, stamp=None, match=None, index=None)
Outputs ROS message.
Definition outputs.py:81
__exit__(self, exc_type, exc_value, traceback)
Context manager exit, closes sink.
Definition outputs.py:61
is_highlighting(self)
Returns whether this sink requires highlighted matches.
Definition outputs.py:118
flush(self)
Writes out any pending data to disk.
Definition outputs.py:111
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
Definition outputs.py:99
__enter__(self)
Context manager entry.
Definition outputs.py:57
close(self)
Shuts down output, closing any files or connections.
Definition outputs.py:106
bind(self, source)
Attaches source to sink.
Definition outputs.py:85
emit_meta(self)
Outputs source metainfo like bag header as debug stream, if not already emitted.
Definition outputs.py:65
Provides message formatting as text.
Definition outputs.py:137
__init__(self, args=None, **kwargs)
Definition outputs.py:168
format_message(self, msg, highlight=False)
Returns message as formatted string, optionally highlighted for matches if configured.
Definition outputs.py:185
str NOCOLOR_HIGHLIGHT_WRAPPERS
Default highlight wrappers if not color output.
Definition outputs.py:141
validate(self)
Returns whether arguments are valid, emits error if not, else populates options.
Definition outputs.py:176
message_to_yaml(self, val, top=(), typename=None)
Returns ROS message or other value as YAML.
Definition outputs.py:226