grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
api.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3ROS interface, shared facade for ROS1 and ROS2.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS1 bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 01.11.2021
11@modified 22.04.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.api
15import abc
16import collections
17import datetime
18import decimal
19import hashlib
20import inspect
21import os
22import re
23import sys
24import time
25
26import six
27
28from . common import ConsolePrinter, LenIterable, format_bytes, memoize
29#from . import ros1, ros2 # Imported conditionally
30
31
32
33NODE_NAME = "grepros"
34
35
36BAG_EXTENSIONS = ()
37
38
39SKIP_EXTENSIONS = ()
40
41
42ROS1 = None
43
44
45ROS2 = None
46
47
48ROS_VERSION = None
49
50
51ROS_FAMILY = None
52
53
54ROS_NUMERIC_TYPES = ["byte", "char", "int8", "int16", "int32", "int64", "uint8",
55 "uint16", "uint32", "uint64", "float32", "float64", "bool"]
56
57
58ROS_STRING_TYPES = ["string", "wstring"]
59
60
61ROS_BUILTIN_TYPES = ROS_NUMERIC_TYPES + ROS_STRING_TYPES
62
63
64ROS_BUILTIN_CTORS = {"byte": int, "char": int, "int8": int, "int16": int,
65 "int32": int, "int64": int, "uint8": int, "uint16": int,
66 "uint32": int, "uint64": int, "float32": float, "float64": float,
67 "bool": bool, "string": str, "wstring": str}
68
69
70ROS_TIME_TYPES = []
71
72
73ROS_TIME_CLASSES = {}
74
75
76ROS_COMMON_TYPES = []
77
78
79ROS_ALIAS_TYPES = {}
80
81
82realapi = None
83
84
85class BaseBag(object):
86 """
87 ROS bag interface.
88
89 %Bag can be used a context manager, is an iterable providing (topic, message, timestamp) tuples
90 and supporting `len(bag)`; and supports topic-based membership
91 (`if mytopic in bag`, `for t, m, s in bag[mytopic]`, `len(bag[mytopic])`).
92
93 Extra methods and properties compared with rosbag.Bag: Bag.get_message_class(),
94 Bag.get_message_definition(), Bag.get_message_type_hash(), Bag.get_topic_info();
95 Bag.closed and Bag.topics.
96 """
97
99 BagMessage = collections.namedtuple("BagMessage", "topic message timestamp")
100
103 TopicTuple = collections.namedtuple("TopicTuple", ["msg_type", "message_count",
104 "connections", "frequency"])
106
107 TypesAndTopicsTuple = collections.namedtuple("TypesAndTopicsTuple", ["msg_types", "topics"])
108
110 MODES = ("r", "w", "a")
111
113 STREAMABLE = True
114
115 def __iter__(self):
116 """Iterates over all messages in the bag."""
117 return self.read_messages()
118
119 def __enter__(self):
120 """Context manager entry, opens bag if not open."""
121 self.open()
122 return self
123
124 def __exit__(self, exc_type, exc_value, traceback):
125 """Context manager exit, closes bag."""
126 self.close()
127
128 def __len__(self):
129 """Returns the number of messages in the bag."""
130 return self.get_message_count()
131
132 def __next__(self):
133 """Retrieves next message from bag as (topic, message, timestamp)."""
134 raise NotImplementedError
135 if sys.version_info < (3, ): next = __next__
136
137 def __nonzero__(self): return True # Iterables by default use len() for bool() [Py2]
138
139 def __bool__ (self): return True # Iterables by default use len() for bool() [Py3]
140
141 def __contains__(self, key):
142 """Returns whether bag contains given topic."""
143 raise NotImplementedError
144
145 def __copy__(self): return self
146
147 def __deepcopy__(self, memo=None): return self
148
149 def __getitem__(self, key):
150 """Returns an iterator yielding messages from the bag in given topic, supporting len()."""
151 if key not in self: return LenIterable([], 0)
152 get_count = lambda: sum(c for (t, _, _), c in self.get_topic_info().items() if t == key)
153 return LenIterable(self.read_messages(key), get_count)
154
155 def __str__(self):
156 """Returns informative text for bag, with a full overview of topics and types."""
158 indent = lambda s, n: ("\n%s" % (" " * n)).join(s.splitlines())
159 # Returns UNIX timestamp as "Dec 22 2021 23:13:44.44"
160 fmttime = lambda x: datetime.datetime.fromtimestamp(x).strftime("%b %d %Y %H:%M:%S.%f")[:-4]
161 def fmtdur(secs):
162 """Returns duration seconds as text like "1hr 1:12s (3672s)" or "51.8s"."""
163 result = ""
164 hh, rem = divmod(secs, 3600)
165 mm, ss = divmod(rem, 60)
166 if hh: result += "%dhr " % hh
167 if mm: result += "%d:" % mm
168 result += "%ds" % ss
169 if hh or mm: result += " (%ds)" % secs
170 return result
171
172 entries = {}
173 counts = self.get_topic_info()
174 start, end = self.get_start_time(), self.get_end_time()
175
176 entries["path"] = self.filenamefilename or "<stream>"
177 if None not in (start, end):
178 entries["duration"] = fmtdur(end - start)
179 entries["start"] = "%s (%.2f)" % (fmttime(start), start)
180 entries["end"] = "%s (%.2f)" % (fmttime(end), end)
181 entries["size"] = format_bytes(self.sizesize)
182 if any(counts.values()):
183 entries["messages"] = str(sum(counts.values()))
184 if counts:
185 nhs = sorted(set((n, h) for _, n, h in counts))
186 namew = max(len(n) for n, _ in nhs)
187 # "pkg/Msg <PAD>[typehash]"
188 entries["types"] = "\n".join("%s%s [%s]" % (n, " " * (namew - len(n)), h) for n, h in nhs)
189 if counts:
190 topicw = max(len(t) for t, _, _ in counts)
191 typew = max(len(n) for _, n, _ in counts)
192 countw = max(len(str(c)) for c in counts.values())
193 lines = []
194 for (t, n, _), c in sorted(counts.items()):
195 qq = self.get_qoses(t, n) or []
196 # "/my/topic<PAD>"
197 line = "%s%s" % (t, " " * (topicw - len(t)))
198 # " <PAD>13 msgs" or " <PAD>1 msg "
199 line += " %s%s msg%s" % (" " * (countw - len(str(c))), c, " " if 1 == c else "s")
200 # " : pkg/Msg"
201 line += " : %s" % n
202 # "<PAD> (2 connections)" if >1 connections
203 line += "%s (%s connections)" % (" " * (typew - len(n)), len(qq)) if len(qq) > 1 else ""
204 lines.append(line)
205 entries["topics"] = "\n".join(lines)
206
207 labelw = max(map(len, entries))
208 return "\n".join("%s:%s %s" % (k, " " * (labelw - len(k)), indent(v, labelw + 2))
209 for k, v in entries.items())
210
211 def get_message_count(self, topic_filters=None):
212 """
213 Returns the number of messages in the bag.
214
215 @param topic_filters list of topics or a single topic to filter by, if any
216 """
217 raise NotImplementedError
219 def get_start_time(self):
220 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
221 raise NotImplementedError
222
223 def get_end_time(self):
224 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
225 raise NotImplementedError
226
227 def get_topic_info(self, counts=True):
228 """
229 Returns topic and message type metainfo as {(topic, typename, typehash): count}.
230
231 @param counts if false, counts may be returned as None if lookup is costly
232 """
233 raise NotImplementedError
235 def get_type_and_topic_info(self, topic_filters=None):
236 """
237 Returns thorough metainfo on topic and message types.
238
239 @param topic_filters list of topics or a single topic to filter returned topics-dict by,
240 if any
241 @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
242 msg_types as dict of {typename: typehash},
243 topics as a dict of {topic: TopicTuple() namedtuple}.
244 """
245 raise NotImplementedError
246
247 def get_qoses(self, topic, typename):
248 """
249 Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.
250
251 Functional only in ROS2.
252 """
253 return None
254
255 def get_message_class(self, typename, typehash=None):
256 """Returns ROS message type class, or None if unknown message type for bag."""
257 return None
258
259 def get_message_definition(self, msg_or_type):
260 """
261 Returns ROS message type definition full text, including subtype definitions.
262
263 Returns None if unknown message type for bag.
264 """
265 return None
266
267 def get_message_type_hash(self, msg_or_type):
268 """Returns ROS message type MD5 hash, or None if unknown message type for bag."""
269 return None
270
271 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__):
272 """
273 Yields messages from the bag, optionally filtered by topic and timestamp.
274
275 @param topics list of topics or a single topic to filter by, if any
276 @param start_time earliest timestamp of message to return, as ROS time or convertible
277 (int/float/duration/datetime/decimal)
278 @param end_time latest timestamp of message to return, as ROS time
279 (int/float/duration/datetime/decimal)
280 @param raw if true, then returned messages are tuples of
281 (typename, bytes, typehash, typeclass)
282 or (typename, bytes, typehash, position, typeclass),
283 depending on file format
284 @return BagMessage namedtuples of (topic, message, timestamp as ROS time)
285 """
286 raise NotImplementedError
287
288 def write(self, topic, msg, t=None, raw=False, **kwargs):
289 """
290 Writes a message to the bag.
292 @param topic name of topic
293 @param msg ROS message
294 @param t message timestamp if not using current wall time, as ROS time
295 or convertible (int/float/duration/datetime/decimal)
296 @param raw if true, `msg` is in raw format, (typename, bytes, typehash, typeclass)
297 @param kwargs ROS version-specific arguments,
298 like `connection_header` for ROS1 or `qoses` for ROS2
299 """
300 raise NotImplementedError
301
302 def open(self):
303 """Opens the bag file if not already open."""
304 raise NotImplementedError
305
306 def close(self):
307 """Closes the bag file."""
308 raise NotImplementedError
309
310 def flush(self):
311 """Ensures all changes are written to bag file."""
312
313 @property
314 def topics(self):
315 """Returns the list of topics in bag, in alphabetic order."""
316 raise NotImplementedError
317
318 @property
319 def filename(self):
320 """Returns bag file path."""
321 raise NotImplementedError
322
323 @property
324 def size(self):
325 """Returns current file size in bytes."""
326 raise NotImplementedError
328 @property
329 def mode(self):
330 """Returns file open mode."""
331 raise NotImplementedError
332
333 @property
334 def closed(self):
335 """Returns whether file is closed."""
336 raise NotImplementedError
337
338 @property
339 def stop_on_error(self):
340 """Whether raising read error on unknown message type (ROS2 SQLite .db3 specific)."""
341 return getattr(self, "_stop_on_error", None)
342
343 @stop_on_error.setter
344 def stop_on_error(self, flag):
345 """Sets whether to raise read error on unknown message type (ROS2 SQLite .db3 specific)."""
346 setattr(self, "_stop_on_error", flag)
347
349@six.add_metaclass(abc.ABCMeta)
350class Bag(BaseBag):
351 """
352 %Bag factory metaclass.
353
354 Result is a format-specific class instance, auto-detected from file extension or content:
355 an extended rosbag.Bag for ROS1 bags, otherwise an object with a conforming interface.
356
357 E.g. {@link grepros.plugins.mcap.McapBag McapBag} if {@link grepros.plugins.mcap mcap}
358 plugin loaded and file recognized as MCAP format.
359
360 User plugins can add their own format support to READER_CLASSES and WRITER_CLASSES.
361 Classes can have a static/class method `autodetect(filename)`
362 returning whether given file is in recognizable format for the plugin class.
363 """
364
365
366 READER_CLASSES = set()
367
368
369 WRITER_CLASSES = set()
370
371 def __new__(cls, f, mode="r", reindex=False, progress=False, **kwargs):
372 """
373 Returns an object for reading or writing ROS bags.
374
375 Suitable Bag class is auto-detected by file extension or content.
376
377 @param f bag file path, or a stream object
378 (streams not supported for ROS2 .db3 SQLite bags)
379 @param mode return reader if "r" else writer
380 @param reindex reindex unindexed bag (ROS1 only), making a backup if indexed format
381 @param progress show progress bar with reindexing status
382 @param kwargs additional keyword arguments for format-specific Bag constructor,
383 like `compression` for ROS1 bag
384 """
385 classes, errors = set(cls.READER_CLASSES if "r" == mode else cls.WRITER_CLASSES), []
386 for detect, bagcls in ((d, c) for d in (True, False) for c in list(classes)):
387 use, discard = not detect, False # Try auto-detecting suitable class first
388 try:
389 if detect and callable(getattr(bagcls, "autodetect", None)):
390 use, discard = bagcls.autodetect(f), True
391 if use:
392 return bagcls(f, mode, reindex=reindex, progress=progress, **kwargs)
393 except Exception as e:
394 discard = True
395 errors.append("Failed to open %r for %s with %s: %s." %
396 (f, "reading" if "r" == mode else "writing", bagcls, e))
397 discard and classes.discard(bagcls)
398 for err in errors: ConsolePrinter.warn(err)
399 raise Exception("No suitable %s class available" % ("reader" if "r" == mode else "writer"))
400
402 @classmethod
403 def autodetect(cls, f):
404 """Returns registered bag class auto-detected from file, or None."""
405 for bagcls in cls.READER_CLASSES | cls.WRITER_CLASSES:
406 if callable(vars(bagcls).get("autodetect")) and bagcls.autodetect(f):
407 return bagcls
408 return None
409
410
411 @classmethod
412 def __subclasshook__(cls, C):
413 return True if issubclass(C, BaseBag) else NotImplemented
414
415
416class TypeMeta(object):
417 """
418 Container for caching and retrieving message type metadata.
419
420 All property values are lazy-loaded upon request.
421 """
422
423
424 LIFETIME = 2
425
426
427 POPULATION = 0
428
430 _CACHE = {}
431
432
433 _CHILDREN = {}
434
435
436 _TIMINGS = {}
437
439 _LASTSWEEP = time.time()
440
441 def __init__(self, msg, topic=None, source=None, data=None):
442 self._msg = msg
443 self._topic = topic
444 self._source = source
445 self._data = data
446 self._type = None # Message typename as "pkg/MsgType"
447 self._def = None # Message type definition with full subtype definitions
448 self._hash = None # Message type definition MD5 hash
449 self._cls = None # Message class object
450 self._topickey = None # (topic, typename, typehash)
451 self._typekey = None # (typename, typehash)
452
453 def __enter__(self, *_, **__):
454 """Allows using instance as a context manager (no other effect)."""
455 return self
456
457 def __exit__(self, *_, **__): pass
458
459 @property
460 def typename(self):
461 """Returns message typename, as "pkg/MsgType"."""
462 if not self._type:
463 self._type = realapi.get_message_type(self._msg)
464 return self._type
465
466 @property
467 def typehash(self):
468 """Returns message type definition MD5 hash."""
469 if not self._hash:
470 hash = self._source and self._source.get_message_type_hash(self._msg)
471 self._hash = hash or realapi.get_message_type_hash(self._msg)
472 return self._hash
473
474 @property
475 def definition(self):
476 """Returns message type definition text with full subtype definitions."""
477 if not self._def:
478 typedef = self._source and self._source.get_message_definition(self._msg)
479 self._def = typedef or realapi.get_message_definition(self._msg)
480 return self._def
481
482 @property
483 def data(self):
484 """Returns message serialized binary, as bytes(), or None if not cached."""
485 return self._data
487 @property
488 def typeclass(self):
489 """Returns message class object."""
490 if not self._cls:
491 cls = type(self._msg) if realapi.is_ros_message(self._msg) else \
493 self._cls = cls or realapi.get_message_class(self.typenametypename)
494 return self._cls
495
496 @property
497 def topickey(self):
498 """Returns (topic, typename, typehash) for message."""
499 if not self._topickey:
500 self._topickey = (self._topic, self.typenametypename, self.typehashtypehash)
501 return self._topickey
502
503 @property
504 def typekey(self):
505 """Returns (typename, typehash) for message."""
506 if not self._typekey:
507 self._typekey = (self.typenametypename, self.typehashtypehash)
508 return self._typekey
510 @classmethod
511 def make(cls, msg, topic=None, source=None, root=None, data=None):
512 """
513 Returns TypeMeta instance, registering message in cache if not present.
514
515 Other parameters are only required for first registration.
516
517 @param topic topic the message is in if root message
518 @param source message source like LiveSource or Bag,
519 for looking up message type metadata
520 @param root root message that msg is a nested value of, if any
521 @param data message serialized binary, if any
522 """
523 msgid = id(msg)
524 if msgid not in cls._CACHE_CACHE:
525 cls._CACHE_CACHE[msgid] = TypeMeta(msg, topic, data)
526 if root and root is not msg:
527 cls._CHILDREN.setdefault(id(root), set()).add(msgid)
528 cls._TIMINGS[msgid] = time.time()
529 result = cls._CACHE_CACHE[msgid]
530 cls.sweep()
531 return result
532
533 @classmethod
534 def discard(cls, msg):
535 """Discards message metadata from cache, if any, including nested messages."""
536 msgid = id(msg)
537 cls._CACHE_CACHE.pop(msgid, None), cls._TIMINGS.pop(msgid, None)
538 for childid in cls._CHILDREN.pop(msgid, []):
539 cls._CACHE_CACHE.pop(childid, None), cls._TIMINGS.pop(childid, None)
540 cls.sweep()
541
542 @classmethod
543 def sweep(cls):
544 """Discards stale or surplus messages from cache."""
545 if cls.POPULATION > 0 and len(cls._CACHE_CACHE) > cls.POPULATION:
546 count = len(cls._CACHE_CACHE) - cls.POPULATION
547 for msgid, tm in sorted(x[::-1] for x in list(cls._TIMINGS.items()))[:count]:
548 cls._CACHE_CACHE.pop(msgid, None), cls._TIMINGS.pop(msgid, None)
549 for childid in cls._CHILDREN.pop(msgid, []):
550 cls._CACHE_CACHE.pop(childid, None), cls._TIMINGS.pop(childid, None)
551
552 now = time.time()
553 if cls.LIFETIME <= 0 or cls._LASTSWEEP < now - cls.LIFETIME: return
554
555 for msgid, tm in list(cls._TIMINGS.items()):
556 drop = (tm > now) or (tm < now - cls.LIFETIME)
557 drop and (cls._CACHE_CACHE.pop(msgid, None), cls._TIMINGS.pop(msgid, None))
558 for childid in cls._CHILDREN.pop(msgid, []) if drop else ():
559 cls._CACHE_CACHE.pop(childid, None), cls._TIMINGS.pop(childid, None)
560 cls._LASTSWEEP = now
561
562 @classmethod
563 def clear(cls):
564 """Clears entire cache."""
566 cls._CHILDREN.clear()
567 cls._TIMINGS.clear()
568
569
570
571def init_node(name=None):
572 """
573 Initializes a ROS1 or ROS2 node if not already initialized.
574
575 In ROS1, blocks until ROS master available.
576 """
577 validate() and realapi.init_node(name or NODE_NAME)
578
579
580def shutdown_node():
581 """Shuts down live ROS node."""
582 realapi and realapi.shutdown_node()
583
584
585def validate(live=False):
586 """
587 Initializes ROS bindings, returns whether ROS environment set, prints or raises error if not.
588
589 @param live whether environment must support launching a ROS node
590 """
591 global realapi, BAG_EXTENSIONS, SKIP_EXTENSIONS, ROS1, ROS2, ROS_VERSION, ROS_FAMILY, \
592 ROS_COMMON_TYPES, ROS_TIME_TYPES, ROS_TIME_CLASSES, ROS_ALIAS_TYPES
593 if realapi and not live:
594 return True
595
596 success, version = False, os.getenv("ROS_VERSION")
597 if "1" == version:
598 from . import ros1
599 realapi = ros1
600 success = realapi.validate(live)
601 ROS1, ROS2, ROS_VERSION, ROS_FAMILY = True, False, 1, "rospy"
602 elif "2" == version:
603 from . import ros2
604 realapi = ros2
605 success = realapi.validate(live)
606 ROS1, ROS2, ROS_VERSION, ROS_FAMILY = False, True, 2, "rclpy"
607 elif not version:
608 ConsolePrinter.error("ROS environment not set: missing ROS_VERSION.")
609 else:
610 ConsolePrinter.error("ROS environment not supported: unknown ROS_VERSION %r.", version)
611 if success:
612 BAG_EXTENSIONS, SKIP_EXTENSIONS = realapi.BAG_EXTENSIONS, realapi.SKIP_EXTENSIONS
613 ROS_COMMON_TYPES = ROS_BUILTIN_TYPES + realapi.ROS_TIME_TYPES
614 ROS_TIME_TYPES = realapi.ROS_TIME_TYPES
615 ROS_TIME_CLASSES = realapi.ROS_TIME_CLASSES
616 ROS_ALIAS_TYPES = realapi.ROS_ALIAS_TYPES
617 Bag.READER_CLASSES.add(realapi.Bag)
618 Bag.WRITER_CLASSES.add(realapi.Bag)
619 return success
620
621
622@memoize
623def calculate_definition_hash(typename, msgdef, extradefs=()):
624 """
625 Returns MD5 hash for message type definition.
626
627 @param extradefs additional subtype definitions as ((typename, msgdef), )
628 """
629 # "type name (= constvalue)?" or "type name (defaultvalue)?" (ROS2 format)
630 FIELD_RGX = re.compile(r"^\s*([a-z][^\s:]+)\s+([^\s=]+)(\s*=\s*([^\n]+))?(\s+([^\n]+))?", re.I)
631 STR_CONST_RGX = re.compile(r"^w?string\s+([^\s=#]+)\s*=")
632 lines, pkg = [], typename.rsplit("/", 1)[0]
633 subtypedefs = dict(extradefs, **parse_definition_subtypes(msgdef))
634 extradefs = tuple(subtypedefs.items())
635
636 # First pass: write constants
637 for line in msgdef.splitlines():
638 if set(line) == set("="): # Subtype separator
639 break # for line
640 # String constants cannot have line comments
641 if "#" in line and not STR_CONST_RGX.match(line): line = line[:line.index("#")]
642 match = FIELD_RGX.match(line)
643 if match and match.group(3):
644 lines.append("%s %s=%s" % (match.group(1), match.group(2), match.group(4).strip()))
645 # Second pass: write fields and subtype hashes
646 for line in msgdef.splitlines():
647 if set(line) == set("="): # Subtype separator
648 break # for line
649 if "#" in line and not STR_CONST_RGX.match(line): line = line[:line.index("#")]
650 match = FIELD_RGX.match(line)
651 if match and not match.group(3): # Not constant
652 scalartype, namestr = scalar(match.group(1)), match.group(2)
653 if scalartype in ROS_COMMON_TYPES:
654 typestr = match.group(1)
655 if match.group(5): namestr = (namestr + " " + match.group(6)).strip()
656 else:
657 subtype = scalartype if "/" in scalartype else "std_msgs/Header" \
658 if "Header" == scalartype else "%s/%s" % (pkg, scalartype)
659 typestr = calculate_definition_hash(subtype, subtypedefs[subtype], extradefs)
660 lines.append("%s %s" % (typestr, namestr))
661 return hashlib.md5("\n".join(lines).encode()).hexdigest()
662
663
664def canonical(typename, unbounded=False):
665 """
666 Returns "pkg/Type" for "pkg/subdir/Type", standardizes various ROS2 formats.
667
668 Converts ROS2 DDS types like "octet" to "byte", and "sequence<uint8, 100>" to "uint8[100]".
669
670 @param unbounded drop constraints like array bounds, and string bounds in ROS2,
671 e.g. returning "uint8[]" for "uint8[10]"
672 """
673 return realapi.canonical(typename, unbounded)
674
675
676def create_publisher(topic, cls_or_typename, queue_size):
677 """Returns a ROS publisher instance, with .get_num_connections() and .unregister()."""
678 return realapi.create_publisher(topic, cls_or_typename, queue_size)
679
680
681def create_subscriber(topic, cls_or_typename, handler, queue_size):
682 """
683 Returns a ROS subscriber instance.
684
685 Supplemented with .unregister(), .get_message_class(), .get_message_definition(),
686 .get_message_type_hash(), and .get_qoses().
687 """
688 return realapi.create_subscriber(topic, cls_or_typename, handler, queue_size)
689
690
691def filter_fields(fieldmap, top=(), include=(), exclude=()):
692 """
693 Returns fieldmap filtered by include and exclude patterns.
694
695 @param fieldmap {field name: field type name}
696 @param top parent path as (rootattr, ..)
697 @param include [((nested, path), re.Pattern())] to require in parent path
698 @param exclude [((nested, path), re.Pattern())] to reject in parent path
699 """
700 NESTED_RGX = re.compile(".+/.+|" + "|".join("^%s$" % re.escape(x) for x in ROS_TIME_TYPES))
701 result = type(fieldmap)() if include or exclude else fieldmap
702 for k, v in fieldmap.items() if not result else ():
703 trailstr = ".".join(map(str, top + (k, )))
704 for is_exclude, patterns in enumerate((include, exclude)):
705 # Nested fields need filtering on deeper level
706 matches = not is_exclude and NESTED_RGX.match(v) \
707 or any(r.match(trailstr) for _, r in patterns)
708 if patterns and (not matches if is_exclude else matches):
709 result[k] = v
710 elif patterns and is_exclude and matches:
711 result.pop(k, None)
712 if include and exclude and k not in result: # Failing to include takes precedence
713 break # for is_exclude
714 return result
715
716
717def format_message_value(msg, name, value):
718 """
719 Returns a message attribute value as string.
720
721 Result is at least 10 chars wide if message is a ROS time/duration
722 (aligning seconds and nanoseconds).
723 """
724 return realapi.format_message_value(msg, name, value)
725
726
727def get_message_class(typename):
728 """Returns ROS message class, or None if unavailable."""
729 return realapi.get_message_class(typename)
730
731
732def get_message_definition(msg_or_type):
733 """
734 Returns ROS message type definition full text, including subtype definitions.
735
736 Returns None if unknown type.
737 """
738 return realapi.get_message_definition(msg_or_type)
739
740
741def get_message_type_hash(msg_or_type):
742 """Returns ROS message type MD5 hash, or "" if unknown type."""
743 return realapi.get_message_type_hash(msg_or_type)
744
745
746def get_message_fields(val):
747 """
748 Returns OrderedDict({field name: field type name}) if ROS message, else {}.
749
750 @param val ROS message class or instance
751 """
752 return realapi.get_message_fields(val)
753
754
755def get_message_type(msg_or_cls):
756 """Returns ROS message type name, like "std_msgs/Header"."""
757 return realapi.get_message_type(msg_or_cls)
758
759
760def get_message_value(msg, name, typename=None, default=Ellipsis):
761 """
762 Returns object attribute value, with numeric arrays converted to lists.
763
764 @param name message attribute name
765 @param typename value ROS type name, for identifying byte arrays
766 @param default value to return if attribute does not exist; raises exception otherwise
767 """
768 return realapi.get_message_value(msg, name, typename, default)
770
771def get_rostime(fallback=False):
772 """
773 Returns current ROS time, as rospy.Time or rclpy.time.Time.
774
775 @param fallback use wall time if node not initialized
776 """
777 return realapi.get_rostime(fallback=fallback)
778
780def get_ros_time_category(msg_or_type):
781 """Returns "time" or "duration" for time/duration type or instance, else original argument."""
782 cls = msg_or_type if inspect.isclass(msg_or_type) else \
783 type(msg_or_type) if is_ros_time(msg_or_type) else None
784 if cls is None and isinstance(msg_or_type, (six.binary_type, six.text_type)):
785 cls = next((x for x in ROS_TIME_CLASSES if get_message_type(x) == msg_or_type), None)
786 if cls in ROS_TIME_CLASSES:
787 return "duration" if "duration" in ROS_TIME_CLASSES[cls].lower() else "time"
788 return msg_or_type
789
790
791def get_topic_types():
792 """
793 Returns currently available ROS topics, as [(topicname, typename)].
794
795 Omits topics that the current ROS node itself has published.
796 """
797 return realapi.get_topic_types()
798
799
800def get_type_alias(typename):
801 """
802 Returns alias like "char" for ROS built-in type, if any; reverse of get_type_alias().
803
804 In ROS1, byte and char are aliases for int8 and uint8; in ROS2 the reverse.
805 """
806 return next((k for k, v in ROS_ALIAS_TYPES.items() if v == typename), None)
807
808
809def get_alias_type(typename):
810 """
811 Returns ROS built-in type for alias like "char", if any; reverse of get_alias_type().
812
813 In ROS1, byte and char are aliases for int8 and uint8; in ROS2 the reverse.
814 """
815 return ROS_ALIAS_TYPES.get(typename)
816
817
818def is_ros_message(val, ignore_time=False):
819 """
820 Returns whether value is a ROS message or special like ROS time/duration class or instance.
821
822 @param ignore_time whether to ignore ROS time/duration types
823 """
824 return realapi.is_ros_message(val, ignore_time)
825
826
827def is_ros_time(val):
828 """Returns whether value is a ROS time/duration class or instance."""
829 return realapi.is_ros_time(val)
830
831
832def iter_message_fields(msg, messages_only=False, flat=False, scalars=(), include=(), exclude=(),
833 top=()):
834 """
835 Yields ((nested, path), value, typename) from ROS message.
836
837 @param messages_only whether to yield only values that are ROS messages themselves
838 or lists of ROS messages, else will yield scalar and scalar list values
839 @param flat recurse into lists of nested messages, ignored if `messages_only`
840 @param scalars sequence of ROS types to consider as scalars, like ("time", duration")
841 @param include [((nested, path), re.Pattern())] to require in field path, if any
842 @param exclude [((nested, path), re.Pattern())] to reject in field path, if any
843 @param top internal recursion helper
844 """
845 fieldmap = realapi.get_message_fields(msg)
846 if include or exclude: fieldmap = filter_fields(fieldmap, (), include, exclude)
847 if not fieldmap: return
848 if messages_only:
849 for k, t in fieldmap.items():
850 v, scalart = realapi.get_message_value(msg, k, t), realapi.scalar(t)
851 is_sublist = isinstance(v, (list, tuple)) and scalart not in ROS_COMMON_TYPES
852 is_forced_scalar = get_ros_time_category(scalart) in scalars
853 if not is_forced_scalar and realapi.is_ros_message(v):
854 for p2, v2, t2 in iter_message_fields(v, True, scalars=scalars, top=top + (k, )):
855 yield p2, v2, t2
856 if is_forced_scalar or is_sublist or realapi.is_ros_message(v, ignore_time=True):
857 yield top + (k, ), v, t
858 else:
859 for k, t in fieldmap.items():
860 v, scalart = realapi.get_message_value(msg, k, t), realapi.scalar(t)
861 is_forced_scalar = get_ros_time_category(scalart) in scalars
862 if not is_forced_scalar and realapi.is_ros_message(v):
863 for p2, v2, t2 in iter_message_fields(v, False, flat, scalars, top=top + (k, )):
864 yield p2, v2, t2
865 elif flat and v and isinstance(v, (list, tuple)) and scalart not in ROS_BUILTIN_TYPES:
866 for i, m in enumerate(v):
867 for p2, v2, t2 in iter_message_fields(m, False, True, scalars, top=top + (k, i)):
868 yield p2, v2, t2
869 else:
870 yield top + (k, ), v, t
871
872
873def make_full_typename(typename, category="msg"):
874 """
875 Returns "pkg/msg/Type" for "pkg/Type".
876
877 @param category type category like "msg" or "srv"
878 """
879 INTER, FAMILY = "/%s/" % category, "rospy" if ROS1 else "rclpy"
880 if INTER in typename or "/" not in typename or typename.startswith("%s/" % FAMILY):
881 return typename
882 return INTER.join(next((x[0], x[-1]) for x in [typename.split("/")]))
883
884
885def make_bag_time(stamp, bag):
886 """
887 Returns value as ROS timestamp, conditionally as relative to bag start/end time.
888
889 Stamp is interpreted as relative offset from bag start/end time
890 if numeric string with sign prefix, or timedelta, or ROS duration.
891
892 @param stamp converted to ROS timestamp if int/float/str/duration/datetime/timedelta/decimal
893 @param bag an open bag to use for relative start/end time
894 """
895 shift = 0
896 if is_ros_time(stamp):
897 if "duration" != get_ros_time_category(stamp): return stamp
898 stamp = realapi.to_sec(stamp)
899 shift = bag.get_start_time() if stamp >= 0 else bag.get_end_time()
900 elif isinstance(stamp, datetime.datetime):
901 stamp = time.mktime(stamp.timetuple()) + stamp.microsecond / 1E6
902 elif isinstance(stamp, datetime.timedelta):
903 stamp = stamp.total_seconds()
904 shift = bag.get_start_time() if stamp >= 0 else bag.get_end_time()
905 elif isinstance(stamp, (six.binary_type, six.text_type)):
906 sign = stamp[0] in ("+", b"+") if six.text_type(stamp[0]) in "+-" else None
907 shift = 0 if sign is None else bag.get_start_time() if sign else bag.get_end_time()
908 stamp = float(stamp)
909 return make_time(stamp + shift)
910
911
912def make_live_time(stamp):
913 """
914 Returns value as ROS timestamp, conditionally as relative to system time.
915
916 Stamp is interpreted as relative offset from system time
917 if numeric string with sign prefix, or timedelta, or ROS duration.
918
919 @param stamp converted to ROS timestamp if int/float/str/duration/datetime/timedelta/decimal
920 """
921 shift = 0
922 if is_ros_time(stamp):
923 if "duration" != get_ros_time_category(stamp): return stamp
924 stamp, shift = realapi.to_sec(stamp), time.time()
925 elif isinstance(stamp, datetime.datetime):
926 stamp = time.mktime(stamp.timetuple()) + stamp.microsecond / 1E6
927 elif isinstance(stamp, datetime.timedelta):
928 stamp, shift = stamp.total_seconds(), time.time()
929 elif isinstance(stamp, (six.binary_type, six.text_type)):
930 sign = stamp[0] in ("+", b"+") if six.text_type(stamp[0]) in "+-" else None
931 stamp, shift = float(stamp), (0 if sign is None else time.time())
932 return make_time(stamp + shift)
933
934
935def make_duration(secs=0, nsecs=0):
936 """Returns a ROS duration."""
937 return realapi.make_duration(secs=secs, nsecs=nsecs)
938
939
940def make_time(secs=0, nsecs=0):
941 """Returns a ROS time."""
942 return realapi.make_time(secs=secs, nsecs=nsecs)
943
944
945def make_message_hash(msg, include=(), exclude=()):
946 """
947 Returns hashcode for ROS message, as a hex digest.
948
949 @param include message fields to include if not all, as [((nested, path), re.Pattern())]
950 @param exclude message fields to exclude, as [((nested, path), re.Pattern())]
951 """
952 hasher = hashlib.md5()
953
954 def walk_message(obj, top=()):
955 fieldmap = get_message_fields(obj)
956 fieldmap = filter_fields(fieldmap, include=include, exclude=exclude)
957 for k, t in fieldmap.items():
958 v, path = get_message_value(obj, k, t), top + (k, )
959 if is_ros_message(v):
960 walk_message(v, path)
961 elif isinstance(v, (list, tuple)) and scalar(t) not in ROS_BUILTIN_TYPES:
962 for x in v: walk_message(x, path)
963 else:
964 s = "%s=%s" % (path, v)
965 hasher.update(s.encode("utf-8", errors="backslashreplace"))
966 if not hasattr(obj, "__slots__"):
967 s = "%s=%s" % (top, obj)
968 hasher.update(s.encode("utf-8", errors="backslashreplace"))
969
970 walk_message(msg)
971 return hasher.hexdigest()
972
973
974def message_to_dict(msg, replace=None):
975 """
976 Returns ROS message as nested Python dictionary.
978 @param replace mapping of {value: replaced value},
979 e.g. {math.nan: None, math.inf: None}
980 """
981 result = {} if realapi.is_ros_message(msg) else msg
982 for name, typename in realapi.get_message_fields(msg).items():
983 v = realapi.get_message_value(msg, name, typename)
984 if realapi.is_ros_time(v):
985 v = dict(zip(realapi.get_message_fields(v), realapi.to_sec_nsec(v)))
986 elif realapi.is_ros_message(v):
987 v = message_to_dict(v)
988 elif isinstance(v, (list, tuple)):
989 if realapi.scalar(typename) not in ROS_BUILTIN_TYPES:
990 v = [message_to_dict(x) for x in v]
991 elif replace:
992 v = [replace.get(x, x) for x in v]
993 elif replace:
994 v = replace.get(v, v)
995 result[name] = v
996 return result
997
998
999def dict_to_message(dct, msg):
1000 """
1001 Returns given ROS message populated from Python dictionary.
1002
1003 Raises TypeError on attribute value type mismatch.
1004 """
1005 for name, typename in realapi.get_message_fields(msg).items():
1006 if name not in dct:
1007 continue # for
1008 v, msgv = dct[name], realapi.get_message_value(msg, name, typename)
1009
1010 if realapi.is_ros_message(msgv):
1011 v = v if is_ros_message(v) else dict_to_message(v, msgv)
1012 elif isinstance(msgv, (list, tuple)):
1013 scalarname = realapi.scalar(typename)
1014 if scalarname in ROS_BUILTIN_TYPES:
1015 cls = ROS_BUILTIN_CTORS[scalarname]
1016 v = [x if isinstance(x, cls) else cls(x) for x in v]
1017 else:
1018 cls = realapi.get_message_class(scalarname)
1019 v = [x if realapi.is_ros_message(x) else dict_to_message(x, cls()) for x in v]
1020 else:
1021 v = type(msgv)(v)
1022
1023 setattr(msg, name, v)
1024 return msg
1025
1026
1027@memoize
1028def parse_definition_fields(typename, typedef):
1029 """
1030 Returns field names and type names from a message definition text.
1031
1032 Does not recurse into subtypes.
1033
1034 @param typename ROS message type name, like "my_pkg/MyCls"
1035 @param typedef ROS message definition, like "Header header\nbool a\nMyCls2 b"
1036 @return ordered {field name: type name},
1037 like {"header": "std_msgs/Header", "a": "bool", "b": "my_pkg/MyCls2"}
1038 """
1039 result = collections.OrderedDict() # {subtypename: subtypedef}
1040
1041 FIELD_RGX = re.compile(r"^\s*([a-z][^\s:]+)\s+([^\s=]+)(\s*=\s*([^\n]+))?(\s+([^\n]+))?", re.I)
1042 STR_CONST_RGX = re.compile(r"^w?string\s+([^\s=#]+)\s*=")
1043 pkg = typename.rsplit("/", 1)[0]
1044 for line in filter(bool, typedef.splitlines()):
1045 if set(line) == set("="): # Subtype separator
1046 break # for line
1047 if "#" in line and not STR_CONST_RGX.match(line): line = line[:line.index("#")]
1048 match = FIELD_RGX.match(line)
1049 if not match or match.group(3): # Constant or not field
1050 continue # for line
1051
1052 name, typename, scalartype = match.group(2), match.group(1), scalar(match.group(1))
1053 if scalartype not in ROS_COMMON_TYPES:
1054 pkg2 = "" if "/" in scalartype else "std_msgs" if "Header" == scalartype else pkg
1055 typename = "%s/%s" % (pkg2, typename) if pkg2 else typename
1056 result[name] = typename
1057 return result
1058
1059
1060@memoize
1061def parse_definition_subtypes(typedef, nesting=False):
1062 """
1063 Returns subtype names and type definitions from a full message definition.
1064
1065 @param typedef message type definition including all subtype definitions
1066 @param nesting whether to additionally return type nesting information as
1067 {typename: [typename contained in parent]}
1068 @return {"pkg/MsgType": "full definition for MsgType including subtypes"}
1069 or ({typedefs}, {nesting}) if nesting
1070 """
1071 result = collections.OrderedDict() # {subtypename: subtypedef}
1072 nesteds = collections.defaultdict(list) # {subtypename: [subtypename2, ]})
1073
1074 # Parse individual subtype definitions from full definition
1075 curtype, curlines = "", []
1076 # Separator line, and definition header like 'MSG: std_msgs/MultiArrayLayout'
1077 rgx = re.compile(r"^((=+)|(MSG: (.+)))$") # Group 2: separator, 4: new type
1078 for line in typedef.splitlines():
1079 m = rgx.match(line)
1080 if m and m.group(2) and curtype: # Separator line between nested definitions
1081 result[curtype] = "\n".join(curlines)
1082 curtype, curlines = "", []
1083 elif m and m.group(4): # Start of nested definition "MSG: pkg/MsgType"
1084 curtype, curlines = m.group(4), []
1085 elif not m and curtype: # Definition content
1086 curlines.append(line)
1087 if curtype:
1088 result[curtype] = "\n".join(curlines)
1089
1090 # "type name (= constvalue)?" or "type name (defaultvalue)?" (ROS2 format)
1091 FIELD_RGX = re.compile(r"^\s*([a-z][^\s]+)\s+([^\s=]+)(\s*=\s*([^\n]+))?(\s+([^\n]+))?", re.I)
1092 # Concatenate nested subtype definitions to parent subtype definitions
1093 for subtype, subdef in list(result.items()):
1094 pkg, seen = subtype.rsplit("/", 1)[0], set()
1095 for line in subdef.splitlines():
1096 m = FIELD_RGX.match(line)
1097 if m and m.group(1):
1098 scalartype, fulltype = realapi.scalar(m.group(1)), None
1099 if scalartype not in ROS_COMMON_TYPES:
1100 fulltype = scalartype if "/" in scalartype else "std_msgs/Header" \
1101 if "Header" == scalartype else "%s/%s" % (pkg, scalartype)
1102 if fulltype in result and fulltype not in seen:
1103 addendum = "%s\nMSG: %s\n%s" % ("=" * 80, fulltype, result[fulltype])
1104 result[subtype] = result[subtype].rstrip() + ("\n\n%s\n" % addendum)
1105 nesteds[subtype].append(fulltype)
1106 seen.add(fulltype)
1107 return (result, nesteds) if nesting else result
1108
1110def serialize_message(msg):
1111 """Returns ROS message as a serialized binary."""
1112 return realapi.serialize_message(msg)
1113
1114
1115def deserialize_message(msg, cls_or_typename):
1116 """Returns ROS message or service request/response instantiated from serialized binary."""
1117 return realapi.deserialize_message(msg, cls_or_typename)
1118
1119
1120def scalar(typename):
1121 """
1122 Returns scalar type from ROS message data type, like "uint8" from uint8-array.
1123
1124 Returns type unchanged if an ordinary type. In ROS2, returns unbounded type,
1125 e.g. "string" from "string<=10[<=5]".
1126 """
1127 return realapi.scalar(typename)
1128
1129
1130def set_message_value(obj, name, value):
1131 """Sets message or object attribute value."""
1132 realapi.set_message_value(obj, name, value)
1133
1134
1135def time_message(val, to_message=True, clock_type=None):
1136 """
1137 Converts ROS2 time/duration between `rclpy` and `builtin_interfaces` objects.
1138
1139 Returns input value as-is in ROS1.
1140
1141 @param val ROS2 time/duration object from `rclpy` or `builtin_interfaces`
1142 @param to_message whether to convert from `rclpy` to `builtin_interfaces` or vice versa
1143 @param clock_type ClockType for converting to `rclpy.Time`, defaults to `ROS_TIME`
1144 @return value converted to appropriate type, or original value if not convertible
1145 """
1146 if ROS1: return val
1147 return realapi.time_message(val, to_message, clock_type=clock_type)
1148
1150def to_datetime(val):
1151 """Returns value as datetime.datetime if value is ROS time/duration, else value."""
1152 sec = realapi.to_sec(val)
1153 return datetime.datetime.fromtimestamp(sec) if sec is not val else val
1155
1156def to_decimal(val):
1157 """Returns value as decimal.Decimal if value is ROS time/duration, else value."""
1158 if realapi.is_ros_time(val):
1159 return decimal.Decimal("%d.%09d" % realapi.to_sec_nsec(val))
1160 return val
1161
1162
1163def to_duration(val):
1164 """Returns value as ROS duration if convertible (int/float/time/datetime/decimal), else value."""
1165 return realapi.to_duration(val)
1166
1167
1168def to_nsec(val):
1169 """Returns value in nanoseconds if value is ROS time/duration, else value."""
1170 return realapi.to_nsec(val)
1171
1172
1173def to_sec(val):
1174 """Returns value in seconds if value is ROS time/duration, else value."""
1175 return realapi.to_sec(val)
1176
1177
1178def to_sec_nsec(val):
1179 """Returns value as (seconds, nanoseconds) if value is ROS time/duration, else value."""
1180 return realapi.to_sec_nsec(val)
1181
1182
1183def to_time(val):
1184 """Returns value as ROS time if convertible (int/float/duration/datetime/decimal), else value."""
1185 return realapi.to_time(val)
1186
1187
1188__all___ = [
1189 "BAG_EXTENSIONS", "NODE_NAME", "ROS_ALIAS_TYPES", "ROS_BUILTIN_CTORS", "ROS_BUILTIN_TYPES",
1190 "ROS_COMMON_TYPES", "ROS_FAMILY", "ROS_NUMERIC_TYPES", "ROS_STRING_TYPES", "ROS_TIME_CLASSES",
1191 "ROS_TIME_TYPES", "SKIP_EXTENSIONS", "Bag", "BaseBag", "TypeMeta",
1192 "calculate_definition_hash", "canonical", "create_publisher", "create_subscriber",
1193 "deserialize_message", "dict_to_message", "filter_fields", "format_message_value",
1194 "get_alias_type", "get_message_class", "get_message_definition", "get_message_fields",
1195 "get_message_type", "get_message_type_hash", "get_message_value", "get_ros_time_category",
1196 "get_rostime", "get_topic_types", "get_type_alias", "init_node", "is_ros_message",
1197 "is_ros_time", "iter_message_fields", "make_bag_time", "make_duration", "make_live_time",
1198 "make_message_hash", "make_time", "message_to_dict", "parse_definition_fields",
1199 "parse_definition_subtypes", "scalar", "deserialize_message", "set_message_value",
1200 "shutdown_node", "time_message", "to_datetime", "to_decimal", "to_duration", "to_nsec",
1201 "to_sec", "to_sec_nsec", "to_time", "validate",
1202]
Bag factory metaclass.
Definition api.py:381
autodetect(cls, f)
Returns registered bag class auto-detected from file, or None.
Definition api.py:420
WRITER_CLASSES
Bag writer classes, as {Cls, }.
Definition api.py:387
__subclasshook__(cls, C)
Definition api.py:429
READER_CLASSES
Bag reader classes, as {Cls, }.
Definition api.py:384
ROS bag interface.
Definition api.py:98
__nonzero__(self)
Definition api.py:139
get_type_and_topic_info(self, topic_filters=None)
Returns thorough metainfo on topic and message types.
Definition api.py:245
mode
Returns file open mode.
Definition api.py:341
open(self)
Opens the bag file if not already open.
Definition api.py:307
filename
Returns bag file path.
Definition api.py:327
__iter__(self)
Iterates over all messages in the bag.
Definition api.py:117
__str__(self)
Returns informative text for bag, with a full overview of topics and types.
Definition api.py:157
stop_on_error
Whether raising read error on unknown message type (ROS2 SQLite .db3 specific).
Definition api.py:355
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
Definition api.py:109
__exit__(self, exc_type, exc_value, traceback)
Context manager exit, closes bag.
Definition api.py:126
get_start_time(self)
Returns the start time of the bag, as UNIX timestamp, or None if bag empty.
Definition api.py:221
get_end_time(self)
Returns the end time of the bag, as UNIX timestamp, or None if bag empty.
Definition api.py:225
__len__(self)
Returns the number of messages in the bag.
Definition api.py:130
get_message_class(self, typename, typehash=None)
Returns ROS message type class, or None if unknown message type for bag.
Definition api.py:256
get_qoses(self, topic, typename)
Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.
Definition api.py:253
get_message_type_hash(self, msg_or_type)
Returns ROS message type MD5 hash, or None if unknown message type for bag.
Definition api.py:268
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
Definition api.py:291
get_topic_info(self, counts=True)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
Definition api.py:234
flush(self)
Ensures all changes are written to bag file.
Definition api.py:315
__enter__(self)
Context manager entry, opens bag if not open.
Definition api.py:121
close(self)
Closes the bag file.
Definition api.py:311
__contains__(self, key)
Returns whether bag contains given topic.
Definition api.py:143
closed
Returns whether file is closed.
Definition api.py:348
size
Returns current file size in bytes.
Definition api.py:334
topics
Returns the list of topics in bag, in alphabetic order.
Definition api.py:320
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
Definition api.py:105
__next__(self)
Retrieves next message from bag as (topic, message, timestamp).
Definition api.py:134
__getitem__(self, key)
Returns an iterator yielding messages from the bag in given topic, supporting len().
Definition api.py:151
get_message_definition(self, msg_or_type)
Returns ROS message type definition full text, including subtype definitions.
Definition api.py:265
get_message_count(self, topic_filters=None)
Returns the number of messages in the bag.
Definition api.py:218
write(self, topic, msg, t=None, raw=False, **kwargs)
Writes a message to the bag.
Definition api.py:304
Container for caching and retrieving message type metadata.
Definition api.py:438
int LIFETIME
Seconds before auto-clearing message from cache, <=0 disables.
Definition api.py:441
typeclass
Returns message class object.
Definition api.py:526
__enter__(self, *_, **__)
Allows using instance as a context manager (no other effect).
Definition api.py:482
__exit__(self, *_, **__)
Definition api.py:486
data
Returns message serialized binary, as bytes(), or None if not cached.
Definition api.py:519
discard(cls, msg)
Discards message metadata from cache, if any, including nested messages.
Definition api.py:577
definition
Returns message type definition text with full subtype definitions.
Definition api.py:509
int POPULATION
Max size to constrain cache to, <=0 disables.
Definition api.py:444
topickey
Returns (topic, typename, typehash) for message.
Definition api.py:537
__init__(self, msg, topic=None, source=None, data=None)
Definition api.py:470
typename
Returns message typename, as "pkg/MsgType".
Definition api.py:490
typekey
Returns (typename, typehash) for message.
Definition api.py:546
clear(cls)
Clears entire cache.
Definition api.py:606
typehash
Returns message type definition MD5 hash.
Definition api.py:499
sweep(cls)
Discards stale or surplus messages from cache.
Definition api.py:586
make(cls, msg, topic=None, source=None, root=None, data=None)
Returns TypeMeta instance, registering message in cache if not present.
Definition api.py:565
Wrapper for iterable value with specified fixed length.
Definition common.py:666
MCAP bag interface, providing most of rosbag.Bag interface.
Definition mcap.py:47
time_message(val, to_message=True, clock_type=None)
Converts ROS2 time/duration between rclpy and builtin_interfaces objects.
Definition api.py:1183
canonical(typename, unbounded=False)
Returns "pkg/Type" for "pkg/subdir/Type", standardizes various ROS2 formats.
Definition api.py:714
make_live_time(stamp)
Returns value as ROS timestamp, conditionally as relative to system time.
Definition api.py:962
create_publisher(topic, cls_or_typename, queue_size)
Returns a ROS publisher instance, with .get_num_connections() and .unregister().
Definition api.py:718
make_message_hash(msg, include=(), exclude=())
Returns hashcode for ROS message, as a hex digest.
Definition api.py:993
serialize_message(msg)
Returns ROS message as a serialized binary.
Definition api.py:1149
to_sec(val)
Returns value in seconds if value is ROS time/duration, else value.
Definition api.py:1211
parse_definition_fields(typename, typedef)
Returns field names and type names from a message definition text.
Definition api.py:1078
get_topic_types()
Returns currently available ROS topics, as [(topicname, typename)].
Definition api.py:838
format_message_value(msg, name, value)
Returns a message attribute value as string.
Definition api.py:765
deserialize_message(msg, cls_or_typename)
Returns ROS message or service request/response instantiated from serialized binary.
Definition api.py:1154
iter_message_fields(msg, messages_only=False, flat=False, scalars=(), include=(), exclude=(), top=())
Yields ((nested, path), value, typename) from ROS message.
Definition api.py:886
dict_to_message(dct, msg)
Returns given ROS message populated from Python dictionary.
Definition api.py:1045
to_decimal(val)
Returns value as decimal.Decimal if value is ROS time/duration, else value.
Definition api.py:1194
get_message_value(msg, name, typename=None, default=Ellipsis)
Returns object attribute value, with numeric arrays converted to lists.
Definition api.py:809
get_message_class(typename)
Returns ROS message class, or None if unavailable.
Definition api.py:769
to_duration(val)
Returns value as ROS duration if convertible (int/float/time/datetime/decimal), else value.
Definition api.py:1201
make_time(secs=0, nsecs=0)
Returns a ROS time.
Definition api.py:982
get_alias_type(typename)
Returns ROS built-in type for alias like "char", if any; reverse of get_alias_type().
Definition api.py:856
make_bag_time(stamp, bag)
Returns value as ROS timestamp, conditionally as relative to bag start/end time.
Definition api.py:936
get_message_type(msg_or_cls)
Returns ROS message type name, like "std_msgs/Header".
Definition api.py:797
get_message_definition(msg_or_type)
Returns ROS message type definition full text, including subtype definitions.
Definition api.py:779
validate(live=False)
Initializes ROS bindings, returns whether ROS environment set, prints or raises error if not.
Definition api.py:633
scalar(typename)
Returns scalar type from ROS message data type, like "uint8" from uint8-array.
Definition api.py:1165
message_to_dict(msg, replace=None)
Returns ROS message as nested Python dictionary.
Definition api.py:1021
make_duration(secs=0, nsecs=0)
Returns a ROS duration.
Definition api.py:977
get_rostime(fallback=False)
Returns current ROS time, as rospy.Time or rclpy.time.Time.
Definition api.py:818
make_full_typename(typename, category="msg")
Returns "pkg/msg/Type" for "pkg/Type".
Definition api.py:920
to_nsec(val)
Returns value in nanoseconds if value is ROS time/duration, else value.
Definition api.py:1206
get_type_alias(typename)
Returns alias like "char" for ROS built-in type, if any; reverse of get_type_alias().
Definition api.py:847
to_time(val)
Returns value as ROS time if convertible (int/float/duration/datetime/decimal), else value.
Definition api.py:1221
shutdown_node()
Shuts down live ROS node.
Definition api.py:623
set_message_value(obj, name, value)
Sets message or object attribute value.
Definition api.py:1169
is_ros_time(val)
Returns whether value is a ROS time/duration class or instance.
Definition api.py:869
create_subscriber(topic, cls_or_typename, handler, queue_size)
Returns a ROS subscriber instance.
Definition api.py:729
get_message_type_hash(msg_or_type)
Returns ROS message type MD5 hash, or "" if unknown type.
Definition api.py:783
to_datetime(val)
Returns value as datetime.datetime if value is ROS time/duration, else value.
Definition api.py:1188
to_sec_nsec(val)
Returns value as (seconds, nanoseconds) if value is ROS time/duration, else value.
Definition api.py:1216
init_node(name=None)
Initializes a ROS1 or ROS2 node if not already initialized.
Definition api.py:619
get_ros_time_category(msg_or_type)
Returns "time" or "duration" for time/duration type or instance, else original argument.
Definition api.py:822
is_ros_message(val, ignore_time=False)
Returns whether value is a ROS message or special like ROS time/duration class or instance.
Definition api.py:865
filter_fields(fieldmap, top=(), include=(), exclude=())
Returns fieldmap filtered by include and exclude patterns.
Definition api.py:741
get_message_fields(val)
Returns OrderedDict({field name: field type name}) if ROS message, else {}.
Definition api.py:793
MCAP input and output.
Definition mcap.py:1