5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.plugins.mcap
15from __future__ import absolute_import
25try: import mcap, mcap.reader
26except ImportError: mcap = None
27if "1" == os.getenv("ROS_VERSION"):
28 try:
import mcap_ros1
as mcap_ros, mcap_ros1.decoder, mcap_ros1.writer
29 except ImportError: mcap_ros =
None
30elif "2" == os.getenv(
"ROS_VERSION"):
31 try:
import mcap_ros2
as mcap_ros, mcap_ros2.decoder, mcap_ros2.writer
32 except ImportError: mcap_ros =
None
37from .. common
import ConsolePrinter
38from .. outputs
import RolloverSinkMixin, Sink
43 MCAP bag interface, providing most of rosbag.Bag interface.
45 Bag cannot be appended to, and cannot be read
and written at the same time
46 (MCAP API limitation).
53 MCAP_MAGIC = b
"\x89MCAP\x30\r\n"
55 def __init__(self, f, mode="r", **__):
57 Opens file and populates metadata.
59 @param f bag file path,
or a stream object
60 @param mode
return reader
if "r" or writer
if "w"
62 if mode
not in self.
MODESMODES:
raise ValueError(
"invalid mode %r" % mode)
85 if common.is_stream(f):
86 if not common.verify_io(f, mode):
87 raise io.UnsupportedOperation(
"read" if "r" == mode
else "write")
91 if not isinstance(f, common.PATH_TYPES):
92 raise ValueError(
"invalid filename %r" % type(f))
93 if "w" == mode: common.makedirs(os.path.dirname(f))
97 (t, c)
for c, t
in api.ROS_TIME_CLASSES.items()
if api.get_message_type(c) == t
105 Returns the number of messages in the bag.
107 @param topic_filters list of topics
or a single topic to filter by,
if any
110 topics = topic_filters
111 topics = topics
if isinstance(topics, (dict, list, set, tuple))
else [topics]
112 return sum(c
for (t, _, _), c
in self.
_topics.items()
if t
in topics)
113 return sum(self.
_topics.values())
117 """Returns the start time of the bag, as UNIX timestamp."""
122 """Returns the end time of the bag, as UNIX timestamp."""
128 Returns ROS message class for typename, or None if unknown type.
130 @param typehash message type definition hash,
if any
132 typehash = typehash or next((h
for t, h
in self.
_typedefs if t == typename),
None)
133 typekey = (typename, typehash)
136 name = typename.split(
"/")[-1]
137 fields = api.parse_definition_fields(typename, self.
_typedefs[typekey])
138 cls = type(name, (types.SimpleNamespace, ), {
139 "__name__": name,
"__slots__": list(fields),
140 "__repr__": message_repr,
"__str__": message_repr
144 typeclses = api.realapi.generate_message_classes(typename, self.
_typedefs[typekey])
145 self.
_types[typekey] = typeclses[typename]
147 return self.
_types.get(typekey)
151 """Returns ROS message type definition full text from bag, including subtype definitions."""
152 if api.is_ros_message(msg_or_type):
153 return self.
_typedefs.get((api.get_message_type(msg_or_type),
154 api.get_message_type_hash(msg_or_type)))
155 typename = msg_or_type
156 return next((d
for (n, h), d
in self.
_typedefs.items()
if n == typename),
None)
160 """Returns ROS message type MD5 hash."""
161 typename = msg_or_type
162 if api.is_ros_message(msg_or_type): typename = api.get_message_type(msg_or_type)
163 return next((h
for n, h
in self.
_typedefs if n == typename),
None)
167 """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available."""
168 return self.
_qoses.get((topic, typename))
172 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
178 Returns thorough metainfo on topic and message types.
180 @param topic_filters list of topics
or a single topic to filter returned topics-dict by,
183 msg_types
as dict of {typename: typehash},
186 topics = topic_filters
187 topics = topics if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
188 if self.
_ttinfo and (
not topics
or set(topics) == set(t
for t, _, _
in self.
_topics)):
192 topics = topic_filters
193 topics = topics
if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
194 msgtypes = {n: h
for t, n, h
in self.
_topics}
198 """Returns median value from given sorted numbers."""
200 return None if not vlen
else vals[vlen // 2]
if vlen % 2
else \
201 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
203 for (t, n, _), c
in sorted(self.
_topics.items()):
204 if topics
and t
not in topics:
continue
207 stamps = sorted(m.publish_time / 1E9
for _, _, m
in self.
_reader.iter_messages([t]))
208 mymedian = median(sorted(s1 - s0
for s1, s0
in zip(stamps[1:], stamps[:-1])))
209 freq = 1.0 / mymedian
if mymedian
else None
210 topicdict[t] = self.
TopicTuple(n, c, len(self.
_qoses.get((t, n))
or []), freq)
212 if not topics
or set(topics) == set(t
for t, _, _
in self.
_topics): self.
_ttinfo = result
216 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False):
218 Yields messages from the bag, optionally filtered by topic
and timestamp.
220 @param topics list of topics
or a single topic to filter by,
if at all
221 @param start_time earliest timestamp of message to
return,
as ROS time
or convertible
222 (int/float/duration/datetime/decimal)
223 @param end_time latest timestamp of message to
return,
as ROS time
or convertible
224 (int/float/duration/datetime/decimal)
225 @param raw
if true, then returned messages are tuples of
226 (typename, bytes, typehash, typeclass)
227 @return BagMessage namedtuples of
228 (topic, message, timestamp
as rospy/rclpy.Time)
231 if "w" == self.
_mode:
raise io.UnsupportedOperation(
"read")
233 topics = topics
if isinstance(topics, list)
else [topics]
if topics
else None
234 start_ns, end_ns = (api.to_nsec(api.to_time(x))
for x
in (start_time, end_time))
235 for schema, channel, message
in self.
_reader.iter_messages(topics, start_ns, end_ns):
237 typekey = (typename, typehash) = self.
_schematypes[schema.id]
238 if typekey
not in self.
_types:
240 msg = (typename, message.data, typehash, self.
_types[typekey])
242 api.TypeMeta.make(msg, channel.topic, self)
243 yield self.
BagMessage(channel.topic, msg, api.make_time(nsecs=message.publish_time))
247 def write(self, topic, msg, t=None, raw=False, **__):
249 Writes out message to MCAP file.
251 @param topic name of topic
252 @param msg ROS1 message
253 @param t message timestamp
if not using current ROS time,
254 as ROS time type
or convertible (int/float/duration/datetime/decimal)
255 @param raw
if true, `msg`
is in raw format, (typename, bytes, typehash, typeclass)
258 if "r" == self.
_mode:
raise io.UnsupportedOperation(
"write")
261 typename, binary, typehash = msg[:3]
263 typedef = self.
_typedefs.get((typename, typehash))
or api.get_message_definition(cls)
264 msg = api.deserialize_message(binary, cls)
266 with api.TypeMeta.make(msg, topic)
as meta:
267 typename, typehash, typedef = meta.typename, meta.typehash, meta.definition
268 topickey, typekey = (topic, typename, typehash), (typename, typehash)
270 nanosec = (time.time_ns()
if hasattr(time,
"time_ns")
else int(time.time() * 10**9)) \
271 if t
is None else api.to_nsec(api.to_time(t))
274 fullname = api.make_full_typename(typename)
275 schema = self.
_writer.register_msgdef(fullname, typedef)
277 schema, data = self.
_schemas[typekey], api.message_to_dict(msg)
278 self.
_writer.write_message(topic, schema, data, publish_time=nanosec)
280 self.
_writer.write_message(topic, msg, publish_time=nanosec)
286 self.
_types.setdefault(typekey, type(msg))
287 self.
_typedefs.setdefault(typekey, typedef)
292 """Opens the bag file if not already open."""
295 raise io.UnsupportedOperation(
"Cannot reopen bag for writing.")
298 self.
_reader = mcap.reader.make_reader(self.
_file)
if "r" == self.
_mode else None
299 self.
_decoder = mcap_ros.decoder.Decoder()
if "r" == self.
_mode else None
302 except Exception
as e:
303 raise Exception(
"Error reading MCAP metadata: %r" % e)
308 """Closes the bag file."""
309 if self.
_file is not None:
317 """Returns whether file is closed."""
318 return self.
_file is None
323 """Returns the list of topics in bag, in alphabetic order."""
324 return sorted((t
for t, _, _
in self.
_topics), key=str.lower)
329 """Returns bag file path."""
335 """Returns current file size."""
338 size, _ = self.
_file.tell(), self.
_file.seek(pos)
345 """Returns file open mode."""
350 """Returns whether bag contains given topic."""
351 return any(key == t
for t, _, _
in self.
_topics)
355 """Retrieves next message from bag as (topic, message, timestamp)."""
361 def _decode_message(self, message, channel, schema):
363 Returns ROS message deserialized from binary data.
365 @param message mcap.records.Message instance
366 @param channel mcap.records.Channel instance
for message
367 @param schema mcap.records.Schema instance
for message type
370 if api.ROS2
and not issubclass(cls, types.SimpleNamespace):
371 msg = api.deserialize_message(message.data, cls)
373 msg = self.
_decoder.decode(schema=schema, message=message)
377 api.TypeMeta.make(msg, channel.topic, self, data=message.data)
379 if typekey
not in self.
_types: self.
_types[typekey] = type(msg)
383 def _make_message_class(self, schema, message, generate=True):
385 Returns message type class, generating
if not already available.
387 @param schema mcap.records.Schema instance
for message type
388 @param message mcap.records.Message instance
389 @param generate generate message
class dynamically if not available
391 typekey = (typename, typehash) = self._schematypes[schema.id]
392 if api.ROS2
and typekey
not in self.
_types:
394 cls = api.get_message_class(typename)
395 clshash = api.get_message_type_hash(cls)
396 if typehash == clshash: self.
_types[typekey] = cls
397 except Exception:
pass
398 if typekey
not in self.
_types and generate:
400 msg = self.
_decoder.decode(schema=schema, message=message)
403 typeclses = api.realapi.generate_message_classes(typename, schema.data.decode())
404 self.
_types[typekey] = typeclses[typename]
405 return self.
_types.get(typekey)
408 def _patch_message_class(self, cls, typename, typehash):
410 Patches MCAP ROS2 message class with expected attributes and methods, recursively.
412 @param cls ROS message
class as returned from mcap_ros2.decoder
413 @param typename ROS message type name, like
"std_msgs/Bool"
414 @param typehash ROS message type hash
415 @return patched
class
417 typekey = (typename, typehash)
419 fields = api.parse_definition_fields(typename, self.
_typedefs[typekey])
422 for s
in [api.scalar(t)]
423 if s
not in api.ROS_BUILTIN_TYPES
428 cls.
__module__ = typename.split(
"/", maxsplit=1)[0]
438 def _patch_message(self, message, typename, typehash):
440 Patches MCAP ROS2 message with expected attributes
and methods, recursively.
442 @param message ROS message instance
as returned
from mcap_ros2.decoder
443 @param typename ROS message type name, like
"std_msgs/Bool"
444 @param typehash ROS message type hash
445 @return original message patched,
or new instance
if ROS2 temporal type
449 stack = [(message, (typename, typehash),
None, ())]
451 msg, typekey, parent, path = stack.pop(0)
452 mycls, typename = type(msg), typekey[0]
456 msg2 = self.
_temporal_ctors[typename](sec=msg.sec, nanosec=msg.nanosec)
457 if msg
is message: result = msg2
458 elif len(path) == 1: setattr(parent, path[0], msg2)
459 else: getattr(parent, path[0])[path[1]] = msg2
465 v = getattr(msg, name)
466 if isinstance(v, list):
467 stack.extend((x, subtypekey, msg, (name, i))
for i, x
in enumerate(v))
469 stack.append((v, subtypekey, msg, (name, )))
474 def _populate_meta(self):
475 """Populates bag metainfo."""
476 summary = self.
_reader.get_summary()
477 self.
_start_time = summary.statistics.message_start_time / 1E9
478 self.
_end_time = summary.statistics.message_end_time / 1E9
480 def make_hash(typename, msgdef, extradefs):
481 """Returns MD5 hash calculated for type definition, or None on error."""
482 try:
return api.calculate_definition_hash(typename, msgdef, extradefs)
483 except Exception
as e:
484 ConsolePrinter.warn(
"Error calculating message type hash for %r: %r", typename, e)
488 for cid, channel
in summary.channels.items():
489 schema = summary.schemas[channel.schema_id]
490 topic, typename = channel.topic, api.canonical(schema.name)
492 typedef = schema.data.decode(
"utf-8")
493 subtypedefs, nesting = api.parse_definition_subtypes(typedef, nesting=
True)
494 typehash = channel.metadata.get(
"md5sum")
or \
495 make_hash(typename, typedef, tuple(subtypedefs.items()))
496 topickey, typekey = (topic, typename, typehash), (typename, typehash)
499 if channel.metadata.get(
"offered_qos_profiles"):
500 try: qoses = yaml.safe_load(channel.metadata[
"offered_qos_profiles"])
501 except Exception
as e:
502 ConsolePrinter.warn(
"Error parsing topic QoS profiles from %r: %s.",
503 channel.metadata[
"offered_qos_profiles"], e)
505 self.
_topics.setdefault(topickey, 0)
506 self.
_topics[topickey] += summary.statistics.channel_message_counts[cid]
508 defhashes[typedef] = typehash
509 for t, d
in subtypedefs.items():
510 h = defhashes[d]
if d
in defhashes
else make_hash(t, d, tuple(subtypedefs.items()))
514 for t, subtypes
in nesting.items():
520 if qoses: self.
_qoses[topickey] = qoses
527 """Returns whether file is recognizable as MCAP format."""
528 if common.is_stream(f):
529 pos, _ = f.tell(), f.seek(0)
531 elif os.path.isfile(f)
and os.path.getsize(f):
532 with open(f,
"rb")
as f:
535 ext = os.path.splitext(f
or "")[-1].lower()
536 result = ext
in McapSink.FILE_EXTENSIONS
542 """Returns a shallow copy, with slots populated manually."""
543 result = self.__class__.__new__(self.__class__)
544 for n
in self.__slots__:
545 setattr(result, n, copy.copy(getattr(self, n)))
550 """Returns a deep copy, with slots populated manually."""
551 result = self.__class__.__new__(self.__class__)
552 for n
in self.__slots__:
553 setattr(result, n, copy.deepcopy(getattr(self, n), memo))
558 """Returns a map of message field names and types (patch method for MCAP message classes)."""
563 """Returns a string representation of ROS message."""
564 fields =
", ".join(
"%s=%r" % (n, getattr(self, n))
for n
in self.__slots__)
565 return "%s(%s)" % (self.__name__, fields)
570 """Writes messages to MCAP file."""
573 FILE_EXTENSIONS = (
".mcap", )
576 DEFAULT_ARGS = dict(META=
False, WRITE_OPTIONS={}, VERBOSE=
False)
579 def __init__(self, args=None, **kwargs):
581 @param args arguments
as namespace
or dictionary, case-insensitive;
582 or a single path
as the file to write
583 @param args.write base name of MCAP files to write
584 @param args.write_options {
"overwrite": whether to overwrite existing file
586 "rollover-size": bytes limit
for individual output files,
587 "rollover-count": message limit
for individual output files,
588 "rollover-duration": time span limit
for individual output files,
589 as ROS duration
or convertible seconds,
590 "rollover-template": output filename template, supporting
591 strftime format codes like
"%H-%M-%S"
592 and "%(index)s" as output file index}
593 @param args.meta whether to
print metainfo
594 @param args.verbose whether to
print debug information
595 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
597 args = {"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else args
598 args = common.ensure_namespace(args, McapSink.DEFAULT_ARGS, **kwargs)
599 super(McapSink, self).
__init__(args)
600 RolloverSinkMixin.__init__(self, args)
613 Returns whether required libraries are available (mcap, mcap_ros1/mcap_ros2)
614 and overwrite
is valid
and file
is writable.
617 ok, mcap_ok, mcap_ros_ok = RolloverSinkMixin.validate(self), bool(mcap), bool(mcap_ros)
618 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
619 ConsolePrinter.error(
"Invalid overwrite option for MCAP: %r. "
620 "Choose one of {true, false}.",
621 self.
args.WRITE_OPTIONS[
"overwrite"])
624 ConsolePrinter.error(
"mcap not available: cannot work with MCAP files.")
626 ConsolePrinter.error(
"mcap_ros%s not available: cannot work with MCAP files.",
627 api.ROS_VERSION
or "")
628 if not common.verify_io(self.
args.WRITE,
"w"):
630 self.
validvalid = ok
and mcap_ok
and mcap_ros_ok
632 self.
_overwrite = (self.
args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true"))
636 def emit(self, topic, msg, stamp=None, match=None, index=None):
637 """Writes out message to MCAP file."""
640 RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
642 kwargs = dict(publish_time=api.to_nsec(stamp), sequence=index)
644 with api.TypeMeta.make(msg, topic)
as m:
647 fullname = api.make_full_typename(m.typename)
648 self.
_schemas[typekey] = self.
_writer.register_msgdef(fullname, m.definition)
649 schema, data = self.
_schemas[typekey], api.message_to_dict(msg)
650 self.
_writer.write_message(topic, schema, data, **kwargs)
652 self.
_writer.write_message(topic, msg, **kwargs)
653 super(McapSink, self).
emit(topic, msg, stamp, match, index)
657 """Closes output file if open, emits metainfo."""
663 super(McapSink, self).
close()
667 """Closes output file, if any."""
674 def _ensure_open(self):
675 """Opens output file if not already open."""
676 if self.
_file:
return
680 if self.
args.VERBOSE:
682 action =
"Overwriting" if sz
and self.
_overwrite else "Creating"
690 """Adds MCAP support to reading and writing. Raises ImportWarning if libraries not available."""
691 if not mcap
or not mcap_ros:
692 ConsolePrinter.error(
"mcap libraries not available: cannot work with MCAP files.")
693 raise ImportWarning()
694 from ..
import plugins
695 plugins.add_write_format(
"mcap", McapSink,
"MCAP", [
696 (
"overwrite=true|false",
"overwrite existing file in MCAP output\n"
697 "instead of appending unique counter (default false)"),
698 ] + RolloverSinkMixin.get_write_options(
"MCAP"))
699 api.BAG_EXTENSIONS += McapSink.FILE_EXTENSIONS
700 api.Bag.READER_CLASSES.add(McapBag)
701 api.Bag.WRITER_CLASSES.add(McapBag)
704__all__ = [
"McapBag",
"McapSink",
"init"]
tuple MODES
Supported opening modes, overridden in subclasses.
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
open(self)
Opens the bag file if not already open.
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
closed
Returns whether file is closed.
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
__deepcopy__(self, memo=None)
Provides output file rollover by size, duration, or message count.
filename
Current output file path.
close_output(self)
Closes output file, if any.
validate(self)
Returns whether write options are valid, emits error if not, else populates options.
format_output_meta(self)
Returns output file metainfo string, with names and sizes and message/topic counts.
valid
Result of validate()
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
close(self)
Shuts down output, closing any files or connections.
MCAP bag interface, providing most of rosbag.Bag interface.
tuple MODES
Supported opening modes.
get_type_and_topic_info(self, topic_filters=None)
Returns thorough metainfo on topic and message types.
mode
Returns file open mode.
open(self)
Opens the bag file if not already open.
filename
Returns bag file path.
__init__(self, f, mode="r", **__)
Opens file and populates metadata.
autodetect(cls, f)
Returns whether file is recognizable as MCAP format.
write(self, topic, msg, t=None, raw=False, **__)
Writes out message to MCAP file.
get_start_time(self)
Returns the start time of the bag, as UNIX timestamp.
get_end_time(self)
Returns the end time of the bag, as UNIX timestamp.
get_message_class(self, typename, typehash=None)
Returns ROS message class for typename, or None if unknown type.
get_qoses(self, topic, typename)
Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.
get_message_type_hash(self, msg_or_type)
Returns ROS message type MD5 hash.
read_messages(self, topics=None, start_time=None, end_time=None, raw=False)
Yields messages from the bag, optionally filtered by topic and timestamp.
get_topic_info(self, *_, **__)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
close(self)
Closes the bag file.
__contains__(self, key)
Returns whether bag contains given topic.
closed
Returns whether file is closed.
size
Returns current file size.
topics
Returns the list of topics in bag, in alphabetic order.
get_fields_and_field_types
__next__(self)
Retrieves next message from bag as (topic, message, timestamp).
str MCAP_MAGIC
MCAP file header magic start bytes.
get_message_definition(self, msg_or_type)
Returns ROS message type definition full text from bag, including subtype definitions.
get_message_count(self, topic_filters=None)
Returns the number of messages in the bag.
Writes messages to MCAP file.
__init__(self, args=None, **kwargs)
close_output(self)
Closes output file, if any.
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes out message to MCAP file.
validate(self)
Returns whether required libraries are available (mcap, mcap_ros1/mcap_ros2) and overwrite is valid a...
close(self)
Closes output file if open, emits metainfo.
copy_with_slots(self)
Returns a shallow copy, with slots populated manually.
deepcopy_with_slots(self, memo)
Returns a deep copy, with slots populated manually.
init(*_, **__)
Adds MCAP support to reading and writing.
message_repr(self)
Returns a string representation of ROS message.
message_get_fields_and_field_types(self)
Returns a map of message field names and types (patch method for MCAP message classes).