5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
28import builtin_interfaces.msg
30except Exception: numpy =
None
35import rclpy.serialization
37import rosidl_parser.parser
38import rosidl_parser.definition
39import rosidl_runtime_py.utilities
43from . common
import PATH_TYPES, ConsolePrinter, MatchMarkers, memoize
47BAG_EXTENSIONS = (
".db3", )
53ROS_TIME_TYPES = [
"builtin_interfaces/Time",
"builtin_interfaces/Duration"]
56ROS_TIME_CLASSES = {rclpy.time.Time:
"builtin_interfaces/Time",
57 builtin_interfaces.msg.Time:
"builtin_interfaces/Time",
58 rclpy.duration.Duration:
"builtin_interfaces/Duration",
59 builtin_interfaces.msg.Duration:
"builtin_interfaces/Duration"}
62ROS_TIME_MESSAGES = {rclpy.time.Time: builtin_interfaces.msg.Time,
63 rclpy.duration.Duration: builtin_interfaces.msg.Duration}
66ROS_ALIAS_TYPES = {
"byte":
"uint8",
"char":
"int8"}
69DDS_TYPES = {
"boolean":
"bool",
74 "unsigned short":
"uint16",
76 "unsigned long":
"uint32",
78 "unsigned long long":
"uint64", }
92 """ROS2 bag reader and writer (SQLite format), providing most of rosbag.Bag interface."""
99CREATE TABLE IF NOT EXISTS messages (
100 id INTEGER PRIMARY KEY,
101 topic_id INTEGER NOT NULL,
102 timestamp INTEGER NOT NULL,
106CREATE TABLE IF NOT EXISTS topics (
107 id INTEGER PRIMARY KEY,
110 serialization_format TEXT NOT NULL,
111 offered_qos_profiles TEXT NOT NULL
114CREATE INDEX IF NOT EXISTS timestamp_idx ON messages (timestamp ASC);
116PRAGMA journal_mode=WAL;
117PRAGMA synchronous=NORMAL;
121 SQLITE_MAGIC = b
"SQLite format 3\x00"
126 @param filename bag file path to open
127 @param mode file will be overwritten if "w"
129 if not isinstance(filename, PATH_TYPES):
130 raise ValueError(
"invalid filename %r" % type(filename))
131 if mode
not in self.
MODES:
raise ValueError(
"invalid mode %r" % mode)
148 Returns the number of messages in the bag.
150 @param topic_filters list of topics or a single topic to filter by, if any
153 sql, where =
"SELECT COUNT(*) AS count FROM messages",
""
156 topics = topic_filters
157 topics = topics
if isinstance(topics, (dict, list, set, tuple))
else [topics]
158 topic_ids = [x[
"id"]
for (topic, _), x
in self.
_topics.items()
if topic
in topics]
159 where =
" WHERE topic_id IN (%s)" %
", ".join(map(str, topic_ids))
160 return self.
_db.execute(sql + where).fetchone()[
"count"]
165 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
167 row = self.
_db.execute(
"SELECT MIN(timestamp) AS val FROM messages").fetchone()
168 if row[
"val"]
is None:
return None
169 secs, nsecs = divmod(row[
"val"], 10**9)
170 return secs + nsecs / 1E9
175 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
177 row = self.
_db.execute(
"SELECT MAX(timestamp) AS val FROM messages").fetchone()
178 if row[
"val"]
is None:
return None
179 secs, nsecs = divmod(row[
"val"], 10**9)
180 return secs + nsecs / 1E9
186 Returns topic and message type metainfo as {(topic, typename, typehash): count}.
188 Can skip retrieving message counts, as this requires a full table scan.
189 Can skip looking up message type classes, as those might be unavailable in ROS2 environment.
191 @param counts whether to return actual message counts instead of None
192 @param ensure_types whether to look up type classes instead of returning typehash as None
202 Returns thorough metainfo on topic and message types.
204 @param topic_filters list of topics or a single topic to filter returned topics-dict by,
206 @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
207 msg_types as dict of {typename: typehash},
208 topics as a dict of {topic: TopicTuple() namedtuple}.
210 topics = topic_filters
211 topics = topics
if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
212 if self.
_ttinfo and (
not topics
or set(topics) == set(t
for t, _, _
in self.
_counts)):
214 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
217 msgtypes = {n: h
for t, n, h
in counts}
221 """Returns median value from given sorted numbers."""
223 return None if not vlen
else vals[vlen // 2]
if vlen % 2
else \
224 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
226 for (t, n, _), c
in sorted(counts.items(), key=
lambda x: x[0][:2]):
227 if topics
and t
not in topics:
continue
230 args = (self.
_topics[(t, n)][
"id"], )
231 cursor = self.
_db.execute(
"SELECT timestamp FROM messages WHERE topic_id = ?", args)
232 stamps = sorted(x[
"timestamp"] / 1E9
for x
in cursor)
233 mymedian = median(sorted(s1 - s0
for s1, s0
in zip(stamps[1:], stamps[:-1])))
234 freq = 1.0 / mymedian
if mymedian
else None
237 if not topics
or set(topics) == set(t
for t, _, _
in self.
_counts): self.
_ttinfo = result
242 """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available."""
243 topickey = (topic, typename)
245 topicrow = self.
_topics[topickey]
247 if topicrow.get(
"offered_qos_profiles"):
248 self.
_qoses[topickey] = yaml.safe_load(topicrow[
"offered_qos_profiles"])
249 except Exception
as e:
250 ConsolePrinter.warn(
"Error parsing quality of service for topic %r: %r", topic, e)
251 self.
_qoses.setdefault(topickey,
None)
252 return self.
_qoses[topickey]
256 """Returns ROS2 message type class, or None if unknown message type for bag."""
258 if any(n == typename
for _, n,
in self.
_topics)
and typehash
is not None \
259 and not any((n, h) == (typename, typehash)
for _, n, h
in self.
_counts):
261 if any((typename, typehash)
in [(n, h), (n,
None)]
for _, n, h
in self.
_counts):
268 Returns ROS2 message type definition full text, including subtype definitions.
270 Returns None if unknown message type for bag.
273 typename = msg_or_type
if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
274 if any(n == typename
for _, n, _
in self.
_counts):
280 """Returns ROS2 message type MD5 hash, or None if unknown message type for bag."""
281 typename = msg_or_type
if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
283 return next((h
for _, n, h
in self.
_counts if n == typename),
None)
286 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__):
288 Yields messages from the bag, optionally filtered by topic and timestamp.
290 @param topics list of topics or a single topic to filter by, if any
291 @param start_time earliest timestamp of message to return, as ROS time or convertible
292 (int/float/duration/datetime/decimal)
293 @param end_time latest timestamp of message to return, as ROS time or convertible
294 (int/float/duration/datetime/decimal)
295 @param raw if True, then returned messages are tuples of
296 (typename, bytes, typehash, typeclass).
297 If message type unavailable, returns None for hash and class.
298 @return BagMessage namedtuples of
299 (topic, message, timestamp as rclpy.time.Time)
301 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
302 if "w" == self.
_mode:
raise io.UnsupportedOperation(
"read")
305 if not self.
_topics or (topics
is not None and not topics):
308 sql, exprs, args =
"SELECT * FROM messages", [], ()
310 topics = topics
if isinstance(topics, (list, tuple))
else [topics]
311 topic_ids = [x[
"id"]
for (topic, _), x
in self.
_topics.items()
if topic
in topics]
312 exprs += [
"topic_id IN (%s)" %
", ".join(map(str, topic_ids))]
313 if start_time
is not None:
314 exprs += [
"timestamp >= ?"]
316 if end_time
is not None:
317 exprs += [
"timestamp <= ?"]
319 sql += ((
" WHERE " +
" AND ".join(exprs))
if exprs
else "")
320 sql +=
" ORDER BY timestamp"
322 topicmap = {v[
"id"]: v
for v
in self.
_topics.values()}
324 topicset = set(topics
or [t
for t, _
in self.
_topics])
325 typehashes = {n: h
for _, n, h
in self.
_counts}
326 for row
in self.
_db.execute(sql, args):
327 tdata = topicmap[row[
"topic_id"]]
328 topic, typename = tdata[
"name"],
canonical(tdata[
"type"])
329 if not raw
and msgtypes.get(typename, typename)
is None:
continue
330 if typehashes.get(typename)
is None:
332 selector = (h
for t, n, h
in self.
_counts if (t, n) == (topic, typename))
333 typehash = typehashes[typename] =
next(selector,
None)
334 else: typehash = typehashes[typename]
337 cls = msgtypes.get(typename)
or \
339 if raw: msg = (typename, row[
"data"], typehash
or None, cls)
340 else: msg = rclpy.serialization.deserialize_message(row[
"data"], cls)
341 except Exception
as e:
342 reportfunc = ConsolePrinter.error
if self.
_stop_on_error else ConsolePrinter.warn
343 reportfunc(
"Error loading type %s in topic %s: %%s" % (typename, topic),
344 "message class not found." if cls
is None else e,
347 if raw: msg = (typename, row[
"data"], typehash
or None, msgtypes.get(typename))
348 elif set(n
for n, c
in msgtypes.items()
if c
is None) == topicset:
351 stamp = rclpy.time.Time(nanoseconds=row[
"timestamp"])
353 api.TypeMeta.make(msg, topic, self)
359 def write(self, topic, msg, t=None, raw=False, qoses=None, **__):
361 Writes a message to the bag.
363 @param topic name of topic
364 @param msg ROS2 message
365 @param t message timestamp if not using wall time, as ROS time or convertible
366 (int/float/duration/datetime/decimal)
367 @param qoses topic Quality-of-Service settings, if any, as a list of dicts
369 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
370 if "r" == self.
_mode:
raise io.UnsupportedOperation(
"write")
374 typename, binary, typehash = msg[:3]
379 topickey = (topic, typename)
380 cursor = self.
_db.cursor()
381 if topickey
not in self.
_topics:
383 sql =
"INSERT INTO topics (name, type, serialization_format, offered_qos_profiles) " \
384 "VALUES (?, ?, ?, ?)"
385 qoses = yaml.safe_dump(qoses)
if isinstance(qoses, list)
else ""
386 args = (topic, full_typename,
"cdr", qoses)
387 cursor.execute(sql, args)
388 tdata = {
"id": cursor.lastrowid,
"name": topic,
"type": full_typename,
389 "serialization_format":
"cdr",
"offered_qos_profiles": qoses}
392 timestamp = (time.time_ns()
if hasattr(time,
"time_ns")
else int(time.time() * 10**9)) \
394 sql =
"INSERT INTO messages (topic_id, timestamp, data) VALUES (?, ?, ?)"
395 args = (self.
_topics[topickey][
"id"], timestamp, binary)
396 cursor.execute(sql, args)
397 countkey = (topic, typename, typehash)
398 if self.
_counts.get(countkey, self)
is not None:
404 """Opens the bag file if not already open."""
409 """Closes the bag file."""
419 """Returns whether file is closed."""
425 """Returns the list of topics in bag, in alphabetic order."""
426 return sorted((t
for t, _, _
in self.
_topics), key=str.lower)
431 """Returns bag file path."""
437 """Returns current file size in bytes (including journaling files)."""
439 for suffix
in (
"-journal",
"-wal")
if result
else ():
441 result += os.path.getsize(path)
if os.path.isfile(path)
else 0
447 """Returns file open mode."""
452 """Returns whether bag contains given topic."""
453 return any(key == t
for t, _, _
in self.
_topics)
457 """Retrieves next message from bag as (topic, message, timestamp)."""
458 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
463 def _ensure_open(self, populate=False):
464 """Opens bag database if not open, populates schema if specified."""
467 if "r" == self.
_mode and not exists:
468 raise IOError(
"No such file: %r" % self.
_filename)
469 if "w" == self.
_mode and exists:
471 self.
_db = sqlite3.connect(self.
_filename, detect_types=sqlite3.PARSE_DECLTYPES,
472 isolation_level=
None, check_same_thread=
False)
473 self.
_db.row_factory =
lambda cursor, row: dict(sqlite3.Row(cursor, row))
478 def _ensure_topics(self):
479 """Populates local topic struct from database, if not already available."""
481 for row
in self.
_db.execute(
"SELECT * FROM topics ORDER BY id"):
482 topickey = (topic, typename) = row[
"name"],
canonical(row[
"type"])
483 self.
_topics[(topic, typename)] = row
484 self.
_counts[(topic, typename,
None)] =
None
487 def _ensure_counts(self):
488 """Populates local counts values from database, if not already available."""
489 if not self.
_db or all(v
is not None for v
in self.
_counts.values()) \
492 topickeys = {self.
_topics[(t, n)][
"id"]: (t, n, h)
for (t, n, h)
in self.
_counts}
494 for row
in self.
_db.execute(
"SELECT topic_id, COUNT(*) AS count FROM messages "
495 "GROUP BY topic_id").fetchall():
496 if row[
"topic_id"]
in topickeys:
497 self.
_counts[topickeys[row[
"topic_id"]]] = row[
"count"]
500 def _ensure_types(self, topics=None):
502 Populates local type definitions and classes from database, if not already available.
504 @param topics selected topics to ensure types for, if not all
506 if not self.
_db or (
not topics
and topics
is not None)
or not self.
_has_table(
"topics") \
507 or not any(h
is None for t, _, h
in self.
_counts if topics
is None or t
in topics):
510 for countkey, count
in list(self.
_counts.items()):
511 (topic, typename, typehash) = countkey
512 if typehash
is None and (topics
is None or topic
in topics):
515 self.
_counts[(topic, typename, typehash)] = count
518 def _has_table(self, name):
519 """Returns whether specified table exists in database."""
520 sql =
"SELECT 1 FROM sqlite_master WHERE type = ? AND name = ?"
521 return bool(self.
_db.execute(sql, (
"table", name)).fetchone())
526 """Returns whether file is recognizable as SQLite format."""
527 if os.path.isfile(f)
and os.path.getsize(f):
528 with open(f,
"rb")
as f:
531 ext = os.path.splitext(f
or "")[-1].lower()
532 result = ext
in BAG_EXTENSIONS
539 """Initializes a ROS2 node if not already initialized."""
540 global node, context, executor
545 while context
and context.ok():
546 executor.spin_once(timeout_sec=1)
548 context = rclpy.Context()
549 try: rclpy.init(context=context)
550 except Exception:
pass
551 node_name =
"%s_%s_%s" % (name, os.getpid(), int(time.time() * 1000))
552 node = rclpy.create_node(node_name, context=context, use_global_arguments=
False,
553 enable_rosout=
False, start_parameter_services=
False)
554 executor = rclpy.executors.MultiThreadedExecutor(context=context)
555 executor.add_node(node)
556 spinner = threading.Thread(target=spin_loop)
557 spinner.daemon =
True
562 """Shuts down live ROS2 node."""
563 global node, context, executor
565 context_, executor_ = context, executor
566 context = executor = node =
None
573 Returns whether ROS2 environment is set, prints error if not.
575 @param live whether environment must support launching a ROS node
577 missing = [k
for k
in [
"ROS_VERSION"]
if not os.getenv(k)]
579 ConsolePrinter.error(
"ROS environment not sourced: missing %s.",
580 ", ".join(sorted(missing)))
581 if "2" != os.getenv(
"ROS_VERSION",
"2"):
582 ConsolePrinter.error(
"ROS environment not supported: need ROS_VERSION=2.")
590 Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
592 Converts DDS types like "octet" to "byte", and "sequence<uint8, 100>" to "uint8[100]".
594 @param unbounded drop constraints like array and string bounds,
595 e.g. returning "uint8[]" for "uint8[10]" and "string" for "string<=8"
597 if not typename:
return typename
598 is_array, bound, dimension =
False,
"",
""
601 match = re.match(
"sequence<(.+)>", typename)
604 typename = match.group(1)
605 match = re.match(
r"([^,]+)?,\s?(\d+)", typename)
607 typename = match.group(1)
608 if match.lastindex > 1: dimension = match.group(2)
610 match = re.match(
"(w?string)<(.+)>", typename)
612 typename, bound = match.groups()
615 dimension = typename[typename.index(
"[") + 1:typename.index(
"]")]
616 typename, is_array = typename[:typename.index(
"[")],
True
619 typename, bound = typename.split(
"<=")
621 if typename.count(
"/") > 1:
622 typename =
"%s/%s" % tuple((x[0], x[-1])
for x
in [typename.split(
"/")])[0]
624 if unbounded: suffix =
"[]" if is_array
else ""
625 else: suffix = (
"<=%s" % bound
if bound
else "") + (
"[%s]" % dimension
if is_array
else "")
626 return DDS_TYPES.get(typename, typename) + suffix
630 """Returns an rclpy.Publisher instance, with .get_num_connections() and .unregister()."""
631 cls = cls_or_typename
633 qos = rclpy.qos.QoSProfile(depth=queue_size)
634 pub = node.create_publisher(cls, topic, qos)
635 pub.get_num_connections = pub.get_subscription_count
636 pub.unregister = pub.destroy
642 Returns an rclpy.Subscription.
644 Supplemented with .get_message_class(), .get_message_definition(),
645 .get_message_type_hash(), .get_qoses(), and.unregister().
647 cls = typename = cls_or_typename
651 qos = rclpy.qos.QoSProfile(depth=queue_size)
652 qoses = [x.qos_profile
for x
in node.get_publishers_info_by_topic(topic)
654 rels, durs = zip(*[(x.reliability, x.durability)
for x
in qoses])
if qoses
else ([], [])
656 if rels: qos.reliability = max(rels)
657 if durs: qos.durability = max(durs)
660 sub = node.create_subscription(cls, topic, handler, qos)
661 sub.get_message_class =
lambda: cls
664 sub.get_qoses =
lambda: qosdicts
665 sub.unregister = sub.destroy
671 Returns a message attribute value as string.
673 Result is at least 10 chars wide if message is a ROS2 time/duration
674 (aligning seconds and nanoseconds).
676 LENS = {
"sec": 13,
"nanosec": 9}
678 if not isinstance(msg, tuple(ROS_TIME_CLASSES))
or name
not in LENS:
681 EXTRA = sum(v.count(x) * len(x)
for x
in (MatchMarkers.START, MatchMarkers.END))
682 return (
"%%%ds" % (LENS[name] + EXTRA)) % v
687 """Returns ROS2 message class, or None if unknown type."""
689 except Exception:
return None
694 Returns ROS2 message type definition full text, including subtype definitions.
696 Returns None if unknown type.
698 typename = msg_or_type
if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
699 return _get_message_definition(
canonical(typename))
703 """Returns ROS2 message type MD5 hash, or "" if unknown type."""
704 typename = msg_or_type
if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
705 return _get_message_type_hash(
canonical(typename))
709def _get_message_definition(typename):
710 """Returns ROS2 message type definition full text, or None on error (internal caching method)."""
712 texts, pkg = collections.OrderedDict(), typename.rsplit(
"/", 1)[0]
714 typepath = rosidl_runtime_py.get_interface_path(
make_full_typename(typename) +
".msg")
715 with open(typepath, encoding=
"utf-8")
as f:
716 texts[typename] = f.read()
719 for line
in texts[typename].splitlines():
721 if not line
or not line[0].isalpha():
723 linetype =
scalar(
canonical(re.sub(
r"^([a-zA-Z][^\s]+)(.+)",
r"\1", line)))
724 if linetype
in api.ROS_BUILTIN_TYPES:
726 linetype = linetype
if "/" in linetype
else "std_msgs/Header" \
727 if "Header" == linetype
else "%s/%s" % (pkg, linetype)
729 if linedef: texts[linetype] = linedef
731 basedef = texts.pop(next(iter(texts)))
732 subdefs = [
"%s\nMSG: %s\n%s" % (
"=" * 80, k, v)
for k, v
in texts.items()]
733 return basedef + (
"\n" if subdefs
else "") +
"\n".join(subdefs)
734 except Exception
as e:
735 ConsolePrinter.warn(
"Error collecting type definition of %s: %s", typename, e)
742 Returns ROS2 message type definition parsed from IDL file.
747 def format_comment(text):
748 """Returns annotation text formatted with comment prefixes and escapes."""
749 ESCAPES = {
"\n":
"\\n",
"\t":
"\\t",
"\x07":
"\\a",
750 "\x08":
"\\b",
"\x0b":
"\\v",
"\x0c":
"\\f"}
751 repl =
lambda m: ESCAPES[m.group(0)]
752 return "#" +
"\n#".join(re.sub(
"|".join(map(re.escape, ESCAPES)), repl, l)
753 for l
in text.split(
"\\n"))
755 def format_type(typeobj, msgpackage, constant=False):
756 """Returns canonical type name, like "uint8" or "string<=5" or "nav_msgs/Path"."""
758 if isinstance(typeobj, rosidl_parser.definition.AbstractNestedType):
760 valuetype = format_type(typeobj.value_type, msgpackage, constant)
761 size, bounding =
"",
""
762 if isinstance(typeobj, rosidl_parser.definition.Array):
764 elif typeobj.has_maximum_size():
765 size = typeobj.maximum_size
766 if isinstance(typeobj, rosidl_parser.definition.BoundedSequence):
768 result =
"%s[%s%s]" % (valuetype, bounding, size)
769 elif isinstance(typeobj, rosidl_parser.definition.AbstractWString):
771 elif isinstance(typeobj, rosidl_parser.definition.AbstractString):
773 elif isinstance(typeobj, rosidl_parser.definition.NamespacedType):
774 nameparts = typeobj.namespaced_name()
776 if nameparts[0].value == msgpackage
or "std_msgs/Header" == result:
777 result =
canonical(
"/".join(nameparts[-1:]))
779 result = DDS_TYPES.get(typeobj.typename, typeobj.typename)
781 if isinstance(typeobj, rosidl_parser.definition.AbstractGenericString) \
782 and typeobj.has_maximum_size()
and not constant:
783 result +=
"<=%s" % typeobj.maximum_size
787 def get_comments(obj):
788 """Returns all comments for annotatable object, as [text, ]."""
789 return [v.get(
"text",
"")
for v
in obj.get_annotation_values(
"verbatim")
790 if "comment" == v.get(
"language")]
792 typepath = rosidl_runtime_py.get_interface_path(
make_full_typename(typename) +
".idl")
793 with open(typepath, encoding=
"utf-8")
as f:
794 idlcontent = rosidl_parser.parser.parse_idl_string(f.read())
795 msgidl = idlcontent.get_elements_of_type(rosidl_parser.definition.Message)[0]
796 package = msgidl.structure.namespaced_type.namespaces[0]
797 DUMMY = rosidl_parser.definition.EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME
801 lines.extend(map(format_comment, get_comments(msgidl.structure)))
803 if lines
and msgidl.constants: lines.append(
"")
805 for c
in msgidl.constants:
806 ctype = format_type(c.type, package, constant=
True)
807 lines.extend(map(format_comment, get_comments(c)))
808 lines.append(
"%s %s=%s" % (ctype, c.name, c.value))
810 if not (len(msgidl.structure.members) == 1
and DUMMY == msgidl.structure[0].name):
812 if msgidl.constants
and msgidl.structure.members: lines.append(
"")
814 for m
in msgidl.structure.members:
815 lines.extend(map(format_comment, get_comments(m)))
816 lines.append(
"%s %s" % (format_type(m.type, package), m.name))
817 return "\n".join(lines)
821def _get_message_type_hash(typename):
822 """Returns ROS2 message type MD5 hash (internal caching method)."""
824 return "" if msgdef
is None else api.calculate_definition_hash(typename, msgdef)
828 """Returns OrderedDict({field name: field type name}) if ROS2 message, else {}."""
830 fields = ((k,
canonical(v))
for k, v
in val.get_fields_and_field_types().items())
831 return collections.OrderedDict(fields)
835 """Returns ROS2 message type name, like "std_msgs/Header"."""
836 cls = msg_or_cls
if inspect.isclass(msg_or_cls)
else type(msg_or_cls)
837 return canonical(
"%s/%s" % (cls.__module__.split(
".")[0], cls.__name__))
842 Returns object attribute value, with numeric arrays converted to lists.
844 @param name message attribute name
845 @param typename value ROS type name, for identifying byte arrays
846 @param default value to return if attribute does not exist; raises exception otherwise
848 if default
is not Ellipsis
and not hasattr(msg, name):
return default
849 v, scalartype = getattr(msg, name),
scalar(typename)
850 if isinstance(v, (bytes, array.array)): v = list(v)
851 elif numpy
and isinstance(v, (numpy.generic, numpy.ndarray)):
853 if v
and isinstance(v, (list, tuple))
and scalartype
in (
"byte",
"uint8"):
854 if isinstance(v[0], bytes):
855 v = list(map(ord, v))
856 elif scalartype == typename:
858 return list(v)
if isinstance(v, tuple)
else v
863 Returns current ROS2 time, as rclpy.time.Time.
865 @param fallback use wall time if node not initialized
867 try:
return node.get_clock().now()
869 if fallback:
return make_time(time.time())
875 Returns currently available ROS2 topics, as [(topicname, typename)].
877 Omits topics that the current ROS2 node itself has published.
880 myname, myns = node.get_name(), node.get_namespace()
882 for topic, typenames
in node.get_publisher_names_and_types_by_node(myname, myns):
883 mytypes.setdefault(topic, []).extend(typenames)
884 for t
in (
"/parameter_events",
"/rosout"):
886 for topic, typenames
in node.get_topic_names_and_types():
887 for typename
in typenames:
888 if topic
not in mytypes
or typename
not in mytypes[topic]:
895 Returns whether value is a ROS2 message or special like ROS2 time/duration class or instance.
897 @param ignore_time whether to ignore ROS2 time/duration types
899 is_message = rosidl_runtime_py.utilities.is_message(val)
900 if is_message
and ignore_time: is_message =
not is_ros_time(val)
905 """Returns whether value is a ROS2 time/duration class or instance."""
906 if inspect.isclass(val):
return issubclass(val, tuple(ROS_TIME_CLASSES))
907 return isinstance(val, tuple(ROS_TIME_CLASSES))
911 """Returns an rclpy.duration.Duration."""
912 return rclpy.duration.Duration(seconds=secs, nanoseconds=nsecs)
916 """Returns a ROS2 time, as rclpy.time.Time."""
917 return rclpy.time.Time(seconds=secs, nanoseconds=nsecs)
921 """Returns "pkg/msg/Type" for "pkg/Type"."""
922 if "/msg/" in typename
or "/" not in typename:
924 return "%s/msg/%s" % tuple((x[0], x[-1])
for x
in [typename.split(
"/")])[0]
929 Returns rclpy.qos.QoSProfile that matches the most permissive publisher.
931 @param queue_size QoSProfile.depth
933 qos = rclpy.qos.QoSProfile(depth=queue_size)
934 infos = node.get_publishers_info_by_topic(topic)
935 rels, durs = zip(*[(x.qos_profile.reliability, x.qos_profile.durability)
936 for x
in infos
if canonical(x.topic_type) == typename])
938 if rels: qos.reliability = max(rels)
939 if durs: qos.durability = max(durs)
944 """Returns rclpy.qos.QoSProfile as dictionary."""
947 QOS_TYPES = (bool, int, enum.Enum) + tuple(ROS_TIME_CLASSES)
948 for name
in (n
for n
in dir(qos)
if not n.startswith(
"_")):
949 val = getattr(qos, name)
950 if name.startswith(
"_")
or not isinstance(val, QOS_TYPES):
952 if isinstance(val, enum.Enum):
954 elif isinstance(val, tuple(ROS_TIME_CLASSES)):
961 """Returns ROS2 message as a serialized binary."""
962 with api.TypeMeta.make(msg)
as m:
963 if m.data
is not None:
return m.data
964 return rclpy.serialization.serialize_message(msg)
968 """Returns ROS2 message or service request/response instantiated from serialized binary."""
969 cls = cls_or_typename
971 return rclpy.serialization.deserialize_message(raw, cls)
977 Returns unbounded scalar type from ROS2 message data type
979 Like "uint8" from "uint8[]", or "string" from "string<=10[<=5]".
980 Returns type unchanged if not a collection or bounded type.
982 if typename
and "[" in typename: typename = typename[:typename.index(
"[")]
983 if typename
and "<=" in typename: typename = typename[:typename.index(
"<=")]
988 """Sets message or object attribute value."""
991 fieldmap = obj.get_fields_and_field_types()
993 name = obj.__slots__[list(fieldmap).index(name)]
994 setattr(obj, name, value)
999 Converts ROS2 time/duration between `rclpy` and `builtin_interfaces` objects.
1001 @param val ROS2 time/duration object from `rclpy` or `builtin_interfaces`
1002 @param to_message whether to convert from `rclpy` to `builtin_interfaces` or vice versa
1003 @param clock_type ClockType for converting to `rclpy.Time`, defaults to `ROS_TIME`
1004 @return value converted to appropriate type, or original value if not convertible
1006 to_message, clock_type = bool(to_message), (clock_type
or rclpy.clock.ClockType.ROS_TIME)
1007 if isinstance(val, tuple(ROS_TIME_CLASSES)):
1008 rcl_cls = next(k
for k, v
in ROS_TIME_MESSAGES.items()
if isinstance(val, (k, v)))
1009 is_rcl = isinstance(val, tuple(ROS_TIME_MESSAGES))
1010 name =
"to_msg" if to_message
and is_rcl
else "from_msg" if to_message == is_rcl
else None
1011 args = [val] + ([clock_type]
if rcl_cls
is rclpy.time.Time
and "from_msg" == name
else [])
1012 return getattr(rcl_cls, name)(*args)
if name
else val
1018 Returns value as ROS2 duration if convertible (int/float/time/datetime/decimal), else value.
1020 Convertible types: int/float/time/datetime/decimal/builtin_interfaces.Time.
1023 if isinstance(val, decimal.Decimal):
1025 elif isinstance(val, datetime.datetime):
1026 result =
make_duration(int(val.timestamp()), 1000 * val.microsecond)
1027 elif isinstance(val, (float, int)):
1029 elif isinstance(val, rclpy.time.Time):
1031 elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1037 """Returns value in nanoseconds if value is ROS2 time/duration, else value."""
1038 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1040 if hasattr(val,
"nanoseconds"):
1041 return val.nanoseconds
1042 return val.sec * 10**9 + val.nanosec
1046 """Returns value in seconds if value is ROS2 time/duration, else value."""
1047 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1049 if hasattr(val,
"nanoseconds"):
1050 secs, nsecs = divmod(val.nanoseconds, 10**9)
1051 return secs + nsecs / 1E9
1052 return val.sec + val.nanosec / 1E9
1056 """Returns value as (seconds, nanoseconds) if value is ROS2 time/duration, else value."""
1057 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1059 if hasattr(val,
"seconds_nanoseconds"):
1060 return val.seconds_nanoseconds()
1061 if hasattr(val,
"nanoseconds"):
1062 return divmod(val.nanoseconds, 10**9)
1063 return (val.sec, val.nanosec)
1068 Returns value as ROS2 time if convertible, else value.
1070 Convertible types: int/float/duration/datetime/decimal/builtin_interfaces.Time.
1073 if isinstance(val, decimal.Decimal):
1074 result =
make_time(int(val), float(val % 1) * 10**9)
1075 elif isinstance(val, datetime.datetime):
1076 result =
make_time(int(val.timestamp()), 1000 * val.microsecond)
1077 elif isinstance(val, (float, int)):
1079 elif isinstance(val, rclpy.duration.Duration):
1080 result =
make_time(nsecs=val.nanoseconds)
1081 elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1082 result =
make_time(val.sec, val.nanosec)
1087 "BAG_EXTENSIONS",
"DDS_TYPES",
"ROS_ALIAS_TYPES",
"ROS_TIME_CLASSES",
"ROS_TIME_MESSAGES",
1088 "ROS_TIME_TYPES",
"SKIP_EXTENSIONS",
"Bag",
"ROS2Bag",
"context",
"executor",
"node",
1089 "canonical",
"create_publisher",
"create_subscriber",
"deserialize_message",
1090 "format_message_value",
"get_message_class",
"get_message_definition",
1091 "get_message_definition_idl",
"get_message_fields",
"get_message_type",
1092 "get_message_type_hash",
"get_message_value",
"get_rostime",
"get_topic_types",
"init_node",
1093 "is_ros_message",
"is_ros_time",
"make_duration",
"make_full_typename",
"make_subscriber_qos",
1094 "make_time",
"qos_to_dict",
"scalar",
"serialize_message",
"set_message_value",
"shutdown_node",
1095 "time_message",
"to_duration",
"to_nsec",
"to_sec",
"to_sec_nsec",
"to_time",
"validate",
tuple MODES
Supported opening modes, overridden in subclasses.
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
get_qoses(self, topic, typename)
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
get_topic_info(self, counts=True)
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
get_type_and_topic_info(self, topic_filters=None)
write(self, topic, msg, t=None, raw=False, qoses=None, **__)
get_message_class(self, typename, typehash=None)
get_qoses(self, topic, typename)
get_message_type_hash(self, msg_or_type)
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
str SQLITE_MAGIC
SQLite file header magic start bytes.
__init__(self, filename, mode="a", *_, **__)
get_topic_info(self, counts=True, ensure_types=True)
get_message_definition(self, msg_or_type)
get_message_count(self, topic_filters=None)
str CREATE_SQL
ROS2 bag SQLite schema.
time_message(val, to_message=True, clock_type=None)
make_full_typename(typename)
canonical(typename, unbounded=False)
make_subscriber_qos(topic, typename, queue_size=10)
create_publisher(topic, cls_or_typename, queue_size)
format_message_value(msg, name, value)
get_message_value(msg, name, typename=None, default=Ellipsis)
get_message_class(typename)
make_time(secs=0, nsecs=0)
deserialize_message(raw, cls_or_typename)
get_message_type(msg_or_cls)
get_message_definition(msg_or_type)
get_message_definition_idl(typename)
make_duration(secs=0, nsecs=0)
get_rostime(fallback=False)
set_message_value(obj, name, value)
create_subscriber(topic, cls_or_typename, handler, queue_size)
get_message_type_hash(msg_or_type)
is_ros_message(val, ignore_time=False)