3ROS interface, shared facade for ROS1 and ROS2.
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS1 bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.api
28from . common import ConsolePrinter, LenIterable, format_bytes, memoize
29#from . import ros1, ros2
54ROS_NUMERIC_TYPES = [
"byte",
"char",
"int8",
"int16",
"int32",
"int64",
"uint8",
55 "uint16",
"uint32",
"uint64",
"float32",
"float64",
"bool"]
58ROS_STRING_TYPES = [
"string",
"wstring"]
61ROS_BUILTIN_TYPES = ROS_NUMERIC_TYPES + ROS_STRING_TYPES
64ROS_BUILTIN_CTORS = {
"byte": int,
"char": int,
"int8": int,
"int16": int,
65 "int32": int,
"int64": int,
"uint8": int,
"uint16": int,
66 "uint32": int,
"uint64": int,
"float32": float,
"float64": float,
67 "bool": bool,
"string": str,
"wstring": str}
89 %Bag can be used a context manager, is an iterable providing (topic, message, timestamp) tuples
90 and supporting `len(bag)`;
and supports topic-based membership
91 (`
if mytopic
in bag`, `
for t, m, s
in bag[mytopic]`, `len(bag[mytopic])`).
93 Extra methods
and properties compared
with rosbag.Bag: Bag.get_message_class(),
94 Bag.get_message_definition(), Bag.get_message_type_hash(), Bag.get_topic_info();
95 Bag.closed
and Bag.topics.
99 BagMessage = collections.namedtuple("BagMessage",
"topic message timestamp")
103 TopicTuple = collections.namedtuple(
"TopicTuple", [
"msg_type",
"message_count",
104 "connections",
"frequency"])
107 TypesAndTopicsTuple = collections.namedtuple(
"TypesAndTopicsTuple", [
"msg_types",
"topics"])
110 MODES = (
"r",
"w",
"a")
116 """Iterates over all messages in the bag."""
120 """Context manager entry, opens bag if not open."""
124 def __exit__(self, exc_type, exc_value, traceback):
125 """Context manager exit, closes bag."""
129 """Returns the number of messages in the bag."""
133 """Retrieves next message from bag as (topic, message, timestamp)."""
134 raise NotImplementedError
135 if sys.version_info < (3, ): next = __next__
139 def __bool__ (self):
return True
142 """Returns whether bag contains given topic."""
143 raise NotImplementedError
147 def __deepcopy__(self, memo=
None):
return self
150 """Returns an iterator yielding messages from the bag in given topic, supporting len()."""
152 get_count =
lambda: sum(c
for (t, _, _), c
in self.
get_topic_info().items()
if t == key)
156 """Returns informative text for bag, with a full overview of topics and types."""
158 indent =
lambda s, n: (
"\n%s" % (
" " * n)).join(s.splitlines())
160 fmttime =
lambda x: datetime.datetime.fromtimestamp(x).strftime(
"%b %d %Y %H:%M:%S.%f")[:-4]
162 """Returns duration seconds as text like "1hr 1:12s (3672s)" or "51.8s"."""
164 hh, rem = divmod(secs, 3600)
165 mm, ss = divmod(rem, 60)
166 if hh: result +=
"%dhr " % hh
167 if mm: result +=
"%d:" % mm
169 if hh
or mm: result +=
" (%ds)" % secs
177 if None not in (start, end):
178 entries[
"duration"] = fmtdur(end - start)
179 entries[
"start"] =
"%s (%.2f)" % (fmttime(start), start)
180 entries[
"end"] =
"%s (%.2f)" % (fmttime(end), end)
181 entries[
"size"] = format_bytes(self.
sizesize)
182 if any(counts.values()):
183 entries[
"messages"] = str(sum(counts.values()))
185 nhs = sorted(set((n, h)
for _, n, h
in counts))
186 namew = max(len(n)
for n, _
in nhs)
188 entries[
"types"] =
"\n".join(
"%s%s [%s]" % (n,
" " * (namew - len(n)), h)
for n, h
in nhs)
190 topicw = max(len(t)
for t, _, _
in counts)
191 typew = max(len(n)
for _, n, _
in counts)
192 countw = max(len(str(c))
for c
in counts.values())
194 for (t, n, _), c
in sorted(counts.items()):
197 line =
"%s%s" % (t,
" " * (topicw - len(t)))
199 line +=
" %s%s msg%s" % (
" " * (countw - len(str(c))), c,
" " if 1 == c
else "s")
203 line +=
"%s (%s connections)" % (
" " * (typew - len(n)), len(qq))
if len(qq) > 1
else ""
205 entries[
"topics"] =
"\n".join(lines)
207 labelw = max(map(len, entries))
208 return "\n".join(
"%s:%s %s" % (k,
" " * (labelw - len(k)), indent(v, labelw + 2))
209 for k, v
in entries.items())
213 Returns the number of messages in the bag.
215 @param topic_filters list of topics
or a single topic to filter by,
if any
217 raise NotImplementedError
220 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
221 raise NotImplementedError
224 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
225 raise NotImplementedError
229 Returns topic and message type metainfo
as {(topic, typename, typehash): count}.
231 @param counts
if false, counts may be returned
as None if lookup
is costly
233 raise NotImplementedError
237 Returns thorough metainfo on topic and message types.
239 @param topic_filters list of topics
or a single topic to filter returned topics-dict by,
242 msg_types
as dict of {typename: typehash},
243 topics
as a dict of {topic:
TopicTuple() namedtuple}.
245 raise NotImplementedError
249 Returns topic Quality-of-Service profiles as a list of dicts,
or None if not available.
251 Functional only
in ROS2.
256 """Returns ROS message type class, or None if unknown message type for bag."""
261 Returns ROS message type definition full text, including subtype definitions.
263 Returns None if unknown message type
for bag.
268 """Returns ROS message type MD5 hash, or None if unknown message type for bag."""
271 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__):
273 Yields messages from the bag, optionally filtered by topic
and timestamp.
275 @param topics list of topics
or a single topic to filter by,
if any
276 @param start_time earliest timestamp of message to
return,
as ROS time
or convertible
277 (int/float/duration/datetime/decimal)
278 @param end_time latest timestamp of message to
return,
as ROS time
279 (int/float/duration/datetime/decimal)
280 @param raw
if true, then returned messages are tuples of
281 (typename, bytes, typehash, typeclass)
282 or (typename, bytes, typehash, position, typeclass),
283 depending on file format
284 @return BagMessage namedtuples of (topic, message, timestamp
as ROS time)
286 raise NotImplementedError
288 def write(self, topic, msg, t=None, raw=False, **kwargs):
290 Writes a message to the bag.
292 @param topic name of topic
293 @param msg ROS message
294 @param t message timestamp
if not using current wall time,
as ROS time
295 or convertible (int/float/duration/datetime/decimal)
296 @param raw
if true, `msg`
is in raw format, (typename, bytes, typehash, typeclass)
297 @param kwargs ROS version-specific arguments,
298 like `connection_header`
for ROS1
or `qoses`
for ROS2
300 raise NotImplementedError
303 """Opens the bag file if not already open."""
304 raise NotImplementedError
307 """Closes the bag file."""
308 raise NotImplementedError
311 """Ensures all changes are written to bag file."""
315 """Returns the list of topics in bag, in alphabetic order."""
316 raise NotImplementedError
320 """Returns bag file path."""
321 raise NotImplementedError
325 """Returns current file size in bytes."""
326 raise NotImplementedError
330 """Returns file open mode."""
331 raise NotImplementedError
335 """Returns whether file is closed."""
336 raise NotImplementedError
340 """Whether raising read error on unknown message type (ROS2 SQLite .db3 specific)."""
341 return getattr(self,
"_stop_on_error",
None)
343 @stop_on_error.setter
345 """Sets whether to raise read error on unknown message type (ROS2 SQLite .db3 specific)."""
346 setattr(self,
"_stop_on_error", flag)
349@six.add_metaclass(abc.ABCMeta)
352 %Bag factory metaclass.
354 Result is a format-specific
class instance, auto-detected
from file extension
or content:
355 an extended rosbag.Bag
for ROS1 bags, otherwise an object
with a conforming interface.
358 plugin loaded
and file recognized
as MCAP format.
360 User plugins can add their own format support to READER_CLASSES
and WRITER_CLASSES.
361 Classes can have a static/
class method `autodetect(filename)`
362 returning whether given file
is in recognizable format
for the plugin
class.
366 READER_CLASSES = set()
369 WRITER_CLASSES = set()
371 def __new__(cls, f, mode="r", reindex=False, progress=False, **kwargs):
373 Returns an object for reading
or writing ROS bags.
375 Suitable Bag
class is auto-detected by file extension
or content.
377 @param f bag file path,
or a stream object
378 (streams
not supported
for ROS2 .db3 SQLite bags)
379 @param mode
return reader
if "r" else writer
380 @param reindex reindex unindexed bag (ROS1 only), making a backup
if indexed format
381 @param progress show progress bar
with reindexing status
382 @param kwargs additional keyword arguments
for format-specific Bag constructor,
383 like `compression`
for ROS1 bag
386 for detect, bagcls
in ((d, c)
for d
in (
True,
False)
for c
in list(classes)):
387 use, discard =
not detect,
False
389 if detect
and callable(getattr(bagcls,
"autodetect",
None)):
390 use, discard = bagcls.autodetect(f),
True
392 return bagcls(f, mode, reindex=reindex, progress=progress, **kwargs)
393 except Exception
as e:
395 errors.append(
"Failed to open %r for %s with %s: %s." %
396 (f,
"reading" if "r" == mode
else "writing", bagcls, e))
397 discard
and classes.discard(bagcls)
398 for err
in errors: ConsolePrinter.warn(err)
399 raise Exception(
"No suitable %s class available" % (
"reader" if "r" == mode
else "writer"))
404 """Returns registered bag class auto-detected from file, or None."""
406 if callable(vars(bagcls).get(
"autodetect"))
and bagcls.autodetect(f):
413 return True if issubclass(C, BaseBag)
else NotImplemented
418 Container for caching
and retrieving message type metadata.
420 All property values are lazy-loaded upon request.
439 _LASTSWEEP = time.time()
441 def __init__(self, msg, topic=None, source=None, data=None):
454 """Allows using instance as a context manager (no other effect)."""
461 """Returns message typename, as "pkg/MsgType"."""
463 self.
_type = realapi.get_message_type(self.
_msg)
468 """Returns message type definition MD5 hash."""
471 self.
_hash = hash
or realapi.get_message_type_hash(self.
_msg)
476 """Returns message type definition text with full subtype definitions."""
479 self.
_def = typedef
or realapi.get_message_definition(self.
_msg)
484 """Returns message serialized binary, as bytes(), or None if not cached."""
489 """Returns message class object."""
491 cls = type(self.
_msg)
if realapi.is_ros_message(self.
_msg)
else \
498 """Returns (topic, typename, typehash) for message."""
505 """Returns (typename, typehash) for message."""
511 def make(cls, msg, topic=None, source=None, root=None, data=None):
513 Returns TypeMeta instance, registering message in cache
if not present.
515 Other parameters are only required
for first registration.
517 @param topic topic the message
is in if root message
518 @param source message source like LiveSource
or Bag,
519 for looking up message type metadata
520 @param root root message that msg
is a nested value of,
if any
521 @param data message serialized binary,
if any
526 if root
and root
is not msg:
527 cls.
_CHILDREN.setdefault(id(root), set()).add(msgid)
535 """Discards message metadata from cache, if any, including nested messages."""
538 for childid
in cls.
_CHILDREN.pop(msgid, []):
544 """Discards stale or surplus messages from cache."""
547 for msgid, tm
in sorted(x[::-1]
for x
in list(cls.
_TIMINGS.items()))[:count]:
549 for childid
in cls.
_CHILDREN.pop(msgid, []):
555 for msgid, tm
in list(cls.
_TIMINGS.items()):
556 drop = (tm > now)
or (tm < now - cls.
LIFETIME)
558 for childid
in cls.
_CHILDREN.pop(msgid, [])
if drop
else ():
564 """Clears entire cache."""
573 Initializes a ROS1 or ROS2 node
if not already initialized.
575 In ROS1, blocks until ROS master available.
577 validate() and realapi.init_node(name
or NODE_NAME)
581 """Shuts down live ROS node."""
582 realapi
and realapi.shutdown_node()
587 Initializes ROS bindings, returns whether ROS environment set, prints or raises error
if not.
589 @param live whether environment must support launching a ROS node
591 global realapi, BAG_EXTENSIONS, SKIP_EXTENSIONS, ROS1, ROS2, ROS_VERSION, ROS_FAMILY, \
592 ROS_COMMON_TYPES, ROS_TIME_TYPES, ROS_TIME_CLASSES, ROS_ALIAS_TYPES
593 if realapi
and not live:
596 success, version =
False, os.getenv(
"ROS_VERSION")
600 success = realapi.validate(live)
601 ROS1, ROS2, ROS_VERSION, ROS_FAMILY =
True,
False, 1,
"rospy"
605 success = realapi.validate(live)
606 ROS1, ROS2, ROS_VERSION, ROS_FAMILY =
False,
True, 2,
"rclpy"
608 ConsolePrinter.error(
"ROS environment not set: missing ROS_VERSION.")
610 ConsolePrinter.error(
"ROS environment not supported: unknown ROS_VERSION %r.", version)
612 BAG_EXTENSIONS, SKIP_EXTENSIONS = realapi.BAG_EXTENSIONS, realapi.SKIP_EXTENSIONS
613 ROS_COMMON_TYPES = ROS_BUILTIN_TYPES + realapi.ROS_TIME_TYPES
614 ROS_TIME_TYPES = realapi.ROS_TIME_TYPES
615 ROS_TIME_CLASSES = realapi.ROS_TIME_CLASSES
616 ROS_ALIAS_TYPES = realapi.ROS_ALIAS_TYPES
617 Bag.READER_CLASSES.add(realapi.Bag)
618 Bag.WRITER_CLASSES.add(realapi.Bag)
623def calculate_definition_hash(typename, msgdef, extradefs=()):
625 Returns MD5 hash for message type definition.
627 @param extradefs additional subtype definitions
as ((typename, msgdef), )
630 FIELD_RGX = re.compile(
r"^\s*([a-z][^\s:]+)\s+([^\s=]+)(\s*=\s*([^\n]+))?(\s+([^\n]+))?", re.I)
631 STR_CONST_RGX = re.compile(
r"^w?string\s+([^\s=#]+)\s*=")
632 lines, pkg = [], typename.rsplit(
"/", 1)[0]
633 subtypedefs = dict(extradefs, **parse_definition_subtypes(msgdef))
634 extradefs = tuple(subtypedefs.items())
637 for line
in msgdef.splitlines():
638 if set(line) == set(
"="):
641 if "#" in line
and not STR_CONST_RGX.match(line): line = line[:line.index(
"#")]
642 match = FIELD_RGX.match(line)
643 if match
and match.group(3):
644 lines.append(
"%s %s=%s" % (match.group(1), match.group(2), match.group(4).strip()))
646 for line
in msgdef.splitlines():
647 if set(line) == set(
"="):
649 if "#" in line
and not STR_CONST_RGX.match(line): line = line[:line.index(
"#")]
650 match = FIELD_RGX.match(line)
651 if match
and not match.group(3):
652 scalartype, namestr =
scalar(match.group(1)), match.group(2)
653 if scalartype
in ROS_COMMON_TYPES:
654 typestr = match.group(1)
655 if match.group(5): namestr = (namestr +
" " + match.group(6)).strip()
657 subtype = scalartype
if "/" in scalartype
else "std_msgs/Header" \
658 if "Header" == scalartype
else "%s/%s" % (pkg, scalartype)
659 typestr = calculate_definition_hash(subtype, subtypedefs[subtype], extradefs)
660 lines.append(
"%s %s" % (typestr, namestr))
661 return hashlib.md5(
"\n".join(lines).encode()).hexdigest()
666 Returns "pkg/Type" for "pkg/subdir/Type", standardizes various ROS2 formats.
668 Converts ROS2 DDS types like
"octet" to
"byte",
and "sequence<uint8, 100>" to
"uint8[100]".
670 @param unbounded drop constraints like array bounds,
and string bounds
in ROS2,
671 e.g. returning
"uint8[]" for "uint8[10]"
673 return realapi.canonical(typename, unbounded)
677 """Returns a ROS publisher instance, with .get_num_connections() and .unregister()."""
678 return realapi.create_publisher(topic, cls_or_typename, queue_size)
683 Returns a ROS subscriber instance.
688 return realapi.create_subscriber(topic, cls_or_typename, handler, queue_size)
693 Returns fieldmap filtered by include and exclude patterns.
695 @param fieldmap {field name: field type name}
696 @param top parent path
as (rootattr, ..)
697 @param include [((nested, path), re.Pattern())] to require
in parent path
698 @param exclude [((nested, path), re.Pattern())] to reject
in parent path
700 NESTED_RGX = re.compile(".+/.+|" +
"|".join(
"^%s$" % re.escape(x)
for x
in ROS_TIME_TYPES))
701 result = type(fieldmap)()
if include
or exclude
else fieldmap
702 for k, v
in fieldmap.items()
if not result
else ():
703 trailstr =
".".join(map(str, top + (k, )))
704 for is_exclude, patterns
in enumerate((include, exclude)):
706 matches =
not is_exclude
and NESTED_RGX.match(v) \
707 or any(r.match(trailstr)
for _, r
in patterns)
708 if patterns
and (
not matches
if is_exclude
else matches):
710 elif patterns
and is_exclude
and matches:
712 if include
and exclude
and k
not in result:
719 Returns a message attribute value as string.
721 Result
is at least 10 chars wide
if message
is a ROS time/duration
722 (aligning seconds
and nanoseconds).
724 return realapi.format_message_value(msg, name, value)
728 """Returns ROS message class, or None if unavailable."""
729 return realapi.get_message_class(typename)
734 Returns ROS message type definition full text, including subtype definitions.
736 Returns None if unknown type.
738 return realapi.get_message_definition(msg_or_type)
742 """Returns ROS message type MD5 hash, or "" if unknown type."""
743 return realapi.get_message_type_hash(msg_or_type)
748 Returns OrderedDict({field name: field type name}) if ROS message,
else {}.
750 @param val ROS message
class or instance
752 return realapi.get_message_fields(val)
756 """Returns ROS message type name, like "std_msgs/Header"."""
757 return realapi.get_message_type(msg_or_cls)
762 Returns object attribute value, with numeric arrays converted to lists.
764 @param name message attribute name
765 @param typename value ROS type name,
for identifying byte arrays
766 @param default value to
return if attribute does
not exist; raises exception otherwise
768 return realapi.get_message_value(msg, name, typename, default)
773 Returns current ROS time, as rospy.Time
or rclpy.time.Time.
775 @param fallback use wall time
if node
not initialized
777 return realapi.get_rostime(fallback=fallback)
781 """Returns "time" or "duration" for time/duration type or instance, else original argument."""
782 cls = msg_or_type
if inspect.isclass(msg_or_type)
else \
784 if cls
is None and isinstance(msg_or_type, (six.binary_type, six.text_type)):
785 cls = next((x
for x
in ROS_TIME_CLASSES
if get_message_type(x) == msg_or_type),
None)
786 if cls
in ROS_TIME_CLASSES:
787 return "duration" if "duration" in ROS_TIME_CLASSES[cls].lower()
else "time"
793 Returns currently available ROS topics, as [(topicname, typename)].
795 Omits topics that the current ROS node itself has published.
797 return realapi.get_topic_types()
802 Returns alias like "char" for ROS built-
in type,
if any; reverse of
get_type_alias().
804 In ROS1, byte
and char are aliases
for int8
and uint8;
in ROS2 the reverse.
806 return next((k
for k, v
in ROS_ALIAS_TYPES.items()
if v == typename),
None)
811 Returns ROS built-in type
for alias like
"char",
if any; reverse of
get_alias_type().
813 In ROS1, byte
and char are aliases
for int8
and uint8;
in ROS2 the reverse.
815 return ROS_ALIAS_TYPES.get(typename)
820 Returns whether value is a ROS message
or special like ROS time/duration
class or instance.
822 @param ignore_time whether to ignore ROS time/duration types
824 return realapi.is_ros_message(val, ignore_time)
828 """Returns whether value is a ROS time/duration class or instance."""
829 return realapi.is_ros_time(val)
832def iter_message_fields(msg, messages_only=False, flat=False, scalars=(), include=(), exclude=(),
835 Yields ((nested, path), value, typename) from ROS message.
837 @param messages_only whether to
yield only values that are ROS messages themselves
838 or lists of ROS messages,
else will
yield scalar
and scalar list values
839 @param flat recurse into lists of nested messages, ignored
if `messages_only`
840 @param scalars sequence of ROS types to consider
as scalars, like (
"time", duration
")
841 @param include [((nested, path), re.Pattern())] to require
in field path,
if any
842 @param exclude [((nested, path), re.Pattern())] to reject
in field path,
if any
843 @param top internal recursion helper
845 fieldmap = realapi.get_message_fields(msg)
846 if include
or exclude: fieldmap =
filter_fields(fieldmap, (), include, exclude)
847 if not fieldmap:
return
849 for k, t
in fieldmap.items():
850 v, scalart = realapi.get_message_value(msg, k, t), realapi.scalar(t)
851 is_sublist = isinstance(v, (list, tuple))
and scalart
not in ROS_COMMON_TYPES
853 if not is_forced_scalar
and realapi.is_ros_message(v):
856 if is_forced_scalar
or is_sublist
or realapi.is_ros_message(v, ignore_time=
True):
857 yield top + (k, ), v, t
859 for k, t
in fieldmap.items():
860 v, scalart = realapi.get_message_value(msg, k, t), realapi.scalar(t)
862 if not is_forced_scalar
and realapi.is_ros_message(v):
865 elif flat
and v
and isinstance(v, (list, tuple))
and scalart
not in ROS_BUILTIN_TYPES:
866 for i, m
in enumerate(v):
870 yield top + (k, ), v, t
875 Returns "pkg/msg/Type" for "pkg/Type".
877 @param category type category like
"msg" or "srv"
879 INTER, FAMILY = "/%s/" % category,
"rospy" if ROS1
else "rclpy"
880 if INTER
in typename
or "/" not in typename
or typename.startswith(
"%s/" % FAMILY):
882 return INTER.join(next((x[0], x[-1])
for x
in [typename.split(
"/")]))
887 Returns value as ROS timestamp, conditionally
as relative to bag start/end time.
889 Stamp
is interpreted
as relative offset
from bag start/end time
890 if numeric string
with sign prefix,
or timedelta,
or ROS duration.
892 @param stamp converted to ROS timestamp
if int/float/str/duration/datetime/timedelta/decimal
893 @param bag an open bag to use
for relative start/end time
898 stamp = realapi.to_sec(stamp)
899 shift = bag.get_start_time()
if stamp >= 0
else bag.get_end_time()
900 elif isinstance(stamp, datetime.datetime):
901 stamp = time.mktime(stamp.timetuple()) + stamp.microsecond / 1E6
902 elif isinstance(stamp, datetime.timedelta):
903 stamp = stamp.total_seconds()
904 shift = bag.get_start_time()
if stamp >= 0
else bag.get_end_time()
905 elif isinstance(stamp, (six.binary_type, six.text_type)):
906 sign = stamp[0]
in (
"+", b
"+")
if six.text_type(stamp[0])
in "+-" else None
907 shift = 0
if sign
is None else bag.get_start_time()
if sign
else bag.get_end_time()
914 Returns value as ROS timestamp, conditionally
as relative to system time.
916 Stamp
is interpreted
as relative offset
from system time
917 if numeric string
with sign prefix,
or timedelta,
or ROS duration.
919 @param stamp converted to ROS timestamp
if int/float/str/duration/datetime/timedelta/decimal
924 stamp, shift = realapi.to_sec(stamp), time.time()
925 elif isinstance(stamp, datetime.datetime):
926 stamp = time.mktime(stamp.timetuple()) + stamp.microsecond / 1E6
927 elif isinstance(stamp, datetime.timedelta):
928 stamp, shift = stamp.total_seconds(), time.time()
929 elif isinstance(stamp, (six.binary_type, six.text_type)):
930 sign = stamp[0]
in (
"+", b
"+")
if six.text_type(stamp[0])
in "+-" else None
931 stamp, shift = float(stamp), (0
if sign
is None else time.time())
936 """Returns a ROS duration."""
937 return realapi.make_duration(secs=secs, nsecs=nsecs)
941 """Returns a ROS time."""
942 return realapi.make_time(secs=secs, nsecs=nsecs)
947 Returns hashcode for ROS message,
as a hex digest.
949 @param include message fields to include
if not all,
as [((nested, path), re.Pattern())]
950 @param exclude message fields to exclude,
as [((nested, path), re.Pattern())]
952 hasher = hashlib.md5()
954 def walk_message(obj, top=()):
956 fieldmap =
filter_fields(fieldmap, include=include, exclude=exclude)
957 for k, t
in fieldmap.items():
960 walk_message(v, path)
961 elif isinstance(v, (list, tuple))
and scalar(t)
not in ROS_BUILTIN_TYPES:
962 for x
in v: walk_message(x, path)
964 s =
"%s=%s" % (path, v)
965 hasher.update(s.encode(
"utf-8", errors=
"backslashreplace"))
966 if not hasattr(obj,
"__slots__"):
967 s =
"%s=%s" % (top, obj)
968 hasher.update(s.encode(
"utf-8", errors=
"backslashreplace"))
971 return hasher.hexdigest()
976 Returns ROS message as nested Python dictionary.
978 @param replace mapping of {value: replaced value},
979 e.g. {math.nan:
None, math.inf:
None}
981 result = {} if realapi.is_ros_message(msg)
else msg
982 for name, typename
in realapi.get_message_fields(msg).items():
983 v = realapi.get_message_value(msg, name, typename)
984 if realapi.is_ros_time(v):
985 v = dict(zip(realapi.get_message_fields(v), realapi.to_sec_nsec(v)))
986 elif realapi.is_ros_message(v):
988 elif isinstance(v, (list, tuple)):
989 if realapi.scalar(typename)
not in ROS_BUILTIN_TYPES:
992 v = [replace.get(x, x)
for x
in v]
994 v = replace.get(v, v)
1001 Returns given ROS message populated from Python dictionary.
1003 Raises TypeError on attribute value type mismatch.
1005 for name, typename
in realapi.get_message_fields(msg).items():
1008 v, msgv = dct[name], realapi.get_message_value(msg, name, typename)
1010 if realapi.is_ros_message(msgv):
1012 elif isinstance(msgv, (list, tuple)):
1013 scalarname = realapi.scalar(typename)
1014 if scalarname
in ROS_BUILTIN_TYPES:
1015 cls = ROS_BUILTIN_CTORS[scalarname]
1016 v = [x
if isinstance(x, cls)
else cls(x)
for x
in v]
1018 cls = realapi.get_message_class(scalarname)
1019 v = [x
if realapi.is_ros_message(x)
else dict_to_message(x, cls())
for x
in v]
1023 setattr(msg, name, v)
1030 Returns field names and type names
from a message definition text.
1032 Does
not recurse into subtypes.
1034 @param typename ROS message type name, like
"my_pkg/MyCls"
1035 @param typedef ROS message definition, like
"Header header\nbool a\nMyCls2 b"
1036 @return ordered {field name: type name},
1037 like {
"header":
"std_msgs/Header",
"a":
"bool",
"b":
"my_pkg/MyCls2"}
1039 result = collections.OrderedDict()
1041 FIELD_RGX = re.compile(
r"^\s*([a-z][^\s:]+)\s+([^\s=]+)(\s*=\s*([^\n]+))?(\s+([^\n]+))?", re.I)
1042 STR_CONST_RGX = re.compile(
r"^w?string\s+([^\s=#]+)\s*=")
1043 pkg = typename.rsplit(
"/", 1)[0]
1044 for line
in filter(bool, typedef.splitlines()):
1045 if set(line) == set(
"="):
1047 if "#" in line
and not STR_CONST_RGX.match(line): line = line[:line.index(
"#")]
1048 match = FIELD_RGX.match(line)
1049 if not match
or match.group(3):
1052 name, typename, scalartype = match.group(2), match.group(1),
scalar(match.group(1))
1053 if scalartype
not in ROS_COMMON_TYPES:
1054 pkg2 =
"" if "/" in scalartype
else "std_msgs" if "Header" == scalartype
else pkg
1055 typename =
"%s/%s" % (pkg2, typename)
if pkg2
else typename
1056 result[name] = typename
1061def parse_definition_subtypes(typedef, nesting=False):
1063 Returns subtype names and type definitions
from a full message definition.
1065 @param typedef message type definition including all subtype definitions
1066 @param nesting whether to additionally
return type nesting information
as
1067 {typename: [typename contained
in parent]}
1068 @return {
"pkg/MsgType":
"full definition for MsgType including subtypes"}
1069 or ({typedefs}, {nesting})
if nesting
1071 result = collections.OrderedDict()
1072 nesteds = collections.defaultdict(list)
1075 curtype, curlines =
"", []
1077 rgx = re.compile(
r"^((=+)|(MSG: (.+)))$")
1078 for line
in typedef.splitlines():
1080 if m
and m.group(2)
and curtype:
1081 result[curtype] =
"\n".join(curlines)
1082 curtype, curlines =
"", []
1083 elif m
and m.group(4):
1084 curtype, curlines = m.group(4), []
1085 elif not m
and curtype:
1086 curlines.append(line)
1088 result[curtype] =
"\n".join(curlines)
1091 FIELD_RGX = re.compile(
r"^\s*([a-z][^\s]+)\s+([^\s=]+)(\s*=\s*([^\n]+))?(\s+([^\n]+))?", re.I)
1093 for subtype, subdef
in list(result.items()):
1094 pkg, seen = subtype.rsplit(
"/", 1)[0], set()
1095 for line
in subdef.splitlines():
1096 m = FIELD_RGX.match(line)
1097 if m
and m.group(1):
1098 scalartype, fulltype = realapi.scalar(m.group(1)),
None
1099 if scalartype
not in ROS_COMMON_TYPES:
1100 fulltype = scalartype
if "/" in scalartype
else "std_msgs/Header" \
1101 if "Header" == scalartype
else "%s/%s" % (pkg, scalartype)
1102 if fulltype
in result
and fulltype
not in seen:
1103 addendum =
"%s\nMSG: %s\n%s" % (
"=" * 80, fulltype, result[fulltype])
1104 result[subtype] = result[subtype].rstrip() + (
"\n\n%s\n" % addendum)
1105 nesteds[subtype].append(fulltype)
1107 return (result, nesteds)
if nesting
else result
1111 """Returns ROS message as a serialized binary."""
1112 return realapi.serialize_message(msg)
1116 """Returns ROS message or service request/response instantiated from serialized binary."""
1117 return realapi.deserialize_message(msg, cls_or_typename)
1122 Returns scalar type from ROS message data type, like
"uint8" from uint8-array.
1124 Returns type unchanged
if an ordinary type. In ROS2, returns unbounded type,
1125 e.g.
"string" from "string<=10[<=5]".
1127 return realapi.scalar(typename)
1131 """Sets message or object attribute value."""
1132 realapi.set_message_value(obj, name, value)
1135def time_message(val, to_message=True, clock_type=None):
1137 Converts ROS2 time/duration between `rclpy` and `builtin_interfaces` objects.
1139 Returns input value
as-
is in ROS1.
1141 @param val ROS2 time/duration object
from `rclpy`
or `builtin_interfaces`
1142 @param to_message whether to convert
from `rclpy` to `builtin_interfaces`
or vice versa
1143 @param clock_type ClockType
for converting to `rclpy.Time`, defaults to `ROS_TIME`
1144 @return value converted to appropriate type,
or original value
if not convertible
1147 return realapi.time_message(val, to_message, clock_type=clock_type)
1151 """Returns value as datetime.datetime if value is ROS time/duration, else value."""
1152 sec = realapi.to_sec(val)
1153 return datetime.datetime.fromtimestamp(sec)
if sec
is not val
else val
1157 """Returns value as decimal.Decimal if value is ROS time/duration, else value."""
1158 if realapi.is_ros_time(val):
1159 return decimal.Decimal(
"%d.%09d" % realapi.to_sec_nsec(val))
1164 """Returns value as ROS duration if convertible (int/float/time/datetime/decimal), else value."""
1165 return realapi.to_duration(val)
1169 """Returns value in nanoseconds if value is ROS time/duration, else value."""
1170 return realapi.to_nsec(val)
1174 """Returns value in seconds if value is ROS time/duration, else value."""
1175 return realapi.to_sec(val)
1179 """Returns value as (seconds, nanoseconds) if value is ROS time/duration, else value."""
1180 return realapi.to_sec_nsec(val)
1184 """Returns value as ROS time if convertible (int/float/duration/datetime/decimal), else value."""
1185 return realapi.to_time(val)
1189 "BAG_EXTENSIONS",
"NODE_NAME",
"ROS_ALIAS_TYPES",
"ROS_BUILTIN_CTORS",
"ROS_BUILTIN_TYPES",
1190 "ROS_COMMON_TYPES",
"ROS_FAMILY",
"ROS_NUMERIC_TYPES",
"ROS_STRING_TYPES",
"ROS_TIME_CLASSES",
1191 "ROS_TIME_TYPES",
"SKIP_EXTENSIONS",
"Bag",
"BaseBag",
"TypeMeta",
1192 "calculate_definition_hash",
"canonical",
"create_publisher",
"create_subscriber",
1193 "deserialize_message",
"dict_to_message",
"filter_fields",
"format_message_value",
1194 "get_alias_type",
"get_message_class",
"get_message_definition",
"get_message_fields",
1195 "get_message_type",
"get_message_type_hash",
"get_message_value",
"get_ros_time_category",
1196 "get_rostime",
"get_topic_types",
"get_type_alias",
"init_node",
"is_ros_message",
1197 "is_ros_time",
"iter_message_fields",
"make_bag_time",
"make_duration",
"make_live_time",
1198 "make_message_hash",
"make_time",
"message_to_dict",
"parse_definition_fields",
1199 "parse_definition_subtypes",
"scalar",
"deserialize_message",
"set_message_value",
1200 "shutdown_node",
"time_message",
"to_datetime",
"to_decimal",
"to_duration",
"to_nsec",
1201 "to_sec",
"to_sec_nsec",
"to_time",
"validate",
autodetect(cls, f)
Returns registered bag class auto-detected from file, or None.
WRITER_CLASSES
Bag writer classes, as {Cls, }.
READER_CLASSES
Bag reader classes, as {Cls, }.
get_type_and_topic_info(self, topic_filters=None)
Returns thorough metainfo on topic and message types.
mode
Returns file open mode.
open(self)
Opens the bag file if not already open.
filename
Returns bag file path.
__iter__(self)
Iterates over all messages in the bag.
__str__(self)
Returns informative text for bag, with a full overview of topics and types.
stop_on_error
Whether raising read error on unknown message type (ROS2 SQLite .db3 specific).
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
__exit__(self, exc_type, exc_value, traceback)
Context manager exit, closes bag.
get_start_time(self)
Returns the start time of the bag, as UNIX timestamp, or None if bag empty.
get_end_time(self)
Returns the end time of the bag, as UNIX timestamp, or None if bag empty.
__len__(self)
Returns the number of messages in the bag.
get_message_class(self, typename, typehash=None)
Returns ROS message type class, or None if unknown message type for bag.
get_qoses(self, topic, typename)
Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.
get_message_type_hash(self, msg_or_type)
Returns ROS message type MD5 hash, or None if unknown message type for bag.
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
get_topic_info(self, counts=True)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
flush(self)
Ensures all changes are written to bag file.
__enter__(self)
Context manager entry, opens bag if not open.
close(self)
Closes the bag file.
__contains__(self, key)
Returns whether bag contains given topic.
closed
Returns whether file is closed.
size
Returns current file size in bytes.
topics
Returns the list of topics in bag, in alphabetic order.
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
__next__(self)
Retrieves next message from bag as (topic, message, timestamp).
__getitem__(self, key)
Returns an iterator yielding messages from the bag in given topic, supporting len().
get_message_definition(self, msg_or_type)
Returns ROS message type definition full text, including subtype definitions.
get_message_count(self, topic_filters=None)
Returns the number of messages in the bag.
write(self, topic, msg, t=None, raw=False, **kwargs)
Writes a message to the bag.
Wrapper for iterable value with specified fixed length.
MCAP bag interface, providing most of rosbag.Bag interface.
time_message(val, to_message=True, clock_type=None)
Converts ROS2 time/duration between rclpy and builtin_interfaces objects.
canonical(typename, unbounded=False)
Returns "pkg/Type" for "pkg/subdir/Type", standardizes various ROS2 formats.
make_live_time(stamp)
Returns value as ROS timestamp, conditionally as relative to system time.
create_publisher(topic, cls_or_typename, queue_size)
Returns a ROS publisher instance, with .get_num_connections() and .unregister().
make_message_hash(msg, include=(), exclude=())
Returns hashcode for ROS message, as a hex digest.
serialize_message(msg)
Returns ROS message as a serialized binary.
to_sec(val)
Returns value in seconds if value is ROS time/duration, else value.
parse_definition_fields(typename, typedef)
Returns field names and type names from a message definition text.
get_topic_types()
Returns currently available ROS topics, as [(topicname, typename)].
format_message_value(msg, name, value)
Returns a message attribute value as string.
deserialize_message(msg, cls_or_typename)
Returns ROS message or service request/response instantiated from serialized binary.
iter_message_fields(msg, messages_only=False, flat=False, scalars=(), include=(), exclude=(), top=())
Yields ((nested, path), value, typename) from ROS message.
dict_to_message(dct, msg)
Returns given ROS message populated from Python dictionary.
to_decimal(val)
Returns value as decimal.Decimal if value is ROS time/duration, else value.
get_message_value(msg, name, typename=None, default=Ellipsis)
Returns object attribute value, with numeric arrays converted to lists.
get_message_class(typename)
Returns ROS message class, or None if unavailable.
to_duration(val)
Returns value as ROS duration if convertible (int/float/time/datetime/decimal), else value.
make_time(secs=0, nsecs=0)
Returns a ROS time.
get_alias_type(typename)
Returns ROS built-in type for alias like "char", if any; reverse of get_alias_type().
make_bag_time(stamp, bag)
Returns value as ROS timestamp, conditionally as relative to bag start/end time.
get_message_type(msg_or_cls)
Returns ROS message type name, like "std_msgs/Header".
get_message_definition(msg_or_type)
Returns ROS message type definition full text, including subtype definitions.
validate(live=False)
Initializes ROS bindings, returns whether ROS environment set, prints or raises error if not.
scalar(typename)
Returns scalar type from ROS message data type, like "uint8" from uint8-array.
message_to_dict(msg, replace=None)
Returns ROS message as nested Python dictionary.
make_duration(secs=0, nsecs=0)
Returns a ROS duration.
get_rostime(fallback=False)
Returns current ROS time, as rospy.Time or rclpy.time.Time.
make_full_typename(typename, category="msg")
Returns "pkg/msg/Type" for "pkg/Type".
to_nsec(val)
Returns value in nanoseconds if value is ROS time/duration, else value.
get_type_alias(typename)
Returns alias like "char" for ROS built-in type, if any; reverse of get_type_alias().
to_time(val)
Returns value as ROS time if convertible (int/float/duration/datetime/decimal), else value.
shutdown_node()
Shuts down live ROS node.
set_message_value(obj, name, value)
Sets message or object attribute value.
is_ros_time(val)
Returns whether value is a ROS time/duration class or instance.
create_subscriber(topic, cls_or_typename, handler, queue_size)
Returns a ROS subscriber instance.
get_message_type_hash(msg_or_type)
Returns ROS message type MD5 hash, or "" if unknown type.
to_datetime(val)
Returns value as datetime.datetime if value is ROS time/duration, else value.
to_sec_nsec(val)
Returns value as (seconds, nanoseconds) if value is ROS time/duration, else value.
init_node(name=None)
Initializes a ROS1 or ROS2 node if not already initialized.
get_ros_time_category(msg_or_type)
Returns "time" or "duration" for time/duration type or instance, else original argument.
is_ros_message(val, ignore_time=False)
Returns whether value is a ROS message or special like ROS time/duration class or instance.
filter_fields(fieldmap, top=(), include=(), exclude=())
Returns fieldmap filtered by include and exclude patterns.
get_message_fields(val)
Returns OrderedDict({field name: field type name}) if ROS message, else {}.