3ROS1 bag reader plugin using the `embag` library.
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
15from __future__
import absolute_import
19except ImportError: embag =
None
22from .. common
import PATH_TYPES, ConsolePrinter
27 """embag reader interface, providing most of rosbag.Bag interface."""
36 ROSBAG_MAGIC = b
"#ROSBAG"
39 if not isinstance(filename, PATH_TYPES):
40 raise ValueError(
"invalid filename %r" % type(filename))
41 if mode
not in self.
MODESMODES:
raise ValueError(
"invalid mode %r" % mode)
49 self.
_view = embag.View(filename)
57 Returns the number of messages in the bag.
59 @param topic_filters list of topics or a single topic to filter by, if any
62 topics = topic_filters
63 topics = topics
if isinstance(topics, (dict, list, set, tuple))
else [topics]
64 return sum(c
for (t, _, _), c
in self.
_topics.items()
if t
in topics)
65 return sum(self.
_topics.values())
69 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
71 return self.
_view.getStartTime().to_sec()
75 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
77 return self.
_view.getEndTime().to_sec()
82 Returns rospy message class for typename, or None if unknown message type for bag.
84 Generates class dynamically if not already generated.
86 @param typehash message type definition hash, if any
88 typekey = (typename, typehash
or next((h
for n, h
in self.
_types if n == typename),
None))
90 for n, c
in api.realapi.generate_message_classes(typename, self.
_typedefs[typekey]).items():
91 self.
_types[(n, c._md5sum)] = c
92 return self.
_types.get(typekey)
97 Returns ROS1 message type definition full text from bag, including subtype definitions.
99 Returns None if unknown message type for bag.
101 if api.is_ros_message(msg_or_type):
102 return self.
_typedefs.get((msg_or_type._type, msg_or_type._md5sum))
103 typename = msg_or_type
104 return next((d
for (n, h), d
in self.
_typedefs.items()
if n == typename),
None)
108 """Returns ROS1 message type MD5 hash, or None if unknown message type for bag."""
109 if api.is_ros_message(msg_or_type):
return msg_or_type._md5sum
110 typename = msg_or_type
111 return next((h
for n, h
in self.
_typedefs if n == typename),
None)
115 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
121 Returns thorough metainfo on topic and message types.
123 @param topic_filters list of topics or a single topic to filter returned topics-dict by,
125 @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
126 msg_types as dict of {typename: typehash},
127 topics as a dict of {topic: TopicTuple() namedtuple}.
129 topics = topic_filters
130 topics = topics
if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
131 if self.
_ttinfo and (
not topics
or set(topics) == set(t
for t, _, _
in self.
_topics)):
133 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
135 msgtypes = {n: h
for t, n, h
in self.
_topics}
139 """Returns median value from given sorted numbers."""
141 return None if not vlen
else vals[vlen // 2]
if vlen % 2
else \
142 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
144 conns = self.
_view.connectionsByTopic()
145 for (t, n, _), c
in sorted(self.
_topics.items()):
146 if topics
and t
not in topics:
continue
149 stamps = sorted(m.timestamp.secs + m.timestamp.nsecs / 1E9
150 for m
in self.
_view.getMessages([t]))
151 mymedian = median(sorted(s1 - s0
for s1, s0
in zip(stamps[1:], stamps[:-1])))
152 freq = 1.0 / mymedian
if mymedian
else None
153 topicdict[t] = self.
TopicTuple(n, c, len(conns.get(t, [])), freq)
154 if not topics
or set(topics) == set(t
for t, _, _
in self.
_topics):
159 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False):
161 Yields messages from the bag, optionally filtered by topic and timestamp.
163 @param topics list of topics or a single topic to filter by, if at all
164 @param start_time earliest timestamp of message to return, as ROS time or convertible
165 (int/float/duration/datetime/decimal)
166 @param end_time latest timestamp of message to return, as ROS time or convertible
167 (int/float/duration/datetime/decimal)
168 @param raw if true, then returned messages are tuples of
169 (typename, bytes, typehash, typeclass)
170 @return BagMessage namedtuples of (topic, message, timestamp as rospy.Time)
172 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
174 topics = topics
if isinstance(topics, list)
else [topics]
if topics
else []
175 start_time, end_time = (api.to_sec(api.to_time(x))
for x
in (start_time, end_time))
176 for m
in self.
_view.getMessages(topics)
if topics
else self.
_view.getMessages():
177 if start_time
is not None and start_time > m.timestamp.to_sec():
179 if end_time
is not None and end_time < m.timestamp.to_sec():
182 typename = self.
_hashdefs[(m.topic, m.md5)]
183 stamp = api.make_time(m.timestamp.secs, m.timestamp.nsecs)
186 api.TypeMeta.make(msg, m.topic, self)
192 """Opens the bag file if not already open."""
197 """Closes the bag file."""
206 """Returns whether file is closed."""
207 return not self.
_view
212 """Returns the list of topics in bag, in alphabetic order."""
213 return sorted((t
for t, _, _
in self.
_topics), key=str.lower)
218 """Returns bag file path."""
224 """Returns current file size."""
230 """Returns file open mode."""
235 """Returns whether bag contains given topic."""
236 return any(key == t
for t, _, _
in self.
_topics)
240 """Retrieves next message from bag as (topic, message, timestamp)."""
241 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
246 def _populate_meta(self):
247 """Populates bag metainfo."""
248 connections = self.
_view.connectionsByTopic()
250 for conn
in connections.get(topic, ()):
251 topickey, typekey = (topic, conn.type, conn.md5sum), (conn.type, conn.md5sum)
252 self.
_topics.setdefault(topickey, 0)
253 self.
_topics[topickey] += conn.message_count
254 self.
_hashdefs[(topic, conn.md5sum)] = conn.type
255 self.
_typedefs[typekey] = conn.message_definition
256 subtypedefs = api.parse_definition_subtypes(conn.message_definition)
257 for n, d
in subtypedefs.items():
258 h = api.calculate_definition_hash(n, d, tuple(subtypedefs.items()))
262 def _populate_message(self, msg, embagval):
263 """Returns the ROS1 message populated from a corresponding embag.RosValue."""
264 for name, typename
in api.get_message_fields(msg).items():
265 v, scalarname = embagval.get(name), api.scalar(typename)
266 if typename
in api.ROS_BUILTIN_TYPES:
267 msgv = getattr(embagval, name)
268 elif scalarname
in api.ROS_BUILTIN_TYPES:
270 elif typename
in api.ROS_TIME_TYPES:
271 cls =
next(k
for k, v
in api.ROS_TIME_CLASSES.items()
if v == typename)
272 msgv = cls(v.secs, v.nsecs)
273 elif scalarname
in api.ROS_TIME_TYPES:
274 cls =
next(k
for k, v
in api.ROS_TIME_CLASSES.items()
if v == scalarname)
275 msgv = [cls(x.secs, x.nsecs)
for x
in v]
276 elif typename == scalarname:
281 setattr(msg, name, msgv)
287 """Returns whether file is readable as ROS1 bag."""
288 result = os.path.isfile(filename)
290 with open(filename,
"rb")
as f:
296 """Replaces ROS1 bag reader with EmbagReader. Raises ImportWarning if embag not available."""
298 ConsolePrinter.error(
"embag not available: cannot read bag files.")
299 raise ImportWarning()
300 api.Bag.READER_CLASSES.add(EmbagReader)
303__all__ = [
"EmbagReader",
"init"]
tuple MODES
Supported opening modes, overridden in subclasses.
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
get_message_class(self, typename, typehash=None)
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
tuple MODES
Supported opening modes.
get_type_and_topic_info(self, topic_filters=None)
str ROSBAG_MAGIC
ROS1 bag file header magic start bytes.
get_message_class(self, typename, typehash=None)
get_message_type_hash(self, msg_or_type)
read_messages(self, topics=None, start_time=None, end_time=None, raw=False)
get_topic_info(self, *_, **__)
__init__(self, filename, mode="r", **__)
autodetect(cls, filename)
get_message_definition(self, msg_or_type)
get_message_count(self, topic_filters=None)