3Input sources for ROS messages.
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.inputs
15from __future__ import print_function
22except ImportError:
import Queue
as queue
32from . common
import ArgumentUtil, ConsolePrinter, ensure_namespace, drop_zeros
36 """Message producer base class."""
42 MESSAGE_META_TEMPLATE =
"{topic} #{index} ({type} {dt} {stamp})"
45 DEFAULT_ARGS = dict(START_TIME=
None, END_TIME=
None, START_INDEX=
None, END_INDEX=
None,
46 UNIQUE=
False, SELECT_FIELD=(), NOSELECT_FIELD=(),
47 NTH_MESSAGE=1, NTH_INTERVAL=0, PROGRESS=
False)
49 def __init__(self, args=None, **kwargs):
51 @param args arguments
as namespace
or dictionary, case-insensitive
52 @param args.start_time earliest timestamp of messages to read
53 @param args.end_time latest timestamp of messages to read
54 @param args.unique emit messages that are unique
in topic
55 @param args.start_index message index within topic to start
from
56 @param args.end_index message index within topic to stop at
57 @param args.select_field message fields to use
for uniqueness
if not all
58 @param args.noselect_field message fields to skip
for uniqueness
59 @param args.nth_message read every Nth message
in topic, starting
from first
60 @param args.nth_interval minimum time interval between messages
in topic,
61 as seconds
or ROS duration
62 @param args.progress whether to
print progress bar
63 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
68 self.
_topics = collections.defaultdict(list)
69 self.
_counts = collections.Counter()
71 self.
_hashes = collections.defaultdict(set)
78 self.
args = ensure_namespace(args, Source.DEFAULT_ARGS, **kwargs)
93 """Yields messages from source, as (topic, msg, ROS time)."""
97 """Context manager entry."""
100 def __exit__(self, exc_type, exc_value, traceback):
101 """Context manager exit, closes source."""
105 """Yields messages from source, as (topic, msg, ROS time)."""
107 def bind(self, sink):
108 """Attaches sink to source"""
111 def configure(self, args=None, **kwargs):
113 Updates source configuration.
115 @param args arguments
as namespace
or dictionary, case-insensitive
116 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
118 self.args = ensure_namespace(args, vars(self.args), **kwargs)
122 """Returns whether arguments are valid and source prerequisites are met."""
124 try: self.
args, self.
valid = ArgumentUtil.validate(self.
args),
True
125 except Exception: self.
valid =
False
129 """Shuts down input, closing any files or connections."""
137 self.
bar.pulse_pos =
None
138 self.
bar.update(flush=
True).stop()
142 """Shuts down input batch if any (like bagfile), else all input."""
146 """Returns source metainfo string."""
150 """Returns message metainfo string."""
152 meta = {k:
"" if v
is None else v
for k, v
in meta.items()}
156 """Returns source batch identifier if any (like bagfile name if BagSource)."""
159 """Returns source metainfo data dict."""
163 """Returns message metainfo data dict."""
164 with api.TypeMeta.make(msg, topic)
as m:
165 return dict(topic=topic, type=m.typename, stamp=drop_zeros(api.to_sec(stamp)),
166 index=index, dt=drop_zeros(common.format_stamp(api.to_sec(stamp)),
" "),
167 hash=m.typehash, schema=m.definition)
170 """Returns message type class."""
171 return api.get_message_class(typename)
174 """Returns ROS message type definition full text, including subtype definitions."""
175 return api.get_message_definition(msg_or_type)
178 """Returns ROS message type MD5 hash."""
179 return api.get_message_type_hash(msg_or_type)
182 """Returns whether message passes source filters; registers status."""
183 if self.
args.START_TIME
and stamp < self.
args.START_TIME:
185 if self.
args.END_TIME
and stamp > self.
args.END_TIME:
187 if self.
args.START_INDEX
or self.
args.END_INDEX \
188 or self.
args.NTH_MESSAGE
or self.
args.UNIQUE:
189 topickey = api.TypeMeta.make(msg, topic).topickey
190 if self.
args.START_INDEX
and index
is not None:
193 if self.
args.END_INDEX
and index
is not None:
196 if self.
args.NTH_MESSAGE > 1
or self.
args.NTH_INTERVAL > 0:
198 if self.
args.NTH_MESSAGE > 1
and last_accepted
and index
is not None:
199 shift = self.
args.START_INDEX
if (self.
args.START_INDEX
or 0) > 1
else 1
200 if (index - shift) % self.
args.NTH_MESSAGE:
202 if self.
args.NTH_INTERVAL > 0
and last_accepted
and stamp
is not None:
203 if api.to_sec(stamp - last_accepted[1]) < self.
args.NTH_INTERVAL:
207 msghash = api.make_message_hash(msg, include, exclude)
208 if msghash
in self.
_hashes[topickey]:
210 self.
_hashes[topickey].add(msghash)
215 """Reports match status of last produced message."""
217 if self.
bar and self.
_bar_args.get(
"source_value")
is not None:
218 self.
bar.update(self.
bar.value + bool(status))
221 """Configures progress bar options, updates current bar if any."""
222 for k, v
in kwargs.items():
223 if isinstance(self.
_bar_args.get(k), dict)
and isinstance(v, dict):
227 bar_attrs = set(k
for k
in vars(self.
bar)
if not k.startswith(
"_"))
229 if k
in bar_attrs: setattr(self.
bar, k, v)
230 else: self.
bar.afterargs[k] = v
233 """Initializes progress bar, if any."""
234 if self.
args.PROGRESS
and not self.
bar:
236 self.
bar.start()
if self.
bar.pulse
else self.
bar.update(value=0)
239 """Updates progress bar, if any, with source processed count, pauses bar if not running."""
242 self.
bar.pause, self.
bar.pulse_pos =
True,
None
243 if self.
_bar_args.get(
"source_value")
is not None:
244 self.
bar.afterargs[
"source_value"] = count
245 else: self.
bar.update(count)
248 """Handles exception, used by background threads."""
249 ConsolePrinter.error(text)
251 def _parse_patterns(self):
252 """Parses pattern arguments into re.Patterns."""
253 selects, noselects = self.
args.SELECT_FIELD, self.
args.NOSELECT_FIELD
254 for key, vals
in [(
"select", selects), (
"noselect", noselects)]:
255 self.
_patterns[key] = [(tuple(v.split(
".")), common.path_to_regex(v))
for v
in vals]
260 Provides topic conditions evaluation.
262 Evaluates a set of Python expressions, with a namespace of:
263 - msg: current message being checked
264 - topic: current topic being read
265 - <topic /any/name> messages
in named
or wildcarded topic
267 <topic ..> gets replaced
with an object
with the following behavior:
268 - len(obj) -> number of messages processed
in topic
269 - bool(obj) -> whether there are any messages
in topic
270 - obj[pos] -> topic message at position (
from latest
if negative, first
if positive)
271 - obj.x -> attribute x of last message
273 All conditions need to evaluate
as true
for a message to be processable.
274 If a condition tries to access attributes of a message
not yet present,
275 condition evaluates
as false.
277 If a condition topic matches more than one real topic (by wildcard
or by
278 different types
in one topic), evaluation
is done
for each set of
279 topics separately, condition passing
if any set passes.
281 Example condition: `<topic */control_enable>.data
and <topic */cmd_vel>.linear.x > 0`
282 `
and <topic */cmd_vel>.angular.z < 0.02`.
285 TOPIC_RGX = re.compile(r"<topic\s+([^\s><]+)\s*>")
288 DEFAULT_ARGS = dict(CONDITION=())
295 Object for <topic x> replacements
in condition expressions.
297 - len(topic) -> number of messages processed
in topic
298 - bool(topic) -> whether there are any messages
in topic
299 - topic[x] -> history at -1 -2
for last
and but one,
or 0 1
for first
and second
300 - topic.x -> attribute x of last message
301 - value
in topic -> whether any field of last message contains value
302 - value
in topic[x] -> whether any field of topic history at position contains value
305 def __init__(self, count, firsts, lasts):
312 def __len__(self):
return self._count
315 """Returns whether value exists in last message, or raises NoMessageException."""
320 """Returns message from history at key, or Empty() if no such message."""
325 """Returns attribute value of last message, or raises NoMessageException."""
332 Object for current topic message
in condition expressions.
334 - value
in msg -> whether any message field contains value
335 - msg.x -> attribute x of message
343 """Returns whether value exists in any message field."""
346 api.iter_message_fields(self.
_msg, flat=
True))
347 value = item
if isinstance(item, six.text_type)
else \
348 item.decode()
if isinstance(item, six.binary_type)
else str(item)
349 return re.search(re.escape(value), self.
_fulltext, re.I)
352 """Returns attribute value of message."""
353 return getattr(self.
_msg, name)
357 """Placeholder falsy object that raises NoMessageException on attribute access."""
360 def __nonzero__(self):
return False
362 def __len__(self):
return 0
367 @param args arguments
as namespace
or dictionary, case-insensitive
368 @param args.condition Python expressions that must evaluate
as true
369 for message to be processable, see ConditionMixin
370 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
372 self._topic_states = {}
373 self._topics_per_condition = []
374 self._wildcard_topics = {}
376 self.
_firstmsgs = collections.defaultdict(collections.deque)
378 self.
_lastmsgs = collections.defaultdict(collections.deque)
386 """Returns whether message passes passes current state conditions, if any."""
390 for i, (expr, code)
in enumerate(self.
_conditions.items()):
393 realcarded = {wt: [(t, n, h)
for (t, n, h)
in self.
_lastmsgs if p.match(t)]
395 variants = [[(wt, (t, n, h))
for (t, n, h)
in tt]
or [(wt, (wt,
None))]
396 for wt, tt
in realcarded.items()]
397 variants = variants
or [[
None]]
400 for remaps
in itertools.product(*variants):
401 if remaps == (
None, ): remaps = ()
404 try: result = eval(code, ns)
406 except Exception
as e:
407 ConsolePrinter.error(
'Error evaluating condition "%s": %s', expr, e)
414 """Returns whether conditions have valid syntax, sets options, prints errors."""
416 for v
in self.
args.CONDITION:
418 try: compile(v,
"",
"eval")
419 except SyntaxError
as e:
420 errors.append(
"'%s': %s at %schar %s" %
421 (v, e.msg,
"line %s " % e.lineno
if e.lineno > 1
else "", e.offset))
422 except Exception
as e:
423 errors.append(
"'%s': %s" % (v, e))
425 ConsolePrinter.error(
"Invalid condition")
427 ConsolePrinter.error(
" %s" % err)
433 """Clears cached messages."""
438 """Returns whether there are any conditions configured."""
442 """Returns a list of all topics used in conditions (may contain wildcards)."""
447 Returns whether topic is used
for checking condition.
449 @param pure whether use should be solely
for condition,
not for matching at all
455 if not wildcarded:
return False
456 return all(map(self.
_topic_states.get, wildcarded))
if pure
else True
459 """Sets whether topic is purely used for conditions not matching."""
464 """Retains message for condition evaluation if in condition topic."""
466 topickey = api.TypeMeta.make(msg, topic).topickey
473 def _get_topic_instance(self, topic, remap=None):
475 Returns Topic() by name.
477 @param remap optional remap dictionary
as {topic1: (topic2, typename, typehash)}
479 if remap
and topic
in remap:
480 topickey = remap[topic]
482 topickey = next(((t, n, h)
for (t, n, h)
in self.
_lastmsgs if t == topic),
None)
483 if topickey
not in self._counts:
486 return self.
Topic(c, f, l)
488 def _configure_conditions(self, args):
489 """Parses condition expressions and populates local structures."""
495 for v
in args.CONDITION:
496 topics = list(set(self.
TOPIC_RGX.findall(v)))
499 for t
in (t
for t
in topics
if "*" in t):
501 expr = self.
TOPIC_RGX.sub(
r'get_topic("\1")', v)
504 for v
in args.CONDITION:
505 indexexprs = re.findall(self.
TOPIC_RGX.pattern +
r"\s*\[([^\]]+)\]", v)
506 for topic, indexexpr
in indexexprs:
509 index = eval(indexexpr)
510 limits[index < 0] = max(limits[index < 0], abs(index) + (index >= 0))
511 except Exception:
continue
515 """Produces messages from ROS bagfiles."""
518 MESSAGE_META_TEMPLATE =
"{topic} {index}/{total} ({type} {dt} {stamp})"
521 META_TEMPLATE =
"\nFile {file} ({size}), {tcount} topics, {mcount:,d} messages\n" \
522 "File period {startdt} - {enddt}\n" \
523 "File span {delta} ({start} - {end})"
526 DEFAULT_ARGS = dict(BAG=(), FILE=(), PATH=(), RECURSE=
False, TOPIC=(), TYPE=(),
527 SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=
None, END_TIME=
None,
528 START_INDEX=
None, END_INDEX=
None, CONDITION=(), AFTER=0, ORDERBY=
None,
529 DECOMPRESS=
False, REINDEX=
False, WRITE=(), PROGRESS=
False,
530 STOP_ON_ERROR=
False, TIMESCALE=0, TIMESCALE_EMISSION=
False, VERBOSE=
False)
532 def __init__(self, args=None, **kwargs):
534 @param args arguments
as namespace
or dictionary, case-insensitive;
535 or a single path
as the ROS bagfile to read,
536 or a stream to read
from,
540 Bag-specific arguments:
541 @param args.file names of ROS bagfiles to read
if not all
in directory,
542 or a stream to read
from;
544 @param args.path paths to scan
if not current directory
545 @param args.recurse recurse into subdirectories when looking
for bagfiles
546 @param args.orderby
"topic" or "type" if any to group results by
547 @param args.decompress decompress archived bags to file directory
548 @param args.reindex make a copy of unindexed bags
and reindex them (ROS1 only)
549 @param args.timescale emit messages on original timeline
from first message
550 at given rate, 0 disables
551 @param args.timescale_emission
552 timeline
from first matched message
not first
in bag,
553 requires
notify()
for each message
554 @param args.write outputs, to skip
in input files
559 @param args.topic ROS topics to read
if not all
560 @param args.type ROS message types to read
if not all
561 @param args.skip_topic ROS topics to skip
562 @param args.skip_type ROS message types to skip
563 @param args.start_time earliest timestamp of messages to read
564 @param args.end_time latest timestamp of messages to read
565 @param args.start_index message index within topic to start
from
566 @param args.end_index message index within topic to stop at
567 @param args.unique emit messages that are unique
in topic
568 @param args.select_field message fields to use
for uniqueness
if not all
569 @param args.noselect_field message fields to skip
for uniqueness
570 @param args.nth_message read every Nth message
in topic, starting
from first
571 @param args.nth_interval minimum time interval between messages
in topic,
572 as seconds
or ROS duration
573 @param args.condition Python expressions that must evaluate
as true
574 for message to be processable, see ConditionMixin
575 @param args.progress whether to
print progress bar
576 @param args.stop_on_error stop execution on any error like unknown message type
577 @param args.verbose whether to
print error stacktraces
578 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
581 is_bag = isinstance(args, api.Bag) or \
582 common.is_iterable(args)
and all(isinstance(x,
api.Bag)
for x
in args)
583 args = {
"FILE": str(args)}
if isinstance(args, common.PATH_TYPES)
else \
584 {
"FILE": args}
if common.is_stream(args)
else {}
if is_bag
else args
585 args = ensure_namespace(args, BagSource.DEFAULT_ARGS, **kwargs)
586 super(BagSource, self).
__init__(args)
587 ConditionMixin.__init__(self, args)
595 self.
_bag0 = ([args0]
if isinstance(args0,
api.Bag)
else args0)
if is_bag
else None
599 """Yields messages from ROS bagfiles, as (topic, msg, ROS time)."""
608 if "topic" == self.
argsargs.ORDERBY:
609 topicsets = [{n: tt}
for n, tt
in sorted(self.
_topics_topics.items())]
610 elif "type" == self.
argsargs.ORDERBY:
613 for t
in tt: typetopics.setdefault(t, []).append(n)
614 topicsets = [{n: [t]
for n
in nn}
for t, nn
in sorted(typetopics.items())]
618 for topics
in topicsets:
619 for topic, msg, stamp, index
in self.
_produce(topics)
if topics
else ():
630 def configure(self, args=None, **kwargs):
632 Updates source configuration.
634 @param args arguments
as namespace
or dictionary, case-insensitive
635 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
637 super(BagSource, self).configure(args, **kwargs)
641 """Returns whether ROS environment is set and arguments valid, prints error if not."""
644 if not api.validate():
647 and not common.verify_io(self.
argsargs.FILE[0],
"r"):
648 ConsolePrinter.error(
"File not readable.")
651 and not any(c.STREAMABLE
for c
in api.Bag.READER_CLASSES):
652 ConsolePrinter.error(
"Bag format does not support reading streams.")
654 if self.
_bag0 and not any(x.mode
in (
"r",
"a")
for x
in self.
_bag0):
655 ConsolePrinter.error(
"Bag not in read mode.")
658 ConsolePrinter.error(
"Cannot use topics in conditions and bag order by %s.",
662 ConsolePrinter.error(
"Invalid timescale factor: %r.", self.
argsargs.TIMESCALE)
664 if not ConditionMixin.validate(self):
669 """Closes current bag, if any."""
672 ConditionMixin.close_batch(self)
673 super(BagSource, self).
close()
676 """Closes current bag, if any."""
681 self.
barbar.update(flush=
True)
683 if self.
_bar_args.get(
"source_value")
is not None:
685 ConditionMixin.close_batch(self)
688 """Returns bagfile metainfo string."""
692 """Returns message metainfo string."""
694 meta = {k:
"" if v
is None else v
for k, v
in meta.items()}
698 """Returns name of current bagfile, or self if reading stream."""
702 """Returns bagfile metainfo data dict."""
703 if self.
_meta is not None:
705 mcount = self.
_bag.get_message_count()
706 start, end = (self.
_bag.get_start_time(), self.
_bag.get_end_time())
if mcount
else (
"",
"")
707 delta = common.format_timedelta(datetime.timedelta(seconds=(end
or 0) - (start
or 0)))
709 mcount=mcount, tcount=len(self.
topicstopics), delta=delta,
710 start=drop_zeros(start), end=drop_zeros(end),
711 startdt=drop_zeros(common.format_stamp(start))
if start !=
"" else "",
712 enddt=drop_zeros(common.format_stamp(end))
if end !=
"" else "")
716 """Returns message metainfo data dict."""
719 result.update(total=self.
topicstopics[(topic, result[
"type"], result[
"hash"])])
720 if callable(getattr(self.
_bag,
"get_qoses",
None)):
721 result.update(qoses=self.
_bag.get_qoses(topic, result[
"type"]))
725 """Returns ROS message type class."""
727 api.get_message_class(typename)
730 """Returns ROS message type definition full text, including subtype definitions."""
732 api.get_message_definition(msg_or_type)
735 """Returns ROS message type MD5 hash."""
737 api.get_message_type_hash(msg_or_type)
740 """Reports match status of last produced message."""
741 super(BagSource, self).
notify(status)
750 """Returns whether message passes source filters; registers status."""
752 topickey = api.TypeMeta.make(msg, topic).topickey
753 if self.
argsargs.START_INDEX
and index
is not None and self.
argsargs.START_INDEX < 0 \
757 if self.
argsargs.END_INDEX
and index
is not None and self.
argsargs.END_INDEX < 0 \
763 if not super(BagSource, self).
is_processable(topic, msg, stamp, index):
765 if not ConditionMixin.is_processable(self, topic, msg, stamp, index):
771 """Initializes progress bar, if any, for current bag."""
777 def _produce(self, topics, start_time=None):
779 Yields messages from current ROS bagfile,
as (topic, msg, ROS time, index
in topic).
781 @param topics {topic: [typename, ]}
784 do_predelay = self.
argsargs.TIMESCALE
and not self.
argsargs.TIMESCALE_EMISSION
787 self.
_delaystamps[
"read"] = getattr(time,
"monotonic", time.time)()
788 counts = collections.Counter()
790 nametypes = {(n, t)
for n, tt
in topics.items()
for t
in tt}
791 for topic, msg, stamp
in self.
_bag.read_messages(list(topics), start_time):
794 typename = api.get_message_type(msg)
795 if topics
and typename
not in topics[topic]:
800 topickey = api.TypeMeta.make(msg, topic, self).topickey
801 counts[topickey] += 1; self.
_counts[topickey] += 1
803 if start_time
is None and counts[topickey] != self.
_counts[topickey]:
809 yield topic, msg, stamp, self.
_counts[topickey]
817 for entry
in self.
_produce({topic: typename}, stamp + api.make_duration(nsecs=1)):
819 if not self.
_running or not self.
_bag or (start_time
is None
823 def _produce_bags(self):
824 """Yields Bag instances from configured arguments."""
826 for bag
in self.
_bag0:
832 exts, skip_exts = api.BAG_EXTENSIONS, api.SKIP_EXTENSIONS
833 exts = list(exts) + [
"%s%s" % (a, b)
for a
in exts
for b
in common.Decompressor.EXTENSIONS]
836 for filename
in common.find_files(names, paths, exts, skip_exts, self.
argsargs.RECURSE):
840 fullname = os.path.realpath(os.path.abspath(filename))
841 skip = common.Decompressor.make_decompressed_name(fullname)
in encountereds
842 encountereds.add(fullname)
847 encountereds.add(self.
_bag.filename)
850 def _make_progress_args(self):
851 """Returns dictionary with progress bar options"""
852 total = sum(sum(c
for (t, n, _), c
in self.
topicstopics.items()
if c
and t == t_
and n
in nn)
854 result = dict(max=total, afterword=os.path.basename(self.
_filename or "<stream>"))
856 instr, outstr =
"{value:,d}/{max:,d}",
""
859 self.
_bar_args.setdefault(
"source_value", 0)
860 if self.
_bar_args.get(
"source_value")
is not None \
861 or self.
_bar_args.get(
"match_max")
is not None:
862 result.update(source_value=self.
_bar_args.get(
"source_value")
or 0)
863 instr, outstr =
"{source_value:,d}/{max:,d}",
"matched {value:,d}"
864 if self.
_bar_args.get(
"match_max")
is not None:
865 instr, outstr =
"{source_value:,d}/{source_max:,d}", outstr +
"/{match_max:,d}"
866 result.update(source_max=total, max=min(total, self.
_bar_args[
"match_max"]))
867 result.update(aftertemplate=
" {afterword} (%s)" %
" ".join(filter(bool, (instr, outstr))))
871 def _ensure_totals(self):
872 """Retrieves total message counts if not retrieved."""
874 has_ensure = common.has_arg(self.
_bag.get_topic_info,
"ensure_types")
875 kws = dict(ensure_types=
False)
if has_ensure
else {}
876 for (t, n, h), c
in self.
_bag.get_topic_info(**kws).items():
880 def _delay_timeline(self):
881 """Sleeps until message ought to be emitted in bag timeline."""
882 curstamp, readstamp, startstamp = map(self.
_delaystamps.get, (
"current",
"read",
"first"))
883 delta = max(0, api.to_sec(curstamp) - startstamp) / (self.
argsargs.TIMESCALE
or 1)
884 if delta: time.sleep(max(0, delta + readstamp - getattr(time,
"monotonic", time.time)()))
886 def _is_at_end_threshold(self, topickey, stamp, nametypes, endtime_indexes):
888 Returns whether bag reading has reached END_INDEX or END_TIME
in all given topics.
890 @param topickey (topic, typename, typehash) of current message
891 @param stamp ROS timestamp of current message
892 @param nametypes {(topic, typename)} to account
for
893 @param endtime_indexes {topickey: index at reaching END_TIME}, gets modified
897 if self.
_counts[topickey] >= max_index:
898 mycounts = {k: v
for k, v
in self.
_counts.items()
if k[:2]
in nametypes}
899 if nametypes == set(k[:2]
for k
in mycounts) \
900 and all(v >= self.
_end_indexes.get(k, max_index)
for k, v
in mycounts.items()):
904 if topickey
not in endtime_indexes: endtime_indexes[topickey] = self.
_counts[topickey]
906 if self.
_counts[topickey] >= max_index:
907 myindexes = {k: v
for k, v
in endtime_indexes.items()
if k[:2]
in nametypes}
908 if nametypes == set(k[:2]
for k
in myindexes) \
910 for k, v
in myindexes.items()):
914 def _configure(self, filename=None, bag=None):
915 """Opens bag and populates bag-specific argument state, returns success."""
928 if bag
is not None and bag.mode
not in (
"r",
"a"):
929 ConsolePrinter.warn(
"Cannot read %s: bag in write mode.", bag)
932 if filename
and self.
argsargs.WRITE \
933 and any(os.path.realpath(x[0]) == os.path.realpath(filename)
937 if filename
and common.Decompressor.is_compressed(filename):
939 filename = common.Decompressor.decompress(filename, self.
argsargs.PROGRESS)
940 else:
raise Exception(
"decompression not enabled")
942 progress=self.
argsargs.PROGRESS)
if bag
is None else bag
943 bag.stop_on_error = self.
argsargs.STOP_ON_ERROR
945 except Exception
as e:
946 ConsolePrinter.error(
"\nError opening %r: %s", filename
or bag, e)
947 if self.
argsargs.STOP_ON_ERROR:
raise
948 if self.
argsargs.VERBOSE: traceback.print_exc()
955 kws = dict(ensure_types=
False)
if common.has_arg(bag.get_topic_info,
"ensure_types")
else {}
956 for (t, n, h), c
in bag.get_topic_info(counts=
False, **kws).items():
957 dct.setdefault(t, []).append(n)
964 dct = common.filter_dict(dct, self.
argsargs.SKIP_TOPIC, self.
argsargs.SKIP_TYPE, reverse=
True)
966 matches = [t
for p
in [common.wildcard_to_regex(topic, end=
True)]
for t
in fulldct
967 if t == topic
or "*" in topic
and p.match(t)]
968 for realtopic
in matches:
970 dct.setdefault(realtopic, fulldct[realtopic])
975 if args.START_TIME
is not None:
976 args.START_TIME = api.make_bag_time(args.START_TIME, bag)
977 if args.END_TIME
is not None:
978 args.END_TIME = api.make_bag_time(args.END_TIME, bag)
983 """Produces messages from live ROS topics."""
989 DEFAULT_ARGS = dict(TOPIC=(), TYPE=(), SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=
None,
990 END_TIME=
None, START_INDEX=
None, END_INDEX=
None, CONDITION=(),
991 QUEUE_SIZE_IN=10, ROS_TIME_IN=
False, PROGRESS=
False, STOP_ON_ERROR=
False,
994 def __init__(self, args=None, **kwargs):
996 @param args arguments
as namespace
or dictionary, case-insensitive
997 @param args.topic ROS topics to read
if not all
998 @param args.type ROS message types to read
if not all
999 @param args.skip_topic ROS topics to skip
1000 @param args.skip_type ROS message types to skip
1001 @param args.start_time earliest timestamp of messages to read
1002 @param args.end_time latest timestamp of messages to read
1003 @param args.start_index message index within topic to start
from
1004 @param args.end_index message index within topic to stop at
1005 @param args.unique emit messages that are unique
in topic
1006 @param args.select_field message fields to use
for uniqueness
if not all
1007 @param args.noselect_field message fields to skip
for uniqueness
1008 @param args.nth_message read every Nth message
in topic, starting
from first
1009 @param args.nth_interval minimum time interval between messages
in topic,
1010 as seconds
or ROS duration
1011 @param args.condition Python expressions that must evaluate
as true
1012 for message to be processable, see ConditionMixin
1013 @param args.queue_size_in subscriber queue size (default 10)
1014 @param args.ros_time_in stamp messages
with ROS time instead of wall time
1015 @param args.progress whether to
print progress bar
1016 @param args.stop_on_error stop execution on any error like unknown message type
1017 @param args.verbose whether to
print error stacktraces
1018 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
1020 args = ensure_namespace(args, LiveSource.DEFAULT_ARGS, **dict(kwargs, live=True))
1021 super(LiveSource, self).
__init__(args)
1022 ConditionMixin.__init__(self, args)
1029 """Yields messages from subscribed ROS topics, as (topic, msg, ROS time)."""
1034 self.
_queue = queue.Queue()
1048 topic, msg, stamp = self.
_queue.get()
1049 total += bool(topic)
1051 if not topic:
continue
1053 topickey = api.TypeMeta.make(msg, topic, self).topickey
1068 def bind(self, sink):
1069 """Attaches sink to source and blocks until connected to ROS live."""
1071 super(LiveSource, self).
bind(sink)
1075 """Returns whether ROS environment is set and arguments valid, prints error if not."""
1078 if not api.validate(live=
True):
1080 if not ConditionMixin.validate(self):
1087 """Shuts down subscribers and stops producing messages."""
1089 for k
in list(self.
_subs):
1090 self.
_subs.pop(k).unregister()
1093 ConditionMixin.close_batch(self)
1094 super(LiveSource, self).
close()
1097 """Returns source metainfo data dict."""
1098 ENV = {k: os.getenv(k)
for k
in (
"ROS_MASTER_URI",
"ROS_DOMAIN_ID")
if os.getenv(k)}
1099 return dict(ENV, tcount=len(self.
topics), scount=len(self.
_subs))
1102 """Returns message metainfo data dict."""
1103 result = super(LiveSource, self).
get_message_meta(topic, msg, stamp, index)
1104 topickey = (topic, result[
"type"], result[
"hash"])
1105 if topickey
in self.
_subs:
1106 result.update(qoses=self.
_subs[topickey].get_qoses())
1110 """Returns message type class, from active subscription if available."""
1111 sub = next((s
for (t, n, h), s
in self.
_subs.items()
1112 if n == typename
and typehash
in (s.get_message_type_hash(),
None)),
None)
1113 return sub
and sub.get_message_class()
or api.get_message_class(typename)
1116 """Returns ROS message type definition full text, including subtype definitions."""
1117 if api.is_ros_message(msg_or_type):
1118 return api.get_message_definition(msg_or_type)
1119 sub = next((s
for (t, n, h), s
in self.
_subs.items()
if n == msg_or_type),
None)
1120 return sub
and sub.get_message_definition()
or api.get_message_definition(msg_or_type)
1123 """Returns ROS message type MD5 hash."""
1124 if api.is_ros_message(msg_or_type):
1125 return api.get_message_type_hash(msg_or_type)
1126 sub = next((s
for (t, n, h), s
in self.
_subs.items()
if n == msg_or_type),
None)
1127 return sub
and sub.get_message_type_hash()
or api.get_message_type_hash(msg_or_type)
1130 """Returns source metainfo string."""
1132 result =
"\nROS%s live" % api.ROS_VERSION
1133 if "ROS_MASTER_URI" in metadata:
1134 result +=
", ROS master %s" % metadata[
"ROS_MASTER_URI"]
1135 if "ROS_DOMAIN_ID" in metadata:
1136 result +=
", ROS domain ID %s" % metadata[
"ROS_DOMAIN_ID"]
1137 result +=
", %s initially" % common.plural(
"topic", metadata[
"tcount"])
1138 result +=
", %s subscribed" % metadata[
"scount"]
1142 """Returns whether message passes source filters; registers status."""
1144 if not super(LiveSource, self).
is_processable(topic, msg, stamp, index):
1146 if not ConditionMixin.is_processable(self, topic, msg, stamp, index):
1152 """Refreshes topics and subscriptions from ROS live."""
1153 for topic, typename
in api.get_topic_types():
1154 topickey = (topic, typename,
None)
1155 self.
topics[topickey] =
None
1156 dct = common.filter_dict({topic: [typename]}, self.
argsargs.TOPIC, self.
argsargs.TYPE)
1157 if not common.filter_dict(dct, self.
argsargs.SKIP_TOPIC, self.
argsargs.SKIP_TYPE, reverse=
True):
1159 if api.ROS2
and api.get_message_class(typename)
is None:
1160 msg =
"Error loading type %s in topic %s." % (typename, topic)
1162 ConsolePrinter.warn(msg, __once=
True)
1164 if topickey
in self.
_subs:
1167 handler = functools.partial(self.
_on_message, topic)
1169 sub = api.create_subscriber(topic, typename, handler,
1170 queue_size=self.
argsargs.QUEUE_SIZE_IN)
1171 except Exception
as e:
1172 ConsolePrinter.warn(
"Error subscribing to topic %s: %%r" % topic,
1174 if self.
argsargs.STOP_ON_ERROR:
raise
1177 self.
_subs[topickey] = sub
1180 """Initializes progress bar, if any."""
1186 """Updates progress bar, if any."""
1192 def _configure(self):
1193 """Adjusts start/end time filter values to current time."""
1194 if self.
argsargs.START_TIME
is not None:
1196 if self.
argsargs.END_TIME
is not None:
1199 def _make_progress_args(self, count=None):
1200 """Returns dictionary with progress bar options, for specific nessage index if any."""
1201 result = dict(afterword =
"ROS%s live" % api.ROS_VERSION, pulse=
True)
1202 if self.
_bar_args.get(
"match_max")
is not None:
1205 instr, outstr =
"{value:,d} message%s" % (
"" if count == 1
else "s"),
""
1208 self.
_bar_args.setdefault(
"source_value", 0)
1210 instr =
"{source_value:,d} message%s" % (
"" if count == 1
else "s")
1211 outstr =
"matched {value:,d}"
1212 if self.
_bar_args.get(
"match_max")
is not None: outstr +=
"/{match_max:,d}"
1213 elif self.
_bar_args.get(
"match_max")
is not None:
1214 instr =
"{value:,d}/{max:,d}"
1215 result.update(aftertemplate=
" {afterword} (%s)" %
" ".join(filter(bool, (instr, outstr))))
1219 def _run_refresh(self):
1220 """Periodically refreshes topics and subscriptions from ROS live."""
1224 except Exception
as e: self.
thread_excepthook(
"Error refreshing live topics: %r" % e, e)
1227 def _run_endtime_closer(self):
1228 """Periodically checks whether END_TIME has been reached, closes source when so."""
1236 def _on_message(self, topic, msg):
1237 """Subscription callback handler, queues message for yielding."""
1238 stamp = api.get_rostime()
if self.
argsargs.ROS_TIME_IN
else api.make_time(time.time())
1243 """Produces messages from iterable or pushed data."""
1246 DEFAULT_ARGS = dict(TOPIC=(), TYPE=(), SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=
None,
1247 END_TIME=
None, START_INDEX=
None, END_INDEX=
None, UNIQUE=
False,
1248 SELECT_FIELD=(), NOSELECT_FIELD=(), NTH_MESSAGE=1, NTH_INTERVAL=0,
1249 CONDITION=(), ITERABLE=
None)
1253 @param args arguments
as namespace
or dictionary, case-insensitive;
1254 or iterable yielding messages
1255 @param args.topic ROS topics to read
if not all
1256 @param args.type ROS message types to read
if not all
1257 @param args.skip_topic ROS topics to skip
1258 @param args.skip_type ROS message types to skip
1259 @param args.start_time earliest timestamp of messages to read
1260 @param args.end_time latest timestamp of messages to read
1261 @param args.start_index message index within topic to start
from
1262 @param args.end_index message index within topic to stop at
1263 @param args.unique emit messages that are unique
in topic
1264 @param args.select_field message fields to use
for uniqueness
if not all
1265 @param args.noselect_field message fields to skip
for uniqueness
1266 @param args.nth_message read every Nth message
in topic, starting
from first
1267 @param args.nth_interval minimum time interval between messages
in topic,
1268 as seconds
or ROS duration
1269 @param args.condition Python expressions that must evaluate
as true
1270 for message to be processable, see ConditionMixin
1271 @param args.iterable iterable yielding (topic, msg, stamp)
or (topic, msg);
1272 yielding `
None` signals end of content
1273 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
1275 if common.is_iterable(args)
and not isinstance(args, dict):
1276 args = ensure_namespace(
None, iterable=args)
1277 args = ensure_namespace(args, AppSource.DEFAULT_ARGS, **kwargs)
1278 super(AppSource, self).
__init__(args)
1279 ConditionMixin.__init__(self, args)
1285 Yields messages from iterable
or pushed data,
as (topic, msg, ROS timestamp).
1287 Blocks until a message
is available,
or source
is closed.
1290 def generate(iterable):
1291 for x
in iterable:
yield x
1292 feeder = generate(self.
argsargs.ITERABLE)
if self.
argsargs.ITERABLE
else None
1295 item = self.
_queue.get()
if not feeder
or self.
_queue.qsize()
else next(feeder,
None)
1296 if item
is None:
break
1298 if len(item) > 2: topic, msg, stamp = item[:3]
1299 else: (topic, msg), stamp = item[:2], api.get_rostime(fallback=
True)
1300 topickey = api.TypeMeta.make(msg, topic, self).topickey
1314 """Closes current read() yielding, if any."""
1321 Returns (topic, msg, stamp) from push queue,
or `
None`
if no queue
1322 or message
in queue
is condition topic only.
1326 try: item = self.
_queue.get(block=
False)
1327 except queue.Empty:
pass
1328 if item
is None:
return None
1330 topic, msg, stamp = item
1331 topickey = api.TypeMeta.make(msg, topic, self).topickey
1337 """Registers message produced from read_queue()."""
1340 topickey = api.TypeMeta.make(msg, topic).topickey
1343 def push(self, topic, msg=None, stamp=None):
1345 Pushes a message to be yielded from read().
1347 @param topic topic name,
or `
None` to signal end of content
1348 @param msg ROS message
1349 @param stamp message ROS timestamp, defaults to current wall time
if `
None`
1352 if topic
is None: self.
_queue.put(
None)
1353 else: self.
_queue.put((topic, msg, stamp
or api.get_rostime(fallback=
True)))
1356 """Returns whether message passes source filters; registers status."""
1358 dct = common.filter_dict({topic: [api.get_message_type(msg)]},
1360 if not common.filter_dict(dct, self.
argsargs.SKIP_TOPIC, self.
argsargs.SKIP_TYPE, reverse=
True):
1362 if not super(AppSource, self).
is_processable(topic, msg, stamp, index):
1364 if not ConditionMixin.is_processable(self, topic, msg, stamp, index):
1370 """Returns whether configured arguments are valid, prints error if not."""
1377 def _configure(self):
1378 """Adjusts start/end time filter values to current time."""
1379 if self.
argsargs.START_TIME
is not None:
1381 if self.
argsargs.END_TIME
is not None:
1385__all__ = [
"AppSource",
"ConditionMixin",
"BagSource",
"LiveSource",
"Source"]
A simple ASCII progress bar with a ticker thread.