grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
inputs.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3Input sources for ROS messages.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 23.10.2021
11@modified 20.04.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.inputs
15from __future__ import print_function
16import collections
17import datetime
18import functools
19import itertools
20import os
21try: import queue # Py3
22except ImportError: import Queue as queue # Py2
23import re
24import threading
25import time
26import traceback
27
28import six
29
30from . import api
31from . import common
32from . common import ArgumentUtil, ConsolePrinter, ensure_namespace, drop_zeros
33
34
35class Source(object):
36 """Message producer base class."""
37
38
39 class SourceMessage(api.Bag.BagMessage): pass
40
41
42 MESSAGE_META_TEMPLATE = "{topic} #{index} ({type} {dt} {stamp})"
43
44
45 DEFAULT_ARGS = dict(START_TIME=None, END_TIME=None, START_INDEX=None, END_INDEX=None,
46 UNIQUE=False, SELECT_FIELD=(), NOSELECT_FIELD=(),
47 NTH_MESSAGE=1, NTH_INTERVAL=0, PROGRESS=False)
48
49 def __init__(self, args=None, **kwargs):
50 """
51 @param args arguments as namespace or dictionary, case-insensitive
52 @param args.start_time earliest timestamp of messages to read
53 @param args.end_time latest timestamp of messages to read
54 @param args.unique emit messages that are unique in topic
55 @param args.start_index message index within topic to start from
56 @param args.end_index message index within topic to stop at
57 @param args.select_field message fields to use for uniqueness if not all
58 @param args.noselect_field message fields to skip for uniqueness
59 @param args.nth_message read every Nth message in topic, starting from first
60 @param args.nth_interval minimum time interval between messages in topic,
61 as seconds or ROS duration
62 @param args.progress whether to print progress bar
63 @param kwargs any and all arguments as keyword overrides, case-insensitive
64 """
65 # {key: [(() if any field else ('nested', 'path') or re.Pattern, re.Pattern), ]}
66 self._patterns = {}
67 # {topic: ["pkg/MsgType", ]} searched in current source
68 self._topics = collections.defaultdict(list)
69 self._counts = collections.Counter() # {(topic, typename, typehash): count processed}
70 # {(topic, typename, typehash): (message hash over all fields used in matching)}
71 self._hashes = collections.defaultdict(set)
72 self._processables = {} # {(topic, typename, typehash): (index, stamp) of last processable}
73 self._start_indexes = {} # {(topic, typename, typehash): index to start producing from}
74 self._end_indexes = {} # {(topic, typename, typehash): index to stop producing at}
75 self._bar_args = {} # Progress bar options
76 self._status = None # Processable/match status of last produced message
77
78 self.args = ensure_namespace(args, Source.DEFAULT_ARGS, **kwargs)
80 self.sink = None
82 self.topics = {}
84 self.bar = None
86 self.valid = None
88 self.preprocess = True
90 self._parse_patterns()
91
92 def __iter__(self):
93 """Yields messages from source, as (topic, msg, ROS time)."""
94 return self.read()
95
96 def __enter__(self):
97 """Context manager entry."""
98 return self
99
100 def __exit__(self, exc_type, exc_value, traceback):
101 """Context manager exit, closes source."""
102 self.close()
103
104 def read(self):
105 """Yields messages from source, as (topic, msg, ROS time)."""
106
107 def bind(self, sink):
108 """Attaches sink to source"""
109 self.sink = sink
110
111 def configure(self, args=None, **kwargs):
112 """
113 Updates source configuration.
114
115 @param args arguments as namespace or dictionary, case-insensitive
116 @param kwargs any and all arguments as keyword overrides, case-insensitive
117 """
118 self.args = ensure_namespace(args, vars(self.args), **kwargs)
119 self.valid = None
120
121 def validate(self):
122 """Returns whether arguments are valid and source prerequisites are met."""
123 if self.valid is not None: return self.valid
124 try: self.args, self.valid = ArgumentUtil.validate(self.args), True
125 except Exception: self.valid = False
126 return self.valid
127
128 def close(self):
129 """Shuts down input, closing any files or connections."""
130 self.topics.clear()
131 self._topics.clear()
132 self._counts.clear()
133 self._hashes.clear()
134 self._processables.clear()
135 self._status = None
136 if self.bar:
137 self.bar.pulse_pos = None
138 self.bar.update(flush=True).stop()
139 self.bar = None
140
141 def close_batch(self):
142 """Shuts down input batch if any (like bagfile), else all input."""
143 self.close()
144
145 def format_meta(self):
146 """Returns source metainfo string."""
147 return ""
148
149 def format_message_meta(self, topic, msg, stamp, index=None):
150 """Returns message metainfo string."""
151 meta = self.get_message_meta(topic, msg, stamp, index)
152 meta = {k: "" if v is None else v for k, v in meta.items()}
153 return self.MESSAGE_META_TEMPLATE.format(**meta)
154
155 def get_batch(self):
156 """Returns source batch identifier if any (like bagfile name if BagSource)."""
157
158 def get_meta(self):
159 """Returns source metainfo data dict."""
160 return {}
161
162 def get_message_meta(self, topic, msg, stamp, index=None):
163 """Returns message metainfo data dict."""
164 with api.TypeMeta.make(msg, topic) as m:
165 return dict(topic=topic, type=m.typename, stamp=drop_zeros(api.to_sec(stamp)),
166 index=index, dt=drop_zeros(common.format_stamp(api.to_sec(stamp)), " "),
167 hash=m.typehash, schema=m.definition)
168
169 def get_message_class(self, typename, typehash=None):
170 """Returns message type class."""
171 return api.get_message_class(typename)
172
173 def get_message_definition(self, msg_or_type):
174 """Returns ROS message type definition full text, including subtype definitions."""
175 return api.get_message_definition(msg_or_type)
176
177 def get_message_type_hash(self, msg_or_type):
178 """Returns ROS message type MD5 hash."""
179 return api.get_message_type_hash(msg_or_type)
180
181 def is_processable(self, topic, msg, stamp, index=None):
182 """Returns whether message passes source filters; registers status."""
183 if self.args.START_TIME and stamp < self.args.START_TIME:
184 return False
185 if self.args.END_TIME and stamp > self.args.END_TIME:
186 return False
187 if self.args.START_INDEX or self.args.END_INDEX \
188 or self.args.NTH_MESSAGE or self.args.UNIQUE:
189 topickey = api.TypeMeta.make(msg, topic).topickey
190 if self.args.START_INDEX and index is not None:
191 if max(0, self._start_indexes.get(topickey, self.args.START_INDEX)) > index:
192 return False
193 if self.args.END_INDEX and index is not None:
194 if self._end_indexes.get(topickey, self.args.END_INDEX) < index:
195 return False
196 if self.args.NTH_MESSAGE > 1 or self.args.NTH_INTERVAL > 0:
197 last_accepted = self._processables.get(topickey)
198 if self.args.NTH_MESSAGE > 1 and last_accepted and index is not None:
199 shift = self.args.START_INDEX if (self.args.START_INDEX or 0) > 1 else 1
200 if (index - shift) % self.args.NTH_MESSAGE:
201 return False
202 if self.args.NTH_INTERVAL > 0 and last_accepted and stamp is not None:
203 if api.to_sec(stamp - last_accepted[1]) < self.args.NTH_INTERVAL:
204 return False
205 if self.args.UNIQUE:
206 include, exclude = self._patterns["select"], self._patterns["noselect"]
207 msghash = api.make_message_hash(msg, include, exclude)
208 if msghash in self._hashes[topickey]:
209 return False
210 self._hashes[topickey].add(msghash)
211 self._status = True
212 return True
213
214 def notify(self, status):
215 """Reports match status of last produced message."""
216 self._status = bool(status)
217 if self.bar and self._bar_args.get("source_value") is not None:
218 self.bar.update(self.bar.value + bool(status))
219
220 def configure_progress(self, **kwargs):
221 """Configures progress bar options, updates current bar if any."""
222 for k, v in kwargs.items():
223 if isinstance(self._bar_args.get(k), dict) and isinstance(v, dict):
224 self._bar_args[k].update(v)
225 else: self._bar_args[k] = v
226 if self.bar:
227 bar_attrs = set(k for k in vars(self.bar) if not k.startswith("_"))
228 for k, v in self._bar_args.items():
229 if k in bar_attrs: setattr(self.bar, k, v)
230 else: self.bar.afterargs[k] = v
231
232 def init_progress(self):
233 """Initializes progress bar, if any."""
234 if self.args.PROGRESS and not self.bar:
235 self.bar = common.ProgressBar(**self._bar_args)
236 self.bar.start() if self.bar.pulse else self.bar.update(value=0)
237
238 def update_progress(self, count, running=True):
239 """Updates progress bar, if any, with source processed count, pauses bar if not running."""
240 if self.bar:
241 if not running:
242 self.bar.pause, self.bar.pulse_pos = True, None
243 if self._bar_args.get("source_value") is not None:
244 self.bar.afterargs["source_value"] = count
245 else: self.bar.update(count)
246
247 def thread_excepthook(self, text, exc):
248 """Handles exception, used by background threads."""
249 ConsolePrinter.error(text)
250
251 def _parse_patterns(self):
252 """Parses pattern arguments into re.Patterns."""
253 selects, noselects = self.args.SELECT_FIELD, self.args.NOSELECT_FIELD
254 for key, vals in [("select", selects), ("noselect", noselects)]:
255 self._patterns[key] = [(tuple(v.split(".")), common.path_to_regex(v)) for v in vals]
256
257
258class ConditionMixin(object):
259 """
260 Provides topic conditions evaluation.
261
262 Evaluates a set of Python expressions, with a namespace of:
263 - msg: current message being checked
264 - topic: current topic being read
265 - <topic /any/name> messages in named or wildcarded topic
266
267 <topic ..> gets replaced with an object with the following behavior:
268 - len(obj) -> number of messages processed in topic
269 - bool(obj) -> whether there are any messages in topic
270 - obj[pos] -> topic message at position (from latest if negative, first if positive)
271 - obj.x -> attribute x of last message
272
273 All conditions need to evaluate as true for a message to be processable.
274 If a condition tries to access attributes of a message not yet present,
275 condition evaluates as false.
276
277 If a condition topic matches more than one real topic (by wildcard or by
278 different types in one topic), evaluation is done for each set of
279 topics separately, condition passing if any set passes.
280
281 Example condition: `<topic */control_enable>.data and <topic */cmd_vel>.linear.x > 0`
282 `and <topic */cmd_vel>.angular.z < 0.02`.
283 """
284
285 TOPIC_RGX = re.compile(r"<topic\s+([^\s><]+)\s*>") # "<topic /some/thing>"
287
288 DEFAULT_ARGS = dict(CONDITION=())
289
290 class NoMessageException(Exception): pass
292
293 class Topic(object):
294 """
295 Object for <topic x> replacements in condition expressions.
296
297 - len(topic) -> number of messages processed in topic
298 - bool(topic) -> whether there are any messages in topic
299 - topic[x] -> history at -1 -2 for last and but one, or 0 1 for first and second
300 - topic.x -> attribute x of last message
301 - value in topic -> whether any field of last message contains value
302 - value in topic[x] -> whether any field of topic history at position contains value
303 """
304
305 def __init__(self, count, firsts, lasts):
306 self._count = count
307 self._firsts = firsts
308 self._lasts = lasts
309
310 def __bool__(self): return bool(self._count)
311 def __nonzero__(self): return bool(self._count)
312 def __len__(self): return self._count
314 def __contains__(self, item):
315 """Returns whether value exists in last message, or raises NoMessageException."""
316 if not self._lasts: raise ConditionMixin.NoMessageException()
317 return item in ConditionMixin.Message(self._lasts[-1])
318
319 def __getitem__(self, key):
320 """Returns message from history at key, or Empty() if no such message."""
321 try: return ConditionMixin.Message((self._lasts if key < 0 else self._firsts)[key])
322 except IndexError: return ConditionMixin.Empty()
323
324 def __getattr__(self, name):
325 """Returns attribute value of last message, or raises NoMessageException."""
326 if not self._lasts: raise ConditionMixin.NoMessageException()
327 return getattr(self._lasts[-1], name)
328
329
330 class Message(object):
331 """
332 Object for current topic message in condition expressions.
333
334 - value in msg -> whether any message field contains value
335 - msg.x -> attribute x of message
336 """
337
338 def __init__(self, msg):
339 self._msg = msg
340 self._fulltext = None
342 def __contains__(self, item):
343 """Returns whether value exists in any message field."""
344 if not self._fulltext:
345 self._fulltext = "\n".join("%s" % (v, ) for _, v, _ in
346 api.iter_message_fields(self._msg, flat=True))
347 value = item if isinstance(item, six.text_type) else \
348 item.decode() if isinstance(item, six.binary_type) else str(item)
349 return re.search(re.escape(value), self._fulltext, re.I)
350
351 def __getattr__(self, name):
352 """Returns attribute value of message."""
353 return getattr(self._msg, name)
355
356 class Empty(object):
357 """Placeholder falsy object that raises NoMessageException on attribute access."""
358 def __getattr__(self, name): raise ConditionMixin.NoMessageException()
359 def __bool__(self): return False
360 def __nonzero__(self): return False
361 def __contains__(self, item): return False
362 def __len__(self): return 0
365 def __init__(self, args=None, **kwargs):
366 """
367 @param args arguments as namespace or dictionary, case-insensitive
368 @param args.condition Python expressions that must evaluate as true
369 for message to be processable, see ConditionMixin
370 @param kwargs any and all arguments as keyword overrides, case-insensitive
371 """
372 self._topic_states = {} # {topic: whether only used for condition, not matching}
373 self._topics_per_condition = [] # [[topics in 1st condition], ]
374 self._wildcard_topics = {} # {"/my/*/topic": re.Pattern}
375 # {(topic, typename, typehash): [1st, 2nd, ..]}
376 self._firstmsgs = collections.defaultdict(collections.deque)
377 # {(topic, typename, typehash): [.., last]}
378 self._lastmsgs = collections.defaultdict(collections.deque)
379 # {topic: (max positive index + 1, max abs(negative index) or 1)}
380 self._topic_limits = collections.defaultdict(lambda: [1, 1])
381
382
383 self._conditions = collections.OrderedDict()
384
385 def is_processable(self, topic, msg, stamp, index=None):
386 """Returns whether message passes passes current state conditions, if any."""
387 result = True
388 if not self._conditions:
389 return result
390 for i, (expr, code) in enumerate(self._conditions.items()):
391 topics = self._topics_per_condition[i]
392 wildcarded = [t for t in topics if t in self._wildcard_topics]
393 realcarded = {wt: [(t, n, h) for (t, n, h) in self._lastmsgs if p.match(t)]
394 for wt in wildcarded for p in [self._wildcard_topics[wt]]}
395 variants = [[(wt, (t, n, h)) for (t, n, h) in tt] or [(wt, (wt, None))]
396 for wt, tt in realcarded.items()]
397 variants = variants or [[None]] # Ensure one iteration if no wildcards to combine
398
399 result = False
400 for remaps in itertools.product(*variants): # [(wildcard1, realname1), (wildcard2, ..]
401 if remaps == (None, ): remaps = ()
402 getter = functools.partial(self._get_topic_instance, remap=dict(remaps))
403 ns = {"topic": topic, "msg": ConditionMixin.Message(msg), "get_topic": getter}
404 try: result = eval(code, ns)
405 except self.NoMessageException: pass
406 except Exception as e:
407 ConsolePrinter.error('Error evaluating condition "%s": %s', expr, e)
408 raise
409 if result: break # for remaps
410 if not result: break # for i,
411 return result
412
413 def validate(self):
414 """Returns whether conditions have valid syntax, sets options, prints errors."""
415 errors = []
416 for v in self.args.CONDITION:
417 v = self.TOPIC_RGX.sub("dummy", v)
418 try: compile(v, "", "eval")
419 except SyntaxError as e:
420 errors.append("'%s': %s at %schar %s" %
421 (v, e.msg, "line %s " % e.lineno if e.lineno > 1 else "", e.offset))
422 except Exception as e:
423 errors.append("'%s': %s" % (v, e))
424 if errors:
425 ConsolePrinter.error("Invalid condition")
426 for err in errors:
427 ConsolePrinter.error(" %s" % err)
428 else:
429 self._configure_conditions(ensure_namespace(self.args, ConditionMixin.DEFAULT_ARGS))
430 return not errors
431
432 def close_batch(self):
433 """Clears cached messages."""
434 self._firstmsgs.clear()
435 self._lastmsgs.clear()
437 def has_conditions(self):
438 """Returns whether there are any conditions configured."""
439 return bool(self._conditions)
440
442 """Returns a list of all topics used in conditions (may contain wildcards)."""
443 return list(self._topic_states)
444
445 def is_conditions_topic(self, topic, pure=True):
446 """
447 Returns whether topic is used for checking condition.
448
449 @param pure whether use should be solely for condition, not for matching at all
450 """
451 if not self._conditions: return False
452 if topic in self._topic_states:
453 return self._topic_states[topic] if pure else True
454 wildcarded = [t for t, p in self._wildcard_topics.items() if p.match(topic)]
455 if not wildcarded: return False
456 return all(map(self._topic_states.get, wildcarded)) if pure else True
457
458 def conditions_set_topic_state(self, topic, pure):
459 """Sets whether topic is purely used for conditions not matching."""
460 if topic in self._topic_states:
461 self._topic_states[topic] = pure
463 def conditions_register_message(self, topic, msg):
464 """Retains message for condition evaluation if in condition topic."""
465 if self.is_conditions_topic(topic, pure=False):
466 topickey = api.TypeMeta.make(msg, topic).topickey
467 self._lastmsgs[topickey].append(msg)
468 if len(self._lastmsgs[topickey]) > self._topic_limits[topic][-1]:
469 self._lastmsgs[topickey].popleft()
470 if len(self._firstmsgs[topickey]) < self._topic_limits[topic][0]:
471 self._firstmsgs[topickey].append(msg)
472
473 def _get_topic_instance(self, topic, remap=None):
474 """
475 Returns Topic() by name.
476
477 @param remap optional remap dictionary as {topic1: (topic2, typename, typehash)}
478 """
479 if remap and topic in remap:
480 topickey = remap[topic]
481 else:
482 topickey = next(((t, n, h) for (t, n, h) in self._lastmsgs if t == topic), None)
483 if topickey not in self._counts:
484 return self.Empty()
485 c, f, l = (d[topickey] for d in (self._counts, self._firstmsgs, self._lastmsgs))
486 return self.Topic(c, f, l)
487
488 def _configure_conditions(self, args):
489 """Parses condition expressions and populates local structures."""
490 self._conditions.clear()
491 self._topic_limits.clear()
492 self._topic_states.clear()
493 self._wildcard_topics.clear()
494 del self._topics_per_condition[:]
495 for v in args.CONDITION:
496 topics = list(set(self.TOPIC_RGX.findall(v)))
497 self._topic_states.update({t: True for t in topics})
498 self._topics_per_condition.append(topics)
499 for t in (t for t in topics if "*" in t):
500 self._wildcard_topics[t] = common.wildcard_to_regex(t, end=True)
501 expr = self.TOPIC_RGX.sub(r'get_topic("\1")', v)
502 self._conditions[expr] = compile(expr, "", "eval")
503
504 for v in args.CONDITION: # Set history length from <topic x>[index]
505 indexexprs = re.findall(self.TOPIC_RGX.pattern + r"\s*\[([^\]]+)\]", v)
506 for topic, indexexpr in indexexprs:
507 limits = self._topic_limits[topic]
508 try:
509 index = eval(indexexpr) # If integer, set history limits
510 limits[index < 0] = max(limits[index < 0], abs(index) + (index >= 0))
511 except Exception: continue # for topic
512
513
515 """Produces messages from ROS bagfiles."""
516
517
518 MESSAGE_META_TEMPLATE = "{topic} {index}/{total} ({type} {dt} {stamp})"
519
520
521 META_TEMPLATE = "\nFile {file} ({size}), {tcount} topics, {mcount:,d} messages\n" \
522 "File period {startdt} - {enddt}\n" \
523 "File span {delta} ({start} - {end})"
524
525
526 DEFAULT_ARGS = dict(BAG=(), FILE=(), PATH=(), RECURSE=False, TOPIC=(), TYPE=(),
527 SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=None, END_TIME=None,
528 START_INDEX=None, END_INDEX=None, CONDITION=(), AFTER=0, ORDERBY=None,
529 DECOMPRESS=False, REINDEX=False, WRITE=(), PROGRESS=False,
530 STOP_ON_ERROR=False, TIMESCALE=0, TIMESCALE_EMISSION=False, VERBOSE=False)
531
532 def __init__(self, args=None, **kwargs):
533 """
534 @param args arguments as namespace or dictionary, case-insensitive;
535 or a single path as the ROS bagfile to read,
536 or a stream to read from,
537 or one or more {@link grepros.api.Bag Bag} instances
538 <!--sep-->
539
540 Bag-specific arguments:
541 @param args.file names of ROS bagfiles to read if not all in directory,
542 or a stream to read from;
543 or one or more {@link grepros.api.Bag Bag} instances
544 @param args.path paths to scan if not current directory
545 @param args.recurse recurse into subdirectories when looking for bagfiles
546 @param args.orderby "topic" or "type" if any to group results by
547 @param args.decompress decompress archived bags to file directory
548 @param args.reindex make a copy of unindexed bags and reindex them (ROS1 only)
549 @param args.timescale emit messages on original timeline from first message
550 at given rate, 0 disables
551 @param args.timescale_emission
552 timeline from first matched message not first in bag,
553 requires notify() for each message
554 @param args.write outputs, to skip in input files
555 @param args.bag one or more {@link grepros.api.Bag Bag} instances
556 <!--sep-->
557
558 General arguments:
559 @param args.topic ROS topics to read if not all
560 @param args.type ROS message types to read if not all
561 @param args.skip_topic ROS topics to skip
562 @param args.skip_type ROS message types to skip
563 @param args.start_time earliest timestamp of messages to read
564 @param args.end_time latest timestamp of messages to read
565 @param args.start_index message index within topic to start from
566 @param args.end_index message index within topic to stop at
567 @param args.unique emit messages that are unique in topic
568 @param args.select_field message fields to use for uniqueness if not all
569 @param args.noselect_field message fields to skip for uniqueness
570 @param args.nth_message read every Nth message in topic, starting from first
571 @param args.nth_interval minimum time interval between messages in topic,
572 as seconds or ROS duration
573 @param args.condition Python expressions that must evaluate as true
574 for message to be processable, see ConditionMixin
575 @param args.progress whether to print progress bar
576 @param args.stop_on_error stop execution on any error like unknown message type
577 @param args.verbose whether to print error stacktraces
578 @param kwargs any and all arguments as keyword overrides, case-insensitive
579 """
580 args0 = args
581 is_bag = isinstance(args, api.Bag) or \
582 common.is_iterable(args) and all(isinstance(x, api.Bag) for x in args)
583 args = {"FILE": str(args)} if isinstance(args, common.PATH_TYPES) else \
584 {"FILE": args} if common.is_stream(args) else {} if is_bag else args
585 args = ensure_namespace(args, BagSource.DEFAULT_ARGS, **kwargs)
586 super(BagSource, self).__init__(args)
587 ConditionMixin.__init__(self, args)
588 self._args0 = common.structcopy(self.argsargs) # Original arguments
589 self._totals_ok = False # Whether message count totals have been retrieved (ROS2 optimize)
590 self._types_ok = False # Whether type definitions have been retrieved (ROS2 optimize)
591 self._running = False
592 self._bag = None # Current bag object instance
593 self._filename = None # Current bagfile path
594 self._meta = None # Cached get_meta()
595 self._bag0 = ([args0] if isinstance(args0, api.Bag) else args0) if is_bag else None
596 self._delaystamps = collections.defaultdict(int) # Tracked timestamps for timeline emission
597
598 def read(self):
599 """Yields messages from ROS bagfiles, as (topic, msg, ROS time)."""
600 if not self.validatevalidatevalidate(): raise Exception("invalid")
601 self._running = True
602
603 for _ in self._produce_bags():
604 if not self._running:
605 break # for _
606
607 topicsets = [self._topics_topics]
608 if "topic" == self.argsargs.ORDERBY: # Group output by sorted topic names
609 topicsets = [{n: tt} for n, tt in sorted(self._topics_topics.items())]
610 elif "type" == self.argsargs.ORDERBY: # Group output by sorted type names
611 typetopics = {}
612 for n, tt in self._topics_topics.items():
613 for t in tt: typetopics.setdefault(t, []).append(n)
614 topicsets = [{n: [t] for n in nn} for t, nn in sorted(typetopics.items())]
615
616 self._types_ok = False
618 for topics in topicsets:
619 for topic, msg, stamp, index in self._produce(topics) if topics else ():
620 self.conditions_register_message(topic, msg)
621 if not self.is_conditions_topic(topic, pure=True) \
622 and (not self.preprocess or self.is_processableis_processableis_processable(topic, msg, stamp, index)):
623 yield self.SourceMessage(topic, msg, stamp)
624 if not self._running:
625 break # for topics
626 self._counts and self.sink and self.sink.flush()
628 self._running = False
629
630 def configure(self, args=None, **kwargs):
631 """
632 Updates source configuration.
633
634 @param args arguments as namespace or dictionary, case-insensitive
635 @param kwargs any and all arguments as keyword overrides, case-insensitive
636 """
637 super(BagSource, self).configure(args, **kwargs)
638 self._args0 = common.structcopy(self.argsargs)
639
640 def validate(self):
641 """Returns whether ROS environment is set and arguments valid, prints error if not."""
642 if self.validvalid is not None: return self.validvalid
643 self.validvalid = Source.validate(self)
644 if not api.validate():
645 self.validvalid = False
646 if not self._bag0 and self.argsargs.FILE and os.path.isfile(self.argsargs.FILE[0]) \
647 and not common.verify_io(self.argsargs.FILE[0], "r"):
648 ConsolePrinter.error("File not readable.")
649 self.validvalid = False
650 if not self._bag0 and common.is_stream(self.argsargs.FILE) \
651 and not any(c.STREAMABLE for c in api.Bag.READER_CLASSES):
652 ConsolePrinter.error("Bag format does not support reading streams.")
653 self.validvalid = False
654 if self._bag0 and not any(x.mode in ("r", "a") for x in self._bag0):
655 ConsolePrinter.error("Bag not in read mode.")
656 self.validvalid = False
657 if self.argsargs.ORDERBY and self.conditions_get_topics():
658 ConsolePrinter.error("Cannot use topics in conditions and bag order by %s.",
659 self.argsargs.ORDERBY)
660 self.validvalid = False
661 if self.argsargs.TIMESCALE and self.argsargs.TIMESCALE < 0:
662 ConsolePrinter.error("Invalid timescale factor: %r.", self.argsargs.TIMESCALE)
663 self.validvalid = False
664 if not ConditionMixin.validate(self):
665 self.validvalid = False
666 return self.validvalid
667
668 def close(self):
669 """Closes current bag, if any."""
670 self._running = False
671 if self._bag and not self._bag0: self._bag.close()
672 ConditionMixin.close_batch(self)
673 super(BagSource, self).close()
674
675 def close_batch(self):
676 """Closes current bag, if any."""
677 if self._bag0: self._running = False
678 elif self._bag: self._bag.close()
679 self._bag = None
680 if self.barbar:
681 self.barbar.update(flush=True)
682 self.barbar = None
683 if self._bar_args.get("source_value") is not None:
684 self._bar_args["source_value"] = 0
685 ConditionMixin.close_batch(self)
686
687 def format_meta(self):
688 """Returns bagfile metainfo string."""
689 return self.META_TEMPLATE.format(**self.get_metaget_meta())
690
691 def format_message_meta(self, topic, msg, stamp, index=None):
692 """Returns message metainfo string."""
693 meta = self.get_message_metaget_message_meta(topic, msg, stamp, index)
694 meta = {k: "" if v is None else v for k, v in meta.items()}
695 return self.MESSAGE_META_TEMPLATEMESSAGE_META_TEMPLATE.format(**meta)
697 def get_batch(self):
698 """Returns name of current bagfile, or self if reading stream."""
699 return self._filename if self._filename is not None else self
701 def get_meta(self):
702 """Returns bagfile metainfo data dict."""
703 if self._meta is not None:
704 return self._meta
705 mcount = self._bag.get_message_count()
706 start, end = (self._bag.get_start_time(), self._bag.get_end_time()) if mcount else ("", "")
707 delta = common.format_timedelta(datetime.timedelta(seconds=(end or 0) - (start or 0)))
708 self._meta = dict(file=self._filename, size=common.format_bytes(self._bag.size),
709 mcount=mcount, tcount=len(self.topicstopics), delta=delta,
710 start=drop_zeros(start), end=drop_zeros(end),
711 startdt=drop_zeros(common.format_stamp(start)) if start != "" else "",
712 enddt=drop_zeros(common.format_stamp(end)) if end != "" else "")
713 return self._meta
714
715 def get_message_meta(self, topic, msg, stamp, index=None):
716 """Returns message metainfo data dict."""
717 self._ensure_totals()
718 result = super(BagSource, self).get_message_meta(topic, msg, stamp, index)
719 result.update(total=self.topicstopics[(topic, result["type"], result["hash"])])
720 if callable(getattr(self._bag, "get_qoses", None)):
721 result.update(qoses=self._bag.get_qoses(topic, result["type"]))
722 return result
723
724 def get_message_class(self, typename, typehash=None):
725 """Returns ROS message type class."""
726 return self._bag.get_message_class(typename, typehash) or \
727 api.get_message_class(typename)
728
729 def get_message_definition(self, msg_or_type):
730 """Returns ROS message type definition full text, including subtype definitions."""
731 return self._bag.get_message_definition(msg_or_type) or \
732 api.get_message_definition(msg_or_type)
734 def get_message_type_hash(self, msg_or_type):
735 """Returns ROS message type MD5 hash."""
736 return self._bag.get_message_type_hash(msg_or_type) or \
737 api.get_message_type_hash(msg_or_type)
739 def notify(self, status):
740 """Reports match status of last produced message."""
741 super(BagSource, self).notify(status)
742 if status and not self._totals_ok:
744 if status and self.argsargs.TIMESCALE and self.argsargs.TIMESCALE_EMISSION:
745 if "first" not in self._delaystamps:
746 self._delaystamps["first"] = self._delaystamps["current"]
747 else: self._delay_timeline() # Delay until time met
749 def is_processable(self, topic, msg, stamp, index=None):
750 """Returns whether message passes source filters; registers status."""
751 self._status_status = False
752 topickey = api.TypeMeta.make(msg, topic).topickey
753 if self.argsargs.START_INDEX and index is not None and self.argsargs.START_INDEX < 0 \
754 and topickey not in self._start_indexes: # Populate topic in _start_indexes
755 self._ensure_totals()
756 self._start_indexes[topickey] = max(0, self.argsargs.START_INDEX + self.topicstopics[topickey])
757 if self.argsargs.END_INDEX and index is not None and self.argsargs.END_INDEX < 0 \
758 and topickey not in self._end_indexes: # Populate topic in _end_indexes
759 self._ensure_totals()
760 self._end_indexes[topickey] = (self.argsargs.END_INDEX + self.topicstopics[topickey])
761 if not self._end_indexes[topickey]: self._end_indexes[topickey] = -1
762
763 if not super(BagSource, self).is_processable(topic, msg, stamp, index):
764 return False
765 if not ConditionMixin.is_processable(self, topic, msg, stamp, index):
766 return False
767 self._status_status = True
768 return True
769
770 def init_progress(self):
771 """Initializes progress bar, if any, for current bag."""
772 if self.argsargs.PROGRESS and not self.barbar:
773 self._ensure_totals()
775 super(BagSource, self).init_progress()
776
777 def _produce(self, topics, start_time=None):
778 """
779 Yields messages from current ROS bagfile, as (topic, msg, ROS time, index in topic).
780
781 @param topics {topic: [typename, ]}
782 """
783 if not self._running or not self._bag: return
784 do_predelay = self.argsargs.TIMESCALE and not self.argsargs.TIMESCALE_EMISSION
785 if do_predelay: self._delaystamps["first"] = self._bag.get_start_time()
786 if self.argsargs.TIMESCALE and "read" not in self._delaystamps:
787 self._delaystamps["read"] = getattr(time, "monotonic", time.time)() # Py3 / Py2
788 counts = collections.Counter()
789 endtime_indexes = {} # {topickey: index at reaching END_TIME}
790 nametypes = {(n, t) for n, tt in topics.items() for t in tt}
791 for topic, msg, stamp in self._bag.read_messages(list(topics), start_time):
792 if not self._running or not self._bag:
793 break # for topic,
794 typename = api.get_message_type(msg)
795 if topics and typename not in topics[topic]:
796 continue # for topic,
797 if api.ROS2 and not self._types_ok:
798 self.topicstopics, self._types_ok = self._bag.get_topic_info(counts=False), True
799
800 topickey = api.TypeMeta.make(msg, topic, self).topickey
801 counts[topickey] += 1; self._counts[topickey] += 1
802 # Skip messages already processed during sticky
803 if start_time is None and counts[topickey] != self._counts[topickey]:
804 continue # for topic,
805
806 self._status_status, self._delaystamps["current"] = None, api.to_sec(stamp)
807 if do_predelay: self._delay_timeline() # Delay emission until time
808 if self.barbar: self.update_progress(sum(self._counts.values()))
809 yield topic, msg, stamp, self._counts[topickey]
810
811 if self._status_status:
812 self._processables[topickey] = (self._counts[topickey], stamp)
813 if self._status_status and not self.preprocess and self.argsargs.AFTER and start_time is None \
814 and not self.has_conditions() \
815 and (len(self._topics_topics) > 1 or len(next(iter(self._topics_topics.values()))) > 1):
816 # Stick to one topic until trailing messages have been emitted
817 for entry in self._produce({topic: typename}, stamp + api.make_duration(nsecs=1)):
818 yield entry
819 if not self._running or not self._bag or (start_time is None
820 and self._is_at_end_threshold(topickey, stamp, nametypes, endtime_indexes)):
821 break # for topic,
822
823 def _produce_bags(self):
824 """Yields Bag instances from configured arguments."""
825 if self._bag0:
826 for bag in self._bag0:
827 if self._configure(bag=bag):
828 yield self._bag
829 return
830
831 names, paths = self.argsargs.FILE, self.argsargs.PATH
832 exts, skip_exts = api.BAG_EXTENSIONS, api.SKIP_EXTENSIONS
833 exts = list(exts) + ["%s%s" % (a, b) for a in exts for b in common.Decompressor.EXTENSIONS]
834
835 encountereds = set()
836 for filename in common.find_files(names, paths, exts, skip_exts, self.argsargs.RECURSE):
837 if not self._running:
838 break # for filename
839
840 fullname = os.path.realpath(os.path.abspath(filename))
841 skip = common.Decompressor.make_decompressed_name(fullname) in encountereds
842 encountereds.add(fullname)
843
844 if skip or not self._configure(filename):
845 continue # for filename
846
847 encountereds.add(self._bag.filename)
848 yield self._bag
849
850 def _make_progress_args(self):
851 """Returns dictionary with progress bar options"""
852 total = sum(sum(c for (t, n, _), c in self.topicstopics.items() if c and t == t_ and n in nn)
853 for t_, nn in self._topics_topics.items())
854 result = dict(max=total, afterword=os.path.basename(self._filename or "<stream>"))
855
856 instr, outstr = "{value:,d}/{max:,d}", ""
857 if any([self.argsargs.CONDITION, self.argsargs.UNIQUE, self.argsargs.NTH_INTERVAL,
858 self.argsargs.START_TIME, self.argsargs.END_TIME]) or self.argsargs.NTH_MESSAGE > 1:
859 self._bar_args.setdefault("source_value", 0) # Separate counts if not all messages
860 if self._bar_args.get("source_value") is not None \
861 or self._bar_args.get("match_max") is not None:
862 result.update(source_value=self._bar_args.get("source_value") or 0)
863 instr, outstr = "{source_value:,d}/{max:,d}", "matched {value:,d}"
864 if self._bar_args.get("match_max") is not None:
865 instr, outstr = "{source_value:,d}/{source_max:,d}", outstr + "/{match_max:,d}"
866 result.update(source_max=total, max=min(total, self._bar_args["match_max"]))
867 result.update(aftertemplate=" {afterword} (%s)" % " ".join(filter(bool, (instr, outstr))))
868
869 return result
870
871 def _ensure_totals(self):
872 """Retrieves total message counts if not retrieved."""
873 if not self._totals_ok: # ROS2 bag probably
874 has_ensure = common.has_arg(self._bag.get_topic_info, "ensure_types")
875 kws = dict(ensure_types=False) if has_ensure else {}
876 for (t, n, h), c in self._bag.get_topic_info(**kws).items():
877 self.topicstopics[(t, n, h)] = c
878 self._totals_ok = True
879
880 def _delay_timeline(self):
881 """Sleeps until message ought to be emitted in bag timeline."""
882 curstamp, readstamp, startstamp = map(self._delaystamps.get, ("current", "read", "first"))
883 delta = max(0, api.to_sec(curstamp) - startstamp) / (self.argsargs.TIMESCALE or 1)
884 if delta: time.sleep(max(0, delta + readstamp - getattr(time, "monotonic", time.time)()))
885
886 def _is_at_end_threshold(self, topickey, stamp, nametypes, endtime_indexes):
887 """
888 Returns whether bag reading has reached END_INDEX or END_TIME in all given topics.
889
890 @param topickey (topic, typename, typehash) of current message
891 @param stamp ROS timestamp of current message
892 @param nametypes {(topic, typename)} to account for
893 @param endtime_indexes {topickey: index at reaching END_TIME}, gets modified
894 """
895 if self.argsargs.END_INDEX:
896 max_index = self.argsargs.END_INDEX + self.argsargs.AFTER
897 if self._counts[topickey] >= max_index: # Stop reading when reaching max in all topics
898 mycounts = {k: v for k, v in self._counts.items() if k[:2] in nametypes}
899 if nametypes == set(k[:2] for k in mycounts) \
900 and all(v >= self._end_indexes.get(k, max_index) for k, v in mycounts.items()):
901 return True # Early break if all topics at max index
902 if self.argsargs.END_TIME and stamp > self.argsargs.END_TIME:
903 self._ensure_totals()
904 if topickey not in endtime_indexes: endtime_indexes[topickey] = self._counts[topickey]
905 max_index = min(self.topicstopics[topickey], endtime_indexes[topickey] + self.argsargs.AFTER)
906 if self._counts[topickey] >= max_index: # One topic reached end: check all topics
907 myindexes = {k: v for k, v in endtime_indexes.items() if k[:2] in nametypes}
908 if nametypes == set(k[:2] for k in myindexes) \
909 and all(self._counts[k] >= min(self.topicstopics[k], v + self.argsargs.AFTER)
910 for k, v in myindexes.items()):
911 return True # Early break if all topics at max time
912 return False
913
914 def _configure(self, filename=None, bag=None):
915 """Opens bag and populates bag-specific argument state, returns success."""
916 self._meta = None
917 self._bag = None
918 self._filename = None
919 self._totals_ok = False
920 self._delaystamps.clear()
921 self._counts.clear()
922 self._start_indexes.clear()
923 self._end_indexes.clear()
924 self._processables.clear()
925 self._hashes.clear()
926 self.topicstopics.clear()
927
928 if bag is not None and bag.mode not in ("r", "a"):
929 ConsolePrinter.warn("Cannot read %s: bag in write mode.", bag)
930 return False
931
932 if filename and self.argsargs.WRITE \
933 and any(os.path.realpath(x[0]) == os.path.realpath(filename)
934 for x in self.argsargs.WRITE):
935 return False
936 try:
937 if filename and common.Decompressor.is_compressed(filename):
938 if self.argsargs.DECOMPRESS:
939 filename = common.Decompressor.decompress(filename, self.argsargs.PROGRESS)
940 else: raise Exception("decompression not enabled")
941 bag = api.Bag(filename, mode="r", reindex=self.argsargs.REINDEX,
942 progress=self.argsargs.PROGRESS) if bag is None else bag
943 bag.stop_on_error = self.argsargs.STOP_ON_ERROR
944 bag.open()
945 except Exception as e:
946 ConsolePrinter.error("\nError opening %r: %s", filename or bag, e)
947 if self.argsargs.STOP_ON_ERROR: raise
948 if self.argsargs.VERBOSE: traceback.print_exc()
949 return False
950
951 self._bag = bag
952 self._filename = bag.filename
953
954 dct = fulldct = {} # {topic: [typename, ]}
955 kws = dict(ensure_types=False) if common.has_arg(bag.get_topic_info, "ensure_types") else {}
956 for (t, n, h), c in bag.get_topic_info(counts=False, **kws).items():
957 dct.setdefault(t, []).append(n)
958 self.topicstopics[(t, n, h)] = c
959 self._totals_ok = not any(v is None for v in self.topicstopics.values())
960 for topic in self.conditions_get_topics():
961 self.conditions_set_topic_state(topic, True)
962
963 dct = common.filter_dict(dct, self.argsargs.TOPIC, self.argsargs.TYPE)
964 dct = common.filter_dict(dct, self.argsargs.SKIP_TOPIC, self.argsargs.SKIP_TYPE, reverse=True)
965 for topic in self.conditions_get_topics(): # Add topics used in conditions
966 matches = [t for p in [common.wildcard_to_regex(topic, end=True)] for t in fulldct
967 if t == topic or "*" in topic and p.match(t)]
968 for realtopic in matches:
969 self.conditions_set_topic_state(realtopic, realtopic not in dct)
970 dct.setdefault(realtopic, fulldct[realtopic])
971 self._topics_topics = dct
972 self._meta = self.get_metaget_meta()
973
974 args = self.argsargs = common.structcopy(self._args0)
975 if args.START_TIME is not None:
976 args.START_TIME = api.make_bag_time(args.START_TIME, bag)
977 if args.END_TIME is not None:
978 args.END_TIME = api.make_bag_time(args.END_TIME, bag)
979 return True
980
981
983 """Produces messages from live ROS topics."""
984
985
986 MASTER_INTERVAL = 2
987
988
989 DEFAULT_ARGS = dict(TOPIC=(), TYPE=(), SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=None,
990 END_TIME=None, START_INDEX=None, END_INDEX=None, CONDITION=(),
991 QUEUE_SIZE_IN=10, ROS_TIME_IN=False, PROGRESS=False, STOP_ON_ERROR=False,
992 VERBOSE=False)
993
994 def __init__(self, args=None, **kwargs):
995 """
996 @param args arguments as namespace or dictionary, case-insensitive
997 @param args.topic ROS topics to read if not all
998 @param args.type ROS message types to read if not all
999 @param args.skip_topic ROS topics to skip
1000 @param args.skip_type ROS message types to skip
1001 @param args.start_time earliest timestamp of messages to read
1002 @param args.end_time latest timestamp of messages to read
1003 @param args.start_index message index within topic to start from
1004 @param args.end_index message index within topic to stop at
1005 @param args.unique emit messages that are unique in topic
1006 @param args.select_field message fields to use for uniqueness if not all
1007 @param args.noselect_field message fields to skip for uniqueness
1008 @param args.nth_message read every Nth message in topic, starting from first
1009 @param args.nth_interval minimum time interval between messages in topic,
1010 as seconds or ROS duration
1011 @param args.condition Python expressions that must evaluate as true
1012 for message to be processable, see ConditionMixin
1013 @param args.queue_size_in subscriber queue size (default 10)
1014 @param args.ros_time_in stamp messages with ROS time instead of wall time
1015 @param args.progress whether to print progress bar
1016 @param args.stop_on_error stop execution on any error like unknown message type
1017 @param args.verbose whether to print error stacktraces
1018 @param kwargs any and all arguments as keyword overrides, case-insensitive
1019 """
1020 args = ensure_namespace(args, LiveSource.DEFAULT_ARGS, **dict(kwargs, live=True))
1021 super(LiveSource, self).__init__(args)
1022 ConditionMixin.__init__(self, args)
1023 self._running = False # Whether is in process of yielding messages from topics
1024 self._queue = None # [(topic, msg, ROS time)]
1025 self._last_stamp = None # ROS stamp of last message
1026 self._subs = {} # {(topic, typename, typehash): ROS subscriber}
1027
1028 def read(self):
1029 """Yields messages from subscribed ROS topics, as (topic, msg, ROS time)."""
1030 if not self._running:
1031 if not self.validatevalidatevalidate(): raise Exception("invalid")
1032 api.init_node()
1033 self._running = True
1034 self._queue = queue.Queue()
1035 self.refresh_topics()
1036 t = threading.Thread(target=self._run_refresh)
1037 t.daemon = True
1038 t.start()
1039 if self.argsargs.END_TIME:
1040 self._last_stamp = None
1041 t = threading.Thread(target=self._run_endtime_closer)
1042 t.daemon = True
1043 t.start()
1044
1045 total = 0
1047 while self._running:
1048 topic, msg, stamp = self._queue.get()
1049 total += bool(topic)
1050 self.update_progressupdate_progress(total, running=self._running and bool(topic))
1051 if not topic: continue # while
1053 topickey = api.TypeMeta.make(msg, topic, self).topickey
1054 self._counts[topickey] += 1
1055 self._last_stamp = stamp
1056 self.conditions_register_message(topic, msg)
1057 if self.is_conditions_topic(topic, pure=True): continue # while
1058
1059 self._status_status = None
1060 if not self.preprocess \
1061 or self.is_processableis_processableis_processable(topic, msg, stamp, self._counts[topickey]):
1062 yield self.SourceMessage(topic, msg, stamp)
1063 if self._status_status and (self.argsargs.NTH_MESSAGE > 1 or self.argsargs.NTH_INTERVAL > 0):
1064 self._processables[topickey] = (self._counts[topickey], stamp)
1065 self._queue = None
1066 self._running = False
1067
1068 def bind(self, sink):
1069 """Attaches sink to source and blocks until connected to ROS live."""
1070 if not self.validatevalidatevalidate(): raise Exception("invalid")
1071 super(LiveSource, self).bind(sink)
1072 api.init_node()
1073
1074 def validate(self):
1075 """Returns whether ROS environment is set and arguments valid, prints error if not."""
1076 if self.validvalid is not None: return self.validvalid
1077 self.validvalid = Source.validate(self)
1078 if not api.validate(live=True):
1079 self.validvalid = False
1080 if not ConditionMixin.validate(self):
1081 self.validvalid = False
1082 if self.validvalid:
1083 self._configure()
1084 return self.validvalid
1085
1086 def close(self):
1087 """Shuts down subscribers and stops producing messages."""
1088 self._running = False
1089 for k in list(self._subs):
1090 self._subs.pop(k).unregister()
1091 self._queue and self._queue.put((None, None, None)) # Wake up iterator
1092 self._queue = None
1093 ConditionMixin.close_batch(self)
1094 super(LiveSource, self).close()
1095
1096 def get_meta(self):
1097 """Returns source metainfo data dict."""
1098 ENV = {k: os.getenv(k) for k in ("ROS_MASTER_URI", "ROS_DOMAIN_ID") if os.getenv(k)}
1099 return dict(ENV, tcount=len(self.topics), scount=len(self._subs))
1100
1101 def get_message_meta(self, topic, msg, stamp, index=None):
1102 """Returns message metainfo data dict."""
1103 result = super(LiveSource, self).get_message_meta(topic, msg, stamp, index)
1104 topickey = (topic, result["type"], result["hash"])
1105 if topickey in self._subs:
1106 result.update(qoses=self._subs[topickey].get_qoses())
1107 return result
1108
1109 def get_message_class(self, typename, typehash=None):
1110 """Returns message type class, from active subscription if available."""
1111 sub = next((s for (t, n, h), s in self._subs.items()
1112 if n == typename and typehash in (s.get_message_type_hash(), None)), None)
1113 return sub and sub.get_message_class() or api.get_message_class(typename)
1114
1115 def get_message_definition(self, msg_or_type):
1116 """Returns ROS message type definition full text, including subtype definitions."""
1117 if api.is_ros_message(msg_or_type):
1118 return api.get_message_definition(msg_or_type)
1119 sub = next((s for (t, n, h), s in self._subs.items() if n == msg_or_type), None)
1120 return sub and sub.get_message_definition() or api.get_message_definition(msg_or_type)
1121
1122 def get_message_type_hash(self, msg_or_type):
1123 """Returns ROS message type MD5 hash."""
1124 if api.is_ros_message(msg_or_type):
1125 return api.get_message_type_hash(msg_or_type)
1126 sub = next((s for (t, n, h), s in self._subs.items() if n == msg_or_type), None)
1127 return sub and sub.get_message_type_hash() or api.get_message_type_hash(msg_or_type)
1128
1129 def format_meta(self):
1130 """Returns source metainfo string."""
1131 metadata = self.get_metaget_meta()
1132 result = "\nROS%s live" % api.ROS_VERSION
1133 if "ROS_MASTER_URI" in metadata:
1134 result += ", ROS master %s" % metadata["ROS_MASTER_URI"]
1135 if "ROS_DOMAIN_ID" in metadata:
1136 result += ", ROS domain ID %s" % metadata["ROS_DOMAIN_ID"]
1137 result += ", %s initially" % common.plural("topic", metadata["tcount"])
1138 result += ", %s subscribed" % metadata["scount"]
1139 return result
1140
1141 def is_processable(self, topic, msg, stamp, index=None):
1142 """Returns whether message passes source filters; registers status."""
1143 self._status_status = False
1144 if not super(LiveSource, self).is_processable(topic, msg, stamp, index):
1145 return False
1146 if not ConditionMixin.is_processable(self, topic, msg, stamp, index):
1147 return False
1148 self._status_status = True
1149 return True
1150
1151 def refresh_topics(self):
1152 """Refreshes topics and subscriptions from ROS live."""
1153 for topic, typename in api.get_topic_types():
1154 topickey = (topic, typename, None)
1155 self.topics[topickey] = None
1156 dct = common.filter_dict({topic: [typename]}, self.argsargs.TOPIC, self.argsargs.TYPE)
1157 if not common.filter_dict(dct, self.argsargs.SKIP_TOPIC, self.argsargs.SKIP_TYPE, reverse=True):
1158 continue # for topic, typename
1159 if api.ROS2 and api.get_message_class(typename) is None:
1160 msg = "Error loading type %s in topic %s." % (typename, topic)
1161 if self.argsargs.STOP_ON_ERROR: raise Exception(msg)
1162 ConsolePrinter.warn(msg, __once=True)
1163 continue # for topic, typename
1164 if topickey in self._subs:
1165 continue # for topic, typename
1166
1167 handler = functools.partial(self._on_message, topic)
1168 try:
1169 sub = api.create_subscriber(topic, typename, handler,
1170 queue_size=self.argsargs.QUEUE_SIZE_IN)
1171 except Exception as e:
1172 ConsolePrinter.warn("Error subscribing to topic %s: %%r" % topic,
1173 e, __once=True)
1174 if self.argsargs.STOP_ON_ERROR: raise
1175 if self.argsargs.VERBOSE: traceback.print_exc()
1176 continue # for topic, typename
1177 self._subs[topickey] = sub
1178
1179 def init_progress(self):
1180 """Initializes progress bar, if any."""
1181 if self.argsargs.PROGRESS and not self.bar:
1183 super(LiveSource, self).init_progress()
1184
1185 def update_progress(self, count, running=True):
1186 """Updates progress bar, if any."""
1187 if self.bar:
1188 if count in (1, 2): # Change plurality
1189 self.configure_progress(**self._make_progress_args(count))
1190 super(LiveSource, self).update_progress(count, running)
1191
1192 def _configure(self):
1193 """Adjusts start/end time filter values to current time."""
1194 if self.argsargs.START_TIME is not None:
1195 self.argsargs.START_TIME = api.make_live_time(self.argsargs.START_TIME)
1196 if self.argsargs.END_TIME is not None:
1197 self.argsargs.END_TIME = api.make_live_time(self.argsargs.END_TIME)
1198
1199 def _make_progress_args(self, count=None):
1200 """Returns dictionary with progress bar options, for specific nessage index if any."""
1201 result = dict(afterword = "ROS%s live" % api.ROS_VERSION, pulse=True)
1202 if self._bar_args.get("match_max") is not None:
1203 result.update(max=self._bar_args["match_max"], pulse=False)
1204
1205 instr, outstr = "{value:,d} message%s" % ("" if count == 1 else "s"), ""
1206 if any([self.argsargs.CONDITION, self.argsargs.UNIQUE, self.argsargs.NTH_INTERVAL,
1207 self.argsargs.START_TIME, self.argsargs.END_TIME]) or self.argsargs.NTH_MESSAGE > 1:
1208 self._bar_args.setdefault("source_value", 0) # Separate counts if not all messages
1209 if self._bar_args.get("source_value") is not None:
1210 instr = "{source_value:,d} message%s" % ("" if count == 1 else "s")
1211 outstr = "matched {value:,d}"
1212 if self._bar_args.get("match_max") is not None: outstr += "/{match_max:,d}"
1213 elif self._bar_args.get("match_max") is not None:
1214 instr = "{value:,d}/{max:,d}"
1215 result.update(aftertemplate=" {afterword} (%s)" % " ".join(filter(bool, (instr, outstr))))
1216
1217 return result
1218
1219 def _run_refresh(self):
1220 """Periodically refreshes topics and subscriptions from ROS live."""
1221 time.sleep(self.MASTER_INTERVALMASTER_INTERVAL)
1222 while self._running:
1223 try: self.refresh_topics()
1224 except Exception as e: self.thread_excepthook("Error refreshing live topics: %r" % e, e)
1225 time.sleep(self.MASTER_INTERVALMASTER_INTERVAL)
1226
1227 def _run_endtime_closer(self):
1228 """Periodically checks whether END_TIME has been reached, closes source when so."""
1229 time.sleep(self.MASTER_INTERVALMASTER_INTERVAL)
1230 while self._running and self.argsargs.END_TIME:
1231 if self._last_stamp and self._last_stamp > self.argsargs.END_TIME:
1232 time.sleep(self.MASTER_INTERVALMASTER_INTERVAL) # Allow some more arrivals just in case
1233 self.closeclose()
1234 else: time.sleep(self.MASTER_INTERVALMASTER_INTERVAL)
1235
1236 def _on_message(self, topic, msg):
1237 """Subscription callback handler, queues message for yielding."""
1238 stamp = api.get_rostime() if self.argsargs.ROS_TIME_IN else api.make_time(time.time())
1239 self._queue and self._queue.put((topic, msg, stamp))
1240
1241
1243 """Produces messages from iterable or pushed data."""
1244
1245
1246 DEFAULT_ARGS = dict(TOPIC=(), TYPE=(), SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=None,
1247 END_TIME=None, START_INDEX=None, END_INDEX=None, UNIQUE=False,
1248 SELECT_FIELD=(), NOSELECT_FIELD=(), NTH_MESSAGE=1, NTH_INTERVAL=0,
1249 CONDITION=(), ITERABLE=None)
1250
1251 def __init__(self, args=None, **kwargs):
1252 """
1253 @param args arguments as namespace or dictionary, case-insensitive;
1254 or iterable yielding messages
1255 @param args.topic ROS topics to read if not all
1256 @param args.type ROS message types to read if not all
1257 @param args.skip_topic ROS topics to skip
1258 @param args.skip_type ROS message types to skip
1259 @param args.start_time earliest timestamp of messages to read
1260 @param args.end_time latest timestamp of messages to read
1261 @param args.start_index message index within topic to start from
1262 @param args.end_index message index within topic to stop at
1263 @param args.unique emit messages that are unique in topic
1264 @param args.select_field message fields to use for uniqueness if not all
1265 @param args.noselect_field message fields to skip for uniqueness
1266 @param args.nth_message read every Nth message in topic, starting from first
1267 @param args.nth_interval minimum time interval between messages in topic,
1268 as seconds or ROS duration
1269 @param args.condition Python expressions that must evaluate as true
1270 for message to be processable, see ConditionMixin
1271 @param args.iterable iterable yielding (topic, msg, stamp) or (topic, msg);
1272 yielding `None` signals end of content
1273 @param kwargs any and all arguments as keyword overrides, case-insensitive
1274 """
1275 if common.is_iterable(args) and not isinstance(args, dict):
1276 args = ensure_namespace(None, iterable=args)
1277 args = ensure_namespace(args, AppSource.DEFAULT_ARGS, **kwargs)
1278 super(AppSource, self).__init__(args)
1279 ConditionMixin.__init__(self, args)
1280 self._queue = queue.Queue() # [(topic, msg, ROS time)]
1281 self._reading = False
1282
1283 def read(self):
1284 """
1285 Yields messages from iterable or pushed data, as (topic, msg, ROS timestamp).
1286
1287 Blocks until a message is available, or source is closed.
1288 """
1289 if not self.validatevalidatevalidate(): raise Exception("invalid")
1290 def generate(iterable):
1291 for x in iterable: yield x
1292 feeder = generate(self.argsargs.ITERABLE) if self.argsargs.ITERABLE else None
1293 self._reading = True
1294 while self._reading:
1295 item = self._queue.get() if not feeder or self._queue.qsize() else next(feeder, None)
1296 if item is None: break # while
1297
1298 if len(item) > 2: topic, msg, stamp = item[:3]
1299 else: (topic, msg), stamp = item[:2], api.get_rostime(fallback=True)
1300 topickey = api.TypeMeta.make(msg, topic, self).topickey
1301 self._counts[topickey] += 1
1302 self.conditions_register_message(topic, msg)
1303 if self.is_conditions_topic(topic, pure=True): continue # while
1304
1305 self._status_status = None
1306 if not self.preprocess \
1307 or self.is_processableis_processableis_processable(topic, msg, stamp, self._counts[topickey]):
1308 yield self.SourceMessage(topic, msg, stamp)
1309 if self._status_status and (self.argsargs.NTH_MESSAGE > 1 or self.argsargs.NTH_INTERVAL > 0):
1310 self._processables[topickey] = (self._counts[topickey], stamp)
1311 self._reading = False
1312
1313 def close(self):
1314 """Closes current read() yielding, if any."""
1315 if self._reading:
1316 self._reading = False
1317 self._queue.put(None)
1318
1319 def read_queue(self):
1320 """
1321 Returns (topic, msg, stamp) from push queue, or `None` if no queue
1322 or message in queue is condition topic only.
1323 """
1324 if not self.validatevalidatevalidate(): raise Exception("invalid")
1325 item = None
1326 try: item = self._queue.get(block=False)
1327 except queue.Empty: pass
1328 if item is None: return None
1329
1330 topic, msg, stamp = item
1331 topickey = api.TypeMeta.make(msg, topic, self).topickey
1332 self._counts[topickey] += 1
1333 self.conditions_register_message(topic, msg)
1334 return None if self.is_conditions_topic(topic, pure=True) else (topic, msg, stamp)
1335
1336 def mark_queue(self, topic, msg, stamp):
1337 """Registers message produced from read_queue()."""
1338 if not self.validatevalidatevalidate(): raise Exception("invalid")
1339 if self.argsargs.NTH_MESSAGE > 1 or self.argsargs.NTH_INTERVAL > 0:
1340 topickey = api.TypeMeta.make(msg, topic).topickey
1341 self._processables[topickey] = (self._counts[topickey], stamp)
1342
1343 def push(self, topic, msg=None, stamp=None):
1344 """
1345 Pushes a message to be yielded from read().
1346
1347 @param topic topic name, or `None` to signal end of content
1348 @param msg ROS message
1349 @param stamp message ROS timestamp, defaults to current wall time if `None`
1350 """
1351 if not self.validatevalidatevalidate(): raise Exception("invalid")
1352 if topic is None: self._queue.put(None)
1353 else: self._queue.put((topic, msg, stamp or api.get_rostime(fallback=True)))
1354
1355 def is_processable(self, topic, msg, stamp, index=None):
1356 """Returns whether message passes source filters; registers status."""
1357 self._status_status = False
1358 dct = common.filter_dict({topic: [api.get_message_type(msg)]},
1359 self.argsargs.TOPIC, self.argsargs.TYPE)
1360 if not common.filter_dict(dct, self.argsargs.SKIP_TOPIC, self.argsargs.SKIP_TYPE, reverse=True):
1361 return False
1362 if not super(AppSource, self).is_processable(topic, msg, stamp, index):
1363 return False
1364 if not ConditionMixin.is_processable(self, topic, msg, stamp, index):
1365 return False
1366 self._status_status = True
1367 return True
1368
1369 def validate(self):
1370 """Returns whether configured arguments are valid, prints error if not."""
1371 if self.validvalid is not None: return self.validvalid
1372 self.validvalid = Source.validate(self)
1373 if self.validvalid:
1374 self._configure()
1375 return self.validvalid
1376
1377 def _configure(self):
1378 """Adjusts start/end time filter values to current time."""
1379 if self.argsargs.START_TIME is not None:
1380 self.argsargs.START_TIME = api.make_live_time(self.argsargs.START_TIME)
1381 if self.argsargs.END_TIME is not None:
1382 self.argsargs.END_TIME = api.make_live_time(self.argsargs.END_TIME)
1383
1384
1385__all__ = ["AppSource", "ConditionMixin", "BagSource", "LiveSource", "Source"]
Bag factory metaclass.
Definition api.py:381
A simple ASCII progress bar with a ticker thread.
Definition common.py:546
Produces messages from iterable or pushed data.
Definition inputs.py:1276
mark_queue(self, topic, msg, stamp)
Registers message produced from read_queue().
Definition inputs.py:1371
__init__(self, args=None, **kwargs)
Definition inputs.py:1309
push(self, topic, msg=None, stamp=None)
Pushes a message to be yielded from read().
Definition inputs.py:1385
read_queue(self)
Returns (topic, msg, stamp) from push queue, or None if no queue or message in queue is condition top...
Definition inputs.py:1358
is_processable(self, topic, msg, stamp, index=None)
Returns whether message passes source filters; registers status.
Definition inputs.py:1390
validate(self)
Returns whether configured arguments are valid, prints error if not.
Definition inputs.py:1404
close(self)
Closes current read() yielding, if any.
Definition inputs.py:1348
read(self)
Yields messages from iterable or pushed data, as (topic, msg, ROS timestamp).
Definition inputs.py:1323
Produces messages from ROS bagfiles.
Definition inputs.py:522
init_progress(self)
Initializes progress bar, if any, for current bag.
Definition inputs.py:779
notify(self, status)
Reports match status of last produced message.
Definition inputs.py:748
get_meta(self)
Returns bagfile metainfo data dict.
Definition inputs.py:710
close_batch(self)
Closes current bag, if any.
Definition inputs.py:684
configure(self, args=None, **kwargs)
Updates source configuration.
Definition inputs.py:645
__init__(self, args=None, **kwargs)
Definition inputs.py:588
get_batch(self)
Returns name of current bagfile, or self if reading stream.
Definition inputs.py:706
get_message_meta(self, topic, msg, stamp, index=None)
Returns message metainfo data dict.
Definition inputs.py:724
is_processable(self, topic, msg, stamp, index=None)
Returns whether message passes source filters; registers status.
Definition inputs.py:758
get_message_class(self, typename, typehash=None)
Returns ROS message type class.
Definition inputs.py:733
format_message_meta(self, topic, msg, stamp, index=None)
Returns message metainfo string.
Definition inputs.py:700
get_message_type_hash(self, msg_or_type)
Returns ROS message type MD5 hash.
Definition inputs.py:743
validate(self)
Returns whether ROS environment is set and arguments valid, prints error if not.
Definition inputs.py:649
close(self)
Closes current bag, if any.
Definition inputs.py:677
read(self)
Yields messages from ROS bagfiles, as (topic, msg, ROS time).
Definition inputs.py:607
str MESSAGE_META_TEMPLATE
Template for message metainfo line.
Definition inputs.py:526
get_message_definition(self, msg_or_type)
Returns ROS message type definition full text, including subtype definitions.
Definition inputs.py:738
str META_TEMPLATE
Template for bag metainfo header.
Definition inputs.py:529
format_meta(self)
Returns bagfile metainfo string.
Definition inputs.py:696
Placeholder falsy object that raises NoMessageException on attribute access.
Definition inputs.py:359
Object for current topic message in condition expressions.
Definition inputs.py:339
__getattr__(self, name)
Returns attribute value of message.
Definition inputs.py:354
__contains__(self, item)
Returns whether value exists in any message field.
Definition inputs.py:345
Object for <topic x> replacements in condition expressions.
Definition inputs.py:306
__getattr__(self, name)
Returns attribute value of last message, or raises NoMessageException.
Definition inputs.py:327
__init__(self, count, firsts, lasts)
Definition inputs.py:308
__getitem__(self, key)
Returns message from history at key, or Empty() if no such message.
Definition inputs.py:322
__contains__(self, item)
Returns whether value exists in last message, or raises NoMessageException.
Definition inputs.py:317
Provides topic conditions evaluation.
Definition inputs.py:286
close_batch(self)
Clears cached messages.
Definition inputs.py:436
__init__(self, args=None, **kwargs)
Definition inputs.py:375
conditions_register_message(self, topic, msg)
Retains message for condition evaluation if in condition topic.
Definition inputs.py:467
is_processable(self, topic, msg, stamp, index=None)
Returns whether message passes passes current state conditions, if any.
Definition inputs.py:389
validate(self)
Returns whether conditions have valid syntax, sets options, prints errors.
Definition inputs.py:417
is_conditions_topic(self, topic, pure=True)
Returns whether topic is used for checking condition.
Definition inputs.py:454
has_conditions(self)
Returns whether there are any conditions configured.
Definition inputs.py:441
conditions_get_topics(self)
Returns a list of all topics used in conditions (may contain wildcards).
Definition inputs.py:445
conditions_set_topic_state(self, topic, pure)
Sets whether topic is purely used for conditions not matching.
Definition inputs.py:462
Produces messages from live ROS topics.
Definition inputs.py:1005
init_progress(self)
Initializes progress bar, if any.
Definition inputs.py:1203
get_meta(self)
Returns source metainfo data dict.
Definition inputs.py:1120
bind(self, sink)
Attaches sink to source and blocks until connected to ROS live.
Definition inputs.py:1092
__init__(self, args=None, **kwargs)
Definition inputs.py:1043
get_message_meta(self, topic, msg, stamp, index=None)
Returns message metainfo data dict.
Definition inputs.py:1125
update_progress(self, count, running=True)
Updates progress bar, if any.
Definition inputs.py:1209
is_processable(self, topic, msg, stamp, index=None)
Returns whether message passes source filters; registers status.
Definition inputs.py:1165
get_message_class(self, typename, typehash=None)
Returns message type class, from active subscription if available.
Definition inputs.py:1133
get_message_type_hash(self, msg_or_type)
Returns ROS message type MD5 hash.
Definition inputs.py:1146
int MASTER_INTERVAL
Seconds between refreshing available topics from ROS master.
Definition inputs.py:1009
validate(self)
Returns whether ROS environment is set and arguments valid, prints error if not.
Definition inputs.py:1098
close(self)
Shuts down subscribers and stops producing messages.
Definition inputs.py:1110
read(self)
Yields messages from subscribed ROS topics, as (topic, msg, ROS time).
Definition inputs.py:1052
refresh_topics(self)
Refreshes topics and subscriptions from ROS live.
Definition inputs.py:1175
get_message_definition(self, msg_or_type)
Returns ROS message type definition full text, including subtype definitions.
Definition inputs.py:1139
format_meta(self)
Returns source metainfo string.
Definition inputs.py:1153
Returned from read() as (topic name, ROS message, ROS timestamp object).
Definition inputs.py:39
Message producer base class.
Definition inputs.py:35
init_progress(self)
Initializes progress bar, if any.
Definition inputs.py:233
notify(self, status)
Reports match status of last produced message.
Definition inputs.py:215
get_meta(self)
Returns source metainfo data dict.
Definition inputs.py:160
close_batch(self)
Shuts down input batch if any (like bagfile), else all input.
Definition inputs.py:142
configure(self, args=None, **kwargs)
Updates source configuration.
Definition inputs.py:118
thread_excepthook(self, text, exc)
Handles exception, used by background threads.
Definition inputs.py:248
bind(self, sink)
Attaches sink to source.
Definition inputs.py:109
__init__(self, args=None, **kwargs)
Definition inputs.py:65
get_batch(self)
Returns source batch identifier if any (like bagfile name if BagSource).
Definition inputs.py:156
get_message_meta(self, topic, msg, stamp, index=None)
Returns message metainfo data dict.
Definition inputs.py:163
__iter__(self)
Yields messages from source, as (topic, msg, ROS time).
Definition inputs.py:93
update_progress(self, count, running=True)
Updates progress bar, if any, with source processed count, pauses bar if not running.
Definition inputs.py:239
valid
Result of validate()
Definition inputs.py:87
bar
ProgressBar instance, if any.
Definition inputs.py:85
__exit__(self, exc_type, exc_value, traceback)
Context manager exit, closes source.
Definition inputs.py:101
is_processable(self, topic, msg, stamp, index=None)
Returns whether message passes source filters; registers status.
Definition inputs.py:182
get_message_class(self, typename, typehash=None)
Returns message type class.
Definition inputs.py:170
format_message_meta(self, topic, msg, stamp, index=None)
Returns message metainfo string.
Definition inputs.py:150
get_message_type_hash(self, msg_or_type)
Returns ROS message type MD5 hash.
Definition inputs.py:178
preprocess
Apply all filter arguments when reading, not only topic and type.
Definition inputs.py:89
validate(self)
Returns whether arguments are valid and source prerequisites are met.
Definition inputs.py:122
__enter__(self)
Context manager entry.
Definition inputs.py:97
close(self)
Shuts down input, closing any files or connections.
Definition inputs.py:129
configure_progress(self, **kwargs)
Configures progress bar options, updates current bar if any.
Definition inputs.py:221
read(self)
Yields messages from source, as (topic, msg, ROS time).
Definition inputs.py:105
topics
All topics in source, as {(topic, typenane, typehash): total message count or None}.
Definition inputs.py:83
sink
outputs.Sink instance bound to this source
Definition inputs.py:81
str MESSAGE_META_TEMPLATE
Template for message metainfo line.
Definition inputs.py:42
get_message_definition(self, msg_or_type)
Returns ROS message type definition full text, including subtype definitions.
Definition inputs.py:174
format_meta(self)
Returns source metainfo string.
Definition inputs.py:146