grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
embag.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3ROS1 bag reader plugin using the `embag` library.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 19.11.2021
11@modified 30.08.2023
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.plugins.embag
15from __future__ import absolute_import
16import os
17
18try: import embag
19except ImportError: embag = None
20
21from .. import api
22from .. common import PATH_TYPES, ConsolePrinter
23
24
25
26class EmbagReader(api.BaseBag):
27 """embag reader interface, providing most of rosbag.Bag interface."""
28
29
30 MODES = ("r", )
31
32
33 STREAMABLE = False
34
35
36 ROSBAG_MAGIC = b"#ROSBAG"
37
38 def __init__(self, filename, mode="r", **__):
39 if not isinstance(filename, PATH_TYPES):
40 raise ValueError("invalid filename %r" % type(filename))
41 if mode not in self.MODESMODES: raise ValueError("invalid mode %r" % mode)
42
43 self._topics = {} # {(topic, typename, typehash): message count}
44 self._types = {} # {(typename, typehash): message type class}
45 self._hashdefs = {} # {(topic, typehash): typename}
46 self._typedefs = {} # {(typename, typehash): type definition text}
47 self._iterer = None # Generator from read_messages() for next()
48 self._ttinfo = None # Cached result for get_type_and_topic_info()
49 self._view = embag.View(filename)
50 self._filename = str(filename)
51
52 self._populate_meta()
53
54
55 def get_message_count(self, topic_filters=None):
56 """
57 Returns the number of messages in the bag.
58
59 @param topic_filters list of topics or a single topic to filter by, if any
60 """
61 if topic_filters:
62 topics = topic_filters
63 topics = topics if isinstance(topics, (dict, list, set, tuple)) else [topics]
64 return sum(c for (t, _, _), c in self._topics.items() if t in topics)
65 return sum(self._topics.values())
66
67
68 def get_start_time(self):
69 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
70 if self.closedclosedclosedclosed or not self._topics: return None
71 return self._view.getStartTime().to_sec()
72
73
74 def get_end_time(self):
75 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
76 if self.closedclosedclosedclosed or not self._topics: return None
77 return self._view.getEndTime().to_sec()
78
79
80 def get_message_class(self, typename, typehash=None):
81 """
82 Returns rospy message class for typename, or None if unknown message type for bag.
83
84 Generates class dynamically if not already generated.
85
86 @param typehash message type definition hash, if any
87 """
88 typekey = (typename, typehash or next((h for n, h in self._types if n == typename), None))
89 if typekey not in self._types and typekey in self._typedefs:
90 for n, c in api.realapi.generate_message_classes(typename, self._typedefs[typekey]).items():
91 self._types[(n, c._md5sum)] = c
92 return self._types.get(typekey)
93
94
95 def get_message_definition(self, msg_or_type):
96 """
97 Returns ROS1 message type definition full text from bag, including subtype definitions.
98
99 Returns None if unknown message type for bag.
100 """
101 if api.is_ros_message(msg_or_type):
102 return self._typedefs.get((msg_or_type._type, msg_or_type._md5sum))
103 typename = msg_or_type
104 return next((d for (n, h), d in self._typedefs.items() if n == typename), None)
105
106
107 def get_message_type_hash(self, msg_or_type):
108 """Returns ROS1 message type MD5 hash, or None if unknown message type for bag."""
109 if api.is_ros_message(msg_or_type): return msg_or_type._md5sum
110 typename = msg_or_type
111 return next((h for n, h in self._typedefs if n == typename), None)
112
113
114 def get_topic_info(self, *_, **__):
115 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
116 return dict(self._topics)
117
118
119 def get_type_and_topic_info(self, topic_filters=None):
120 """
121 Returns thorough metainfo on topic and message types.
122
123 @param topic_filters list of topics or a single topic to filter returned topics-dict by,
124 if any
125 @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
126 msg_types as dict of {typename: typehash},
127 topics as a dict of {topic: TopicTuple() namedtuple}.
128 """
129 topics = topic_filters
130 topics = topics if isinstance(topics, (list, set, tuple)) else [topics] if topics else []
131 if self._ttinfo and (not topics or set(topics) == set(t for t, _, _ in self._topics)):
132 return self._ttinfo
133 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
134
135 msgtypes = {n: h for t, n, h in self._topics}
136 topicdict = {}
137
138 def median(vals):
139 """Returns median value from given sorted numbers."""
140 vlen = len(vals)
141 return None if not vlen else vals[vlen // 2] if vlen % 2 else \
142 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
143
144 conns = self._view.connectionsByTopic() # {topic: [embag.Connection, ]}
145 for (t, n, _), c in sorted(self._topics.items()):
146 if topics and t not in topics: continue # for
147 mymedian = None
148 if c > 1:
149 stamps = sorted(m.timestamp.secs + m.timestamp.nsecs / 1E9
150 for m in self._view.getMessages([t]))
151 mymedian = median(sorted(s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])))
152 freq = 1.0 / mymedian if mymedian else None
153 topicdict[t] = self.TopicTuple(n, c, len(conns.get(t, [])), freq)
154 if not topics or set(topics) == set(t for t, _, _ in self._topics):
155 self._ttinfo = self.TypesAndTopicsTuple(msgtypes, topicdict)
156 return self._ttinfo
157
158
159 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False):
160 """
161 Yields messages from the bag, optionally filtered by topic and timestamp.
162
163 @param topics list of topics or a single topic to filter by, if at all
164 @param start_time earliest timestamp of message to return, as ROS time or convertible
165 (int/float/duration/datetime/decimal)
166 @param end_time latest timestamp of message to return, as ROS time or convertible
167 (int/float/duration/datetime/decimal)
168 @param raw if true, then returned messages are tuples of
169 (typename, bytes, typehash, typeclass)
170 @return BagMessage namedtuples of (topic, message, timestamp as rospy.Time)
171 """
172 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
173
174 topics = topics if isinstance(topics, list) else [topics] if topics else []
175 start_time, end_time = (api.to_sec(api.to_time(x)) for x in (start_time, end_time))
176 for m in self._view.getMessages(topics) if topics else self._view.getMessages():
177 if start_time is not None and start_time > m.timestamp.to_sec():
178 continue # for m
179 if end_time is not None and end_time < m.timestamp.to_sec():
180 continue # for m
181
182 typename = self._hashdefs[(m.topic, m.md5)]
183 stamp = api.make_time(m.timestamp.secs, m.timestamp.nsecs)
184 if raw: msg = (typename, m.data(), m.md5, self.get_message_classget_message_class(typename, m.md5))
185 else: msg = self._populate_message(self.get_message_classget_message_class(typename, m.md5)(), m.data())
186 api.TypeMeta.make(msg, m.topic, self)
187 yield self.BagMessage(m.topic, msg, stamp)
188 if self.closedclosedclosedclosed: break # for m
189
190
191 def open(self):
192 """Opens the bag file if not already open."""
193 if not self._view: self._view = embag.View(self._filename)
194
196 def close(self):
197 """Closes the bag file."""
198 if self._view:
199 del self._view
200 self._view = None
201 self._iterer = None
202
203
204 @property
205 def closed(self):
206 """Returns whether file is closed."""
207 return not self._view
208
209
210 @property
211 def topics(self):
212 """Returns the list of topics in bag, in alphabetic order."""
213 return sorted((t for t, _, _ in self._topics), key=str.lower)
214
215
216 @property
217 def filename(self):
218 """Returns bag file path."""
219 return self._filename
220
221
222 @property
223 def size(self):
224 """Returns current file size."""
225 return os.path.getsize(self._filename) if os.path.isfile(self._filename) else None
227
228 @property
229 def mode(self):
230 """Returns file open mode."""
231 return "r"
232
233
234 def __contains__(self, key):
235 """Returns whether bag contains given topic."""
236 return any(key == t for t, _, _ in self._topics)
237
238
239 def __next__(self):
240 """Retrieves next message from bag as (topic, message, timestamp)."""
241 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
242 if self._iterer is None: self._iterer = self.read_messagesread_messages()
243 return next(self._iterer)
244
245
246 def _populate_meta(self):
247 """Populates bag metainfo."""
248 connections = self._view.connectionsByTopic()
249 for topic in self._view.topics():
250 for conn in connections.get(topic, ()):
251 topickey, typekey = (topic, conn.type, conn.md5sum), (conn.type, conn.md5sum)
252 self._topics.setdefault(topickey, 0)
253 self._topics[topickey] += conn.message_count
254 self._hashdefs[(topic, conn.md5sum)] = conn.type
255 self._typedefs[typekey] = conn.message_definition
256 subtypedefs = api.parse_definition_subtypes(conn.message_definition)
257 for n, d in subtypedefs.items():
258 h = api.calculate_definition_hash(n, d, tuple(subtypedefs.items()))
259 self._typedefs.setdefault((n, h), d)
260
261
262 def _populate_message(self, msg, embagval):
263 """Returns the ROS1 message populated from a corresponding embag.RosValue."""
264 for name, typename in api.get_message_fields(msg).items():
265 v, scalarname = embagval.get(name), api.scalar(typename)
266 if typename in api.ROS_BUILTIN_TYPES: # Single built-in type
267 msgv = getattr(embagval, name)
268 elif scalarname in api.ROS_BUILTIN_TYPES: # List of built-in types
269 msgv = list(v)
270 elif typename in api.ROS_TIME_TYPES: # Single temporal type
271 cls = next(k for k, v in api.ROS_TIME_CLASSES.items() if v == typename)
272 msgv = cls(v.secs, v.nsecs)
273 elif scalarname in api.ROS_TIME_TYPES: # List of temporal types
274 cls = next(k for k, v in api.ROS_TIME_CLASSES.items() if v == scalarname)
275 msgv = [cls(x.secs, x.nsecs) for x in v]
276 elif typename == scalarname: # Single subtype
277 msgv = self._populate_message(self.get_message_classget_message_class(typename)(), v)
278 else: # List of subtypes
279 cls = self.get_message_classget_message_class(scalarname)
280 msgv = [self._populate_message(cls(), x) for x in v]
281 setattr(msg, name, msgv)
282 return msg
283
284
285 @classmethod
286 def autodetect(cls, filename):
287 """Returns whether file is readable as ROS1 bag."""
288 result = os.path.isfile(filename)
289 if result:
290 with open(filename, "rb") as f:
291 result = (f.read(len(cls.ROSBAG_MAGIC)) == cls.ROSBAG_MAGIC)
292 return result
293
294
295def init(*_, **__):
296 """Replaces ROS1 bag reader with EmbagReader. Raises ImportWarning if embag not available."""
297 if not embag:
298 ConsolePrinter.error("embag not available: cannot read bag files.")
299 raise ImportWarning()
300 api.Bag.READER_CLASSES.add(EmbagReader)
301
302
303__all__ = ["EmbagReader", "init"]
tuple MODES
Supported opening modes, overridden in subclasses.
Definition api.py:112
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
Definition api.py:101
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
Definition api.py:109
get_message_class(self, typename, typehash=None)
Returns ROS message type class, or None if unknown message type for bag.
Definition api.py:256
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
Definition api.py:291
closed
Returns whether file is closed.
Definition api.py:348
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
Definition api.py:105
tuple MODES
Supported opening modes.
Definition embag.py:30
get_type_and_topic_info(self, topic_filters=None)
Returns thorough metainfo on topic and message types.
Definition embag.py:127
mode
Returns file open mode.
Definition embag.py:242
open(self)
Opens the bag file if not already open.
Definition embag.py:195
filename
Returns bag file path.
Definition embag.py:226
str ROSBAG_MAGIC
ROS1 bag file header magic start bytes.
Definition embag.py:36
get_start_time(self)
Returns the start time of the bag, as UNIX timestamp, or None if bag empty.
Definition embag.py:68
get_end_time(self)
Returns the end time of the bag, as UNIX timestamp, or None if bag empty.
Definition embag.py:74
get_message_class(self, typename, typehash=None)
Returns rospy message class for typename, or None if unknown message type for bag.
Definition embag.py:87
get_message_type_hash(self, msg_or_type)
Returns ROS1 message type MD5 hash, or None if unknown message type for bag.
Definition embag.py:107
read_messages(self, topics=None, start_time=None, end_time=None, raw=False)
Yields messages from the bag, optionally filtered by topic and timestamp.
Definition embag.py:175
get_topic_info(self, *_, **__)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
Definition embag.py:114
close(self)
Closes the bag file.
Definition embag.py:200
__contains__(self, key)
Returns whether bag contains given topic.
Definition embag.py:248
closed
Returns whether file is closed.
Definition embag.py:210
size
Returns current file size.
Definition embag.py:234
topics
Returns the list of topics in bag, in alphabetic order.
Definition embag.py:218
__init__(self, filename, mode="r", **__)
Definition embag.py:38
__next__(self)
Retrieves next message from bag as (topic, message, timestamp).
Definition embag.py:253
autodetect(cls, filename)
Returns whether file is readable as ROS1 bag.
Definition embag.py:304
get_message_definition(self, msg_or_type)
Returns ROS1 message type definition full text from bag, including subtype definitions.
Definition embag.py:100
get_message_count(self, topic_filters=None)
Returns the number of messages in the bag.
Definition embag.py:60
init(*_, **__)
Replaces ROS1 bag reader with EmbagReader.
Definition embag.py:313