3ROS1 bag reader plugin using the `embag` library.
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.plugins.embag
15from __future__ import absolute_import
19except ImportError: embag = None
22from .. common import PATH_TYPES, ConsolePrinter
26class EmbagReader(api.BaseBag):
27 """embag reader interface, providing most of rosbag.Bag interface."""
36 ROSBAG_MAGIC = b
"#ROSBAG"
39 if not isinstance(filename, PATH_TYPES):
40 raise ValueError(
"invalid filename %r" % type(filename))
41 if mode
not in self.
MODESMODES:
raise ValueError(
"invalid mode %r" % mode)
49 self.
_view = embag.View(filename)
57 Returns the number of messages in the bag.
59 @param topic_filters list of topics
or a single topic to filter by,
if any
62 topics = topic_filters
63 topics = topics
if isinstance(topics, (dict, list, set, tuple))
else [topics]
64 return sum(c
for (t, _, _), c
in self.
_topics.items()
if t
in topics)
65 return sum(self.
_topics.values())
69 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
71 return self.
_view.getStartTime().to_sec()
75 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
77 return self.
_view.getEndTime().to_sec()
82 Returns rospy message class for typename, or None if unknown message type for bag.
84 Generates
class dynamically if not already generated.
86 @param typehash message type definition hash,
if any
88 typekey = (typename, typehash or next((h
for n, h
in self.
_types if n == typename),
None))
90 for n, c
in api.realapi.generate_message_classes(typename, self.
_typedefs[typekey]).items():
91 self.
_types[(n, c._md5sum)] = c
92 return self.
_types.get(typekey)
97 Returns ROS1 message type definition full text from bag, including subtype definitions.
99 Returns
None if unknown message type
for bag.
101 if api.is_ros_message(msg_or_type):
102 return self.
_typedefs.get((msg_or_type._type, msg_or_type._md5sum))
103 typename = msg_or_type
104 return next((d
for (n, h), d
in self.
_typedefs.items()
if n == typename),
None)
108 """Returns ROS1 message type MD5 hash, or None if unknown message type for bag."""
109 if api.is_ros_message(msg_or_type):
return msg_or_type._md5sum
110 typename = msg_or_type
111 return next((h
for n, h
in self.
_typedefs if n == typename),
None)
115 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
121 Returns thorough metainfo on topic and message types.
123 @param topic_filters list of topics
or a single topic to filter returned topics-dict by,
126 msg_types
as dict of {typename: typehash},
129 topics = topic_filters
130 topics = topics if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
131 if self.
_ttinfo and (
not topics
or set(topics) == set(t
for t, _, _
in self.
_topics)):
135 msgtypes = {n: h
for t, n, h
in self.
_topics}
139 """Returns median value from given sorted numbers."""
141 return None if not vlen
else vals[vlen // 2]
if vlen % 2
else \
142 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
144 conns = self.
_view.connectionsByTopic()
145 for (t, n, _), c
in sorted(self.
_topics.items()):
146 if topics
and t
not in topics:
continue
149 stamps = sorted(m.timestamp.secs + m.timestamp.nsecs / 1E9
150 for m
in self.
_view.getMessages([t]))
151 mymedian = median(sorted(s1 - s0
for s1, s0
in zip(stamps[1:], stamps[:-1])))
152 freq = 1.0 / mymedian
if mymedian
else None
153 topicdict[t] = self.
TopicTuple(n, c, len(conns.get(t, [])), freq)
154 if not topics
or set(topics) == set(t
for t, _, _
in self.
_topics):
159 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False):
161 Yields messages from the bag, optionally filtered by topic
and timestamp.
163 @param topics list of topics
or a single topic to filter by,
if at all
164 @param start_time earliest timestamp of message to
return,
as ROS time
or convertible
165 (int/float/duration/datetime/decimal)
166 @param end_time latest timestamp of message to
return,
as ROS time
or convertible
167 (int/float/duration/datetime/decimal)
168 @param raw
if true, then returned messages are tuples of
169 (typename, bytes, typehash, typeclass)
170 @return BagMessage namedtuples of (topic, message, timestamp
as rospy.Time)
174 topics = topics
if isinstance(topics, list)
else [topics]
if topics
else []
175 start_time, end_time = (api.to_sec(api.to_time(x))
for x
in (start_time, end_time))
176 for m
in self.
_view.getMessages(topics)
if topics
else self.
_view.getMessages():
177 if start_time
is not None and start_time > m.timestamp.to_sec():
179 if end_time
is not None and end_time < m.timestamp.to_sec():
182 typename = self.
_hashdefs[(m.topic, m.md5)]
183 stamp = api.make_time(m.timestamp.secs, m.timestamp.nsecs)
186 api.TypeMeta.make(msg, m.topic, self)
192 """Opens the bag file if not already open."""
197 """Closes the bag file."""
206 """Returns whether file is closed."""
207 return not self.
_view
212 """Returns the list of topics in bag, in alphabetic order."""
213 return sorted((t
for t, _, _
in self.
_topics), key=str.lower)
218 """Returns bag file path."""
224 """Returns current file size."""
230 """Returns file open mode."""
235 """Returns whether bag contains given topic."""
236 return any(key == t
for t, _, _
in self.
_topics)
240 """Retrieves next message from bag as (topic, message, timestamp)."""
246 def _populate_meta(self):
247 """Populates bag metainfo."""
248 connections = self.
_view.connectionsByTopic()
250 for conn
in connections.get(topic, ()):
251 topickey, typekey = (topic, conn.type, conn.md5sum), (conn.type, conn.md5sum)
252 self.
_topics.setdefault(topickey, 0)
254 self.
_hashdefs[(topic, conn.md5sum)] = conn.type
255 self.
_typedefs[typekey] = conn.message_definition
256 subtypedefs = api.parse_definition_subtypes(conn.message_definition)
257 for n, d
in subtypedefs.items():
258 h = api.calculate_definition_hash(n, d, tuple(subtypedefs.items()))
262 def _populate_message(self, msg, embagval):
263 """Returns the ROS1 message populated from a corresponding embag.RosValue."""
264 for name, typename
in api.get_message_fields(msg).items():
265 v, scalarname = embagval.get(name), api.scalar(typename)
266 if typename
in api.ROS_BUILTIN_TYPES:
267 msgv = getattr(embagval, name)
268 elif scalarname
in api.ROS_BUILTIN_TYPES:
270 elif typename
in api.ROS_TIME_TYPES:
271 cls =
next(k
for k, v
in api.ROS_TIME_CLASSES.items()
if v == typename)
272 msgv = cls(v.secs, v.nsecs)
273 elif scalarname
in api.ROS_TIME_TYPES:
274 cls =
next(k
for k, v
in api.ROS_TIME_CLASSES.items()
if v == scalarname)
275 msgv = [cls(x.secs, x.nsecs)
for x
in v]
276 elif typename == scalarname:
281 setattr(msg, name, msgv)
287 """Returns whether file is readable as ROS1 bag."""
288 result = os.path.isfile(filename)
290 with open(filename,
"rb")
as f:
296 """Replaces ROS1 bag reader with EmbagReader. Raises ImportWarning if embag not available."""
298 ConsolePrinter.error(
"embag not available: cannot read bag files.")
299 raise ImportWarning()
300 api.Bag.READER_CLASSES.add(EmbagReader)
303__all__ = [
"EmbagReader",
"init"]
tuple MODES
Supported opening modes, overridden in subclasses.
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
get_message_class(self, typename, typehash=None)
Returns ROS message type class, or None if unknown message type for bag.
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
closed
Returns whether file is closed.
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
tuple MODES
Supported opening modes.
get_type_and_topic_info(self, topic_filters=None)
Returns thorough metainfo on topic and message types.
mode
Returns file open mode.
open(self)
Opens the bag file if not already open.
filename
Returns bag file path.
str ROSBAG_MAGIC
ROS1 bag file header magic start bytes.
get_start_time(self)
Returns the start time of the bag, as UNIX timestamp, or None if bag empty.
get_end_time(self)
Returns the end time of the bag, as UNIX timestamp, or None if bag empty.
get_message_class(self, typename, typehash=None)
Returns rospy message class for typename, or None if unknown message type for bag.
get_message_type_hash(self, msg_or_type)
Returns ROS1 message type MD5 hash, or None if unknown message type for bag.
read_messages(self, topics=None, start_time=None, end_time=None, raw=False)
Yields messages from the bag, optionally filtered by topic and timestamp.
get_topic_info(self, *_, **__)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
close(self)
Closes the bag file.
__contains__(self, key)
Returns whether bag contains given topic.
closed
Returns whether file is closed.
size
Returns current file size.
topics
Returns the list of topics in bag, in alphabetic order.
__init__(self, filename, mode="r", **__)
__next__(self)
Retrieves next message from bag as (topic, message, timestamp).
autodetect(cls, filename)
Returns whether file is readable as ROS1 bag.
get_message_definition(self, msg_or_type)
Returns ROS1 message type definition full text from bag, including subtype definitions.
get_message_count(self, topic_filters=None)
Returns the number of messages in the bag.
init(*_, **__)
Replaces ROS1 bag reader with EmbagReader.