3Main outputs for emitting messages.
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
15from __future__
import print_function
28from . common
import ArgumentUtil, ConsolePrinter, MatchMarkers
29from . inputs
import Source
33 """Output base class."""
39 DEFAULT_ARGS = dict(META=
False)
43 @param args arguments as namespace or dictionary, case-insensitive
44 @param args.meta whether to emit metainfo
45 @param kwargs any and all arguments as keyword overrides, case-insensitive
50 self.
args = common.ensure_namespace(args, Sink.DEFAULT_ARGS, **kwargs)
57 """Context manager entry."""
60 def __exit__(self, exc_type, exc_value, traceback):
61 """Context manager exit, closes sink."""
65 """Outputs source metainfo like bag header as debug stream, if not already emitted."""
66 batch = self.
args.META
and self.
source.get_batch()
69 meta
and ConsolePrinter.debug(meta)
71 def emit(self, topic, msg, stamp=None, match=None, index=None):
75 @param topic full name of ROS topic the message is from
76 @param msg ROS message
77 @param stamp message ROS timestamp, if not current ROS time
78 @param match ROS message with values tagged with match markers if matched, else None
79 @param index message index in topic, if any
81 topickey = api.TypeMeta.make(msg, topic).topickey
85 """Attaches source to sink."""
90 Updates sink configuration.
92 @param args arguments as namespace or dictionary, case-insensitive
93 @param kwargs any and all arguments as keyword overrides, case-insensitive
95 self.
args = common.ensure_namespace(args, vars(self.
args), **kwargs)
99 """Returns whether sink prerequisites are met (like ROS environment set if LiveSink)."""
101 try: self.
args, self.
valid = ArgumentUtil.validate(self.
args),
True
102 except Exception: self.
valid =
False
106 """Shuts down output, closing any files or connections."""
111 """Writes out any pending data to disk."""
114 """Handles exception, used by background threads."""
115 ConsolePrinter.error(text)
118 """Returns whether this sink requires highlighted matches."""
123 """Returns true if target is recognizable as output for this sink class."""
124 ext = os.path.splitext(target
or "")[-1].lower()
127 def _ensure_stamp_index(self, topic, msg, stamp=None, index=None):
128 """Returns (stamp, index) populated with current ROS time and topic index if `None`."""
129 if stamp
is None: stamp = api.get_rostime(fallback=
True)
130 if index
is None: index = self.
_counts.get(api.TypeMeta.make(msg, topic).topickey, 0) + 1
135 """Provides message formatting as text."""
138 NOCOLOR_HIGHLIGHT_WRAPPERS =
"**",
"**"
141 DEFAULT_ARGS = dict(COLOR=
True, EMIT_FIELD=(), NOEMIT_FIELD=(), HIGHLIGHT=
True,
142 MAX_FIELD_LINES=
None, START_LINE=
None, END_LINE=
None,
143 MAX_MESSAGE_LINES=
None, LINES_AROUND_MATCH=
None, MATCHED_FIELDS_ONLY=
False,
144 WRAP_WIDTH=
None, MATCH_WRAPPER=
None)
148 @param args arguments as namespace or dictionary, case-insensitive
149 @param args.color False or "never" for not using colors in replacements
150 @param args.highlight highlight matched values (default true)
151 @param args.emit_field message fields to emit if not all
152 @param args.noemit_field message fields to skip in output
153 @param args.max_field_lines maximum number of lines to output per field
154 @param args.start_line message line number to start output from
155 @param args.end_line message line number to stop output at
156 @param args.max_message_lines maximum number of lines to output per message
157 @param args.lines_around_match number of message lines around matched fields to output
158 @param args.matched_fields_only output only the fields where match was found
159 @param args.wrap_width character width to wrap message YAML output at
160 @param args.match_wrapper string to wrap around matched values,
161 both sides if one value, start and end if more than one,
162 or no wrapping if zero values
163 @param kwargs any and all arguments as keyword overrides, case-insensitive
169 self.
_styles = collections.defaultdict(str)
173 """Returns whether arguments are valid, emits error if not, else populates options."""
174 args = common.ensure_namespace(self.args, TextSinkMixin.DEFAULT_ARGS)
175 try: args = ArgumentUtil.validate(args)
176 except Exception:
return False
182 """Returns message as formatted string, optionally highlighted for matches if configured."""
183 if self.args.MAX_MESSAGE_LINES == 0:
return ""
186 highlight = highlight
and self.args.HIGHLIGHT
187 if self.
_prefix or self.args.START_LINE
or self.args.END_LINE \
188 or self.args.MAX_MESSAGE_LINES
or (self.args.LINES_AROUND_MATCH
and highlight):
189 lines = text.splitlines()
191 if self.args.START_LINE
or self.args.END_LINE
or self.args.MAX_MESSAGE_LINES:
192 start = self.args.START_LINE
or 0
193 start = max(start, -len(lines)) - (start > 0)
194 end = self.args.END_LINE
or len(lines)
195 end = max(end, -len(lines)) - (end > 0)
196 if self.args.MAX_MESSAGE_LINES: end = min(end, start + self.args.MAX_MESSAGE_LINES)
197 lines = lines[start:end + 1]
198 lines = lines
and (lines[:-1] + [lines[-1] + self.
_styles[
"rst"]])
200 if self.args.LINES_AROUND_MATCH
and highlight:
201 spans, NUM = [], self.args.LINES_AROUND_MATCH
202 for i, l
in enumerate(lines):
203 if MatchMarkers.START
in l:
204 spans.append([max(0, i - NUM), min(i + NUM + 1, len(lines))])
205 if MatchMarkers.END
in l
and spans:
206 spans[-1][1] = min(i + NUM + 1, len(lines))
207 lines = sum((lines[a:b - 1] + [lines[b - 1] + self.
_styles[
"rst"]]
208 for a, b
in common.merge_spans(spans)), [])
211 lines = [self.
_prefix + l
for l
in lines]
213 text =
"\n".join(lines)
216 text = re.sub(
r"(%s)\1+" % re.escape(a),
r"\1", text)
217 text = text.replace(a, b)
223 """Returns ROS message or other value as YAML."""
225 unquote =
lambda v: v[1:-1]
if v[:1] == v[-1:] ==
'"' else v
227 def retag_match_lines(lines):
228 """Adds match tags to lines where wrapping separated start and end."""
230 for i, l
in enumerate(lines):
231 startpos0, endpos0 = l.find (MatchMarkers.START), l.find (MatchMarkers.END)
232 startpos1, endpos1 = l.rfind(MatchMarkers.START), l.rfind(MatchMarkers.END)
233 if endpos0 >= 0
and (startpos0 < 0
or startpos0 > endpos0):
234 lines[i] = l = re.sub(
r"^(\s*)",
r"\1" + MatchMarkers.START, l)
235 if startpos1 >= 0
and endpos1 < startpos1
and i + 1 < len(lines):
236 lines[i + 1] = re.sub(
r"^(\s*)",
r"\1" + MatchMarkers.START, lines[i + 1])
237 if startpos1 >= 0
and startpos1 > endpos1:
238 CUT, EXTRA = (-len(PH), PH)
if PH
and l.endswith(PH)
else (len(l),
"")
239 lines[i] = l[:CUT] + MatchMarkers.END + EXTRA
243 """Returns text or list/tuple truncated to length used in final output."""
244 if self.args.LINES_AROUND_MATCH \
245 or (
not self.args.MAX_MESSAGE_LINES
and (self.args.END_LINE
or 0) <= 0):
return v
247 MAX_CHAR_LEN = 1 + len(MatchMarkers.START) + len(MatchMarkers.END)
249 if isinstance(v, (list, tuple)): textlen = bytelen = 2 + len(v) * (2 + MAX_CHAR_LEN)
250 else: textlen, bytelen = self.
_wrapper.strlen(v), len(v)
251 if textlen < 10000:
return v
256 MIN_CHARS_PER_LINE = self.
_wrapper.width
257 if MAX_CHAR_LEN != 1:
258 MIN_CHARS_PER_LINE = self.
_wrapper.width // MAX_CHAR_LEN * 2
259 MAX_LINES = self.args.MAX_MESSAGE_LINES
or self.args.END_LINE
260 MAX_CHARS = MAX_LEN = MAX_LINES * MIN_CHARS_PER_LINE * self.
_wrapper.width + 100
261 if bytelen > MAX_CHARS:
262 if isinstance(v, (list, tuple)): MAX_LEN = MAX_CHARS // 3
266 indent =
" " * len(top)
267 if isinstance(val, six.integer_types + (float, bool)):
269 if isinstance(val, common.TEXT_TYPES):
270 if val
in (
"", MatchMarkers.EMPTY):
271 return MatchMarkers.EMPTY_REPL
if val
else "''"
273 return yaml.safe_dump(truncate(val), default_style=
'"', width=sys.maxsize).rstrip(
"\n")
274 if isinstance(val, (list, tuple)):
277 if api.scalar(typename)
in api.ROS_STRING_TYPES:
278 yaml_str = yaml.safe_dump(truncate(val)).rstrip(
'\n')
279 return "\n" +
"\n".join(indent + line
for line
in yaml_str.splitlines())
280 vals = [x
for i, v
in enumerate(truncate(val))
282 if api.scalar(typename)
in api.ROS_NUMERIC_TYPES:
283 return "[%s]" %
", ".join(unquote(str(v))
for v
in vals)
284 return (
"\n" +
"\n".join(indent +
"- " + v
for v
in vals))
if vals
else ""
285 if api.is_ros_message(val):
286 MATCHED_ONLY = self.args.MATCHED_FIELDS_ONLY
and not self.args.LINES_AROUND_MATCH
287 vals, fieldmap = [], api.get_message_fields(val)
289 fieldmap = api.filter_fields(fieldmap, top, include=prints, exclude=noprints)
290 for k, t
in fieldmap.items():
291 v = self.
message_to_yaml(api.get_message_value(val, k, t), top + (k, ), t)
292 if not v
or MATCHED_ONLY
and MatchMarkers.START
not in v:
295 if t
not in api.ROS_STRING_TYPES: v = unquote(v)
296 if api.scalar(t)
in api.ROS_BUILTIN_TYPES:
297 is_strlist = t.endswith(
"]")
and api.scalar(t)
in api.ROS_STRING_TYPES
298 is_num = api.scalar(t)
in api.ROS_NUMERIC_TYPES
299 extra_indent = indent
if is_strlist
else " " * len(indent + k +
": ")
301 self.
_wrapper.drop_whitespace = t.endswith(
"]")
and not is_strlist
302 self.
_wrapper.break_long_words =
not is_num
303 v = (
"\n" + extra_indent).join(retag_match_lines(self.
_wrapper.wrap(v)))
304 if is_strlist
and self.
_wrapper.strip(v) !=
"[]": v =
"\n" + v
305 vals.append(
"%s%s: %s" % (indent, k, api.format_message_value(val, k, v)))
306 return (
"\n" if indent
and vals
else "") +
"\n".join(vals)
311 def _configure(self, args):
312 """Initializes output settings."""
315 self.
_styles.default_factory = str
316 prints, noprints = args.EMIT_FIELD, args.NOEMIT_FIELD
317 for key, vals
in [(
"print", prints), (
"noprint", noprints)]:
318 self.
_patterns[key] = [(tuple(v.split(
".")), common.path_to_regex(v))
for v
in vals]
320 if args.COLOR
not in (
"never",
False):
321 self.
_styles.update({
"hl0": ConsolePrinter.STYLE_HIGHLIGHT
if self.args.HIGHLIGHT
323 "ll0": ConsolePrinter.STYLE_LOWLIGHT,
324 "pfx0": ConsolePrinter.STYLE_SPECIAL,
325 "sep0": ConsolePrinter.STYLE_SPECIAL2})
326 self.
_styles.default_factory =
lambda: ConsolePrinter.STYLE_RESET
328 WRAPS = args.MATCH_WRAPPER
if self.args.HIGHLIGHT
else ""
330 WRAPS = ((WRAPS
or [
""]) * 2)[:2]
334 custom_widths = {MatchMarkers.START: len(WRAPS[0]), MatchMarkers.END: len(WRAPS[1]),
338 wrapargs = dict(max_lines=args.MAX_FIELD_LINES,
339 placeholder=
"%s ...%s" % (self.
_styles[
"ll0"], self.
_styles[
"ll1"]))
340 if args.WRAP_WIDTH
is not None: wrapargs.update(width=args.WRAP_WIDTH)
343 MatchMarkers.END: self.
_styles[
"hl1"]}
348 """Provides output file rollover by size, duration, or message count."""
351 DEFAULT_ARGS = dict(WRITE=
None, WRITE_OPTIONS={}, VERBOSE=
False)
354 OPTIONS_TEMPLATES = [
355 (
"rollover-size=NUM",
"size limit for individual files\nin {label} output\n"
356 "as bytes (supports abbreviations like 1K or 2M or 3G)"),
357 (
"rollover-count=NUM",
"message limit for individual files\nin {label} output\n"
358 "(supports abbreviations like 1K or 2M or 3G)"),
359 (
"rollover-duration=INTERVAL",
"message time span limit for individual files\n"
360 "in {label} output\n"
361 "as seconds (supports abbreviations like 60m or 2h or 1d)"),
362 (
"rollover-template=STR",
"output filename template for individual files\n"
363 "in {label} output,\n"
364 'supporting strftime format codes like "%%H-%%M-%%S"\n'
365 'and "%%(index)s" as output file index'),
368 START_META_TEMPLATE =
"{mcount} in {tcount} to "
370 FILE_META_TEMPLATE =
"{name} ({size})"
372 MULTI_META_TEMPLATE =
"\n- {name} ({size}, {mcount}, {tcount})"
377 @param args arguments as namespace or dictionary, case-insensitive
378 @param args.write base name of output file to write if not using rollover-template
379 @param args.write_options {"rollover-size": bytes limit for individual output files,
380 "rollover-count": message limit for individual output files,
381 "rollover-duration": time span limit for individual output files,
382 as ROS duration or convertible seconds,
383 "rollover-template": output filename template, supporting
384 strftime format codes like "%H-%M-%S"
385 and "%(index)s" as output file index,
386 "overwrite": whether to overwrite existing file
388 @param args.verbose whether to emit debug information
389 @param kwargs any and all arguments as keyword overrides, case-insensitive
400 """Returns whether write options are valid, emits error if not, else populates options."""
404 for k
in (
"size",
"count",
"duration"):
405 value = value0 = self.args.WRITE_OPTIONS.get(
"rollover-%s" % k)
406 if value
is None:
continue
407 SUFFIXES = dict(zip(
"smhd", [1, 60, 3600, 24*3600]))
if "duration" == k
else \
408 dict(zip(
"KMGT", [2**10, 2**20, 2**30, 2**40]))
if "size" == k
else \
409 dict(zip(
"KMGT", [10**3, 10**6, 10**9, 10**12]))
411 if isinstance(value, (six.binary_type, six.text_type)):
412 value = common.parse_number(value, SUFFIXES)
413 value = (api.to_duration
if "duration" == k
else int)(value)
414 except Exception:
pass
415 if (value
is None or value < 0)
if "duration" != k \
416 else (k != api.get_ros_time_category(value)
or api.to_sec(value) < 0):
417 ConsolePrinter.error(
"Invalid rollover %s option: %r. "
418 "Value must be a non-negative %s.", k, value0, k)
422 if self.args.WRITE_OPTIONS.get(
"rollover-template"):
423 value = self.args.WRITE_OPTIONS[
"rollover-template"]
424 value = re.sub(
r"(^|[^%])%\(index\)",
r"\1%%(index)", value)
425 try: datetime.datetime.now().strftime(value)
427 ConsolePrinter.error(
"Invalid rollover template option: %r. "
428 "Value must contain valid strftime codes.", value)
434 ConsolePrinter.warn(
"Ignoring rollover template option: "
435 "no rollover limits given.")
441 Closes current output file and prepares new filename if rollover limit reached.
447 stamp = api.time_message(stamp, to_message=
False)
450 props[
"size"] = self.
size
453 do_rollover = (sum(props[
"counts"].values()) >= self.
_rollover_limits[
"count"])
455 stamps = [stamp, props[
"start"]]
456 do_rollover = (max(stamps) - min(stamps) >= self.
_rollover_limits[
"duration"])
459 props[
"size"] = self.
size
463 topickey = api.TypeMeta.make(msg, topic).topickey
464 if not props: props.update({
"counts": {},
"start": stamp,
"size":
None})
465 props[
"start"] = min((props[
"start"], stamp))
466 props[
"counts"][topickey] = props[
"counts"].get(topickey, 0) + 1
470 """Closes output file, if any."""
471 raise NotImplementedError
475 """Returns new filename for output, accounting for rollover template and overwrite."""
476 result = self.args.WRITE
480 except Exception:
pass
481 if self.args.WRITE_OPTIONS.get(
"overwrite")
not in (
True,
"true"):
482 result = common.unique_path(result, empty_ok=
True)
487 """Returns output file metainfo string, with names and sizes and message/topic counts."""
488 if not self._counts:
return ""
489 SIZE_ERROR =
"error getting size"
491 mcount=common.plural(
"message", sum(self._counts.values())),
492 tcount=common.plural(
"topic", self._counts)
496 sizestr = SIZE_ERROR
if sz
is None else common.format_bytes(sz)
500 if props[
"size"]
is None:
501 try: props[
"size"] = os.path.getsize(path)
502 except Exception
as e:
503 ConsolePrinter.warn(
"Error getting size of %s: %s", path, e)
504 sizesum = sum(x[
"size"]
for x
in self.
_rollover_files.values()
if x[
"size"]
is not None)
507 size=common.format_bytes(sizesum)
508 ) + (
":" if self.args.VERBOSE
else ".")
509 for path, props
in self.
_rollover_files.items()
if self.args.VERBOSE
else ():
510 sizestr = SIZE_ERROR
if props[
"size"]
is None else common.format_bytes(props[
"size"])
512 mcount=common.plural(
"message", sum(props[
"counts"].values())),
513 tcount=common.plural(
"topic", props[
"counts"])
520 """Returns current file size in bytes, or None if size lookup failed."""
521 try:
return os.path.getsize(self.
filename)
522 except Exception
as e:
523 ConsolePrinter.warn(
"Error getting size of %s: %s", self.
filename, e)
529 """Returns command-line help texts for rollover options, as [(name, help)]."""
535 """Prints messages to console."""
537 META_LINE_TEMPLATE =
"{ll0}{sep} {line}{ll1}"
538 MESSAGE_SEP_TEMPLATE =
"{ll0}{sep}{ll1}"
539 PREFIX_TEMPLATE =
"{pfx0}{batch}{pfx1}{sep0}{sep}{sep1}"
540 MATCH_PREFIX_SEP =
":"
541 CONTEXT_PREFIX_SEP =
"-"
545 DEFAULT_ARGS = dict(COLOR=
True, EMIT_FIELD=(), NOEMIT_FIELD=(), HIGHLIGHT=
True, META=
False,
546 LINE_PREFIX=
True, MAX_FIELD_LINES=
None, START_LINE=
None,
547 END_LINE=
None, MAX_MESSAGE_LINES=
None, LINES_AROUND_MATCH=
None,
548 MATCHED_FIELDS_ONLY=
False, WRAP_WIDTH=
None, MATCH_WRAPPER=
None)
553 @param args arguments as namespace or dictionary, case-insensitive
554 @param args.color False or "never" for not using colors in replacements
555 @param args.highlight highlight matched values (default true)
556 @param args.meta whether to print metainfo
557 @param args.emit_field message fields to emit if not all
558 @param args.noemit_field message fields to skip in output
559 @param args.line_prefix print source prefix like bag filename on each message line
560 @param args.max_field_lines maximum number of lines to print per field
561 @param args.start_line message line number to start output from
562 @param args.end_line message line number to stop output at
563 @param args.max_message_lines maximum number of lines to output per message
564 @param args.lines_around_match number of message lines around matched fields to output
565 @param args.matched_fields_only output only the fields where match was found
566 @param args.wrap_width character width to wrap message YAML output at
567 @param args.match_wrapper string to wrap around matched values,
568 both sides if one value, start and end if more than one,
569 or no wrapping if zero values
570 @param kwargs any and all arguments as keyword overrides, case-insensitive
572 args = common.ensure_namespace(args, ConsoleSink.DEFAULT_ARGS, **kwargs)
573 super(ConsoleSink, self).
__init__(args)
574 TextSinkMixin.__init__(self, args)
578 """Prints source metainfo like bag header, if not already printed."""
580 batch = self.
args.META
and self.
source.get_batch()
585 for x
in meta.splitlines())
586 meta
and ConsolePrinter.print(meta)
589 def emit(self, topic, msg, stamp=None, match=None, index=None):
590 """Prints separator line and message text."""
594 if self.
args.LINE_PREFIX
and self.
source.get_batch():
596 kws = dict(self.
_styles, sep=sep, batch=self.
source.get_batch())
600 meta = self.
source.format_message_meta(topic, msg, stamp, index)
602 for x
in meta.splitlines())
603 meta
and ConsolePrinter.print(meta)
606 sep
and ConsolePrinter.print(sep)
607 ConsolePrinter.print(self.
format_message(match
or msg, highlight=bool(match)))
608 super(ConsoleSink, self).
emit(topic, msg, stamp, match, index)
612 """Returns True if sink is configured to highlight matched values."""
613 return bool(self.
args.HIGHLIGHT)
617 """Returns whether arguments environment set, populates options, emits error if not."""
619 if self.
args.WRAP_WIDTH
is None:
620 self.
args.WRAP_WIDTH = ConsolePrinter.WIDTH
621 self.
validvalid = Sink.validate(self)
and TextSinkMixin.validate(self)
627 """Writes messages to bagfile."""
630 DEFAULT_ARGS = dict(META=
False, WRITE_OPTIONS={}, VERBOSE=
False)
634 @param args arguments as namespace or dictionary, case-insensitive;
635 or a single path as the ROS bagfile to write,
636 or a stream or {@link grepros.api.Bag Bag} instance to write to
637 @param args.write name of ROS bagfile to create or append to,
638 or a stream to write to
639 @param args.write_options {"overwrite": whether to overwrite existing file
641 "rollover-size": bytes limit for individual output files,
642 "rollover-count": message limit for individual output files,
643 "rollover-duration": time span limit for individual output files,
644 as ROS duration or convertible seconds,
645 "rollover-template": output filename template, supporting
646 strftime format codes like "%H-%M-%S"
647 and "%(index)s" as output file index}
648 @param args.meta whether to emit metainfo
649 @param args.verbose whether to emit debug information
650 @param kwargs any and all arguments as keyword overrides, case-insensitive
654 args = {
"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else \
655 {
"WRITE": args}
if common.is_stream(args)
else \
656 {}
if isinstance(args,
api.Bag)
else args
657 args = common.ensure_namespace(args, BagSink.DEFAULT_ARGS, **kwargs)
659 RolloverSinkMixin.__init__(self, args)
660 self.
_bag = args0
if isinstance(args0,
api.Bag)
else None
661 self.
_overwrite = (args.WRITE_OPTIONS.get(
"overwrite")
in (
"true",
True))
667 def emit(self, topic, msg, stamp=None, match=None, index=None):
668 """Writes message to output bagfile."""
671 if self.
_is_pathed: RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
673 topickey = api.TypeMeta.make(msg, topic).topickey
674 if self.
args.VERBOSE
and topickey
not in self.
_counts:
675 ConsolePrinter.debug(
"Adding topic %s in bag output.", topic)
677 qoses = self.
source.get_message_meta(topic, msg, stamp).get(
"qoses")
678 self.
_bag.write(topic, msg, stamp, qoses=qoses)
679 super(BagSink, self).
emit(topic, msg, stamp, match, index)
682 """Returns whether write options are valid and ROS environment set, emits error if not."""
684 result = Sink.validate(self)
685 if not RolloverSinkMixin.validate(self):
687 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
688 ConsolePrinter.error(
"Invalid overwrite option for bag: %r. "
689 "Choose one of {true, false}.",
690 self.
args.WRITE_OPTIONS[
"overwrite"])
694 ConsolePrinter.error(
"File not writable.")
696 if not self.
_bag and common.is_stream(self.
args.WRITE) \
697 and not any(c.STREAMABLE
for c
in api.Bag.WRITER_CLASSES):
698 ConsolePrinter.error(
"Bag format does not support writing streams.")
700 if self.
_bag and self.
_bag.mode
not in (
"a",
"w"):
701 ConsolePrinter.error(
"Bag not in write mode.")
707 """Closes output bag, if any, emits metainfo."""
712 super(BagSink, self).
close()
715 """Closes output bag, if any."""
721 """Returns current file size in bytes, or None if size lookup failed."""
723 if not self.
_bag or (self.
_bag.filename
and api.ROS1)
else self.
_bag.size
724 except Exception
as e:
725 ConsolePrinter.warn(
"Error getting size of %s: %s", self.
filenamefilename, e)
728 def _ensure_open(self):
729 """Opens output file if not already open."""
730 if self.
_bag is not None:
736 if common.is_stream(self.
args.WRITE):
742 if not self.
_overwrite and os.path.isfile(filename)
and os.path.getsize(filename):
743 cls = api.Bag.autodetect(filename)
744 if cls
and "a" not in getattr(cls,
"MODES", (
"a", )):
746 if self.
args.VERBOSE:
747 ConsolePrinter.debug(
"Making unique filename %r, as %s does not support "
749 if self.
args.VERBOSE:
750 sz = os.path.isfile(filename)
and os.path.getsize(filename)
751 ConsolePrinter.debug(
"%s bag output %s%s.",
753 "Appending to" if sz
else "Creating",
754 filename, (
" (%s)" % common.format_bytes(sz))
if sz
else "")
755 common.makedirs(os.path.dirname(filename))
760 """Returns true if target is recognizable as a ROS bag."""
761 ext = os.path.splitext(target
or "")[-1].lower()
762 return ext
in api.BAG_EXTENSIONS
766 """Publishes messages to ROS topics."""
769 DEFAULT_ARGS = dict(LIVE=
False, META=
False, QUEUE_SIZE_OUT=10, PUBLISH_PREFIX=
"",
770 PUBLISH_SUFFIX=
"", PUBLISH_FIXNAME=
"", VERBOSE=
False)
774 @param args arguments as namespace or dictionary, case-insensitive
775 @param args.live whether reading messages from live ROS topics
776 @param args.queue_size_out publisher queue size (default 10)
777 @param args.publish_prefix output topic prefix, prepended to input topic
778 @param args.publish_suffix output topic suffix, appended to output topic
779 @param args.publish_fixname single output topic name to publish to,
780 overrides prefix and suffix if given
781 @param args.meta whether to emit metainfo
782 @param args.verbose whether to emit debug information
783 @param kwargs any and all arguments as keyword overrides, case-insensitive
785 args = common.ensure_namespace(args, LiveSink.DEFAULT_ARGS, **kwargs)
786 super(LiveSink, self).
__init__(args)
790 def emit(self, topic, msg, stamp=None, match=None, index=None):
791 """Publishes message to output topic."""
794 with api.TypeMeta.make(msg, topic)
as m:
795 topickey, cls = (m.topickey, m.typeclass)
796 if topickey
not in self.
_pubs:
797 topic2 = self.
args.PUBLISH_PREFIX + topic + self.
args.PUBLISH_SUFFIX
798 topic2 = self.
args.PUBLISH_FIXNAME
or topic2
799 if self.
args.VERBOSE:
800 ConsolePrinter.debug(
"Publishing from %s to %s.", topic, topic2)
803 if self.
args.PUBLISH_FIXNAME:
804 pub = next((v
for (_, c), v
in self.
_pubs.items()
if c == cls),
None)
805 pub = pub
or api.create_publisher(topic2, cls, queue_size=self.
args.QUEUE_SIZE_OUT)
806 self.
_pubs[topickey] = pub
808 self.
_pubs[topickey].publish(msg)
810 super(LiveSink, self).
emit(topic, msg, stamp, match, index)
813 """Attaches source to sink and blocks until connected to ROS."""
815 super(LiveSink, self).
bind(source)
820 Returns whether ROS environment is set for publishing,
821 and output topic configuration is valid, emits error if not.
824 result = Sink.validate(self)
825 if not api.validate(live=
True):
828 if self.
args.LIVE
and not any((self.
args.PUBLISH_PREFIX, self.
args.PUBLISH_SUFFIX,
829 self.
args.PUBLISH_FIXNAME)):
830 ConsolePrinter.error(
"Need topic prefix or suffix or fixname "
831 "when republishing messages from live ROS topics.")
837 """Shuts down publishers."""
840 ConsolePrinter.debug(
"Published %s to %s.",
841 common.plural(
"message", sum(self.
_counts.values())),
842 common.plural(
"topic", self.
_pubs))
843 for k
in list(self.
_pubs):
844 try: self.
_pubs.pop(k).unregister()
845 except Exception
as e:
846 if self.
args.VERBOSE:
847 ConsolePrinter.warn(
"Error closing publisher on topic %r: %s", k[0], e)
848 super(LiveSink, self).
close()
852 """Provides messages to callback function."""
855 DEFAULT_ARGS = dict(EMIT=
None, METAEMIT=
None, HIGHLIGHT=
False)
859 @param args arguments as namespace or dictionary, case-insensitive;
861 @param args.emit callback(topic, msg, stamp, highlighted msg, index in topic), if any
862 @param args.metaemit callback(metadata dict) if any, invoked before first emit from source batch
863 @param args.highlight whether to expect highlighted matching fields from source messages
864 @param kwargs any and all arguments as keyword overrides, case-insensitive
866 if callable(args): args = common.ensure_namespace(
None, emit=args)
867 args = common.ensure_namespace(args, AppSink.DEFAULT_ARGS, **kwargs)
871 """Invokes registered metaemit callback, if any, and not already invoked."""
873 if not self.
source:
return
874 batch = self.
source.get_batch()
if self.
argsargs.METAEMIT
else None
879 def emit(self, topic, msg, stamp=None, match=None, index=None):
880 """Registers message and invokes registered emit callback, if any."""
883 super(AppSink, self).
emit(topic, msg, stamp, match, index)
887 """Returns whether emitted matches are highlighted."""
891 """Returns whether callbacks are valid, emits error if not."""
894 for key
in (
"EMIT",
"METAEMIT"):
896 ConsolePrinter.error(
"Invalid callback for %s: %r", key, getattr(self.
argsargs, key))
902 """Combines any number of sinks."""
905 FLAG_CLASSES = {
"PUBLISH": LiveSink,
"CONSOLE": ConsoleSink,
"APP": AppSink}
908 FORMAT_CLASSES = {
"bag": BagSink}
912 Accepts more arguments, given to the real sinks constructed.
914 @param args arguments as namespace or dictionary, case-insensitive
915 @param args.console print matches to console
916 @param args.write [[target, format=FORMAT, key=value, ], ]
917 @param args.publish publish matches to live topics
918 @param args.app provide messages to given callback function
919 @param sinks pre-created sinks, arguments will be ignored
920 @param kwargs any and all arguments as keyword overrides, case-insensitive
922 args = common.ensure_namespace(args, **kwargs)
923 super(MultiSink, self).
__init__(args)
928 if getattr(args, flag,
None)]
if not sinks
else list(sinks)
930 for dumpopts
in getattr(args,
"WRITE", [])
if not sinks
else ():
931 kwargs = dict(x.split(
"=", 1)
for x
in dumpopts[1:]
if isinstance(x, common.TEXT_TYPES))
932 kwargs.update(kv
for x
in dumpopts[1:]
if isinstance(x, dict)
for kv
in x.items())
933 target, cls = dumpopts[0], self.
FORMAT_CLASSES.get(kwargs.pop(
"format",
None))
936 key=
lambda x: x
is BagSink)
937 if callable(getattr(c,
"autodetect",
None))
938 and c.autodetect(target)),
None)
940 ConsolePrinter.error(
'Unknown output format in "%s"' %
" ".join(map(str, dumpopts)))
943 clsargs = common.structcopy(args)
944 clsargs.WRITE, clsargs.WRITE_OPTIONS = target, kwargs
945 self.
sinks += [cls(clsargs)]
948 """Outputs source metainfo in one sink, if not already emitted."""
950 sink = next((s
for s
in self.
sinks if isinstance(s, ConsoleSink)),
None)
952 sink = sink
or self.
sinks[0]
if self.
sinks else None
953 sink
and sink.emit_meta()
955 def emit(self, topic, msg, stamp=None, match=None, index=None):
956 """Outputs ROS message to all sinks."""
959 for sink
in self.
sinks:
960 sink.emit(topic, msg, stamp, match, index)
961 super(MultiSink, self).
emit(topic, msg, stamp, match, index)
964 """Attaches source to all sinks, sets thread_excepthook on all sinks."""
965 super(MultiSink, self).
bind(source)
966 for sink
in self.
sinks:
972 Updates sinks configuration.
974 @param args arguments as namespace or dictionary, case-insensitive
975 @param kwargs any and all arguments as keyword overrides, case-insensitive
977 args = common.ensure_namespace(args, **kwargs)
978 hasattr(args,
"WRITE")
and delattr(args,
"WRITE")
979 for sink
in self.
sinks: sink.configure(args, **kwargs)
982 """Returns whether prerequisites are met for all sinks."""
984 ConsolePrinter.error(
"No output configured.")
988 """Closes all sinks."""
989 for sink
in self.
sinks:
993 """Flushes all sinks."""
994 for sink
in self.
sinks:
998 """Returns whether any sink requires highlighted matches."""
999 return any(s.is_highlighting()
for s
in self.
sinks)
1003 "AppSink",
"BagSink",
"ConsoleSink",
"LiveSink",
"MultiSink",
"RolloverSinkMixin",
"Sink",
__init__(self, args=None, **kwargs)
emit(self, topic, msg, stamp=None, match=None, index=None)
__init__(self, args=None, **kwargs)
emit(self, topic, msg, stamp=None, match=None, index=None)
__init__(self, args=None, **kwargs)
emit(self, topic, msg, stamp=None, match=None, index=None)
__init__(self, args=None, **kwargs)
emit(self, topic, msg, stamp=None, match=None, index=None)
configure(self, args=None, **kwargs)
dict FORMAT_CLASSES
Autobinding between `–write TARGET format=FORMAT` and sink classes.
emit(self, topic, msg, stamp=None, match=None, index=None)
sinks
List of all combined sinks.
__init__(self, args=None, sinks=(), **kwargs)
dict FLAG_CLASSES
Autobinding between argument flags and sink classes.
__init__(self, args=None, **kwargs)
filename
Current output file path.
ensure_rollover(self, topic, msg, stamp)
get_write_options(cls, label)
list OPTIONS_TEMPLATES
Command-line help templates for rollover options, as [(name, text with s label placeholder)].
configure(self, args=None, **kwargs)
thread_excepthook(self, text, exc)
source
inputs.Source instance bound to this sink
__init__(self, args=None, **kwargs)
tuple FILE_EXTENSIONS
Auto-detection file extensions for subclasses, as (".ext", )
valid
Result of validate()
emit(self, topic, msg, stamp=None, match=None, index=None)
__exit__(self, exc_type, exc_value, traceback)
__init__(self, args=None, **kwargs)
format_message(self, msg, highlight=False)
str NOCOLOR_HIGHLIGHT_WRAPPERS
Default highlight wrappers if not color output.
message_to_yaml(self, val, top=(), typename=None)