grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
mcap.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3MCAP input and output.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 14.10.2022
11@modified 21.04.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.plugins.mcap
15from __future__ import absolute_import
16import atexit
17import copy
18import io
19import os
20import time
21import types
22
23from .. import api
24
25try: import mcap, mcap.reader
26except ImportError: mcap = None
27if "1" == os.getenv("ROS_VERSION"):
28 try: import mcap_ros1 as mcap_ros, mcap_ros1.decoder, mcap_ros1.writer
29 except ImportError: mcap_ros = None
30elif "2" == os.getenv("ROS_VERSION"):
31 try: import mcap_ros2 as mcap_ros, mcap_ros2.decoder, mcap_ros2.writer
32 except ImportError: mcap_ros = None
33else: mcap_ros = None
34import yaml
35
36from .. import common
37from .. common import ConsolePrinter
38from .. outputs import RolloverSinkMixin, Sink
39
40
41class McapBag(api.BaseBag):
42 """
43 MCAP bag interface, providing most of rosbag.Bag interface.
44
45 Bag cannot be appended to, and cannot be read and written at the same time
46 (MCAP API limitation).
47 """
48
49
50 MODES = ("r", "w")
51
52
53 MCAP_MAGIC = b"\x89MCAP\x30\r\n"
54
55 def __init__(self, f, mode="r", **__):
56 """
57 Opens file and populates metadata.
58
59 @param f bag file path, or a stream object
60 @param mode return reader if "r" or writer if "w"
61 """
62 if mode not in self.MODESMODES: raise ValueError("invalid mode %r" % mode)
63 self._mode = mode
64 self._topics = {} # {(topic, typename, typehash): message count}
65 self._types = {} # {(typename, typehash): message type class}
66 self._typedefs = {} # {(typename, typehash): type definition text}
67 self._schemas = {} # {(typename, typehash): mcap.records.Schema}
68 self._schematypes = {} # {mcap.records.Schema.id: (typename, typehash)}
69 self._qoses = {} # {(topic, typename): [{qos profile dict}]}
70 self._typefields = {} # {(typename, typehash): {field name: type name}}
71 self._type_subtypes = {} # {(typename, typehash): {typename: typehash}}
72 self._field_subtypes = {} # {(typename, typehash): {field name: (typename, typehash)}}
73 self._temporal_ctors = {} # {typename: time/duration constructor}
74 self._start_time = None # Bag start time, as UNIX timestamp
75 self._end_time = None # Bag end time, as UNIX timestamp
76 self._file = None # File handle
77 self._reader = None # mcap.McapReader
78 self._decoder = None # mcap_ros.Decoder
79 self._writer = None # mcap_ros.Writer
80 self._iterer = None # Generator from read_messages() for next()
81 self._ttinfo = None # Cached result for get_type_and_topic_info()
82 self._opened = False # Whether file has been opened at least once
83 self._filename = None # File path, or None if stream
84
85 if common.is_stream(f):
86 if not common.verify_io(f, mode):
87 raise io.UnsupportedOperation("read" if "r" == mode else "write")
88 self._file, self._filename = f, None
89 f.seek(0)
90 else:
91 if not isinstance(f, common.PATH_TYPES):
92 raise ValueError("invalid filename %r" % type(f))
93 if "w" == mode: common.makedirs(os.path.dirname(f))
94 self._filename = str(f)
95
96 if api.ROS2 and "r" == mode: self._temporal_ctors.update(
97 (t, c) for c, t in api.ROS_TIME_CLASSES.items() if api.get_message_type(c) == t
98 )
99
100 self.openopen()
101
102
103 def get_message_count(self, topic_filters=None):
104 """
105 Returns the number of messages in the bag.
106
107 @param topic_filters list of topics or a single topic to filter by, if any
108 """
109 if topic_filters:
110 topics = topic_filters
111 topics = topics if isinstance(topics, (dict, list, set, tuple)) else [topics]
112 return sum(c for (t, _, _), c in self._topics.items() if t in topics)
113 return sum(self._topics.values())
114
115
116 def get_start_time(self):
117 """Returns the start time of the bag, as UNIX timestamp."""
118 return self._start_time
119
120
121 def get_end_time(self):
122 """Returns the end time of the bag, as UNIX timestamp."""
123 return self._end_time
124
125
126 def get_message_class(self, typename, typehash=None):
127 """
128 Returns ROS message class for typename, or None if unknown type.
129
130 @param typehash message type definition hash, if any
131 """
132 typehash = typehash or next((h for t, h in self._typedefs if t == typename), None)
133 typekey = (typename, typehash)
134 if typekey not in self._types and typekey in self._typedefs:
135 if api.ROS2:
136 name = typename.split("/")[-1]
137 fields = api.parse_definition_fields(typename, self._typedefs[typekey])
138 cls = type(name, (types.SimpleNamespace, ), {
139 "__name__": name, "__slots__": list(fields),
140 "__repr__": message_repr, "__str__": message_repr
141 })
142 self._types[typekey] = self._patch_message_class(cls, typename, typehash)
143 else:
144 typeclses = api.realapi.generate_message_classes(typename, self._typedefs[typekey])
145 self._types[typekey] = typeclses[typename]
146
147 return self._types.get(typekey)
148
149
150 def get_message_definition(self, msg_or_type):
151 """Returns ROS message type definition full text from bag, including subtype definitions."""
152 if api.is_ros_message(msg_or_type):
153 return self._typedefs.get((api.get_message_type(msg_or_type),
154 api.get_message_type_hash(msg_or_type)))
155 typename = msg_or_type
156 return next((d for (n, h), d in self._typedefs.items() if n == typename), None)
157
158
159 def get_message_type_hash(self, msg_or_type):
160 """Returns ROS message type MD5 hash."""
161 typename = msg_or_type
162 if api.is_ros_message(msg_or_type): typename = api.get_message_type(msg_or_type)
163 return next((h for n, h in self._typedefs if n == typename), None)
164
165
166 def get_qoses(self, topic, typename):
167 """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available."""
168 return self._qoses.get((topic, typename))
169
170
171 def get_topic_info(self, *_, **__):
172 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
173 return dict(self._topics)
174
175
176 def get_type_and_topic_info(self, topic_filters=None):
177 """
178 Returns thorough metainfo on topic and message types.
179
180 @param topic_filters list of topics or a single topic to filter returned topics-dict by,
181 if any
182 @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
183 msg_types as dict of {typename: typehash},
184 topics as a dict of {topic: TopicTuple() namedtuple}.
185 """
186 topics = topic_filters
187 topics = topics if isinstance(topics, (list, set, tuple)) else [topics] if topics else []
188 if self._ttinfo and (not topics or set(topics) == set(t for t, _, _ in self._topics)):
189 return self._ttinfo
190 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
191
192 topics = topic_filters
193 topics = topics if isinstance(topics, (list, set, tuple)) else [topics] if topics else []
194 msgtypes = {n: h for t, n, h in self._topics}
195 topicdict = {}
196
197 def median(vals):
198 """Returns median value from given sorted numbers."""
199 vlen = len(vals)
200 return None if not vlen else vals[vlen // 2] if vlen % 2 else \
201 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
202
203 for (t, n, _), c in sorted(self._topics.items()):
204 if topics and t not in topics: continue # for
205 mymedian = None
206 if c > 1 and self._reader:
207 stamps = sorted(m.publish_time / 1E9 for _, _, m in self._reader.iter_messages([t]))
208 mymedian = median(sorted(s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])))
209 freq = 1.0 / mymedian if mymedian else None
210 topicdict[t] = self.TopicTuple(n, c, len(self._qoses.get((t, n)) or []), freq)
211 result = self.TypesAndTopicsTuple(msgtypes, topicdict)
212 if not topics or set(topics) == set(t for t, _, _ in self._topics): self._ttinfo = result
213 return result
214
215
216 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False):
217 """
218 Yields messages from the bag, optionally filtered by topic and timestamp.
219
220 @param topics list of topics or a single topic to filter by, if at all
221 @param start_time earliest timestamp of message to return, as ROS time or convertible
222 (int/float/duration/datetime/decimal)
223 @param end_time latest timestamp of message to return, as ROS time or convertible
224 (int/float/duration/datetime/decimal)
225 @param raw if true, then returned messages are tuples of
226 (typename, bytes, typehash, typeclass)
227 @return BagMessage namedtuples of
228 (topic, message, timestamp as rospy/rclpy.Time)
229 """
230 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
231 if "w" == self._mode: raise io.UnsupportedOperation("read")
232
233 topics = topics if isinstance(topics, list) else [topics] if topics else None
234 start_ns, end_ns = (api.to_nsec(api.to_time(x)) for x in (start_time, end_time))
235 for schema, channel, message in self._reader.iter_messages(topics, start_ns, end_ns):
236 if raw:
237 typekey = (typename, typehash) = self._schematypes[schema.id]
238 if typekey not in self._types:
239 self._types[typekey] = self._make_message_class(schema, message)
240 msg = (typename, message.data, typehash, self._types[typekey])
241 else: msg = self._decode_message(message, channel, schema)
242 api.TypeMeta.make(msg, channel.topic, self)
243 yield self.BagMessage(channel.topic, msg, api.make_time(nsecs=message.publish_time))
244 if self.closedclosedclosedclosed: break # for schema
245
246
247 def write(self, topic, msg, t=None, raw=False, **__):
248 """
249 Writes out message to MCAP file.
250
251 @param topic name of topic
252 @param msg ROS1 message
253 @param t message timestamp if not using current ROS time,
254 as ROS time type or convertible (int/float/duration/datetime/decimal)
255 @param raw if true, `msg` is in raw format, (typename, bytes, typehash, typeclass)
256 """
257 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
258 if "r" == self._mode: raise io.UnsupportedOperation("write")
259
260 if raw:
261 typename, binary, typehash = msg[:3]
262 cls = msg[-1]
263 typedef = self._typedefs.get((typename, typehash)) or api.get_message_definition(cls)
264 msg = api.deserialize_message(binary, cls)
265 else:
266 with api.TypeMeta.make(msg, topic) as meta:
267 typename, typehash, typedef = meta.typename, meta.typehash, meta.definition
268 topickey, typekey = (topic, typename, typehash), (typename, typehash)
269
270 nanosec = (time.time_ns() if hasattr(time, "time_ns") else int(time.time() * 10**9)) \
271 if t is None else api.to_nsec(api.to_time(t))
272 if api.ROS2:
273 if typekey not in self._schemas:
274 fullname = api.make_full_typename(typename)
275 schema = self._writer.register_msgdef(fullname, typedef)
276 self._schemas[typekey] = schema
277 schema, data = self._schemas[typekey], api.message_to_dict(msg)
278 self._writer.write_message(topic, schema, data, publish_time=nanosec)
279 else:
280 self._writer.write_message(topic, msg, publish_time=nanosec)
281
282 sec = nanosec / 1E9
283 self._start_time = sec if self._start_time is None else min(sec, self._start_time)
284 self._end_time = sec if self._end_time is None else min(sec, self._end_time)
285 self._topics[topickey] = self._topics.get(topickey, 0) + 1
286 self._types.setdefault(typekey, type(msg))
287 self._typedefs.setdefault(typekey, typedef)
288 self._ttinfo = None
289
290
291 def open(self):
292 """Opens the bag file if not already open."""
293 if self._reader is not None or self._writer is not None: return
294 if self._opened and "w" == self._mode:
295 raise io.UnsupportedOperation("Cannot reopen bag for writing.")
296
297 if self._file is None: self._file = open(self._filename, "%sb" % self._mode)
298 self._reader = mcap.reader.make_reader(self._file) if "r" == self._mode else None
299 self._decoder = mcap_ros.decoder.Decoder() if "r" == self._mode else None
300 self._writer = mcap_ros.writer.Writer(self._file) if "w" == self._mode else None
301 try: "r" == self._mode and self._populate_meta()
302 except Exception as e:
303 raise Exception("Error reading MCAP metadata: %r" % e)
304 self._opened = True
305
306
307 def close(self):
308 """Closes the bag file."""
309 if self._file is not None:
310 if self._writer: self._writer.finish()
312 self._file, self._reader, self._writer, self._iterer = None, None, None, None
313
314
315 @property
316 def closed(self):
317 """Returns whether file is closed."""
318 return self._file is None
319
320
321 @property
322 def topics(self):
323 """Returns the list of topics in bag, in alphabetic order."""
324 return sorted((t for t, _, _ in self._topics), key=str.lower)
325
326
327 @property
328 def filename(self):
329 """Returns bag file path."""
330 return self._filename
331
332
333 @property
334 def size(self):
335 """Returns current file size."""
336 if self._filename is None and self._file is not None:
337 pos, _ = self._file.tell(), self._file.seek(0, os.SEEK_END)
338 size, _ = self._file.tell(), self._file.seek(pos)
339 return size
340 return os.path.getsize(self._filename) if os.path.isfile(self._filename) else None
341
342
343 @property
344 def mode(self):
345 """Returns file open mode."""
346 return self._mode
347
348
349 def __contains__(self, key):
350 """Returns whether bag contains given topic."""
351 return any(key == t for t, _, _ in self._topics)
352
353
354 def __next__(self):
355 """Retrieves next message from bag as (topic, message, timestamp)."""
356 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
357 if self._iterer is None: self._iterer = self.read_messagesread_messages()
358 return next(self._iterer)
359
360
361 def _decode_message(self, message, channel, schema):
362 """
363 Returns ROS message deserialized from binary data.
364
365 @param message mcap.records.Message instance
366 @param channel mcap.records.Channel instance for message
367 @param schema mcap.records.Schema instance for message type
368 """
369 cls = self._make_message_class(schema, message, generate=False)
370 if api.ROS2 and not issubclass(cls, types.SimpleNamespace):
371 msg = api.deserialize_message(message.data, cls)
372 else:
373 msg = self._decoder.decode(schema=schema, message=message)
374 if api.ROS2: # MCAP ROS2 message classes need monkey-patching with expected API
375 msg = self._patch_message(msg, *self._schematypes[schema.id])
376 # Register serialized binary, as MCAP does not support serializing its own creations
377 api.TypeMeta.make(msg, channel.topic, self, data=message.data)
378 typekey = self._schematypes[schema.id]
379 if typekey not in self._types: self._types[typekey] = type(msg)
380 return msg
381
382
383 def _make_message_class(self, schema, message, generate=True):
384 """
385 Returns message type class, generating if not already available.
386
387 @param schema mcap.records.Schema instance for message type
388 @param message mcap.records.Message instance
389 @param generate generate message class dynamically if not available
390 """
391 typekey = (typename, typehash) = self._schematypes[schema.id]
392 if api.ROS2 and typekey not in self._types:
393 try: # Try loading class from disk for full compatibility
394 cls = api.get_message_class(typename)
395 clshash = api.get_message_type_hash(cls)
396 if typehash == clshash: self._types[typekey] = cls
397 except Exception: pass # ModuleNotFoundError, AttributeError etc
398 if typekey not in self._types and generate:
399 if api.ROS2: # MCAP ROS2 message classes need monkey-patching with expected API
400 msg = self._decoder.decode(schema=schema, message=message)
401 self._types[typekey] = self._patch_message_class(type(msg), typename, typehash)
402 else:
403 typeclses = api.realapi.generate_message_classes(typename, schema.data.decode())
404 self._types[typekey] = typeclses[typename]
405 return self._types.get(typekey)
406
407
408 def _patch_message_class(self, cls, typename, typehash):
409 """
410 Patches MCAP ROS2 message class with expected attributes and methods, recursively.
411
412 @param cls ROS message class as returned from mcap_ros2.decoder
413 @param typename ROS message type name, like "std_msgs/Bool"
414 @param typehash ROS message type hash
415 @return patched class
416 """
417 typekey = (typename, typehash)
418 if typekey not in self._typefields:
419 fields = api.parse_definition_fields(typename, self._typedefs[typekey])
420 self._typefields[typekey] = fields
421 self._field_subtypes[typekey] = {n: (s, h) for n, t in fields.items()
422 for s in [api.scalar(t)]
423 if s not in api.ROS_BUILTIN_TYPES
424 for h in [self._type_subtypes[typekey][s]]}
425
426 # mcap_ros2 creates a dynamic class for each message, having __slots__
427 # but no other ROS2 API hooks; even the class module is "mcap_ros2.dynamic".
428 cls.__module__ = typename.split("/", maxsplit=1)[0]
429 cls.SLOT_TYPES = () # rosidl_runtime_py.utilities.is_message() checks for presence
430 cls._fields_and_field_types = dict(self._typefields[typekey])
431 cls.get_fields_and_field_types = message_get_fields_and_field_types
432 cls.__copy____copy__ = copy_with_slots # MCAP message classes lack support for copy
433 cls.__deepcopy____deepcopy__ = deepcopy_with_slots
434
435 return cls
436
437
438 def _patch_message(self, message, typename, typehash):
439 """
440 Patches MCAP ROS2 message with expected attributes and methods, recursively.
441
442 @param message ROS message instance as returned from mcap_ros2.decoder
443 @param typename ROS message type name, like "std_msgs/Bool"
444 @param typehash ROS message type hash
445 @return original message patched, or new instance if ROS2 temporal type
446 """
447 result = message
448 # [(field value, (field type name, field type hash), parent, (field name, ?array index))]
449 stack = [(message, (typename, typehash), None, ())]
450 while stack:
451 msg, typekey, parent, path = stack.pop(0)
452 mycls, typename = type(msg), typekey[0]
453
454 if typename in self._temporal_ctors:
455 # Convert temporal types to ROS2 temporal types for grepros to recognize
456 msg2 = self._temporal_ctors[typename](sec=msg.sec, nanosec=msg.nanosec)
457 if msg is message: result = msg2 # Replace input message
458 elif len(path) == 1: setattr(parent, path[0], msg2) # Set scalar field
459 else: getattr(parent, path[0])[path[1]] = msg2 # Set array field element
460 continue # while stack
461
462 self._patch_message_class(mycls, *typekey)
463
464 for name, subtypekey in self._field_subtypes[typekey].items():
465 v = getattr(msg, name)
466 if isinstance(v, list): # Queue each array element for patching
467 stack.extend((x, subtypekey, msg, (name, i)) for i, x in enumerate(v))
468 else: # Queue scalar field for patching
469 stack.append((v, subtypekey, msg, (name, )))
470
471 return result
472
473
474 def _populate_meta(self):
475 """Populates bag metainfo."""
476 summary = self._reader.get_summary()
477 self._start_time = summary.statistics.message_start_time / 1E9
478 self._end_time = summary.statistics.message_end_time / 1E9
479
480 def make_hash(typename, msgdef, extradefs):
481 """Returns MD5 hash calculated for type definition, or None on error."""
482 try: return api.calculate_definition_hash(typename, msgdef, extradefs)
483 except Exception as e:
484 ConsolePrinter.warn("Error calculating message type hash for %r: %r", typename, e)
485 return None
486
487 defhashes = {} # Cached {type definition full text: type hash}
488 for cid, channel in summary.channels.items():
489 schema = summary.schemas[channel.schema_id]
490 topic, typename = channel.topic, api.canonical(schema.name)
491
492 typedef = schema.data.decode("utf-8") # Full definition including subtype definitions
493 subtypedefs, nesting = api.parse_definition_subtypes(typedef, nesting=True)
494 typehash = channel.metadata.get("md5sum") or \
495 make_hash(typename, typedef, tuple(subtypedefs.items()))
496 topickey, typekey = (topic, typename, typehash), (typename, typehash)
497
498 qoses = None
499 if channel.metadata.get("offered_qos_profiles"):
500 try: qoses = yaml.safe_load(channel.metadata["offered_qos_profiles"])
501 except Exception as e:
502 ConsolePrinter.warn("Error parsing topic QoS profiles from %r: %s.",
503 channel.metadata["offered_qos_profiles"], e)
504
505 self._topics.setdefault(topickey, 0)
506 self._topics[topickey] += summary.statistics.channel_message_counts[cid]
507 self._typedefs[typekey] = typedef
508 defhashes[typedef] = typehash
509 for t, d in subtypedefs.items(): # Populate subtype definitions and hashes
510 h = defhashes[d] if d in defhashes else make_hash(t, d, tuple(subtypedefs.items()))
511 self._typedefs.setdefault((t, h), d)
512 self._type_subtypes.setdefault(typekey, {})[t] = h
513 defhashes[d] = h
514 for t, subtypes in nesting.items(): # Populate all nested type references
515 h = self._type_subtypes[typekey][t]
516 for t2 in subtypes:
517 h2 = self._type_subtypes[typekey][t2]
518 self._type_subtypes.setdefault((t, h), {})[t2] = h2
519
520 if qoses: self._qoses[topickey] = qoses
521 self._schemas[typekey] = schema
522 self._schematypes[schema.id] = typekey
523
524
525 @classmethod
526 def autodetect(cls, f):
527 """Returns whether file is recognizable as MCAP format."""
528 if common.is_stream(f):
529 pos, _ = f.tell(), f.seek(0)
530 result, _ = (f.read(len(cls.MCAP_MAGIC)) == cls.MCAP_MAGIC), f.seek(pos)
531 elif os.path.isfile(f) and os.path.getsize(f):
532 with open(f, "rb") as f:
533 result = (f.read(len(cls.MCAP_MAGIC)) == cls.MCAP_MAGIC)
534 else:
535 ext = os.path.splitext(f or "")[-1].lower()
536 result = ext in McapSink.FILE_EXTENSIONS
537 return result
538
539
540
541def copy_with_slots(self):
542 """Returns a shallow copy, with slots populated manually."""
543 result = self.__class__.__new__(self.__class__)
544 for n in self.__slots__:
545 setattr(result, n, copy.copy(getattr(self, n)))
546 return result
547
549def deepcopy_with_slots(self, memo):
550 """Returns a deep copy, with slots populated manually."""
551 result = self.__class__.__new__(self.__class__)
552 for n in self.__slots__:
553 setattr(result, n, copy.deepcopy(getattr(self, n), memo))
554 return result
555
556
558 """Returns a map of message field names and types (patch method for MCAP message classes)."""
559 return self._fields_and_field_types.copy()
560
561
562def message_repr(self):
563 """Returns a string representation of ROS message."""
564 fields = ", ".join("%s=%r" % (n, getattr(self, n)) for n in self.__slots__)
565 return "%s(%s)" % (self.__name__, fields)
566
567
568
570 """Writes messages to MCAP file."""
572
573 FILE_EXTENSIONS = (".mcap", )
574
575
576 DEFAULT_ARGS = dict(META=False, WRITE_OPTIONS={}, VERBOSE=False)
577
578
579 def __init__(self, args=None, **kwargs):
580 """
581 @param args arguments as namespace or dictionary, case-insensitive;
582 or a single path as the file to write
583 @param args.write base name of MCAP files to write
584 @param args.write_options {"overwrite": whether to overwrite existing file
585 (default false),
586 "rollover-size": bytes limit for individual output files,
587 "rollover-count": message limit for individual output files,
588 "rollover-duration": time span limit for individual output files,
589 as ROS duration or convertible seconds,
590 "rollover-template": output filename template, supporting
591 strftime format codes like "%H-%M-%S"
592 and "%(index)s" as output file index}
593 @param args.meta whether to print metainfo
594 @param args.verbose whether to print debug information
595 @param kwargs any and all arguments as keyword overrides, case-insensitive
596 """
597 args = {"WRITE": str(args)} if isinstance(args, common.PATH_TYPES) else args
598 args = common.ensure_namespace(args, McapSink.DEFAULT_ARGS, **kwargs)
599 super(McapSink, self).__init__(args)
600 RolloverSinkMixin.__init__(self, args)
601
602 self._file = None # Open file() object
603 self._writer = None # mcap_ros.writer.Writer object
604 self._schemas = {} # {(typename, typehash): mcap.records.Schema}
605 self._overwrite = None
606 self._close_printed = False
607
608 atexit.register(self.closecloseclose)
609
610
611 def validate(self):
612 """
613 Returns whether required libraries are available (mcap, mcap_ros1/mcap_ros2)
614 and overwrite is valid and file is writable.
615 """
616 if self.validvalid is not None: return self.validvalid
617 ok, mcap_ok, mcap_ros_ok = RolloverSinkMixin.validate(self), bool(mcap), bool(mcap_ros)
618 if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
619 ConsolePrinter.error("Invalid overwrite option for MCAP: %r. "
620 "Choose one of {true, false}.",
621 self.args.WRITE_OPTIONS["overwrite"])
622 ok = False
623 if not mcap_ok:
624 ConsolePrinter.error("mcap not available: cannot work with MCAP files.")
625 if not mcap_ros_ok:
626 ConsolePrinter.error("mcap_ros%s not available: cannot work with MCAP files.",
627 api.ROS_VERSION or "")
628 if not common.verify_io(self.args.WRITE, "w"):
629 ok = False
630 self.validvalid = ok and mcap_ok and mcap_ros_ok
631 if self.validvalid:
632 self._overwrite = (self.args.WRITE_OPTIONS.get("overwrite") in (True, "true"))
633 return self.validvalid
634
635
636 def emit(self, topic, msg, stamp=None, match=None, index=None):
637 """Writes out message to MCAP file."""
638 if not self.validatevalidatevalidate(): raise Exception("invalid")
639 stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
640 RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
641 self._ensure_open()
642 kwargs = dict(publish_time=api.to_nsec(stamp), sequence=index)
643 if api.ROS2:
644 with api.TypeMeta.make(msg, topic) as m:
645 typekey = m.typekey
646 if typekey not in self._schemas:
647 fullname = api.make_full_typename(m.typename)
648 self._schemas[typekey] = self._writer.register_msgdef(fullname, m.definition)
649 schema, data = self._schemas[typekey], api.message_to_dict(msg)
650 self._writer.write_message(topic, schema, data, **kwargs)
651 else:
652 self._writer.write_message(topic, msg, **kwargs)
653 super(McapSink, self).emit(topic, msg, stamp, match, index)
654
655
656 def close(self):
657 """Closes output file if open, emits metainfo."""
658 try: self.close_outputclose_output()
659 finally:
660 if not self._close_printed and self._counts:
661 self._close_printed = True
662 ConsolePrinter.debug("Wrote MCAP for %s", self.format_output_meta())
663 super(McapSink, self).close()
664
665
666 def close_output(self):
667 """Closes output file, if any."""
668 if self._writer:
669 self._writer.finish()
670 self._file.close()
671 self._file, self._writer = None, None
672
673
674 def _ensure_open(self):
675 """Opens output file if not already open."""
676 if self._file: return
677
678 self.filenamefilename = self.filenamefilename or RolloverSinkMixin.make_filename(self)
679 common.makedirs(os.path.dirname(self.filenamefilename))
680 if self.args.VERBOSE:
681 sz = os.path.exists(self.filenamefilename) and os.path.getsize(self.filenamefilename)
682 action = "Overwriting" if sz and self._overwrite else "Creating"
683 ConsolePrinter.debug("%s MCAP output %s.", action, self.filenamefilename)
684 self._file = open(self.filenamefilename, "wb")
685 self._writer = mcap_ros.writer.Writer(self._file)
686 self._close_printed = False
687
688
689def init(*_, **__):
690 """Adds MCAP support to reading and writing. Raises ImportWarning if libraries not available."""
691 if not mcap or not mcap_ros:
692 ConsolePrinter.error("mcap libraries not available: cannot work with MCAP files.")
693 raise ImportWarning()
694 from .. import plugins # Late import to avoid circular
695 plugins.add_write_format("mcap", McapSink, "MCAP", [
696 ("overwrite=true|false", "overwrite existing file in MCAP output\n"
697 "instead of appending unique counter (default false)"),
698 ] + RolloverSinkMixin.get_write_options("MCAP"))
699 api.BAG_EXTENSIONS += McapSink.FILE_EXTENSIONS
700 api.Bag.READER_CLASSES.add(McapBag)
701 api.Bag.WRITER_CLASSES.add(McapBag)
702
704__all__ = ["McapBag", "McapSink", "init"]
ROS bag interface.
Definition api.py:98
tuple MODES
Supported opening modes, overridden in subclasses.
Definition api.py:112
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
Definition api.py:101
open(self)
Opens the bag file if not already open.
Definition api.py:307
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
Definition api.py:109
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
Definition api.py:291
closed
Returns whether file is closed.
Definition api.py:348
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
Definition api.py:105
__deepcopy__(self, memo=None)
Definition api.py:149
Provides output file rollover by size, duration, or message count.
Definition outputs.py:353
filename
Current output file path.
Definition outputs.py:403
close_output(self)
Closes output file, if any.
Definition outputs.py:476
validate(self)
Returns whether write options are valid, emits error if not, else populates options.
Definition outputs.py:406
format_output_meta(self)
Returns output file metainfo string, with names and sizes and message/topic counts.
Definition outputs.py:493
Output base class.
Definition outputs.py:32
valid
Result of validate()
Definition outputs.py:53
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
Definition outputs.py:99
close(self)
Shuts down output, closing any files or connections.
Definition outputs.py:106
MCAP bag interface, providing most of rosbag.Bag interface.
Definition mcap.py:47
tuple MODES
Supported opening modes.
Definition mcap.py:50
get_type_and_topic_info(self, topic_filters=None)
Returns thorough metainfo on topic and message types.
Definition mcap.py:184
mode
Returns file open mode.
Definition mcap.py:357
open(self)
Opens the bag file if not already open.
Definition mcap.py:295
filename
Returns bag file path.
Definition mcap.py:337
__init__(self, f, mode="r", **__)
Opens file and populates metadata.
Definition mcap.py:61
autodetect(cls, f)
Returns whether file is recognizable as MCAP format.
Definition mcap.py:548
write(self, topic, msg, t=None, raw=False, **__)
Writes out message to MCAP file.
Definition mcap.py:260
get_start_time(self)
Returns the start time of the bag, as UNIX timestamp.
Definition mcap.py:116
get_end_time(self)
Returns the end time of the bag, as UNIX timestamp.
Definition mcap.py:121
get_message_class(self, typename, typehash=None)
Returns ROS message class for typename, or None if unknown type.
Definition mcap.py:131
get_qoses(self, topic, typename)
Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.
Definition mcap.py:166
get_message_type_hash(self, msg_or_type)
Returns ROS message type MD5 hash.
Definition mcap.py:159
read_messages(self, topics=None, start_time=None, end_time=None, raw=False)
Yields messages from the bag, optionally filtered by topic and timestamp.
Definition mcap.py:233
get_topic_info(self, *_, **__)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
Definition mcap.py:171
close(self)
Closes the bag file.
Definition mcap.py:311
__contains__(self, key)
Returns whether bag contains given topic.
Definition mcap.py:363
closed
Returns whether file is closed.
Definition mcap.py:321
size
Returns current file size.
Definition mcap.py:345
topics
Returns the list of topics in bag, in alphabetic order.
Definition mcap.py:329
__next__(self)
Retrieves next message from bag as (topic, message, timestamp).
Definition mcap.py:368
str MCAP_MAGIC
MCAP file header magic start bytes.
Definition mcap.py:53
get_message_definition(self, msg_or_type)
Returns ROS message type definition full text from bag, including subtype definitions.
Definition mcap.py:150
get_message_count(self, topic_filters=None)
Returns the number of messages in the bag.
Definition mcap.py:108
Writes messages to MCAP file.
Definition mcap.py:591
__init__(self, args=None, **kwargs)
Definition mcap.py:619
close_output(self)
Closes output file, if any.
Definition mcap.py:689
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes out message to MCAP file.
Definition mcap.py:659
validate(self)
Returns whether required libraries are available (mcap, mcap_ros1/mcap_ros2) and overwrite is valid a...
Definition mcap.py:638
close(self)
Closes output file if open, emits metainfo.
Definition mcap.py:679
copy_with_slots(self)
Returns a shallow copy, with slots populated manually.
Definition mcap.py:563
deepcopy_with_slots(self, memo)
Returns a deep copy, with slots populated manually.
Definition mcap.py:571
init(*_, **__)
Adds MCAP support to reading and writing.
Definition mcap.py:714
message_repr(self)
Returns a string representation of ROS message.
Definition mcap.py:584
message_get_fields_and_field_types(self)
Returns a map of message field names and types (patch method for MCAP message classes).
Definition mcap.py:579