43 MCAP bag interface, providing most of rosbag.Bag interface.
45 Bag cannot be appended to, and cannot be read and written at the same time
46 (MCAP API limitation).
53 MCAP_MAGIC = b
"\x89MCAP\x30\r\n"
57 Opens file and populates metadata.
59 @param f bag file path, or a stream object
60 @param mode return reader if "r" or writer if "w"
62 if mode
not in self.
MODESMODES:
raise ValueError(
"invalid mode %r" % mode)
85 if common.is_stream(f):
86 if not common.verify_io(f, mode):
87 raise io.UnsupportedOperation(
"read" if "r" == mode
else "write")
91 if not isinstance(f, common.PATH_TYPES):
92 raise ValueError(
"invalid filename %r" % type(f))
93 if "w" == mode: common.makedirs(os.path.dirname(f))
97 (t, c)
for c, t
in api.ROS_TIME_CLASSES.items()
if api.get_message_type(c) == t
105 Returns the number of messages in the bag.
107 @param topic_filters list of topics or a single topic to filter by, if any
110 topics = topic_filters
111 topics = topics
if isinstance(topics, (dict, list, set, tuple))
else [topics]
112 return sum(c
for (t, _, _), c
in self.
_topics.items()
if t
in topics)
113 return sum(self.
_topics.values())
117 """Returns the start time of the bag, as UNIX timestamp."""
122 """Returns the end time of the bag, as UNIX timestamp."""
128 Returns ROS message class for typename, or None if unknown type.
130 @param typehash message type definition hash, if any
132 typehash = typehash
or next((h
for t, h
in self.
_typedefs if t == typename),
None)
133 typekey = (typename, typehash)
136 name = typename.split(
"/")[-1]
137 fields = api.parse_definition_fields(typename, self.
_typedefs[typekey])
138 cls = type(name, (types.SimpleNamespace, ), {
139 "__name__": name,
"__slots__": list(fields),
140 "__repr__": message_repr,
"__str__": message_repr
144 typeclses = api.realapi.generate_message_classes(typename, self.
_typedefs[typekey])
145 self.
_types[typekey] = typeclses[typename]
147 return self.
_types.get(typekey)
151 """Returns ROS message type definition full text from bag, including subtype definitions."""
152 if api.is_ros_message(msg_or_type):
153 return self.
_typedefs.get((api.get_message_type(msg_or_type),
154 api.get_message_type_hash(msg_or_type)))
155 typename = msg_or_type
156 return next((d
for (n, h), d
in self.
_typedefs.items()
if n == typename),
None)
160 """Returns ROS message type MD5 hash."""
161 typename = msg_or_type
162 if api.is_ros_message(msg_or_type): typename = api.get_message_type(msg_or_type)
163 return next((h
for n, h
in self.
_typedefs if n == typename),
None)
167 """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available."""
168 return self.
_qoses.get((topic, typename))
172 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
178 Returns thorough metainfo on topic and message types.
180 @param topic_filters list of topics or a single topic to filter returned topics-dict by,
182 @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
183 msg_types as dict of {typename: typehash},
184 topics as a dict of {topic: TopicTuple() namedtuple}.
186 topics = topic_filters
187 topics = topics
if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
188 if self.
_ttinfo and (
not topics
or set(topics) == set(t
for t, _, _
in self.
_topics)):
190 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
192 topics = topic_filters
193 topics = topics
if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
194 msgtypes = {n: h
for t, n, h
in self.
_topics}
198 """Returns median value from given sorted numbers."""
200 return None if not vlen
else vals[vlen // 2]
if vlen % 2
else \
201 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
203 for (t, n, _), c
in sorted(self.
_topics.items()):
204 if topics
and t
not in topics:
continue
207 stamps = sorted(m.publish_time / 1E9
for _, _, m
in self.
_reader.iter_messages([t]))
208 mymedian = median(sorted(s1 - s0
for s1, s0
in zip(stamps[1:], stamps[:-1])))
209 freq = 1.0 / mymedian
if mymedian
else None
210 topicdict[t] = self.
TopicTuple(n, c, len(self.
_qoses.get((t, n))
or []), freq)
212 if not topics
or set(topics) == set(t
for t, _, _
in self.
_topics): self.
_ttinfo = result
216 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False):
218 Yields messages from the bag, optionally filtered by topic and timestamp.
220 @param topics list of topics or a single topic to filter by, if at all
221 @param start_time earliest timestamp of message to return, as ROS time or convertible
222 (int/float/duration/datetime/decimal)
223 @param end_time latest timestamp of message to return, as ROS time or convertible
224 (int/float/duration/datetime/decimal)
225 @param raw if true, then returned messages are tuples of
226 (typename, bytes, typehash, typeclass)
227 @return BagMessage namedtuples of
228 (topic, message, timestamp as rospy/rclpy.Time)
230 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
231 if "w" == self.
_mode:
raise io.UnsupportedOperation(
"read")
233 topics = topics
if isinstance(topics, list)
else [topics]
if topics
else None
234 start_ns, end_ns = (api.to_nsec(api.to_time(x))
for x
in (start_time, end_time))
235 for schema, channel, message
in self.
_reader.iter_messages(topics, start_ns, end_ns):
237 typekey = (typename, typehash) = self.
_schematypes[schema.id]
238 if typekey
not in self.
_types:
240 msg = (typename, message.data, typehash, self.
_types[typekey])
242 api.TypeMeta.make(msg, channel.topic, self)
243 yield self.
BagMessage(channel.topic, msg, api.make_time(nsecs=message.publish_time))
247 def write(self, topic, msg, t=None, raw=False, **__):
249 Writes out message to MCAP file.
251 @param topic name of topic
252 @param msg ROS1 message
253 @param t message timestamp if not using current ROS time,
254 as ROS time type or convertible (int/float/duration/datetime/decimal)
255 @param raw if true, `msg` is in raw format, (typename, bytes, typehash, typeclass)
257 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
258 if "r" == self.
_mode:
raise io.UnsupportedOperation(
"write")
261 typename, binary, typehash = msg[:3]
263 typedef = self.
_typedefs.get((typename, typehash))
or api.get_message_definition(cls)
264 msg = api.deserialize_message(binary, cls)
266 with api.TypeMeta.make(msg, topic)
as meta:
267 typename, typehash, typedef = meta.typename, meta.typehash, meta.definition
268 topickey, typekey = (topic, typename, typehash), (typename, typehash)
270 nanosec = (time.time_ns()
if hasattr(time,
"time_ns")
else int(time.time() * 10**9)) \
271 if t
is None else api.to_nsec(api.to_time(t))
274 fullname = api.make_full_typename(typename)
275 schema = self.
_writer.register_msgdef(fullname, typedef)
277 schema, data = self.
_schemas[typekey], api.message_to_dict(msg)
278 self.
_writer.write_message(topic, schema, data, publish_time=nanosec)
280 self.
_writer.write_message(topic, msg, publish_time=nanosec)
286 self.
_types.setdefault(typekey, type(msg))
287 self.
_typedefs.setdefault(typekey, typedef)
292 """Opens the bag file if not already open."""
295 raise io.UnsupportedOperation(
"Cannot reopen bag for writing.")
298 self.
_reader = mcap.reader.make_reader(self.
_file)
if "r" == self.
_mode else None
299 self.
_decoder = mcap_ros.decoder.Decoder()
if "r" == self.
_mode else None
302 except Exception
as e:
303 raise Exception(
"Error reading MCAP metadata: %r" % e)
308 """Closes the bag file."""
309 if self.
_file is not None:
317 """Returns whether file is closed."""
318 return self.
_file is None
323 """Returns the list of topics in bag, in alphabetic order."""
324 return sorted((t
for t, _, _
in self.
_topics), key=str.lower)
329 """Returns bag file path."""
335 """Returns current file size."""
337 pos, _ = self.
_file.tell(), self.
_file.seek(0, os.SEEK_END)
338 size, _ = self.
_file.tell(), self.
_file.seek(pos)
345 """Returns file open mode."""
350 """Returns whether bag contains given topic."""
351 return any(key == t
for t, _, _
in self.
_topics)
355 """Retrieves next message from bag as (topic, message, timestamp)."""
356 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
361 def _decode_message(self, message, channel, schema):
363 Returns ROS message deserialized from binary data.
365 @param message mcap.records.Message instance
366 @param channel mcap.records.Channel instance for message
367 @param schema mcap.records.Schema instance for message type
370 if api.ROS2
and not issubclass(cls, types.SimpleNamespace):
371 msg = api.deserialize_message(message.data, cls)
373 msg = self.
_decoder.decode(schema=schema, message=message)
377 api.TypeMeta.make(msg, channel.topic, self, data=message.data)
379 if typekey
not in self.
_types: self.
_types[typekey] = type(msg)
383 def _make_message_class(self, schema, message, generate=True):
385 Returns message type class, generating if not already available.
387 @param schema mcap.records.Schema instance for message type
388 @param message mcap.records.Message instance
389 @param generate generate message class dynamically if not available
391 typekey = (typename, typehash) = self.
_schematypes[schema.id]
392 if api.ROS2
and typekey
not in self.
_types:
394 cls = api.get_message_class(typename)
395 clshash = api.get_message_type_hash(cls)
396 if typehash == clshash: self.
_types[typekey] = cls
397 except Exception:
pass
398 if typekey
not in self.
_types and generate:
400 msg = self.
_decoder.decode(schema=schema, message=message)
403 typeclses = api.realapi.generate_message_classes(typename, schema.data.decode())
404 self.
_types[typekey] = typeclses[typename]
405 return self.
_types.get(typekey)
408 def _patch_message_class(self, cls, typename, typehash):
410 Patches MCAP ROS2 message class with expected attributes and methods, recursively.
412 @param cls ROS message class as returned from mcap_ros2.decoder
413 @param typename ROS message type name, like "std_msgs/Bool"
414 @param typehash ROS message type hash
415 @return patched class
417 typekey = (typename, typehash)
419 fields = api.parse_definition_fields(typename, self.
_typedefs[typekey])
422 for s
in [api.scalar(t)]
423 if s
not in api.ROS_BUILTIN_TYPES
428 cls.
__module__ = typename.split(
"/", maxsplit=1)[0]
438 def _patch_message(self, message, typename, typehash):
440 Patches MCAP ROS2 message with expected attributes and methods, recursively.
442 @param message ROS message instance as returned from mcap_ros2.decoder
443 @param typename ROS message type name, like "std_msgs/Bool"
444 @param typehash ROS message type hash
445 @return original message patched, or new instance if ROS2 temporal type
449 stack = [(message, (typename, typehash),
None, ())]
451 msg, typekey, parent, path = stack.pop(0)
452 mycls, typename = type(msg), typekey[0]
456 msg2 = self.
_temporal_ctors[typename](sec=msg.sec, nanosec=msg.nanosec)
457 if msg
is message: result = msg2
458 elif len(path) == 1: setattr(parent, path[0], msg2)
459 else: getattr(parent, path[0])[path[1]] = msg2
465 v = getattr(msg, name)
466 if isinstance(v, list):
467 stack.extend((x, subtypekey, msg, (name, i))
for i, x
in enumerate(v))
469 stack.append((v, subtypekey, msg, (name, )))
474 def _populate_meta(self):
475 """Populates bag metainfo."""
476 summary = self.
_reader.get_summary()
477 self.
_start_time = summary.statistics.message_start_time / 1E9
478 self.
_end_time = summary.statistics.message_end_time / 1E9
480 def make_hash(typename, msgdef, extradefs):
481 """Returns MD5 hash calculated for type definition, or None on error."""
482 try:
return api.calculate_definition_hash(typename, msgdef, extradefs)
483 except Exception
as e:
484 ConsolePrinter.warn(
"Error calculating message type hash for %r: %r", typename, e)
488 for cid, channel
in summary.channels.items():
489 schema = summary.schemas[channel.schema_id]
490 topic, typename = channel.topic, api.canonical(schema.name)
492 typedef = schema.data.decode(
"utf-8")
493 subtypedefs, nesting = api.parse_definition_subtypes(typedef, nesting=
True)
494 typehash = channel.metadata.get(
"md5sum")
or \
495 make_hash(typename, typedef, tuple(subtypedefs.items()))
496 topickey, typekey = (topic, typename, typehash), (typename, typehash)
499 if channel.metadata.get(
"offered_qos_profiles"):
500 try: qoses = yaml.safe_load(channel.metadata[
"offered_qos_profiles"])
501 except Exception
as e:
502 ConsolePrinter.warn(
"Error parsing topic QoS profiles from %r: %s.",
503 channel.metadata[
"offered_qos_profiles"], e)
505 self.
_topics.setdefault(topickey, 0)
506 self.
_topics[topickey] += summary.statistics.channel_message_counts[cid]
508 defhashes[typedef] = typehash
509 for t, d
in subtypedefs.items():
510 h = defhashes[d]
if d
in defhashes
else make_hash(t, d, tuple(subtypedefs.items()))
514 for t, subtypes
in nesting.items():
520 if qoses: self.
_qoses[topickey] = qoses
527 """Returns whether file is recognizable as MCAP format."""
528 if common.is_stream(f):
529 pos, _ = f.tell(), f.seek(0)
531 elif os.path.isfile(f)
and os.path.getsize(f):
532 with open(f,
"rb")
as f:
535 ext = os.path.splitext(f
or "")[-1].lower()
536 result = ext
in McapSink.FILE_EXTENSIONS