5------------------------------------------------------------------------------ 
    6This file is part of grepros - grep for ROS bag files and live topics. 
    7Released under the BSD License. 
   12------------------------------------------------------------------------------
 
   14## @namespace grepros.plugins.mcap 
   15from __future__ import absolute_import 
   25try: import mcap, mcap.reader 
   26except ImportError: mcap = None 
   27if "1" == os.getenv("ROS_VERSION"): 
   28    try: 
import mcap_ros1 
as mcap_ros, mcap_ros1.decoder, mcap_ros1.writer
 
   29    except ImportError: mcap_ros = 
None 
   30elif "2" == os.getenv(
"ROS_VERSION"):
 
   31    try: 
import mcap_ros2 
as mcap_ros, mcap_ros2.decoder, mcap_ros2.writer
 
   32    except ImportError: mcap_ros = 
None 
   37from .. common 
import ConsolePrinter
 
   38from .. outputs 
import RolloverSinkMixin, Sink
 
   43    MCAP bag interface, providing most of rosbag.Bag interface. 
   45    Bag cannot be appended to, and cannot be read 
and written at the same time
 
   46    (MCAP API limitation).
 
   53    MCAP_MAGIC = b
"\x89MCAP\x30\r\n" 
   55    def __init__(self, f, mode="r", **__):
 
   57        Opens file and populates metadata.
 
   59        @param   f         bag file path, 
or a stream object
 
   60        @param   mode      
return reader 
if "r" or writer 
if "w" 
   62        if mode 
not in self.
MODESMODES: 
raise ValueError(
"invalid mode %r" % mode)
 
   85        if common.is_stream(f):
 
   86            if not common.verify_io(f, mode):
 
   87                raise io.UnsupportedOperation(
"read" if "r" == mode 
else "write")
 
   91            if not isinstance(f, common.PATH_TYPES):
 
   92                raise ValueError(
"invalid filename %r" % type(f))
 
   93            if "w" == mode: common.makedirs(os.path.dirname(f))
 
   97            (t, c) 
for c, t 
in api.ROS_TIME_CLASSES.items() 
if api.get_message_type(c) == t
 
  105        Returns the number of messages in the bag.
 
  107        @param   topic_filters  list of topics 
or a single topic to filter by, 
if any
 
  110            topics = topic_filters
 
  111            topics = topics 
if isinstance(topics, (dict, list, set, tuple)) 
else [topics]
 
  112            return sum(c 
for (t, _, _), c 
in self.
_topics.items() 
if t 
in topics)
 
  113        return sum(self.
_topics.values())
 
  117        """Returns the start time of the bag, as UNIX timestamp.""" 
  122        """Returns the end time of the bag, as UNIX timestamp.""" 
  128        Returns ROS message class for typename, or None if unknown type.
 
  130        @param   typehash  message type definition hash, 
if any
 
  132        typehash = typehash or next((h 
for t, h 
in self.
_typedefs if t == typename), 
None)
 
  133        typekey = (typename, typehash)
 
  136                name = typename.split(
"/")[-1]
 
  137                fields = api.parse_definition_fields(typename, self.
_typedefs[typekey])
 
  138                cls = type(name, (types.SimpleNamespace, ), {
 
  139                    "__name__": name, 
"__slots__": list(fields),
 
  140                    "__repr__": message_repr, 
"__str__": message_repr
 
  144                typeclses = api.realapi.generate_message_classes(typename, self.
_typedefs[typekey])
 
  145                self.
_types[typekey] = typeclses[typename]
 
  147        return self.
_types.get(typekey)
 
  151        """Returns ROS message type definition full text from bag, including subtype definitions.""" 
  152        if api.is_ros_message(msg_or_type):
 
  153            return self.
_typedefs.get((api.get_message_type(msg_or_type),
 
  154                                       api.get_message_type_hash(msg_or_type)))
 
  155        typename = msg_or_type
 
  156        return next((d 
for (n, h), d 
in self.
_typedefs.items() 
if n == typename), 
None)
 
  160        """Returns ROS message type MD5 hash.""" 
  161        typename = msg_or_type
 
  162        if api.is_ros_message(msg_or_type): typename = api.get_message_type(msg_or_type)
 
  163        return next((h 
for n, h 
in self.
_typedefs if n == typename), 
None)
 
  167        """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.""" 
  168        return self.
_qoses.get((topic, typename))
 
  172        """Returns topic and message type metainfo as {(topic, typename, typehash): count}.""" 
  178        Returns thorough metainfo on topic and message types.
 
  180        @param   topic_filters  list of topics 
or a single topic to filter returned topics-dict by,
 
  183                                msg_types 
as dict of {typename: typehash},
 
  186        topics = topic_filters 
  187        topics = topics if isinstance(topics, (list, set, tuple)) 
else [topics] 
if topics 
else []
 
  188        if self.
_ttinfo and (
not topics 
or set(topics) == set(t 
for t, _, _ 
in self.
_topics)):
 
  192        topics = topic_filters
 
  193        topics = topics 
if isinstance(topics, (list, set, tuple)) 
else [topics] 
if topics 
else []
 
  194        msgtypes = {n: h 
for t, n, h 
in self.
_topics}
 
  198            """Returns median value from given sorted numbers.""" 
  200            return None if not vlen 
else vals[vlen // 2] 
if vlen % 2 
else \
 
  201                   float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
 
  203        for (t, n, _), c 
in sorted(self.
_topics.items()):
 
  204            if topics 
and t 
not in topics: 
continue   
  207                stamps = sorted(m.publish_time / 1E9 
for _, _, m 
in self.
_reader.iter_messages([t]))
 
  208                mymedian = median(sorted(s1 - s0 
for s1, s0 
in zip(stamps[1:], stamps[:-1])))
 
  209            freq = 1.0 / mymedian 
if mymedian 
else None 
  210            topicdict[t] = self.
TopicTuple(n, c, len(self.
_qoses.get((t, n)) 
or []), freq)
 
  212        if not topics 
or set(topics) == set(t 
for t, _, _ 
in self.
_topics): self.
_ttinfo = result
 
  216    def read_messages(self, topics=None, start_time=None, end_time=None, raw=False):
 
  218        Yields messages from the bag, optionally filtered by topic 
and timestamp.
 
  220        @param   topics      list of topics 
or a single topic to filter by, 
if at all
 
  221        @param   start_time  earliest timestamp of message to 
return, 
as ROS time 
or convertible
 
  222                             (int/float/duration/datetime/decimal)
 
  223        @param   end_time    latest timestamp of message to 
return, 
as ROS time 
or convertible
 
  224                             (int/float/duration/datetime/decimal)
 
  225        @param   raw         
if true, then returned messages are tuples of
 
  226                             (typename, bytes, typehash, typeclass)
 
  227        @return              BagMessage namedtuples of 
 
  228                             (topic, message, timestamp 
as rospy/rclpy.Time)
 
  231        if "w" == self.
_mode: 
raise io.UnsupportedOperation(
"read")
 
  233        topics = topics 
if isinstance(topics, list) 
else [topics] 
if topics 
else None 
  234        start_ns, end_ns = (api.to_nsec(api.to_time(x)) 
for x 
in (start_time, end_time))
 
  235        for schema, channel, message 
in self.
_reader.iter_messages(topics, start_ns, end_ns):
 
  237                typekey = (typename, typehash) = self.
_schematypes[schema.id]
 
  238                if typekey 
not in self.
_types:
 
  240                msg = (typename, message.data, typehash, self.
_types[typekey])
 
  242            api.TypeMeta.make(msg, channel.topic, self)
 
  243            yield self.
BagMessage(channel.topic, msg, api.make_time(nsecs=message.publish_time))
 
  247    def write(self, topic, msg, t=None, raw=False, **__):
 
  249        Writes out message to MCAP file. 
  251        @param   topic   name of topic
 
  252        @param   msg     ROS1 message
 
  253        @param   t       message timestamp 
if not using current ROS time,
 
  254                         as ROS time type 
or convertible (int/float/duration/datetime/decimal)
 
  255        @param   raw     
if true, `msg` 
is in raw format, (typename, bytes, typehash, typeclass)
 
  258        if "r" == self.
_mode: 
raise io.UnsupportedOperation(
"write")
 
  261            typename, binary, typehash = msg[:3]
 
  263            typedef = self.
_typedefs.get((typename, typehash)) 
or api.get_message_definition(cls)
 
  264            msg = api.deserialize_message(binary, cls)
 
  266            with api.TypeMeta.make(msg, topic) 
as meta:
 
  267                typename, typehash, typedef = meta.typename, meta.typehash, meta.definition
 
  268        topickey, typekey = (topic, typename, typehash), (typename, typehash)
 
  270        nanosec = (time.time_ns() 
if hasattr(time, 
"time_ns") 
else int(time.time() * 10**9)) \
 
  271                  if t 
is None else api.to_nsec(api.to_time(t))
 
  274                fullname = api.make_full_typename(typename)
 
  275                schema = self.
_writer.register_msgdef(fullname, typedef)
 
  277            schema, data = self.
_schemas[typekey], api.message_to_dict(msg)
 
  278            self.
_writer.write_message(topic, schema, data, publish_time=nanosec)
 
  280            self.
_writer.write_message(topic, msg, publish_time=nanosec)
 
  286        self.
_types.setdefault(typekey, type(msg))
 
  287        self.
_typedefs.setdefault(typekey, typedef)
 
  292        """Opens the bag file if not already open.""" 
  295            raise io.UnsupportedOperation(
"Cannot reopen bag for writing.")
 
  298        self.
_reader  = mcap.reader.make_reader(self.
_file) 
if "r" == self.
_mode else None 
  299        self.
_decoder = mcap_ros.decoder.Decoder()          
if "r" == self.
_mode else None 
  302        except Exception 
as e:
 
  303            raise Exception(
"Error reading MCAP metadata: %r" % e)
 
  308        """Closes the bag file.""" 
  309        if self.
_file is not None:
 
  317        """Returns whether file is closed.""" 
  318        return self.
_file is None 
  323        """Returns the list of topics in bag, in alphabetic order.""" 
  324        return sorted((t 
for t, _, _ 
in self.
_topics), key=str.lower)
 
  329        """Returns bag file path.""" 
  335        """Returns current file size.""" 
  338            size, _ = self.
_file.tell(), self.
_file.seek(pos)
 
  345        """Returns file open mode.""" 
  350        """Returns whether bag contains given topic.""" 
  351        return any(key == t 
for t, _, _ 
in self.
_topics)
 
  355        """Retrieves next message from bag as (topic, message, timestamp).""" 
  361    def _decode_message(self, message, channel, schema):
 
  363        Returns ROS message deserialized from binary data.
 
  365        @param   message  mcap.records.Message instance
 
  366        @param   channel  mcap.records.Channel instance 
for message
 
  367        @param   schema   mcap.records.Schema instance 
for message type
 
  370        if api.ROS2 
and not issubclass(cls, types.SimpleNamespace):
 
  371            msg = api.deserialize_message(message.data, cls)
 
  373            msg = self.
_decoder.decode(schema=schema, message=message)
 
  377                api.TypeMeta.make(msg, channel.topic, self, data=message.data)
 
  379        if typekey 
not in self.
_types: self.
_types[typekey] = type(msg)
 
  383    def _make_message_class(self, schema, message, generate=True):
 
  385        Returns message type class, generating 
if not already available.
 
  387        @param   schema    mcap.records.Schema instance 
for message type
 
  388        @param   message   mcap.records.Message instance
 
  389        @param   generate  generate message 
class dynamically if not available
 
  391        typekey = (typename, typehash) = self._schematypes[schema.id] 
  392        if api.ROS2 
and typekey 
not in self.
_types:
 
  394                cls = api.get_message_class(typename)
 
  395                clshash = api.get_message_type_hash(cls)
 
  396                if typehash == clshash: self.
_types[typekey] = cls
 
  397            except Exception: 
pass   
  398        if typekey 
not in self.
_types and generate:
 
  400                msg = self.
_decoder.decode(schema=schema, message=message)
 
  403                typeclses = api.realapi.generate_message_classes(typename, schema.data.decode())
 
  404                self.
_types[typekey] = typeclses[typename]
 
  405        return self.
_types.get(typekey)
 
  408    def _patch_message_class(self, cls, typename, typehash):
 
  410        Patches MCAP ROS2 message class with expected attributes and methods, recursively.
 
  412        @param   cls       ROS message 
class as returned from mcap_ros2.decoder
 
  413        @param   typename  ROS message type name, like 
"std_msgs/Bool" 
  414        @param   typehash  ROS message type hash
 
  415        @return            patched 
class 
  417        typekey = (typename, typehash) 
  419            fields = api.parse_definition_fields(typename, self.
_typedefs[typekey])
 
  422                                             for s 
in [api.scalar(t)]
 
  423                                             if s 
not in api.ROS_BUILTIN_TYPES
 
  428        cls.
__module__ = typename.split(
"/", maxsplit=1)[0]
 
  438    def _patch_message(self, message, typename, typehash):
 
  440        Patches MCAP ROS2 message with expected attributes 
and methods, recursively.
 
  442        @param   message   ROS message instance 
as returned 
from mcap_ros2.decoder
 
  443        @param   typename  ROS message type name, like 
"std_msgs/Bool" 
  444        @param   typehash  ROS message type hash
 
  445        @return            original message patched, 
or new instance 
if ROS2 temporal type
 
  449        stack = [(message, (typename, typehash), 
None, ())]
 
  451            msg, typekey, parent, path = stack.pop(0)
 
  452            mycls, typename = type(msg), typekey[0]
 
  456                msg2 = self.
_temporal_ctors[typename](sec=msg.sec, nanosec=msg.nanosec)
 
  457                if msg 
is message: result = msg2                     
 
  458                elif len(path) == 1: setattr(parent, path[0], msg2)  
 
  459                else: getattr(parent, path[0])[path[1]] = msg2       
 
  465                v = getattr(msg, name)
 
  466                if isinstance(v, list):  
 
  467                    stack.extend((x, subtypekey, msg, (name, i)) 
for i, x 
in enumerate(v))
 
  469                    stack.append((v, subtypekey, msg, (name, )))
 
  474    def _populate_meta(self):
 
  475        """Populates bag metainfo.""" 
  476        summary = self.
_reader.get_summary()
 
  477        self.
_start_time = summary.statistics.message_start_time / 1E9
 
  478        self.
_end_time   = summary.statistics.message_end_time   / 1E9
 
  480        def make_hash(typename, msgdef, extradefs):
 
  481            """Returns MD5 hash calculated for type definition, or None on error.""" 
  482            try: 
return api.calculate_definition_hash(typename, msgdef, extradefs)
 
  483            except Exception 
as e:
 
  484                ConsolePrinter.warn(
"Error calculating message type hash for %r: %r", typename, e)
 
  488        for cid, channel 
in summary.channels.items():
 
  489            schema = summary.schemas[channel.schema_id]
 
  490            topic, typename = channel.topic, api.canonical(schema.name)
 
  492            typedef = schema.data.decode(
"utf-8")  
 
  493            subtypedefs, nesting = api.parse_definition_subtypes(typedef, nesting=
True)
 
  494            typehash = channel.metadata.get(
"md5sum") 
or \
 
  495                       make_hash(typename, typedef, tuple(subtypedefs.items()))
 
  496            topickey, typekey = (topic, typename, typehash), (typename, typehash)
 
  499            if channel.metadata.get(
"offered_qos_profiles"):
 
  500                try: qoses = yaml.safe_load(channel.metadata[
"offered_qos_profiles"])
 
  501                except Exception 
as e:
 
  502                    ConsolePrinter.warn(
"Error parsing topic QoS profiles from %r: %s.",
 
  503                                        channel.metadata[
"offered_qos_profiles"], e)
 
  505            self.
_topics.setdefault(topickey, 0)
 
  506            self.
_topics[topickey] += summary.statistics.channel_message_counts[cid]
 
  508            defhashes[typedef] = typehash
 
  509            for t, d 
in subtypedefs.items():  
 
  510                h = defhashes[d] 
if d 
in defhashes 
else make_hash(t, d, tuple(subtypedefs.items()))
 
  514            for t, subtypes 
in nesting.items():  
 
  520            if qoses: self.
_qoses[topickey] = qoses
 
  527        """Returns whether file is recognizable as MCAP format.""" 
  528        if common.is_stream(f):
 
  529            pos, _ = f.tell(), f.seek(0)
 
  531        elif os.path.isfile(f) 
and os.path.getsize(f):
 
  532            with open(f, 
"rb") 
as f:
 
  535            ext = os.path.splitext(f 
or "")[-1].lower()
 
  536            result = ext 
in McapSink.FILE_EXTENSIONS
 
  542    """Returns a shallow copy, with slots populated manually.""" 
  543    result = self.__class__.__new__(self.__class__)
 
  544    for n 
in self.__slots__:
 
  545        setattr(result, n, copy.copy(getattr(self, n)))
 
  550    """Returns a deep copy, with slots populated manually.""" 
  551    result = self.__class__.__new__(self.__class__)
 
  552    for n 
in self.__slots__:
 
  553        setattr(result, n, copy.deepcopy(getattr(self, n), memo))
 
  558    """Returns a map of message field names and types (patch method for MCAP message classes).""" 
  563    """Returns a string representation of ROS message.""" 
  564    fields = 
", ".join(
"%s=%r" % (n, getattr(self, n)) 
for n 
in self.__slots__)
 
  565    return "%s(%s)" % (self.__name__, fields)
 
  570    """Writes messages to MCAP file.""" 
  573    FILE_EXTENSIONS = (
".mcap", )
 
  576    DEFAULT_ARGS = dict(META=
False, WRITE_OPTIONS={}, VERBOSE=
False)
 
  579    def __init__(self, args=None, **kwargs):
 
  581        @param   args                 arguments 
as namespace 
or dictionary, case-insensitive;
 
  582                                      or a single path 
as the file to write
 
  583        @param   args.write           base name of MCAP files to write
 
  584        @param   args.write_options   {
"overwrite": whether to overwrite existing file
 
  586                                       "rollover-size": bytes limit 
for individual output files,
 
  587                                       "rollover-count": message limit 
for individual output files,
 
  588                                       "rollover-duration": time span limit 
for individual output files,
 
  589                                                            as ROS duration 
or convertible seconds,
 
  590                                       "rollover-template": output filename template, supporting
 
  591                                                            strftime format codes like 
"%H-%M-%S" 
  592                                                            and "%(index)s" as output file index}
 
  593        @param   args.meta            whether to 
print metainfo
 
  594        @param   args.verbose         whether to 
print debug information
 
  595        @param   kwargs               any 
and all arguments 
as keyword overrides, case-insensitive
 
  597        args = {"WRITE": str(args)} 
if isinstance(args, common.PATH_TYPES) 
else args
 
  598        args = common.ensure_namespace(args, McapSink.DEFAULT_ARGS, **kwargs)
 
  599        super(McapSink, self).
__init__(args)
 
  600        RolloverSinkMixin.__init__(self, args)
 
  613        Returns whether required libraries are available (mcap, mcap_ros1/mcap_ros2) 
  614        and overwrite 
is valid 
and file 
is writable.
 
  617        ok, mcap_ok, mcap_ros_ok = RolloverSinkMixin.validate(self), bool(mcap), bool(mcap_ros)
 
  618        if self.
args.WRITE_OPTIONS.get(
"overwrite") 
not in (
None, 
True, 
False, 
"true", 
"false"):
 
  619            ConsolePrinter.error(
"Invalid overwrite option for MCAP: %r. " 
  620                                 "Choose one of {true, false}.",
 
  621                                 self.
args.WRITE_OPTIONS[
"overwrite"])
 
  624            ConsolePrinter.error(
"mcap not available: cannot work with MCAP files.")
 
  626            ConsolePrinter.error(
"mcap_ros%s not available: cannot work with MCAP files.",
 
  627                                 api.ROS_VERSION 
or "")
 
  628        if not common.verify_io(self.
args.WRITE, 
"w"):
 
  630        self.
validvalid = ok 
and mcap_ok 
and mcap_ros_ok
 
  632            self.
_overwrite = (self.
args.WRITE_OPTIONS.get(
"overwrite") 
in (
True, 
"true"))
 
  636    def emit(self, topic, msg, stamp=None, match=None, index=None):
 
  637        """Writes out message to MCAP file.""" 
  640        RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
 
  642        kwargs = dict(publish_time=api.to_nsec(stamp), sequence=index)
 
  644            with api.TypeMeta.make(msg, topic) 
as m:
 
  647                    fullname = api.make_full_typename(m.typename)
 
  648                    self.
_schemas[typekey] = self.
_writer.register_msgdef(fullname, m.definition)
 
  649            schema, data = self.
_schemas[typekey], api.message_to_dict(msg)
 
  650            self.
_writer.write_message(topic, schema, data, **kwargs)
 
  652            self.
_writer.write_message(topic, msg, **kwargs)
 
  653        super(McapSink, self).
emit(topic, msg, stamp, match, index)
 
  657        """Closes output file if open, emits metainfo.""" 
  663            super(McapSink, self).
close()
 
  667        """Closes output file, if any.""" 
  674    def _ensure_open(self):
 
  675        """Opens output file if not already open.""" 
  676        if self.
_file: 
return 
  680        if self.
args.VERBOSE:
 
  682            action = 
"Overwriting" if sz 
and self.
_overwrite else "Creating" 
  690    """Adds MCAP support to reading and writing. Raises ImportWarning if libraries not available.""" 
  691    if not mcap 
or not mcap_ros:
 
  692        ConsolePrinter.error(
"mcap libraries not available: cannot work with MCAP files.")
 
  693        raise ImportWarning()
 
  694    from .. 
import plugins  
 
  695    plugins.add_write_format(
"mcap", McapSink, 
"MCAP", [
 
  696        (
"overwrite=true|false",  
"overwrite existing file in MCAP output\n" 
  697                                  "instead of appending unique counter (default false)"),
 
  698    ] + RolloverSinkMixin.get_write_options(
"MCAP"))
 
  699    api.BAG_EXTENSIONS += McapSink.FILE_EXTENSIONS
 
  700    api.Bag.READER_CLASSES.add(McapBag)
 
  701    api.Bag.WRITER_CLASSES.add(McapBag)
 
  704__all__ = [
"McapBag", 
"McapSink", 
"init"]
 
tuple MODES
Supported opening modes, overridden in subclasses.
 
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
 
open(self)
Opens the bag file if not already open.
 
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
 
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
 
closed
Returns whether file is closed.
 
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
 
__deepcopy__(self, memo=None)
 
Provides output file rollover by size, duration, or message count.
 
filename
Current output file path.
 
close_output(self)
Closes output file, if any.
 
validate(self)
Returns whether write options are valid, emits error if not, else populates options.
 
format_output_meta(self)
Returns output file metainfo string, with names and sizes and message/topic counts.
 
valid
Result of validate()
 
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
 
close(self)
Shuts down output, closing any files or connections.
 
MCAP bag interface, providing most of rosbag.Bag interface.
 
tuple MODES
Supported opening modes.
 
get_type_and_topic_info(self, topic_filters=None)
Returns thorough metainfo on topic and message types.
 
mode
Returns file open mode.
 
open(self)
Opens the bag file if not already open.
 
filename
Returns bag file path.
 
__init__(self, f, mode="r", **__)
Opens file and populates metadata.
 
autodetect(cls, f)
Returns whether file is recognizable as MCAP format.
 
write(self, topic, msg, t=None, raw=False, **__)
Writes out message to MCAP file.
 
get_start_time(self)
Returns the start time of the bag, as UNIX timestamp.
 
get_end_time(self)
Returns the end time of the bag, as UNIX timestamp.
 
get_message_class(self, typename, typehash=None)
Returns ROS message class for typename, or None if unknown type.
 
get_qoses(self, topic, typename)
Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.
 
get_message_type_hash(self, msg_or_type)
Returns ROS message type MD5 hash.
 
read_messages(self, topics=None, start_time=None, end_time=None, raw=False)
Yields messages from the bag, optionally filtered by topic and timestamp.
 
get_topic_info(self, *_, **__)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
 
close(self)
Closes the bag file.
 
__contains__(self, key)
Returns whether bag contains given topic.
 
closed
Returns whether file is closed.
 
size
Returns current file size.
 
topics
Returns the list of topics in bag, in alphabetic order.
 
get_fields_and_field_types
 
__next__(self)
Retrieves next message from bag as (topic, message, timestamp).
 
str MCAP_MAGIC
MCAP file header magic start bytes.
 
get_message_definition(self, msg_or_type)
Returns ROS message type definition full text from bag, including subtype definitions.
 
get_message_count(self, topic_filters=None)
Returns the number of messages in the bag.
 
Writes messages to MCAP file.
 
__init__(self, args=None, **kwargs)
 
close_output(self)
Closes output file, if any.
 
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes out message to MCAP file.
 
validate(self)
Returns whether required libraries are available (mcap, mcap_ros1/mcap_ros2) and overwrite is valid a...
 
close(self)
Closes output file if open, emits metainfo.
 
copy_with_slots(self)
Returns a shallow copy, with slots populated manually.
 
deepcopy_with_slots(self, memo)
Returns a deep copy, with slots populated manually.
 
init(*_, **__)
Adds MCAP support to reading and writing.
 
message_repr(self)
Returns a string representation of ROS message.
 
message_get_fields_and_field_types(self)
Returns a map of message field names and types (patch method for MCAP message classes).