3Main outputs for emitting messages.
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.outputs
15from __future__ import print_function
28from . common import ArgumentUtil, ConsolePrinter, MatchMarkers
29from . inputs import Source
33 """Output base class."""
39 DEFAULT_ARGS = dict(META=
False)
41 def __init__(self, args=None, **kwargs):
43 @param args arguments
as namespace
or dictionary, case-insensitive
44 @param args.meta whether to emit metainfo
45 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
50 self.
args = common.ensure_namespace(args, Sink.DEFAULT_ARGS, **kwargs)
57 """Context manager entry."""
60 def __exit__(self, exc_type, exc_value, traceback):
61 """Context manager exit, closes sink."""
65 """Outputs source metainfo like bag header as debug stream, if not already emitted."""
66 batch = self.
args.META
and self.
source.get_batch()
69 meta
and ConsolePrinter.debug(meta)
71 def emit(self, topic, msg, stamp=None, match=None, index=None):
75 @param topic full name of ROS topic the message
is from
76 @param msg ROS message
77 @param stamp message ROS timestamp,
if not current ROS time
78 @param match ROS message
with values tagged
with match markers
if matched,
else None
79 @param index message index
in topic,
if any
81 topickey = api.TypeMeta.make(msg, topic).topickey
84 def bind(self, source):
85 """Attaches source to sink."""
90 Updates sink configuration.
92 @param args arguments
as namespace
or dictionary, case-insensitive
93 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
95 self.args = common.ensure_namespace(args, vars(self.args), **kwargs)
99 """Returns whether sink prerequisites are met (like ROS environment set if LiveSink)."""
101 try: self.
args, self.
valid = ArgumentUtil.validate(self.
args),
True
102 except Exception: self.
valid =
False
106 """Shuts down output, closing any files or connections."""
111 """Writes out any pending data to disk."""
114 """Handles exception, used by background threads."""
115 ConsolePrinter.error(text)
118 """Returns whether this sink requires highlighted matches."""
123 """Returns true if target is recognizable as output for this sink class."""
124 ext = os.path.splitext(target
or "")[-1].lower()
127 def _ensure_stamp_index(self, topic, msg, stamp=None, index=None):
128 """Returns (stamp, index) populated with current ROS time and topic index if `None`."""
129 if stamp
is None: stamp = api.get_rostime(fallback=
True)
130 if index
is None: index = self.
_counts.get(api.TypeMeta.make(msg, topic).topickey, 0) + 1
135 """Provides message formatting as text."""
138 NOCOLOR_HIGHLIGHT_WRAPPERS =
"**",
"**"
141 DEFAULT_ARGS = dict(COLOR=
True, EMIT_FIELD=(), NOEMIT_FIELD=(), HIGHLIGHT=
True,
142 MAX_FIELD_LINES=
None, START_LINE=
None, END_LINE=
None,
143 MAX_MESSAGE_LINES=
None, LINES_AROUND_MATCH=
None, MATCHED_FIELDS_ONLY=
False,
144 WRAP_WIDTH=
None, MATCH_WRAPPER=
None)
146 def __init__(self, args=None, **kwargs):
148 @param args arguments
as namespace
or dictionary, case-insensitive
149 @param args.color
False or "never" for not using colors
in replacements
150 @param args.highlight highlight matched values (default true)
151 @param args.emit_field message fields to emit
if not all
152 @param args.noemit_field message fields to skip
in output
153 @param args.max_field_lines maximum number of lines to output per field
154 @param args.start_line message line number to start output
from
155 @param args.end_line message line number to stop output at
156 @param args.max_message_lines maximum number of lines to output per message
157 @param args.lines_around_match number of message lines around matched fields to output
158 @param args.matched_fields_only output only the fields where match was found
159 @param args.wrap_width character width to wrap message YAML output at
160 @param args.match_wrapper string to wrap around matched values,
161 both sides
if one value, start
and end
if more than one,
162 or no wrapping
if zero values
163 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
169 self.
_styles = collections.defaultdict(str)
173 """Returns whether arguments are valid, emits error if not, else populates options."""
174 args = common.ensure_namespace(self.args, TextSinkMixin.DEFAULT_ARGS)
175 try: args = ArgumentUtil.validate(args)
176 except Exception:
return False
182 """Returns message as formatted string, optionally highlighted for matches if configured."""
183 if self.args.MAX_MESSAGE_LINES == 0:
return ""
186 highlight = highlight
and self.args.HIGHLIGHT
187 if self.
_prefix or self.args.START_LINE
or self.args.END_LINE \
188 or self.args.MAX_MESSAGE_LINES
or (self.args.LINES_AROUND_MATCH
and highlight):
189 lines = text.splitlines()
191 if self.args.START_LINE
or self.args.END_LINE
or self.args.MAX_MESSAGE_LINES:
192 start = self.args.START_LINE
or 0
193 start = max(start, -len(lines)) - (start > 0)
194 end = self.args.END_LINE
or len(lines)
195 end = max(end, -len(lines)) - (end > 0)
196 if self.args.MAX_MESSAGE_LINES: end = min(end, start + self.args.MAX_MESSAGE_LINES)
197 lines = lines[start:end + 1]
198 lines = lines
and (lines[:-1] + [lines[-1] + self.
_styles[
"rst"]])
200 if self.args.LINES_AROUND_MATCH
and highlight:
201 spans, NUM = [], self.args.LINES_AROUND_MATCH
202 for i, l
in enumerate(lines):
203 if MatchMarkers.START
in l:
204 spans.append([max(0, i - NUM), min(i + NUM + 1, len(lines))])
205 if MatchMarkers.END
in l
and spans:
206 spans[-1][1] = min(i + NUM + 1, len(lines))
207 lines = sum((lines[a:b - 1] + [lines[b - 1] + self.
_styles[
"rst"]]
208 for a, b
in common.merge_spans(spans)), [])
211 lines = [self.
_prefix + l
for l
in lines]
213 text =
"\n".join(lines)
216 text = re.sub(
r"(%s)\1+" % re.escape(a),
r"\1", text)
217 text = text.replace(a, b)
223 """Returns ROS message or other value as YAML."""
225 unquote =
lambda v: v[1:-1]
if v[:1] == v[-1:] ==
'"' else v
227 def retag_match_lines(lines):
228 """Adds match tags to lines where wrapping separated start and end."""
230 for i, l
in enumerate(lines):
231 startpos0, endpos0 = l.find (MatchMarkers.START), l.find (MatchMarkers.END)
232 startpos1, endpos1 = l.rfind(MatchMarkers.START), l.rfind(MatchMarkers.END)
233 if endpos0 >= 0
and (startpos0 < 0
or startpos0 > endpos0):
234 lines[i] = l = re.sub(
r"^(\s*)",
r"\1" + MatchMarkers.START, l)
235 if startpos1 >= 0
and endpos1 < startpos1
and i + 1 < len(lines):
236 lines[i + 1] = re.sub(
r"^(\s*)",
r"\1" + MatchMarkers.START, lines[i + 1])
237 if startpos1 >= 0
and startpos1 > endpos1:
238 CUT, EXTRA = (-len(PH), PH)
if PH
and l.endswith(PH)
else (len(l),
"")
239 lines[i] = l[:CUT] + MatchMarkers.END + EXTRA
243 """Returns text or list/tuple truncated to length used in final output."""
244 if self.args.LINES_AROUND_MATCH \
245 or (
not self.args.MAX_MESSAGE_LINES
and (self.args.END_LINE
or 0) <= 0):
return v
247 MAX_CHAR_LEN = 1 + len(MatchMarkers.START) + len(MatchMarkers.END)
249 if isinstance(v, (list, tuple)): textlen = bytelen = 2 + len(v) * (2 + MAX_CHAR_LEN)
250 else: textlen, bytelen = self.
_wrapper.strlen(v), len(v)
251 if textlen < 10000:
return v
256 MIN_CHARS_PER_LINE = self.
_wrapper.width
257 if MAX_CHAR_LEN != 1:
258 MIN_CHARS_PER_LINE = self.
_wrapper.width // MAX_CHAR_LEN * 2
259 MAX_LINES = self.args.MAX_MESSAGE_LINES
or self.args.END_LINE
260 MAX_CHARS = MAX_LEN = MAX_LINES * MIN_CHARS_PER_LINE * self.
_wrapper.width + 100
261 if bytelen > MAX_CHARS:
262 if isinstance(v, (list, tuple)): MAX_LEN = MAX_CHARS // 3
266 indent =
" " * len(top)
267 if isinstance(val, six.integer_types + (float, bool)):
269 if isinstance(val, common.TEXT_TYPES):
270 if val
in (
"", MatchMarkers.EMPTY):
271 return MatchMarkers.EMPTY_REPL
if val
else "''"
273 return yaml.safe_dump(truncate(val), default_style=
'"', width=sys.maxsize).rstrip(
"\n")
274 if isinstance(val, (list, tuple)):
277 if api.scalar(typename)
in api.ROS_STRING_TYPES:
278 yaml_str = yaml.safe_dump(truncate(val)).rstrip(
'\n')
279 return "\n" +
"\n".join(indent + line
for line
in yaml_str.splitlines())
280 vals = [x
for i, v
in enumerate(truncate(val))
282 if api.scalar(typename)
in api.ROS_NUMERIC_TYPES:
283 return "[%s]" %
", ".join(unquote(str(v))
for v
in vals)
284 return (
"\n" +
"\n".join(indent +
"- " + v
for v
in vals))
if vals
else ""
285 if api.is_ros_message(val):
286 MATCHED_ONLY = self.args.MATCHED_FIELDS_ONLY
and not self.args.LINES_AROUND_MATCH
287 vals, fieldmap = [], api.get_message_fields(val)
289 fieldmap = api.filter_fields(fieldmap, top, include=prints, exclude=noprints)
290 for k, t
in fieldmap.items():
291 v = self.
message_to_yaml(api.get_message_value(val, k, t), top + (k, ), t)
292 if not v
or MATCHED_ONLY
and MatchMarkers.START
not in v:
295 if t
not in api.ROS_STRING_TYPES: v = unquote(v)
296 if api.scalar(t)
in api.ROS_BUILTIN_TYPES:
297 is_strlist = t.endswith(
"]")
and api.scalar(t)
in api.ROS_STRING_TYPES
298 is_num = api.scalar(t)
in api.ROS_NUMERIC_TYPES
299 extra_indent = indent
if is_strlist
else " " * len(indent + k +
": ")
301 self.
_wrapper.drop_whitespace = t.endswith(
"]")
and not is_strlist
302 self.
_wrapper.break_long_words =
not is_num
303 v = (
"\n" + extra_indent).join(retag_match_lines(self.
_wrapper.wrap(v)))
304 if is_strlist
and self.
_wrapper.strip(v) !=
"[]": v =
"\n" + v
305 vals.append(
"%s%s: %s" % (indent, k, api.format_message_value(val, k, v)))
306 return (
"\n" if indent
and vals
else "") +
"\n".join(vals)
311 def _configure(self, args):
312 """Initializes output settings."""
315 self.
_styles.default_factory = str
316 prints, noprints = args.EMIT_FIELD, args.NOEMIT_FIELD
317 for key, vals
in [(
"print", prints), (
"noprint", noprints)]:
318 self.
_patterns[key] = [(tuple(v.split(
".")), common.path_to_regex(v))
for v
in vals]
320 if args.COLOR
not in (
"never",
False):
321 self.
_styles.update({
"hl0": ConsolePrinter.STYLE_HIGHLIGHT
if self.args.HIGHLIGHT
323 "ll0": ConsolePrinter.STYLE_LOWLIGHT,
324 "pfx0": ConsolePrinter.STYLE_SPECIAL,
325 "sep0": ConsolePrinter.STYLE_SPECIAL2})
326 self.
_styles.default_factory =
lambda: ConsolePrinter.STYLE_RESET
328 WRAPS = args.MATCH_WRAPPER
if self.args.HIGHLIGHT
else ""
330 WRAPS = ((WRAPS
or [
""]) * 2)[:2]
334 custom_widths = {MatchMarkers.START: len(WRAPS[0]), MatchMarkers.END: len(WRAPS[1]),
338 wrapargs = dict(max_lines=args.MAX_FIELD_LINES,
339 placeholder=
"%s ...%s" % (self.
_styles[
"ll0"], self.
_styles[
"ll1"]))
340 if args.WRAP_WIDTH
is not None: wrapargs.update(width=args.WRAP_WIDTH)
343 MatchMarkers.END: self.
_styles[
"hl1"]}
348 """Provides output file rollover by size, duration, or message count."""
351 DEFAULT_ARGS = dict(WRITE=
None, WRITE_OPTIONS={}, VERBOSE=
False)
354 OPTIONS_TEMPLATES = [
355 (
"rollover-size=NUM",
"size limit for individual files\nin {label} output\n"
356 "as bytes (supports abbreviations like 1K or 2M or 3G)"),
357 (
"rollover-count=NUM",
"message limit for individual files\nin {label} output\n"
358 "(supports abbreviations like 1K or 2M or 3G)"),
359 (
"rollover-duration=INTERVAL",
"message time span limit for individual files\n"
360 "in {label} output\n"
361 "as seconds (supports abbreviations like 60m or 2h or 1d)"),
362 (
"rollover-template=STR",
"output filename template for individual files\n"
363 "in {label} output,\n"
364 'supporting strftime format codes like "%%H-%%M-%%S"\n'
365 'and "%%(index)s" as output file index'),
368 START_META_TEMPLATE =
"{mcount} in {tcount} to "
370 FILE_META_TEMPLATE =
"{name} ({size})"
372 MULTI_META_TEMPLATE =
"\n- {name} ({size}, {mcount}, {tcount})"
375 def __init__(self, args=None, **kwargs):
377 @param args arguments
as namespace
or dictionary, case-insensitive
378 @param args.write base name of output file to write
if not using rollover-template
379 @param args.write_options {
"rollover-size": bytes limit
for individual output files,
380 "rollover-count": message limit
for individual output files,
381 "rollover-duration": time span limit
for individual output files,
382 as ROS duration
or convertible seconds,
383 "rollover-template": output filename template, supporting
384 strftime format codes like
"%H-%M-%S"
385 and "%(index)s" as output file index,
386 "overwrite": whether to overwrite existing file
388 @param args.verbose whether to emit debug information
389 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
400 """Returns whether write options are valid, emits error if not, else populates options."""
404 for k
in (
"size",
"count",
"duration"):
405 value = value0 = self.args.WRITE_OPTIONS.get(
"rollover-%s" % k)
406 if value
is None:
continue
407 SUFFIXES = dict(zip(
"smhd", [1, 60, 3600, 24*3600]))
if "duration" == k
else \
408 dict(zip(
"KMGT", [2**10, 2**20, 2**30, 2**40]))
if "size" == k
else \
409 dict(zip(
"KMGT", [10**3, 10**6, 10**9, 10**12]))
411 if isinstance(value, (six.binary_type, six.text_type)):
412 value = common.parse_number(value, SUFFIXES)
413 value = (api.to_duration
if "duration" == k
else int)(value)
414 except Exception:
pass
415 if (value
is None or value < 0)
if "duration" != k \
416 else (k != api.get_ros_time_category(value)
or api.to_sec(value) < 0):
417 ConsolePrinter.error(
"Invalid rollover %s option: %r. "
418 "Value must be a non-negative %s.", k, value0, k)
422 if self.args.WRITE_OPTIONS.get(
"rollover-template"):
423 value = self.args.WRITE_OPTIONS[
"rollover-template"]
424 value = re.sub(
r"(^|[^%])%\(index\)",
r"\1%%(index)", value)
425 try: datetime.datetime.now().strftime(value)
427 ConsolePrinter.error(
"Invalid rollover template option: %r. "
428 "Value must contain valid strftime codes.", value)
434 ConsolePrinter.warn(
"Ignoring rollover template option: "
435 "no rollover limits given.")
441 Closes current output file and prepares new filename
if rollover limit reached.
447 stamp = api.time_message(stamp, to_message=
False)
453 do_rollover = (sum(props[
"counts"].values()) >= self.
_rollover_limits[
"count"])
455 stamps = [stamp, props[
"start"]]
456 do_rollover = (max(stamps) - min(stamps) >= self.
_rollover_limits[
"duration"])
463 topickey = api.TypeMeta.make(msg, topic).topickey
464 if not props: props.update({
"counts": {},
"start": stamp,
"size":
None})
465 props[
"start"] = min((props[
"start"], stamp))
466 props[
"counts"][topickey] = props[
"counts"].get(topickey, 0) + 1
470 """Closes output file, if any."""
471 raise NotImplementedError
475 """Returns new filename for output, accounting for rollover template and overwrite."""
476 result = self.args.WRITE
480 except Exception:
pass
481 if self.args.WRITE_OPTIONS.get(
"overwrite")
not in (
True,
"true"):
482 result = common.unique_path(result, empty_ok=
True)
487 """Returns output file metainfo string, with names and sizes and message/topic counts."""
488 if not self._counts:
return ""
489 SIZE_ERROR =
"error getting size"
491 mcount=common.plural(
"message", sum(self._counts.values())),
492 tcount=common.plural(
"topic", self._counts)
496 sizestr = SIZE_ERROR
if sz
is None else common.format_bytes(sz)
500 if props[
"size"]
is None:
501 try: props[
"size"] = os.path.getsize(path)
502 except Exception
as e:
503 ConsolePrinter.warn(
"Error getting size of %s: %s", path, e)
504 sizesum = sum(x[
"size"]
for x
in self.
_rollover_files.values()
if x[
"size"]
is not None)
507 size=common.format_bytes(sizesum)
508 ) + (
":" if self.args.VERBOSE
else ".")
509 for path, props
in self.
_rollover_files.items()
if self.args.VERBOSE
else ():
510 sizestr = SIZE_ERROR
if props[
"size"]
is None else common.format_bytes(props[
"size"])
512 mcount=common.plural(
"message", sum(props[
"counts"].values())),
513 tcount=common.plural(
"topic", props[
"counts"])
520 """Returns current file size in bytes, or None if size lookup failed."""
521 try:
return os.path.getsize(self.
filename)
522 except Exception
as e:
523 ConsolePrinter.warn(
"Error getting size of %s: %s", self.
filename, e)
529 """Returns command-line help texts for rollover options, as [(name, help)]."""
535 """Prints messages to console."""
537 META_LINE_TEMPLATE =
"{ll0}{sep} {line}{ll1}"
538 MESSAGE_SEP_TEMPLATE =
"{ll0}{sep}{ll1}"
539 PREFIX_TEMPLATE =
"{pfx0}{batch}{pfx1}{sep0}{sep}{sep1}"
540 MATCH_PREFIX_SEP =
":"
541 CONTEXT_PREFIX_SEP =
"-"
545 DEFAULT_ARGS = dict(COLOR=
True, EMIT_FIELD=(), NOEMIT_FIELD=(), HIGHLIGHT=
True, META=
False,
546 LINE_PREFIX=
True, MAX_FIELD_LINES=
None, START_LINE=
None,
547 END_LINE=
None, MAX_MESSAGE_LINES=
None, LINES_AROUND_MATCH=
None,
548 MATCHED_FIELDS_ONLY=
False, WRAP_WIDTH=
None, MATCH_WRAPPER=
None)
553 @param args arguments
as namespace
or dictionary, case-insensitive
554 @param args.color
False or "never" for not using colors
in replacements
555 @param args.highlight highlight matched values (default true)
556 @param args.meta whether to
print metainfo
557 @param args.emit_field message fields to emit
if not all
558 @param args.noemit_field message fields to skip
in output
559 @param args.line_prefix
print source prefix like bag filename on each message line
560 @param args.max_field_lines maximum number of lines to
print per field
561 @param args.start_line message line number to start output
from
562 @param args.end_line message line number to stop output at
563 @param args.max_message_lines maximum number of lines to output per message
564 @param args.lines_around_match number of message lines around matched fields to output
565 @param args.matched_fields_only output only the fields where match was found
566 @param args.wrap_width character width to wrap message YAML output at
567 @param args.match_wrapper string to wrap around matched values,
568 both sides
if one value, start
and end
if more than one,
569 or no wrapping
if zero values
570 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
572 args = common.ensure_namespace(args, ConsoleSink.DEFAULT_ARGS, **kwargs)
573 super(ConsoleSink, self).__init__(args)
574 TextSinkMixin.__init__(self, args)
578 """Prints source metainfo like bag header, if not already printed."""
580 batch = self.
args.META
and self.
source.get_batch()
585 for x
in meta.splitlines())
586 meta
and ConsolePrinter.print(meta)
589 def emit(self, topic, msg, stamp=None, match=None, index=None):
590 """Prints separator line and message text."""
594 if self.
args.LINE_PREFIX
and self.
source.get_batch():
596 kws = dict(self.
_styles, sep=sep, batch=self.
source.get_batch())
600 meta = self.
source.format_message_meta(topic, msg, stamp, index)
602 for x
in meta.splitlines())
603 meta
and ConsolePrinter.print(meta)
606 sep
and ConsolePrinter.print(sep)
607 ConsolePrinter.print(self.
format_message(match
or msg, highlight=bool(match)))
608 super(ConsoleSink, self).
emit(topic, msg, stamp, match, index)
612 """Returns True if sink is configured to highlight matched values."""
613 return bool(self.
args.HIGHLIGHT)
617 """Returns whether arguments environment set, populates options, emits error if not."""
619 if self.
args.WRAP_WIDTH
is None:
620 self.
args.WRAP_WIDTH = ConsolePrinter.WIDTH
621 self.
validvalid = Sink.validate(self)
and TextSinkMixin.validate(self)
627 """Writes messages to bagfile."""
630 DEFAULT_ARGS = dict(META=
False, WRITE_OPTIONS={}, VERBOSE=
False)
632 def __init__(self, args=None, **kwargs):
634 @param args arguments
as namespace
or dictionary, case-insensitive;
635 or a single path
as the ROS bagfile to write,
637 @param args.write name of ROS bagfile to create
or append to,
638 or a stream to write to
639 @param args.write_options {
"overwrite": whether to overwrite existing file
641 "rollover-size": bytes limit
for individual output files,
642 "rollover-count": message limit
for individual output files,
643 "rollover-duration": time span limit
for individual output files,
644 as ROS duration
or convertible seconds,
645 "rollover-template": output filename template, supporting
646 strftime format codes like
"%H-%M-%S"
647 and "%(index)s" as output file index}
648 @param args.meta whether to emit metainfo
649 @param args.verbose whether to emit debug information
650 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
654 args = {"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else \
655 {
"WRITE": args}
if common.is_stream(args)
else \
656 {}
if isinstance(args,
api.Bag)
else args
657 args = common.ensure_namespace(args, BagSink.DEFAULT_ARGS, **kwargs)
659 RolloverSinkMixin.__init__(self, args)
660 self.
_bag = args0
if isinstance(args0,
api.Bag)
else None
661 self.
_overwrite = (args.WRITE_OPTIONS.get(
"overwrite")
in (
"true",
True))
667 def emit(self, topic, msg, stamp=None, match=None, index=None):
668 """Writes message to output bagfile."""
671 if self.
_is_pathed: RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
673 topickey = api.TypeMeta.make(msg, topic).topickey
674 if self.
args.VERBOSE
and topickey
not in self.
_counts:
675 ConsolePrinter.debug(
"Adding topic %s in bag output.", topic)
677 qoses = self.
source.get_message_meta(topic, msg, stamp).get(
"qoses")
678 self.
_bag.write(topic, msg, stamp, qoses=qoses)
679 super(BagSink, self).
emit(topic, msg, stamp, match, index)
682 """Returns whether write options are valid and ROS environment set, emits error if not."""
684 result = Sink.validate(self)
685 if not RolloverSinkMixin.validate(self):
687 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
688 ConsolePrinter.error(
"Invalid overwrite option for bag: %r. "
689 "Choose one of {true, false}.",
690 self.
args.WRITE_OPTIONS[
"overwrite"])
694 ConsolePrinter.error(
"File not writable.")
696 if not self.
_bag and common.is_stream(self.
args.WRITE) \
697 and not any(c.STREAMABLE
for c
in api.Bag.WRITER_CLASSES):
698 ConsolePrinter.error(
"Bag format does not support writing streams.")
700 if self.
_bag and self.
_bag.mode
not in (
"a",
"w"):
701 ConsolePrinter.error(
"Bag not in write mode.")
707 """Closes output bag, if any, emits metainfo."""
712 super(BagSink, self).
close()
715 """Closes output bag, if any."""
721 """Returns current file size in bytes, or None if size lookup failed."""
723 if not self.
_bag or (self.
_bag.filename
and api.ROS1)
else self.
_bag.size
724 except Exception
as e:
728 def _ensure_open(self):
729 """Opens output file if not already open."""
730 if self.
_bag is not None:
736 if common.is_stream(self.
args.WRITE):
742 if not self.
_overwrite and os.path.isfile(filename)
and os.path.getsize(filename):
743 cls = api.Bag.autodetect(filename)
744 if cls
and "a" not in getattr(cls,
"MODES", (
"a", )):
746 if self.
args.VERBOSE:
747 ConsolePrinter.debug(
"Making unique filename %r, as %s does not support "
749 if self.
args.VERBOSE:
750 sz = os.path.isfile(filename)
and os.path.getsize(filename)
751 ConsolePrinter.debug(
"%s bag output %s%s.",
753 "Appending to" if sz
else "Creating",
754 filename, (
" (%s)" % common.format_bytes(sz))
if sz
else "")
755 common.makedirs(os.path.dirname(filename))
760 """Returns true if target is recognizable as a ROS bag."""
761 ext = os.path.splitext(target
or "")[-1].lower()
762 return ext
in api.BAG_EXTENSIONS
766 """Publishes messages to ROS topics."""
769 DEFAULT_ARGS = dict(LIVE=
False, META=
False, QUEUE_SIZE_OUT=10, PUBLISH_PREFIX=
"",
770 PUBLISH_SUFFIX=
"", PUBLISH_FIXNAME=
"", VERBOSE=
False)
772 def __init__(self, args=None, **kwargs):
774 @param args arguments
as namespace
or dictionary, case-insensitive
775 @param args.live whether reading messages
from live ROS topics
776 @param args.queue_size_out publisher queue size (default 10)
777 @param args.publish_prefix output topic prefix, prepended to input topic
778 @param args.publish_suffix output topic suffix, appended to output topic
779 @param args.publish_fixname single output topic name to publish to,
780 overrides prefix
and suffix
if given
781 @param args.meta whether to emit metainfo
782 @param args.verbose whether to emit debug information
783 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
785 args = common.ensure_namespace(args, LiveSink.DEFAULT_ARGS, **kwargs)
786 super(LiveSink, self).__init__(args)
790 def emit(self, topic, msg, stamp=None, match=None, index=None):
791 """Publishes message to output topic."""
794 with api.TypeMeta.make(msg, topic)
as m:
795 topickey, cls = (m.topickey, m.typeclass)
796 if topickey
not in self.
_pubs:
797 topic2 = self.
args.PUBLISH_PREFIX + topic + self.
args.PUBLISH_SUFFIX
798 topic2 = self.
args.PUBLISH_FIXNAME
or topic2
799 if self.
args.VERBOSE:
800 ConsolePrinter.debug(
"Publishing from %s to %s.", topic, topic2)
803 if self.
args.PUBLISH_FIXNAME:
804 pub = next((v
for (_, c), v
in self.
_pubs.items()
if c == cls),
None)
805 pub = pub
or api.create_publisher(topic2, cls, queue_size=self.
args.QUEUE_SIZE_OUT)
808 self.
_pubs[topickey].publish(msg)
810 super(LiveSink, self).
emit(topic, msg, stamp, match, index)
812 def bind(self, source):
813 """Attaches source to sink and blocks until connected to ROS."""
815 super(LiveSink, self).
bind(source)
820 Returns whether ROS environment is set
for publishing,
821 and output topic configuration
is valid, emits error
if not.
824 result = Sink.validate(self)
825 if not api.validate(live=
True):
828 if self.
args.LIVE
and not any((self.
args.PUBLISH_PREFIX, self.
args.PUBLISH_SUFFIX,
829 self.
args.PUBLISH_FIXNAME)):
830 ConsolePrinter.error(
"Need topic prefix or suffix or fixname "
831 "when republishing messages from live ROS topics.")
837 """Shuts down publishers."""
840 ConsolePrinter.debug(
"Published %s to %s.",
841 common.plural(
"message", sum(self.
_counts.values())),
842 common.plural(
"topic", self.
_pubs))
843 for k
in list(self.
_pubs):
844 try: self.
_pubs.pop(k).unregister()
845 except Exception
as e:
846 if self.
args.VERBOSE:
847 ConsolePrinter.warn(
"Error closing publisher on topic %r: %s", k[0], e)
848 super(LiveSink, self).
close()
852 """Provides messages to callback function."""
855 DEFAULT_ARGS = dict(EMIT=
None, METAEMIT=
None, HIGHLIGHT=
False)
857 def __init__(self, args=None, **kwargs):
859 @param args arguments
as namespace
or dictionary, case-insensitive;
861 @param args.emit callback(topic, msg, stamp, highlighted msg, index
in topic),
if any
862 @param args.metaemit callback(metadata dict)
if any, invoked before first emit
from source batch
863 @param args.highlight whether to expect highlighted matching fields
from source messages
864 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
866 if callable(args): args = common.ensure_namespace(
None, emit=args)
867 args = common.ensure_namespace(args, AppSink.DEFAULT_ARGS, **kwargs)
871 """Invokes registered metaemit callback, if any, and not already invoked."""
873 if not self.
source:
return
874 batch = self.
source.get_batch()
if self.
argsargs.METAEMIT
else None
879 def emit(self, topic, msg, stamp=None, match=None, index=None):
880 """Registers message and invokes registered emit callback, if any."""
883 super(AppSink, self).
emit(topic, msg, stamp, match, index)
887 """Returns whether emitted matches are highlighted."""
891 """Returns whether callbacks are valid, emits error if not."""
894 for key
in (
"EMIT",
"METAEMIT"):
895 if getattr(self.
argsargs, key)
and not callable(getattr(self.
argsargs, key)):
896 ConsolePrinter.error(
"Invalid callback for %s: %r", key, getattr(self.
argsargs, key))
902 """Combines any number of sinks."""
905 FLAG_CLASSES = {
"PUBLISH": LiveSink,
"CONSOLE": ConsoleSink,
"APP": AppSink}
908 FORMAT_CLASSES = {
"bag": BagSink}
912 Accepts more arguments, given to the real sinks constructed.
914 @param args arguments
as namespace
or dictionary, case-insensitive
915 @param args.console
print matches to console
916 @param args.write [[target, format=FORMAT, key=value, ], ]
917 @param args.publish publish matches to live topics
918 @param args.app provide messages to given callback function
919 @param sinks pre-created sinks, arguments will be ignored
920 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
922 args = common.ensure_namespace(args, **kwargs)
923 super(MultiSink, self).__init__(args)
928 if getattr(args, flag,
None)]
if not sinks
else list(sinks)
930 for dumpopts
in getattr(args,
"WRITE", [])
if not sinks
else ():
931 kwargs = dict(x.split(
"=", 1)
for x
in dumpopts[1:]
if isinstance(x, common.TEXT_TYPES))
932 kwargs.update(kv
for x
in dumpopts[1:]
if isinstance(x, dict)
for kv
in x.items())
933 target, cls = dumpopts[0], self.
FORMAT_CLASSES.get(kwargs.pop(
"format",
None))
936 key=
lambda x: x
is BagSink)
937 if callable(getattr(c,
"autodetect",
None))
938 and c.autodetect(target)),
None)
940 ConsolePrinter.error(
'Unknown output format in "%s"' %
" ".join(map(str, dumpopts)))
943 clsargs = common.structcopy(args)
944 clsargs.WRITE, clsargs.WRITE_OPTIONS = target, kwargs
945 self.
sinks += [cls(clsargs)]
948 """Outputs source metainfo in one sink, if not already emitted."""
950 sink = next((s
for s
in self.
sinks if isinstance(s, ConsoleSink)),
None)
952 sink = sink
or self.
sinks[0]
if self.
sinks else None
953 sink
and sink.emit_meta()
955 def emit(self, topic, msg, stamp=None, match=None, index=None):
956 """Outputs ROS message to all sinks."""
959 for sink
in self.
sinks:
960 sink.emit(topic, msg, stamp, match, index)
961 super(MultiSink, self).
emit(topic, msg, stamp, match, index)
963 def bind(self, source):
964 """Attaches source to all sinks, sets thread_excepthook on all sinks."""
965 super(MultiSink, self).
bind(source)
966 for sink
in self.
sinks:
970 def configure(self, args=None, **kwargs):
972 Updates sinks configuration.
974 @param args arguments
as namespace
or dictionary, case-insensitive
975 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
977 args = common.ensure_namespace(args, **kwargs)
978 hasattr(args, "WRITE")
and delattr(args,
"WRITE")
979 for sink
in self.
sinks: sink.configure(args, **kwargs)
982 """Returns whether prerequisites are met for all sinks."""
984 ConsolePrinter.error(
"No output configured.")
988 """Closes all sinks."""
989 for sink
in self.
sinks:
993 """Flushes all sinks."""
994 for sink
in self.
sinks:
998 """Returns whether any sink requires highlighted matches."""
999 return any(s.is_highlighting()
for s
in self.
sinks)
1003 "AppSink",
"BagSink",
"ConsoleSink",
"LiveSink",
"MultiSink",
"RolloverSinkMixin",
"Sink",
TextWrapper that supports custom substring widths in line width calculation.
Provides messages to callback function.
__init__(self, args=None, **kwargs)
emit(self, topic, msg, stamp=None, match=None, index=None)
Registers message and invokes registered emit callback, if any.
is_highlighting(self)
Returns whether emitted matches are highlighted.
validate(self)
Returns whether callbacks are valid, emits error if not.
emit_meta(self)
Invokes registered metaemit callback, if any, and not already invoked.
Writes messages to bagfile.
__init__(self, args=None, **kwargs)
autodetect(cls, target)
Returns true if target is recognizable as a ROS bag.
close_output(self)
Closes output bag, if any.
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes message to output bagfile.
validate(self)
Returns whether write options are valid and ROS environment set, emits error if not.
close(self)
Closes output bag, if any, emits metainfo.
size
Returns current file size in bytes, or None if size lookup failed.
Prints messages to console.
__init__(self, args=None, **kwargs)
emit(self, topic, msg, stamp=None, match=None, index=None)
Prints separator line and message text.
is_highlighting(self)
Returns True if sink is configured to highlight matched values.
validate(self)
Returns whether arguments environment set, populates options, emits error if not.
emit_meta(self)
Prints source metainfo like bag header, if not already printed.
Publishes messages to ROS topics.
__init__(self, args=None, **kwargs)
emit(self, topic, msg, stamp=None, match=None, index=None)
Publishes message to output topic.
validate(self)
Returns whether ROS environment is set for publishing, and output topic configuration is valid,...
close(self)
Shuts down publishers.
bind(self, source)
Attaches source to sink and blocks until connected to ROS.
Combines any number of sinks.
configure(self, args=None, **kwargs)
Updates sinks configuration.
dict FORMAT_CLASSES
Autobinding between –write TARGET format=FORMAT and sink classes.
emit(self, topic, msg, stamp=None, match=None, index=None)
Outputs ROS message to all sinks.
sinks
List of all combined sinks.
is_highlighting(self)
Returns whether any sink requires highlighted matches.
flush(self)
Flushes all sinks.
validate(self)
Returns whether prerequisites are met for all sinks.
close(self)
Closes all sinks.
__init__(self, args=None, sinks=(), **kwargs)
Accepts more arguments, given to the real sinks constructed.
dict FLAG_CLASSES
Autobinding between argument flags and sink classes.
bind(self, source)
Attaches source to all sinks, sets thread_excepthook on all sinks.
emit_meta(self)
Outputs source metainfo in one sink, if not already emitted.
Provides output file rollover by size, duration, or message count.
__init__(self, args=None, **kwargs)
filename
Current output file path.
close_output(self)
Closes output file, if any.
validate(self)
Returns whether write options are valid, emits error if not, else populates options.
size
Returns current file size in bytes, or None if size lookup failed.
ensure_rollover(self, topic, msg, stamp)
Closes current output file and prepares new filename if rollover limit reached.
format_output_meta(self)
Returns output file metainfo string, with names and sizes and message/topic counts.
get_write_options(cls, label)
Returns command-line help texts for rollover options, as [(name, help)].
list OPTIONS_TEMPLATES
Command-line help templates for rollover options, as [(name, text with s label placeholder)].
make_filename(self)
Returns new filename for output, accounting for rollover template and overwrite.
configure(self, args=None, **kwargs)
Updates sink configuration.
thread_excepthook(self, text, exc)
Handles exception, used by background threads.
source
inputs.Source instance bound to this sink
__init__(self, args=None, **kwargs)
autodetect(cls, target)
Returns true if target is recognizable as output for this sink class.
tuple FILE_EXTENSIONS
Auto-detection file extensions for subclasses, as (".ext", )
valid
Result of validate()
emit(self, topic, msg, stamp=None, match=None, index=None)
Outputs ROS message.
__exit__(self, exc_type, exc_value, traceback)
Context manager exit, closes sink.
is_highlighting(self)
Returns whether this sink requires highlighted matches.
flush(self)
Writes out any pending data to disk.
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
__enter__(self)
Context manager entry.
close(self)
Shuts down output, closing any files or connections.
bind(self, source)
Attaches source to sink.
emit_meta(self)
Outputs source metainfo like bag header as debug stream, if not already emitted.
Provides message formatting as text.
__init__(self, args=None, **kwargs)
format_message(self, msg, highlight=False)
Returns message as formatted string, optionally highlighted for matches if configured.
str NOCOLOR_HIGHLIGHT_WRAPPERS
Default highlight wrappers if not color output.
validate(self)
Returns whether arguments are valid, emits error if not, else populates options.
message_to_yaml(self, val, top=(), typename=None)
Returns ROS message or other value as YAML.