3Input sources for ROS messages. 
    5------------------------------------------------------------------------------ 
    6This file is part of grepros - grep for ROS bag files and live topics. 
    7Released under the BSD License. 
   12------------------------------------------------------------------------------
 
   14## @namespace grepros.inputs 
   15from __future__ import print_function 
   22except ImportError: 
import Queue 
as queue  
 
   32from . common 
import ArgumentUtil, ConsolePrinter, ensure_namespace, drop_zeros
 
   36    """Message producer base class.""" 
   42    MESSAGE_META_TEMPLATE = 
"{topic} #{index} ({type}  {dt}  {stamp})" 
   45    DEFAULT_ARGS = dict(START_TIME=
None, END_TIME=
None, START_INDEX=
None, END_INDEX=
None,
 
   46                        UNIQUE=
False, SELECT_FIELD=(), NOSELECT_FIELD=(),
 
   47                        NTH_MESSAGE=1, NTH_INTERVAL=0, PROGRESS=
False)
 
   49    def __init__(self, args=None, **kwargs):
 
   51        @param   args                   arguments 
as namespace 
or dictionary, case-insensitive
 
   52        @param   args.start_time        earliest timestamp of messages to read
 
   53        @param   args.end_time          latest timestamp of messages to read
 
   54        @param   args.unique            emit messages that are unique 
in topic
 
   55        @param   args.start_index       message index within topic to start 
from 
   56        @param   args.end_index         message index within topic to stop at
 
   57        @param   args.select_field      message fields to use 
for uniqueness 
if not all
 
   58        @param   args.noselect_field    message fields to skip 
for uniqueness
 
   59        @param   args.nth_message       read every Nth message 
in topic, starting 
from first
 
   60        @param   args.nth_interval      minimum time interval between messages 
in topic,
 
   61                                        as seconds 
or ROS duration
 
   62        @param   args.progress          whether to 
print progress bar
 
   63        @param   kwargs                 any 
and all arguments 
as keyword overrides, case-insensitive
 
   68        self.
_topics = collections.defaultdict(list)
 
   69        self.
_counts = collections.Counter()  
 
   71        self.
_hashes = collections.defaultdict(set)
 
   78        self.
args = ensure_namespace(args, Source.DEFAULT_ARGS, **kwargs)
 
   93        """Yields messages from source, as (topic, msg, ROS time).""" 
   97        """Context manager entry.""" 
  100    def __exit__(self, exc_type, exc_value, traceback):
 
  101        """Context manager exit, closes source.""" 
  105        """Yields messages from source, as (topic, msg, ROS time).""" 
  107    def bind(self, sink):
 
  108        """Attaches sink to source""" 
  111    def configure(self, args=None, **kwargs):
 
  113        Updates source configuration. 
  115        @param   args    arguments 
as namespace 
or dictionary, case-insensitive
 
  116        @param   kwargs  any 
and all arguments 
as keyword overrides, case-insensitive
 
  118        self.args = ensure_namespace(args, vars(self.args), **kwargs) 
  122        """Returns whether arguments are valid and source prerequisites are met.""" 
  124        try: self.
args, self.
valid = ArgumentUtil.validate(self.
args), 
True 
  125        except Exception: self.
valid = 
False 
  129        """Shuts down input, closing any files or connections.""" 
  137            self.
bar.pulse_pos = 
None 
  138            self.
bar.update(flush=
True).stop()
 
  142        """Shuts down input batch if any (like bagfile), else all input.""" 
  146        """Returns source metainfo string.""" 
  150        """Returns message metainfo string.""" 
  152        meta = {k: 
"" if v 
is None else v 
for k, v 
in meta.items()}
 
  156        """Returns source batch identifier if any (like bagfile name if BagSource).""" 
  159        """Returns source metainfo data dict.""" 
  163        """Returns message metainfo data dict.""" 
  164        with api.TypeMeta.make(msg, topic) 
as m:
 
  165            return dict(topic=topic, type=m.typename, stamp=drop_zeros(api.to_sec(stamp)),
 
  166                        index=index, dt=drop_zeros(common.format_stamp(api.to_sec(stamp)), 
" "),
 
  167                        hash=m.typehash, schema=m.definition)
 
  170        """Returns message type class.""" 
  171        return api.get_message_class(typename)
 
  174        """Returns ROS message type definition full text, including subtype definitions.""" 
  175        return api.get_message_definition(msg_or_type)
 
  178        """Returns ROS message type MD5 hash.""" 
  179        return api.get_message_type_hash(msg_or_type)
 
  182        """Returns whether message passes source filters; registers status.""" 
  183        if self.
args.START_TIME 
and stamp < self.
args.START_TIME:
 
  185        if self.
args.END_TIME 
and stamp > self.
args.END_TIME:
 
  187        if self.
args.START_INDEX 
or self.
args.END_INDEX \
 
  188        or self.
args.NTH_MESSAGE 
or self.
args.UNIQUE:
 
  189            topickey = api.TypeMeta.make(msg, topic).topickey
 
  190        if self.
args.START_INDEX 
and index 
is not None:
 
  193        if self.
args.END_INDEX 
and index 
is not None:
 
  196        if self.
args.NTH_MESSAGE > 1 
or self.
args.NTH_INTERVAL > 0:
 
  198        if self.
args.NTH_MESSAGE > 1 
and last_accepted 
and index 
is not None:
 
  199            shift = self.
args.START_INDEX 
if (self.
args.START_INDEX 
or 0) > 1 
else 1
 
  200            if (index - shift) % self.
args.NTH_MESSAGE:
 
  202        if self.
args.NTH_INTERVAL > 0 
and last_accepted 
and stamp 
is not None:
 
  203            if api.to_sec(stamp - last_accepted[1]) < self.
args.NTH_INTERVAL:
 
  207            msghash = api.make_message_hash(msg, include, exclude)
 
  208            if msghash 
in self.
_hashes[topickey]:
 
  210            self.
_hashes[topickey].add(msghash)
 
  215        """Reports match status of last produced message.""" 
  217        if self.
bar and self.
_bar_args.get(
"source_value") 
is not None:
 
  218            self.
bar.update(self.
bar.value + bool(status))
 
  221        """Configures progress bar options, updates current bar if any.""" 
  222        for k, v 
in kwargs.items():
 
  223            if isinstance(self.
_bar_args.get(k), dict) 
and isinstance(v, dict):
 
  227            bar_attrs = set(k 
for k 
in vars(self.
bar) 
if not k.startswith(
"_"))
 
  229                if k 
in bar_attrs: setattr(self.
bar, k, v)
 
  230                else: self.
bar.afterargs[k] = v
 
  233        """Initializes progress bar, if any.""" 
  234        if self.
args.PROGRESS 
and not self.
bar:
 
  236            self.
bar.start() 
if self.
bar.pulse 
else self.
bar.update(value=0)
 
  239        """Updates progress bar, if any, with source processed count, pauses bar if not running.""" 
  242                self.
bar.pause, self.
bar.pulse_pos = 
True, 
None 
  243            if self.
_bar_args.get(
"source_value") 
is not None:
 
  244                self.
bar.afterargs[
"source_value"] = count
 
  245            else: self.
bar.update(count)
 
  248        """Handles exception, used by background threads.""" 
  249        ConsolePrinter.error(text)
 
  251    def _parse_patterns(self):
 
  252        """Parses pattern arguments into re.Patterns.""" 
  253        selects, noselects = self.
args.SELECT_FIELD, self.
args.NOSELECT_FIELD
 
  254        for key, vals 
in [(
"select", selects), (
"noselect", noselects)]:
 
  255            self.
_patterns[key] = [(tuple(v.split(
".")), common.path_to_regex(v)) 
for v 
in vals]
 
  260    Provides topic conditions evaluation. 
  262    Evaluates a set of Python expressions, with a namespace of:
 
  263    - msg:                current message being checked
 
  264    - topic:              current topic being read
 
  265    - <topic /any/name>   messages 
in named 
or wildcarded topic
 
  267    <topic ..> gets replaced 
with an object 
with the following behavior:
 
  268    - len(obj)  -> number of messages processed 
in topic
 
  269    - bool(obj) -> whether there are any messages 
in topic
 
  270    - obj[pos]  -> topic message at position (
from latest 
if negative, first 
if positive)
 
  271    - obj.x     -> attribute x of last message
 
  273    All conditions need to evaluate 
as true 
for a message to be processable.
 
  274    If a condition tries to access attributes of a message 
not yet present,
 
  275    condition evaluates 
as false.
 
  277    If a condition topic matches more than one real topic (by wildcard 
or by
 
  278    different types 
in one topic), evaluation 
is done 
for each set of
 
  279    topics separately, condition passing 
if any set passes.
 
  281    Example condition: `<topic */control_enable>.data 
and <topic */cmd_vel>.linear.x > 0`
 
  282                       `
and <topic */cmd_vel>.angular.z < 0.02`.
 
  285    TOPIC_RGX = re.compile(r"<topic\s+([^\s><]+)\s*>")  
 
  288    DEFAULT_ARGS = dict(CONDITION=())
 
  295        Object for <topic x> replacements 
in condition expressions.
 
  297        - len(topic)        -> number of messages processed 
in topic
 
  298        - bool(topic)       -> whether there are any messages 
in topic
 
  299        - topic[x]          -> history at -1 -2 
for last 
and but one, 
or 0 1 
for first 
and second
 
  300        - topic.x           -> attribute x of last message
 
  301        - value 
in topic    -> whether any field of last message contains value
 
  302        - value 
in topic[x] -> whether any field of topic history at position contains value
 
  305        def __init__(self, count, firsts, lasts):
 
  312        def __len__(self):     
return self._count
 
  315            """Returns whether value exists in last message, or raises NoMessageException.""" 
  320            """Returns message from history at key, or Empty() if no such message.""" 
  325            """Returns attribute value of last message, or raises NoMessageException.""" 
  332        Object for current topic message 
in condition expressions.
 
  334        - value 
in msg -> whether any message field contains value
 
  335        - msg.x        -> attribute x of message
 
  343            """Returns whether value exists in any message field.""" 
  346                                           api.iter_message_fields(self.
_msg, flat=
True))
 
  347            value = item 
if isinstance(item, six.text_type) 
else \
 
  348                    item.decode() 
if isinstance(item, six.binary_type) 
else str(item)
 
  349            return re.search(re.escape(value), self.
_fulltext, re.I)
 
  352            """Returns attribute value of message.""" 
  353            return getattr(self.
_msg, name)
 
  357        """Placeholder falsy object that raises NoMessageException on attribute access.""" 
  360        def __nonzero__(self):        
return False 
  362        def __len__(self):            
return 0
 
  367        @param   args             arguments 
as namespace 
or dictionary, case-insensitive
 
  368        @param   args.condition   Python expressions that must evaluate 
as true
 
  369                                  for message to be processable, see ConditionMixin
 
  370        @param   kwargs           any 
and all arguments 
as keyword overrides, case-insensitive
 
  372        self._topic_states         = {}   
  373        self._topics_per_condition = []  
 
  374        self._wildcard_topics      = {}  
 
  376        self.
_firstmsgs = collections.defaultdict(collections.deque)
 
  378        self.
_lastmsgs  = collections.defaultdict(collections.deque)
 
  386        """Returns whether message passes passes current state conditions, if any.""" 
  390        for i, (expr, code) 
in enumerate(self.
_conditions.items()):
 
  393            realcarded = {wt: [(t, n, h) 
for (t, n, h) 
in self.
_lastmsgs if p.match(t)]
 
  395            variants = [[(wt, (t, n, h)) 
for (t, n, h) 
in tt] 
or [(wt, (wt, 
None))]
 
  396                        for wt, tt 
in realcarded.items()]
 
  397            variants = variants 
or [[
None]]  
 
  400            for remaps 
in itertools.product(*variants):  
 
  401                if remaps == (
None, ): remaps = ()
 
  404                try:   result = eval(code, ns)
 
  406                except Exception 
as e:
 
  407                    ConsolePrinter.error(
'Error evaluating condition "%s": %s', expr, e)
 
  414        """Returns whether conditions have valid syntax, sets options, prints errors.""" 
  416        for v 
in self.
args.CONDITION:
 
  418            try: compile(v, 
"", 
"eval")
 
  419            except SyntaxError 
as e:
 
  420                errors.append(
"'%s': %s at %schar %s" %
 
  421                              (v, e.msg, 
"line %s " % e.lineno 
if e.lineno > 1 
else "", e.offset))
 
  422            except Exception 
as e:
 
  423                errors.append(
"'%s': %s" % (v, e))
 
  425            ConsolePrinter.error(
"Invalid condition")
 
  427                ConsolePrinter.error(
"  %s" % err)
 
  433        """Clears cached messages.""" 
  438        """Returns whether there are any conditions configured.""" 
  442        """Returns a list of all topics used in conditions (may contain wildcards).""" 
  447        Returns whether topic is used 
for checking condition.
 
  449        @param   pure  whether use should be solely 
for condition, 
not for matching at all
 
  455        if not wildcarded: 
return False 
  456        return all(map(self.
_topic_states.get, wildcarded)) 
if pure 
else True 
  459        """Sets whether topic is purely used for conditions not matching.""" 
  464        """Retains message for condition evaluation if in condition topic.""" 
  466            topickey = api.TypeMeta.make(msg, topic).topickey
 
  473    def _get_topic_instance(self, topic, remap=None):
 
  475        Returns Topic() by name. 
  477        @param   remap  optional remap dictionary 
as {topic1: (topic2, typename, typehash)}
 
  479        if remap 
and topic 
in remap:
 
  480            topickey = remap[topic]
 
  482            topickey = next(((t, n, h) 
for (t, n, h) 
in self.
_lastmsgs if t == topic), 
None)
 
  483        if topickey 
not in self._counts:
 
  486        return self.
Topic(c, f, l)
 
  488    def _configure_conditions(self, args):
 
  489        """Parses condition expressions and populates local structures.""" 
  495        for v 
in args.CONDITION:
 
  496            topics = list(set(self.
TOPIC_RGX.findall(v)))
 
  499            for t 
in (t 
for t 
in topics 
if "*" in t):
 
  501            expr = self.
TOPIC_RGX.sub(
r'get_topic("\1")', v)
 
  504        for v 
in args.CONDITION:  
 
  505            indexexprs = re.findall(self.
TOPIC_RGX.pattern + 
r"\s*\[([^\]]+)\]", v)
 
  506            for topic, indexexpr 
in indexexprs:
 
  509                    index = eval(indexexpr)  
 
  510                    limits[index < 0] = max(limits[index < 0], abs(index) + (index >= 0))
 
  511                except Exception: 
continue   
  515    """Produces messages from ROS bagfiles.""" 
  518    MESSAGE_META_TEMPLATE = 
"{topic} {index}/{total} ({type}  {dt}  {stamp})" 
  521    META_TEMPLATE = 
"\nFile {file} ({size}), {tcount} topics, {mcount:,d} messages\n" \
 
  522                    "File period {startdt} - {enddt}\n" \
 
  523                    "File span {delta} ({start} - {end})" 
  526    DEFAULT_ARGS = dict(BAG=(), FILE=(), PATH=(), RECURSE=
False, TOPIC=(), TYPE=(),
 
  527                        SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=
None, END_TIME=
None,
 
  528                        START_INDEX=
None, END_INDEX=
None, CONDITION=(), AFTER=0, ORDERBY=
None,
 
  529                        DECOMPRESS=
False, REINDEX=
False, WRITE=(), PROGRESS=
False,
 
  530                        STOP_ON_ERROR=
False, TIMESCALE=0, TIMESCALE_EMISSION=
False, VERBOSE=
False)
 
  532    def __init__(self, args=None, **kwargs):
 
  534        @param   args                   arguments 
as namespace 
or dictionary, case-insensitive;
 
  535                                        or a single path 
as the ROS bagfile to read,
 
  536                                        or a stream to read 
from,
 
  540        Bag-specific arguments:
 
  541        @param   args.file              names of ROS bagfiles to read 
if not all 
in directory,
 
  542                                        or a stream to read 
from;
 
  544        @param   args.path              paths to scan 
if not current directory
 
  545        @param   args.recurse           recurse into subdirectories when looking 
for bagfiles
 
  546        @param   args.orderby           
"topic" or "type" if any to group results by
 
  547        @param   args.decompress        decompress archived bags to file directory
 
  548        @param   args.reindex           make a copy of unindexed bags 
and reindex them (ROS1 only)
 
  549        @param   args.timescale         emit messages on original timeline 
from first message
 
  550                                        at given rate, 0 disables
 
  551        @param   args.timescale_emission
 
  552                                        timeline 
from first matched message 
not first 
in bag,
 
  553                                        requires 
notify() 
for each message
 
  554        @param   args.write             outputs, to skip 
in input files
 
  559        @param   args.topic             ROS topics to read 
if not all
 
  560        @param   args.type              ROS message types to read 
if not all
 
  561        @param   args.skip_topic        ROS topics to skip
 
  562        @param   args.skip_type         ROS message types to skip
 
  563        @param   args.start_time        earliest timestamp of messages to read
 
  564        @param   args.end_time          latest timestamp of messages to read
 
  565        @param   args.start_index       message index within topic to start 
from 
  566        @param   args.end_index         message index within topic to stop at
 
  567        @param   args.unique            emit messages that are unique 
in topic
 
  568        @param   args.select_field      message fields to use 
for uniqueness 
if not all
 
  569        @param   args.noselect_field    message fields to skip 
for uniqueness
 
  570        @param   args.nth_message       read every Nth message 
in topic, starting 
from first
 
  571        @param   args.nth_interval      minimum time interval between messages 
in topic,
 
  572                                        as seconds 
or ROS duration
 
  573        @param   args.condition         Python expressions that must evaluate 
as true
 
  574                                        for message to be processable, see ConditionMixin
 
  575        @param   args.progress          whether to 
print progress bar
 
  576        @param   args.stop_on_error     stop execution on any error like unknown message type
 
  577        @param   args.verbose           whether to 
print error stacktraces
 
  578        @param   kwargs                 any 
and all arguments 
as keyword overrides, case-insensitive
 
  581        is_bag = isinstance(args, api.Bag) or \
 
  582                 common.is_iterable(args) 
and all(isinstance(x, 
api.Bag) 
for x 
in args)
 
  583        args = {
"FILE": str(args)} 
if isinstance(args, common.PATH_TYPES) 
else \
 
  584               {
"FILE": args} 
if common.is_stream(args) 
else {} 
if is_bag 
else args
 
  585        args = ensure_namespace(args, BagSource.DEFAULT_ARGS, **kwargs)
 
  586        super(BagSource, self).
__init__(args)
 
  587        ConditionMixin.__init__(self, args)
 
  595        self.
_bag0      = ([args0] 
if isinstance(args0, 
api.Bag) 
else args0) 
if is_bag 
else None 
  599        """Yields messages from ROS bagfiles, as (topic, msg, ROS time).""" 
  608            if "topic" == self.
argsargs.ORDERBY:  
 
  609                topicsets = [{n: tt} 
for n, tt 
in sorted(self.
_topics_topics.items())]
 
  610            elif "type" == self.
argsargs.ORDERBY:  
 
  613                    for t 
in tt: typetopics.setdefault(t, []).append(n)
 
  614                topicsets = [{n: [t] 
for n 
in nn} 
for t, nn 
in sorted(typetopics.items())]
 
  618            for topics 
in topicsets:
 
  619                for topic, msg, stamp, index 
in self.
_produce(topics) 
if topics 
else ():
 
  630    def configure(self, args=None, **kwargs):
 
  632        Updates source configuration. 
  634        @param   args    arguments 
as namespace 
or dictionary, case-insensitive
 
  635        @param   kwargs  any 
and all arguments 
as keyword overrides, case-insensitive
 
  637        super(BagSource, self).configure(args, **kwargs) 
  641        """Returns whether ROS environment is set and arguments valid, prints error if not.""" 
  644        if not api.validate():
 
  647        and not common.verify_io(self.
argsargs.FILE[0], 
"r"):
 
  648            ConsolePrinter.error(
"File not readable.")
 
  651        and not any(c.STREAMABLE 
for c 
in api.Bag.READER_CLASSES):
 
  652            ConsolePrinter.error(
"Bag format does not support reading streams.")
 
  654        if self.
_bag0 and not any(x.mode 
in (
"r", 
"a") 
for x 
in self.
_bag0):
 
  655            ConsolePrinter.error(
"Bag not in read mode.")
 
  658            ConsolePrinter.error(
"Cannot use topics in conditions and bag order by %s.",
 
  662            ConsolePrinter.error(
"Invalid timescale factor: %r.", self.
argsargs.TIMESCALE)
 
  664        if not ConditionMixin.validate(self):
 
  669        """Closes current bag, if any.""" 
  672        ConditionMixin.close_batch(self)
 
  673        super(BagSource, self).
close()
 
  676        """Closes current bag, if any.""" 
  681            self.
barbar.update(flush=
True)
 
  683            if self.
_bar_args.get(
"source_value") 
is not None:
 
  685        ConditionMixin.close_batch(self)
 
  688        """Returns bagfile metainfo string.""" 
  692        """Returns message metainfo string.""" 
  694        meta = {k: 
"" if v 
is None else v 
for k, v 
in meta.items()}
 
  698        """Returns name of current bagfile, or self if reading stream.""" 
  702        """Returns bagfile metainfo data dict.""" 
  703        if self.
_meta is not None:
 
  705        mcount = self.
_bag.get_message_count()
 
  706        start, end = (self.
_bag.get_start_time(), self.
_bag.get_end_time()) 
if mcount 
else (
"", 
"")
 
  707        delta = common.format_timedelta(datetime.timedelta(seconds=(end 
or 0) - (start 
or 0)))
 
  709                          mcount=mcount, tcount=len(self.
topicstopics), delta=delta,
 
  710                          start=drop_zeros(start), end=drop_zeros(end),
 
  711                          startdt=drop_zeros(common.format_stamp(start)) 
if start != 
"" else "",
 
  712                          enddt=drop_zeros(common.format_stamp(end)) 
if end != 
"" else "")
 
  716        """Returns message metainfo data dict.""" 
  719        result.update(total=self.
topicstopics[(topic, result[
"type"], result[
"hash"])])
 
  720        if callable(getattr(self.
_bag, 
"get_qoses", 
None)):
 
  721            result.update(qoses=self.
_bag.get_qoses(topic, result[
"type"]))
 
  725        """Returns ROS message type class.""" 
  727               api.get_message_class(typename)
 
  730        """Returns ROS message type definition full text, including subtype definitions.""" 
  732               api.get_message_definition(msg_or_type)
 
  735        """Returns ROS message type MD5 hash.""" 
  737               api.get_message_type_hash(msg_or_type)
 
  740        """Reports match status of last produced message.""" 
  741        super(BagSource, self).
notify(status)
 
  750        """Returns whether message passes source filters; registers status.""" 
  752        topickey = api.TypeMeta.make(msg, topic).topickey
 
  753        if self.
argsargs.START_INDEX 
and index 
is not None and self.
argsargs.START_INDEX < 0 \
 
  757        if self.
argsargs.END_INDEX 
and index 
is not None and self.
argsargs.END_INDEX < 0 \
 
  763        if not super(BagSource, self).
is_processable(topic, msg, stamp, index):
 
  765        if not ConditionMixin.is_processable(self, topic, msg, stamp, index):
 
  771        """Initializes progress bar, if any, for current bag.""" 
  777    def _produce(self, topics, start_time=None):
 
  779        Yields messages from current ROS bagfile, 
as (topic, msg, ROS time, index 
in topic).
 
  781        @param   topics  {topic: [typename, ]}
 
  784        do_predelay = self.
argsargs.TIMESCALE 
and not self.
argsargs.TIMESCALE_EMISSION
 
  787            self.
_delaystamps[
"read"] = getattr(time, 
"monotonic", time.time)()  
 
  788        counts = collections.Counter()
 
  790        nametypes = {(n, t) 
for n, tt 
in topics.items() 
for t 
in tt}
 
  791        for topic, msg, stamp 
in self.
_bag.read_messages(list(topics), start_time):
 
  794            typename = api.get_message_type(msg)
 
  795            if topics 
and typename 
not in topics[topic]:
 
  800            topickey = api.TypeMeta.make(msg, topic, self).topickey
 
  801            counts[topickey] += 1; self.
_counts[topickey] += 1
 
  803            if start_time 
is None and counts[topickey] != self.
_counts[topickey]:
 
  809            yield topic, msg, stamp, self.
_counts[topickey]
 
  817                for entry 
in self.
_produce({topic: typename}, stamp + api.make_duration(nsecs=1)):
 
  819            if not self.
_running or not self.
_bag or (start_time 
is None 
  823    def _produce_bags(self):
 
  824        """Yields Bag instances from configured arguments.""" 
  826            for bag 
in self.
_bag0:
 
  832        exts, skip_exts = api.BAG_EXTENSIONS, api.SKIP_EXTENSIONS
 
  833        exts = list(exts) + [
"%s%s" % (a, b) 
for a 
in exts 
for b 
in common.Decompressor.EXTENSIONS]
 
  836        for filename 
in common.find_files(names, paths, exts, skip_exts, self.
argsargs.RECURSE):
 
  840            fullname = os.path.realpath(os.path.abspath(filename))
 
  841            skip = common.Decompressor.make_decompressed_name(fullname) 
in encountereds
 
  842            encountereds.add(fullname)
 
  847            encountereds.add(self.
_bag.filename)
 
  850    def _make_progress_args(self):
 
  851        """Returns dictionary with progress bar options""" 
  852        total = sum(sum(c 
for (t, n, _), c 
in self.
topicstopics.items() 
if c 
and t == t_ 
and n 
in nn)
 
  854        result = dict(max=total, afterword=os.path.basename(self.
_filename or "<stream>"))
 
  856        instr, outstr = 
"{value:,d}/{max:,d}", 
"" 
  859            self.
_bar_args.setdefault(
"source_value", 0)  
 
  860        if self.
_bar_args.get(
"source_value") 
is not None \
 
  861        or self.
_bar_args.get(
"match_max") 
is not None:
 
  862            result.update(source_value=self.
_bar_args.get(
"source_value") 
or 0)
 
  863            instr, outstr = 
"{source_value:,d}/{max:,d}", 
"matched {value:,d}" 
  864            if self.
_bar_args.get(
"match_max") 
is not None:
 
  865                instr, outstr = 
"{source_value:,d}/{source_max:,d}", outstr + 
"/{match_max:,d}" 
  866                result.update(source_max=total, max=min(total, self.
_bar_args[
"match_max"]))
 
  867        result.update(aftertemplate=
" {afterword} (%s)" % 
"  ".join(filter(bool, (instr, outstr))))
 
  871    def _ensure_totals(self):
 
  872        """Retrieves total message counts if not retrieved.""" 
  874            has_ensure = common.has_arg(self.
_bag.get_topic_info, 
"ensure_types")
 
  875            kws = dict(ensure_types=
False) 
if has_ensure 
else {}
 
  876            for (t, n, h), c 
in self.
_bag.get_topic_info(**kws).items():
 
  880    def _delay_timeline(self):
 
  881        """Sleeps until message ought to be emitted in bag timeline.""" 
  882        curstamp, readstamp, startstamp = map(self.
_delaystamps.get, (
"current", 
"read", 
"first"))
 
  883        delta = max(0, api.to_sec(curstamp) - startstamp) / (self.
argsargs.TIMESCALE 
or 1)
 
  884        if delta: time.sleep(max(0, delta + readstamp - getattr(time, 
"monotonic", time.time)()))
 
  886    def _is_at_end_threshold(self, topickey, stamp, nametypes, endtime_indexes):
 
  888        Returns whether bag reading has reached END_INDEX or END_TIME 
in all given topics.
 
  890        @param   topickey         (topic, typename, typehash) of current message
 
  891        @param   stamp            ROS timestamp of current message
 
  892        @param   nametypes        {(topic, typename)} to account 
for 
  893        @param   endtime_indexes  {topickey: index at reaching END_TIME}, gets modified
 
  897            if self.
_counts[topickey] >= max_index:  
 
  898                mycounts = {k: v 
for k, v 
in self.
_counts.items() 
if k[:2] 
in nametypes}
 
  899                if nametypes == set(k[:2] 
for k 
in mycounts) \
 
  900                and all(v >= self.
_end_indexes.get(k, max_index) 
for k, v 
in mycounts.items()):
 
  904            if topickey 
not in endtime_indexes: endtime_indexes[topickey] = self.
_counts[topickey]
 
  906            if self.
_counts[topickey] >= max_index: 
 
  907                myindexes = {k: v 
for k, v 
in endtime_indexes.items() 
if k[:2] 
in nametypes}
 
  908                if nametypes == set(k[:2] 
for k 
in myindexes) \
 
  910                        for k, v 
in myindexes.items()):
 
  914    def _configure(self, filename=None, bag=None):
 
  915        """Opens bag and populates bag-specific argument state, returns success.""" 
  928        if bag 
is not None and bag.mode 
not in (
"r", 
"a"):
 
  929            ConsolePrinter.warn(
"Cannot read %s: bag in write mode.", bag)
 
  932        if filename 
and self.
argsargs.WRITE \
 
  933        and any(os.path.realpath(x[0]) == os.path.realpath(filename)
 
  937            if filename 
and common.Decompressor.is_compressed(filename):
 
  939                    filename = common.Decompressor.decompress(filename, self.
argsargs.PROGRESS)
 
  940                else: 
raise Exception(
"decompression not enabled")
 
  942                          progress=self.
argsargs.PROGRESS) 
if bag 
is None else bag
 
  943            bag.stop_on_error = self.
argsargs.STOP_ON_ERROR
 
  945        except Exception 
as e:
 
  946            ConsolePrinter.error(
"\nError opening %r: %s", filename 
or bag, e)
 
  947            if self.
argsargs.STOP_ON_ERROR: 
raise 
  948            if self.
argsargs.VERBOSE: traceback.print_exc()
 
  955        kws = dict(ensure_types=
False) 
if common.has_arg(bag.get_topic_info, 
"ensure_types") 
else {}
 
  956        for (t, n, h), c 
in bag.get_topic_info(counts=
False, **kws).items():
 
  957            dct.setdefault(t, []).append(n)
 
  964        dct = common.filter_dict(dct, self.
argsargs.SKIP_TOPIC, self.
argsargs.SKIP_TYPE, reverse=
True)
 
  966            matches = [t 
for p 
in [common.wildcard_to_regex(topic, end=
True)] 
for t 
in fulldct
 
  967                       if t == topic 
or "*" in topic 
and p.match(t)]
 
  968            for realtopic 
in matches:
 
  970                dct.setdefault(realtopic, fulldct[realtopic])
 
  975        if args.START_TIME 
is not None:
 
  976            args.START_TIME = api.make_bag_time(args.START_TIME, bag)
 
  977        if args.END_TIME 
is not None:
 
  978            args.END_TIME = api.make_bag_time(args.END_TIME, bag)
 
  983    """Produces messages from live ROS topics.""" 
  989    DEFAULT_ARGS = dict(TOPIC=(), TYPE=(), SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=
None,
 
  990                        END_TIME=
None, START_INDEX=
None, END_INDEX=
None, CONDITION=(),
 
  991                        QUEUE_SIZE_IN=10, ROS_TIME_IN=
False, PROGRESS=
False, STOP_ON_ERROR=
False,
 
  994    def __init__(self, args=None, **kwargs):
 
  996        @param   args                   arguments 
as namespace 
or dictionary, case-insensitive
 
  997        @param   args.topic             ROS topics to read 
if not all
 
  998        @param   args.type              ROS message types to read 
if not all
 
  999        @param   args.skip_topic        ROS topics to skip
 
 1000        @param   args.skip_type         ROS message types to skip
 
 1001        @param   args.start_time        earliest timestamp of messages to read
 
 1002        @param   args.end_time          latest timestamp of messages to read
 
 1003        @param   args.start_index       message index within topic to start 
from 
 1004        @param   args.end_index         message index within topic to stop at
 
 1005        @param   args.unique            emit messages that are unique 
in topic
 
 1006        @param   args.select_field      message fields to use 
for uniqueness 
if not all
 
 1007        @param   args.noselect_field    message fields to skip 
for uniqueness
 
 1008        @param   args.nth_message       read every Nth message 
in topic, starting 
from first
 
 1009        @param   args.nth_interval      minimum time interval between messages 
in topic,
 
 1010                                        as seconds 
or ROS duration
 
 1011        @param   args.condition         Python expressions that must evaluate 
as true
 
 1012                                        for message to be processable, see ConditionMixin
 
 1013        @param   args.queue_size_in     subscriber queue size (default 10)
 
 1014        @param   args.ros_time_in       stamp messages 
with ROS time instead of wall time
 
 1015        @param   args.progress          whether to 
print progress bar
 
 1016        @param   args.stop_on_error     stop execution on any error like unknown message type
 
 1017        @param   args.verbose           whether to 
print error stacktraces
 
 1018        @param   kwargs                 any 
and all arguments 
as keyword overrides, case-insensitive
 
 1020        args = ensure_namespace(args, LiveSource.DEFAULT_ARGS, **dict(kwargs, live=True))
 
 1021        super(LiveSource, self).
__init__(args)
 
 1022        ConditionMixin.__init__(self, args)
 
 1029        """Yields messages from subscribed ROS topics, as (topic, msg, ROS time).""" 
 1034            self.
_queue = queue.Queue()
 
 1048            topic, msg, stamp = self.
_queue.get()
 
 1049            total += bool(topic)
 
 1051            if not topic: 
continue   
 1053            topickey = api.TypeMeta.make(msg, topic, self).topickey
 
 1068    def bind(self, sink):
 
 1069        """Attaches sink to source and blocks until connected to ROS live.""" 
 1071        super(LiveSource, self).
bind(sink)
 
 1075        """Returns whether ROS environment is set and arguments valid, prints error if not.""" 
 1078        if not api.validate(live=
True):
 
 1080        if not ConditionMixin.validate(self):
 
 1087        """Shuts down subscribers and stops producing messages.""" 
 1089        for k 
in list(self.
_subs):
 
 1090            self.
_subs.pop(k).unregister()
 
 1093        ConditionMixin.close_batch(self)
 
 1094        super(LiveSource, self).
close()
 
 1097        """Returns source metainfo data dict.""" 
 1098        ENV = {k: os.getenv(k) 
for k 
in (
"ROS_MASTER_URI", 
"ROS_DOMAIN_ID") 
if os.getenv(k)}
 
 1099        return dict(ENV, tcount=len(self.
topics), scount=len(self.
_subs))
 
 1102        """Returns message metainfo data dict.""" 
 1103        result = super(LiveSource, self).
get_message_meta(topic, msg, stamp, index)
 
 1104        topickey = (topic, result[
"type"], result[
"hash"])
 
 1105        if topickey 
in self.
_subs:
 
 1106            result.update(qoses=self.
_subs[topickey].get_qoses())
 
 1110        """Returns message type class, from active subscription if available.""" 
 1111        sub = next((s 
for (t, n, h), s 
in self.
_subs.items()
 
 1112                    if n == typename 
and typehash 
in (s.get_message_type_hash(), 
None)), 
None)
 
 1113        return sub 
and sub.get_message_class() 
or api.get_message_class(typename)
 
 1116        """Returns ROS message type definition full text, including subtype definitions.""" 
 1117        if api.is_ros_message(msg_or_type):
 
 1118            return api.get_message_definition(msg_or_type)
 
 1119        sub = next((s 
for (t, n, h), s 
in self.
_subs.items() 
if n == msg_or_type), 
None)
 
 1120        return sub 
and sub.get_message_definition() 
or api.get_message_definition(msg_or_type)
 
 1123        """Returns ROS message type MD5 hash.""" 
 1124        if api.is_ros_message(msg_or_type):
 
 1125            return api.get_message_type_hash(msg_or_type)
 
 1126        sub = next((s 
for (t, n, h), s 
in self.
_subs.items() 
if n == msg_or_type), 
None)
 
 1127        return sub 
and sub.get_message_type_hash() 
or api.get_message_type_hash(msg_or_type)
 
 1130        """Returns source metainfo string.""" 
 1132        result = 
"\nROS%s live" % api.ROS_VERSION
 
 1133        if "ROS_MASTER_URI" in metadata:
 
 1134            result += 
", ROS master %s" % metadata[
"ROS_MASTER_URI"]
 
 1135        if "ROS_DOMAIN_ID" in metadata:
 
 1136            result += 
", ROS domain ID %s" % metadata[
"ROS_DOMAIN_ID"]
 
 1137        result += 
", %s initially" % common.plural(
"topic", metadata[
"tcount"])
 
 1138        result += 
", %s subscribed" % metadata[
"scount"]
 
 1142        """Returns whether message passes source filters; registers status.""" 
 1144        if not super(LiveSource, self).
is_processable(topic, msg, stamp, index):
 
 1146        if not ConditionMixin.is_processable(self, topic, msg, stamp, index):
 
 1152        """Refreshes topics and subscriptions from ROS live.""" 
 1153        for topic, typename 
in api.get_topic_types():
 
 1154            topickey = (topic, typename, 
None)
 
 1155            self.
topics[topickey] = 
None 
 1156            dct = common.filter_dict({topic: [typename]}, self.
argsargs.TOPIC, self.
argsargs.TYPE)
 
 1157            if not common.filter_dict(dct, self.
argsargs.SKIP_TOPIC, self.
argsargs.SKIP_TYPE, reverse=
True):
 
 1159            if api.ROS2 
and api.get_message_class(typename) 
is None:
 
 1160                msg = 
"Error loading type %s in topic %s." % (typename, topic)
 
 1162                ConsolePrinter.warn(msg, __once=
True)
 
 1164            if topickey 
in self.
_subs:
 
 1167            handler = functools.partial(self.
_on_message, topic)
 
 1169                sub = api.create_subscriber(topic, typename, handler,
 
 1170                                            queue_size=self.
argsargs.QUEUE_SIZE_IN)
 
 1171            except Exception 
as e:
 
 1172                ConsolePrinter.warn(
"Error subscribing to topic %s: %%r" % topic,
 
 1174                if self.
argsargs.STOP_ON_ERROR: 
raise 
 1177            self.
_subs[topickey] = sub
 
 1180        """Initializes progress bar, if any.""" 
 1186        """Updates progress bar, if any.""" 
 1192    def _configure(self):
 
 1193        """Adjusts start/end time filter values to current time.""" 
 1194        if self.
argsargs.START_TIME 
is not None:
 
 1196        if self.
argsargs.END_TIME 
is not None:
 
 1199    def _make_progress_args(self, count=None):
 
 1200        """Returns dictionary with progress bar options, for specific nessage index if any.""" 
 1201        result = dict(afterword = 
"ROS%s live" % api.ROS_VERSION, pulse=
True)
 
 1202        if self.
_bar_args.get(
"match_max") 
is not None:
 
 1205        instr, outstr = 
"{value:,d} message%s" % (
"" if count == 1 
else "s"), 
"" 
 1208            self.
_bar_args.setdefault(
"source_value", 0)  
 
 1210            instr = 
"{source_value:,d} message%s" % (
"" if count == 1 
else "s")
 
 1211            outstr = 
"matched {value:,d}" 
 1212            if self.
_bar_args.get(
"match_max") 
is not None: outstr += 
"/{match_max:,d}" 
 1213        elif self.
_bar_args.get(
"match_max") 
is not None:
 
 1214            instr = 
"{value:,d}/{max:,d}" 
 1215        result.update(aftertemplate=
" {afterword} (%s)" % 
"  ".join(filter(bool, (instr, outstr))))
 
 1219    def _run_refresh(self):
 
 1220        """Periodically refreshes topics and subscriptions from ROS live.""" 
 1224            except Exception 
as e: self.
thread_excepthook(
"Error refreshing live topics: %r" % e, e)
 
 1227    def _run_endtime_closer(self):
 
 1228        """Periodically checks whether END_TIME has been reached, closes source when so.""" 
 1236    def _on_message(self, topic, msg):
 
 1237        """Subscription callback handler, queues message for yielding.""" 
 1238        stamp = api.get_rostime() 
if self.
argsargs.ROS_TIME_IN 
else api.make_time(time.time())
 
 1243    """Produces messages from iterable or pushed data.""" 
 1246    DEFAULT_ARGS = dict(TOPIC=(), TYPE=(), SKIP_TOPIC=(), SKIP_TYPE=(), START_TIME=
None,
 
 1247                        END_TIME=
None, START_INDEX=
None, END_INDEX=
None, UNIQUE=
False,
 
 1248                        SELECT_FIELD=(), NOSELECT_FIELD=(), NTH_MESSAGE=1, NTH_INTERVAL=0,
 
 1249                        CONDITION=(), ITERABLE=
None)
 
 1253        @param   args                  arguments 
as namespace 
or dictionary, case-insensitive;
 
 1254                                       or iterable yielding messages
 
 1255        @param   args.topic            ROS topics to read 
if not all
 
 1256        @param   args.type             ROS message types to read 
if not all
 
 1257        @param   args.skip_topic       ROS topics to skip
 
 1258        @param   args.skip_type        ROS message types to skip
 
 1259        @param   args.start_time       earliest timestamp of messages to read
 
 1260        @param   args.end_time         latest timestamp of messages to read
 
 1261        @param   args.start_index      message index within topic to start 
from 
 1262        @param   args.end_index        message index within topic to stop at
 
 1263        @param   args.unique           emit messages that are unique 
in topic
 
 1264        @param   args.select_field     message fields to use 
for uniqueness 
if not all
 
 1265        @param   args.noselect_field   message fields to skip 
for uniqueness
 
 1266        @param   args.nth_message      read every Nth message 
in topic, starting 
from first
 
 1267        @param   args.nth_interval     minimum time interval between messages 
in topic,
 
 1268                                       as seconds 
or ROS duration
 
 1269        @param   args.condition        Python expressions that must evaluate 
as true
 
 1270                                       for message to be processable, see ConditionMixin
 
 1271        @param   args.iterable         iterable yielding (topic, msg, stamp) 
or (topic, msg);
 
 1272                                       yielding `
None` signals end of content
 
 1273        @param   kwargs                any 
and all arguments 
as keyword overrides, case-insensitive
 
 1275        if common.is_iterable(args) 
and not isinstance(args, dict):
 
 1276            args = ensure_namespace(
None, iterable=args)
 
 1277        args = ensure_namespace(args, AppSource.DEFAULT_ARGS, **kwargs)
 
 1278        super(AppSource, self).
__init__(args)
 
 1279        ConditionMixin.__init__(self, args)
 
 1285        Yields messages from iterable 
or pushed data, 
as (topic, msg, ROS timestamp).
 
 1287        Blocks until a message 
is available, 
or source 
is closed.
 
 1290        def generate(iterable):
 
 1291            for x 
in iterable: 
yield x
 
 1292        feeder = generate(self.
argsargs.ITERABLE) 
if self.
argsargs.ITERABLE 
else None 
 1295            item = self.
_queue.get() 
if not feeder 
or self.
_queue.qsize() 
else next(feeder, 
None)
 
 1296            if item 
is None: 
break   
 1298            if len(item) > 2: topic, msg, stamp = item[:3]
 
 1299            else: (topic, msg), stamp = item[:2], api.get_rostime(fallback=
True)
 
 1300            topickey = api.TypeMeta.make(msg, topic, self).topickey
 
 1314        """Closes current read() yielding, if any.""" 
 1321        Returns (topic, msg, stamp) from push queue, 
or `
None` 
if no queue
 
 1322        or message 
in queue 
is condition topic only.
 
 1326        try: item = self.
_queue.get(block=
False)
 
 1327        except queue.Empty: 
pass 
 1328        if item 
is None: 
return None 
 1330        topic, msg, stamp = item
 
 1331        topickey = api.TypeMeta.make(msg, topic, self).topickey
 
 1337        """Registers message produced from read_queue().""" 
 1340            topickey = api.TypeMeta.make(msg, topic).topickey
 
 1343    def push(self, topic, msg=None, stamp=None):
 
 1345        Pushes a message to be yielded from read().
 
 1347        @param   topic  topic name, 
or `
None` to signal end of content
 
 1348        @param   msg    ROS message
 
 1349        @param   stamp  message ROS timestamp, defaults to current wall time 
if `
None`
 
 1352        if topic 
is None: self.
_queue.put(
None)
 
 1353        else: self.
_queue.put((topic, msg, stamp 
or api.get_rostime(fallback=
True)))
 
 1356        """Returns whether message passes source filters; registers status.""" 
 1358        dct = common.filter_dict({topic: [api.get_message_type(msg)]},
 
 1360        if not common.filter_dict(dct, self.
argsargs.SKIP_TOPIC, self.
argsargs.SKIP_TYPE, reverse=
True):
 
 1362        if not super(AppSource, self).
is_processable(topic, msg, stamp, index):
 
 1364        if not ConditionMixin.is_processable(self, topic, msg, stamp, index):
 
 1370        """Returns whether configured arguments are valid, prints error if not.""" 
 1377    def _configure(self):
 
 1378        """Adjusts start/end time filter values to current time.""" 
 1379        if self.
argsargs.START_TIME 
is not None:
 
 1381        if self.
argsargs.END_TIME 
is not None:
 
 1385__all__ = [
"AppSource", 
"ConditionMixin", 
"BagSource", 
"LiveSource", 
"Source"]
 
A simple ASCII progress bar with a ticker thread.