5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.ros2
28import builtin_interfaces.msg
30except Exception: numpy = None
35import rclpy.serialization
37import rosidl_parser.parser
38import rosidl_parser.definition
39import rosidl_runtime_py.utilities
43from . common import PATH_TYPES, ConsolePrinter, MatchMarkers, memoize
46## Bagfile extensions to seek
47BAG_EXTENSIONS = (".db3", )
53ROS_TIME_TYPES = [
"builtin_interfaces/Time",
"builtin_interfaces/Duration"]
56ROS_TIME_CLASSES = {rclpy.time.Time:
"builtin_interfaces/Time",
57 builtin_interfaces.msg.Time:
"builtin_interfaces/Time",
58 rclpy.duration.Duration:
"builtin_interfaces/Duration",
59 builtin_interfaces.msg.Duration:
"builtin_interfaces/Duration"}
62ROS_TIME_MESSAGES = {rclpy.time.Time: builtin_interfaces.msg.Time,
63 rclpy.duration.Duration: builtin_interfaces.msg.Duration}
66ROS_ALIAS_TYPES = {
"byte":
"uint8",
"char":
"int8"}
69DDS_TYPES = {
"boolean":
"bool",
74 "unsigned short":
"uint16",
76 "unsigned long":
"uint32",
78 "unsigned long long":
"uint64", }
92 """ROS2 bag reader and writer (SQLite format), providing most of rosbag.Bag interface."""
99CREATE TABLE IF NOT EXISTS messages (
100 id INTEGER PRIMARY KEY,
101 topic_id INTEGER NOT NULL,
102 timestamp INTEGER NOT NULL,
106CREATE TABLE IF NOT EXISTS topics (
107 id INTEGER PRIMARY KEY,
110 serialization_format TEXT NOT NULL,
111 offered_qos_profiles TEXT NOT NULL
114CREATE INDEX IF NOT EXISTS timestamp_idx ON messages (timestamp ASC);
116PRAGMA journal_mode=WAL;
117PRAGMA synchronous=NORMAL;
121 SQLITE_MAGIC = b"SQLite format 3\x00"
124 def __init__(self, filename, mode="a", *_, **__):
126 @param filename bag file path to open
127 @param mode file will be overwritten
if "w"
129 if not isinstance(filename, PATH_TYPES):
130 raise ValueError(
"invalid filename %r" % type(filename))
131 if mode
not in self.
MODES:
raise ValueError(
"invalid mode %r" % mode)
148 Returns the number of messages in the bag.
150 @param topic_filters list of topics
or a single topic to filter by,
if any
153 sql, where =
"SELECT COUNT(*) AS count FROM messages",
""
156 topics = topic_filters
157 topics = topics
if isinstance(topics, (dict, list, set, tuple))
else [topics]
158 topic_ids = [x[
"id"]
for (topic, _), x
in self.
_topics.items()
if topic
in topics]
159 where =
" WHERE topic_id IN (%s)" %
", ".join(map(str, topic_ids))
160 return self.
_db.execute(sql + where).fetchone()[
"count"]
165 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
167 row = self.
_db.execute(
"SELECT MIN(timestamp) AS val FROM messages").fetchone()
168 if row[
"val"]
is None:
return None
169 secs, nsecs = divmod(row[
"val"], 10**9)
170 return secs + nsecs / 1E9
175 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
177 row = self.
_db.execute(
"SELECT MAX(timestamp) AS val FROM messages").fetchone()
178 if row[
"val"]
is None:
return None
179 secs, nsecs = divmod(row[
"val"], 10**9)
180 return secs + nsecs / 1E9
186 Returns topic and message type metainfo
as {(topic, typename, typehash): count}.
188 Can skip retrieving message counts,
as this requires a full table scan.
189 Can skip looking up message type classes,
as those might be unavailable
in ROS2 environment.
191 @param counts whether to
return actual message counts instead of
None
192 @param ensure_types whether to look up type classes instead of returning typehash
as None
202 Returns thorough metainfo on topic and message types.
204 @param topic_filters list of topics
or a single topic to filter returned topics-dict by,
207 msg_types
as dict of {typename: typehash},
208 topics
as a dict of {topic:
TopicTuple() namedtuple}.
210 topics = topic_filters
211 topics = topics if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
212 if self.
_ttinfo and (
not topics
or set(topics) == set(t
for t, _, _
in self.
_counts)):
217 msgtypes = {n: h
for t, n, h
in counts}
221 """Returns median value from given sorted numbers."""
223 return None if not vlen
else vals[vlen // 2]
if vlen % 2
else \
224 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
226 for (t, n, _), c
in sorted(counts.items(), key=
lambda x: x[0][:2]):
227 if topics
and t
not in topics:
continue
230 args = (self.
_topics[(t, n)][
"id"], )
231 cursor = self.
_db.execute(
"SELECT timestamp FROM messages WHERE topic_id = ?", args)
232 stamps = sorted(x[
"timestamp"] / 1E9
for x
in cursor)
233 mymedian = median(sorted(s1 - s0
for s1, s0
in zip(stamps[1:], stamps[:-1])))
234 freq = 1.0 / mymedian
if mymedian
else None
237 if not topics
or set(topics) == set(t
for t, _, _
in self.
_counts): self.
_ttinfo = result
242 """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available."""
243 topickey = (topic, typename)
245 topicrow = self.
_topics[topickey]
247 if topicrow.get(
"offered_qos_profiles"):
248 self.
_qoses[topickey] = yaml.safe_load(topicrow[
"offered_qos_profiles"])
249 except Exception
as e:
250 ConsolePrinter.warn(
"Error parsing quality of service for topic %r: %r", topic, e)
251 self.
_qoses.setdefault(topickey,
None)
252 return self.
_qoses[topickey]
256 """Returns ROS2 message type class, or None if unknown message type for bag."""
258 if any(n == typename
for _, n,
in self.
_topics)
and typehash
is not None \
259 and not any((n, h) == (typename, typehash)
for _, n, h
in self.
_counts):
261 if any((typename, typehash)
in [(n, h), (n,
None)]
for _, n, h
in self.
_counts):
268 Returns ROS2 message type definition full text, including subtype definitions.
270 Returns None if unknown message type
for bag.
273 typename = msg_or_type if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
274 if any(n == typename
for _, n, _
in self.
_counts):
280 """Returns ROS2 message type MD5 hash, or None if unknown message type for bag."""
281 typename = msg_or_type
if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
283 return next((h
for _, n, h
in self.
_counts if n == typename),
None)
286 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__):
288 Yields messages from the bag, optionally filtered by topic
and timestamp.
290 @param topics list of topics
or a single topic to filter by,
if any
291 @param start_time earliest timestamp of message to
return,
as ROS time
or convertible
292 (int/float/duration/datetime/decimal)
293 @param end_time latest timestamp of message to
return,
as ROS time
or convertible
294 (int/float/duration/datetime/decimal)
295 @param raw
if True, then returned messages are tuples of
296 (typename, bytes, typehash, typeclass).
297 If message type unavailable, returns
None for hash
and class.
298 @return BagMessage namedtuples of
299 (topic, message, timestamp
as rclpy.time.Time)
302 if "w" == self.
_mode:
raise io.UnsupportedOperation(
"read")
305 if not self.
_topics or (topics
is not None and not topics):
308 sql, exprs, args =
"SELECT * FROM messages", [], ()
310 topics = topics
if isinstance(topics, (list, tuple))
else [topics]
311 topic_ids = [x[
"id"]
for (topic, _), x
in self.
_topics.items()
if topic
in topics]
312 exprs += [
"topic_id IN (%s)" %
", ".join(map(str, topic_ids))]
313 if start_time
is not None:
314 exprs += [
"timestamp >= ?"]
316 if end_time
is not None:
317 exprs += [
"timestamp <= ?"]
319 sql += ((
" WHERE " +
" AND ".join(exprs))
if exprs
else "")
320 sql +=
" ORDER BY timestamp"
322 topicmap = {v[
"id"]: v
for v
in self.
_topics.values()}
324 topicset = set(topics
or [t
for t, _
in self.
_topics])
325 typehashes = {n: h
for _, n, h
in self.
_counts}
326 for row
in self.
_db.execute(sql, args):
327 tdata = topicmap[row[
"topic_id"]]
328 topic, typename = tdata[
"name"],
canonical(tdata[
"type"])
329 if not raw
and msgtypes.get(typename, typename)
is None:
continue
330 if typehashes.get(typename)
is None:
332 selector = (h
for t, n, h
in self.
_counts if (t, n) == (topic, typename))
333 typehash = typehashes[typename] =
next(selector,
None)
334 else: typehash = typehashes[typename]
337 cls = msgtypes.get(typename)
or \
339 if raw: msg = (typename, row[
"data"], typehash
or None, cls)
340 else: msg = rclpy.serialization.deserialize_message(row[
"data"], cls)
341 except Exception
as e:
342 reportfunc = ConsolePrinter.error
if self.
_stop_on_error else ConsolePrinter.warn
343 reportfunc(
"Error loading type %s in topic %s: %%s" % (typename, topic),
344 "message class not found." if cls
is None else e,
347 if raw: msg = (typename, row[
"data"], typehash
or None, msgtypes.get(typename))
348 elif set(n
for n, c
in msgtypes.items()
if c
is None) == topicset:
351 stamp = rclpy.time.Time(nanoseconds=row[
"timestamp"])
353 api.TypeMeta.make(msg, topic, self)
359 def write(self, topic, msg, t=None, raw=False, qoses=None, **__):
361 Writes a message to the bag.
363 @param topic name of topic
364 @param msg ROS2 message
365 @param t message timestamp
if not using wall time,
as ROS time
or convertible
366 (int/float/duration/datetime/decimal)
367 @param qoses topic Quality-of-Service settings,
if any,
as a list of dicts
370 if "r" == self.
_mode:
raise io.UnsupportedOperation(
"write")
374 typename, binary, typehash = msg[:3]
379 topickey = (topic, typename)
380 cursor = self.
_db.cursor()
381 if topickey
not in self.
_topics:
383 sql =
"INSERT INTO topics (name, type, serialization_format, offered_qos_profiles) " \
384 "VALUES (?, ?, ?, ?)"
385 qoses = yaml.safe_dump(qoses)
if isinstance(qoses, list)
else ""
386 args = (topic, full_typename,
"cdr", qoses)
387 cursor.execute(sql, args)
388 tdata = {
"id": cursor.lastrowid,
"name": topic,
"type": full_typename,
389 "serialization_format":
"cdr",
"offered_qos_profiles": qoses}
392 timestamp = (time.time_ns()
if hasattr(time,
"time_ns")
else int(time.time() * 10**9)) \
394 sql =
"INSERT INTO messages (topic_id, timestamp, data) VALUES (?, ?, ?)"
395 args = (self.
_topics[topickey][
"id"], timestamp, binary)
396 cursor.execute(sql, args)
397 countkey = (topic, typename, typehash)
398 if self.
_counts.get(countkey, self)
is not None:
404 """Opens the bag file if not already open."""
409 """Closes the bag file."""
419 """Returns whether file is closed."""
425 """Returns the list of topics in bag, in alphabetic order."""
426 return sorted((t
for t, _, _
in self.
_topics), key=str.lower)
431 """Returns bag file path."""
437 """Returns current file size in bytes (including journaling files)."""
439 for suffix
in (
"-journal",
"-wal")
if result
else ():
441 result += os.path.getsize(path)
if os.path.isfile(path)
else 0
447 """Returns file open mode."""
452 """Returns whether bag contains given topic."""
453 return any(key == t
for t, _, _
in self.
_topics)
457 """Retrieves next message from bag as (topic, message, timestamp)."""
463 def _ensure_open(self, populate=False):
464 """Opens bag database if not open, populates schema if specified."""
467 if "r" == self.
_mode and not exists:
468 raise IOError(
"No such file: %r" % self.
_filename)
469 if "w" == self.
_mode and exists:
471 self.
_db = sqlite3.connect(self.
_filename, detect_types=sqlite3.PARSE_DECLTYPES,
472 isolation_level=
None, check_same_thread=
False)
473 self.
_db.row_factory =
lambda cursor, row: dict(sqlite3.Row(cursor, row))
478 def _ensure_topics(self):
479 """Populates local topic struct from database, if not already available."""
481 for row
in self.
_db.execute(
"SELECT * FROM topics ORDER BY id"):
482 topickey = (topic, typename) = row[
"name"],
canonical(row[
"type"])
483 self.
_topics[(topic, typename)] = row
484 self.
_counts[(topic, typename,
None)] =
None
487 def _ensure_counts(self):
488 """Populates local counts values from database, if not already available."""
489 if not self.
_db or all(v
is not None for v
in self.
_counts.values()) \
492 topickeys = {self.
_topics[(t, n)][
"id"]: (t, n, h)
for (t, n, h)
in self.
_counts}
494 for row
in self.
_db.execute(
"SELECT topic_id, COUNT(*) AS count FROM messages "
495 "GROUP BY topic_id").fetchall():
496 if row[
"topic_id"]
in topickeys:
497 self.
_counts[topickeys[row[
"topic_id"]]] = row[
"count"]
500 def _ensure_types(self, topics=None):
502 Populates local type definitions and classes
from database,
if not already available.
504 @param topics selected topics to ensure types
for,
if not all
506 if not self.
_db or (
not topics
and topics
is not None)
or not self.
_has_table(
"topics") \
507 or not any(h
is None for t, _, h
in self.
_counts if topics
is None or t
in topics):
510 for countkey, count
in list(self.
_counts.items()):
511 (topic, typename, typehash) = countkey
512 if typehash
is None and (topics
is None or topic
in topics):
515 self.
_counts[(topic, typename, typehash)] = count
518 def _has_table(self, name):
519 """Returns whether specified table exists in database."""
520 sql =
"SELECT 1 FROM sqlite_master WHERE type = ? AND name = ?"
521 return bool(self.
_db.execute(sql, (
"table", name)).fetchone())
526 """Returns whether file is recognizable as SQLite format."""
527 if os.path.isfile(f)
and os.path.getsize(f):
528 with open(f,
"rb")
as f:
531 ext = os.path.splitext(f
or "")[-1].lower()
532 result = ext
in BAG_EXTENSIONS
539 """Initializes a ROS2 node if not already initialized."""
540 global node, context, executor
545 while context
and context.ok():
546 executor.spin_once(timeout_sec=1)
548 context = rclpy.Context()
549 try: rclpy.init(context=context)
550 except Exception:
pass
551 node_name =
"%s_%s_%s" % (name, os.getpid(), int(time.time() * 1000))
552 node = rclpy.create_node(node_name, context=context, use_global_arguments=
False,
553 enable_rosout=
False, start_parameter_services=
False)
554 executor = rclpy.executors.MultiThreadedExecutor(context=context)
555 executor.add_node(node)
556 spinner = threading.Thread(target=spin_loop)
557 spinner.daemon =
True
562 """Shuts down live ROS2 node."""
563 global node, context, executor
565 context_, executor_ = context, executor
566 context = executor = node =
None
573 Returns whether ROS2 environment is set, prints error
if not.
575 @param live whether environment must support launching a ROS node
577 missing = [k for k
in [
"ROS_VERSION"]
if not os.getenv(k)]
579 ConsolePrinter.error(
"ROS environment not sourced: missing %s.",
580 ", ".join(sorted(missing)))
581 if "2" != os.getenv(
"ROS_VERSION",
"2"):
582 ConsolePrinter.error(
"ROS environment not supported: need ROS_VERSION=2.")
590 Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
592 Converts DDS types like
"octet" to
"byte",
and "sequence<uint8, 100>" to
"uint8[100]".
594 @param unbounded drop constraints like array
and string bounds,
595 e.g. returning
"uint8[]" for "uint8[10]" and "string" for "string<=8"
597 if not typename:
return typename
598 is_array, bound, dimension =
False,
"",
""
601 match = re.match(
"sequence<(.+)>", typename)
604 typename = match.group(1)
605 match = re.match(
r"([^,]+)?,\s?(\d+)", typename)
607 typename = match.group(1)
608 if match.lastindex > 1: dimension = match.group(2)
610 match = re.match(
"(w?string)<(.+)>", typename)
612 typename, bound = match.groups()
615 dimension = typename[typename.index(
"[") + 1:typename.index(
"]")]
616 typename, is_array = typename[:typename.index(
"[")],
True
619 typename, bound = typename.split(
"<=")
621 if typename.count(
"/") > 1:
622 typename =
"%s/%s" % tuple((x[0], x[-1])
for x
in [typename.split(
"/")])[0]
624 if unbounded: suffix =
"[]" if is_array
else ""
625 else: suffix = (
"<=%s" % bound
if bound
else "") + (
"[%s]" % dimension
if is_array
else "")
626 return DDS_TYPES.get(typename, typename) + suffix
630 """Returns an rclpy.Publisher instance, with .get_num_connections() and .unregister()."""
631 cls = cls_or_typename
633 qos = rclpy.qos.QoSProfile(depth=queue_size)
634 pub = node.create_publisher(cls, topic, qos)
635 pub.get_num_connections = pub.get_subscription_count
636 pub.unregister = pub.destroy
642 Returns an rclpy.Subscription.
647 cls = typename = cls_or_typename
651 qos = rclpy.qos.QoSProfile(depth=queue_size)
652 qoses = [x.qos_profile
for x
in node.get_publishers_info_by_topic(topic)
654 rels, durs = zip(*[(x.reliability, x.durability)
for x
in qoses])
if qoses
else ([], [])
656 if rels: qos.reliability = max(rels)
657 if durs: qos.durability = max(durs)
660 sub = node.create_subscription(cls, topic, handler, qos)
661 sub.get_message_class =
lambda: cls
664 sub.get_qoses =
lambda: qosdicts
665 sub.unregister = sub.destroy
671 Returns a message attribute value as string.
673 Result
is at least 10 chars wide
if message
is a ROS2 time/duration
674 (aligning seconds
and nanoseconds).
676 LENS = {"sec": 13,
"nanosec": 9}
678 if not isinstance(msg, tuple(ROS_TIME_CLASSES))
or name
not in LENS:
681 EXTRA = sum(v.count(x) * len(x)
for x
in (MatchMarkers.START, MatchMarkers.END))
682 return (
"%%%ds" % (LENS[name] + EXTRA)) % v
687 """Returns ROS2 message class, or None if unknown type."""
689 except Exception:
return None
694 Returns ROS2 message type definition full text, including subtype definitions.
696 Returns None if unknown type.
698 typename = msg_or_type if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
703 """Returns ROS2 message type MD5 hash, or "" if unknown type."""
704 typename = msg_or_type
if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
705 return _get_message_type_hash(
canonical(typename))
709def _get_message_definition(typename):
710 """Returns ROS2 message type definition full text, or None on error (internal caching method)."""
712 texts, pkg = collections.OrderedDict(), typename.rsplit(
"/", 1)[0]
714 typepath = rosidl_runtime_py.get_interface_path(
make_full_typename(typename) +
".msg")
715 with open(typepath, encoding=
"utf-8")
as f:
716 texts[typename] = f.read()
719 for line
in texts[typename].splitlines():
720 if not line
or not line[0].isalpha():
722 linetype =
scalar(
canonical(re.sub(
r"^([a-zA-Z][^\s]+)(.+)",
r"\1", line)))
723 if linetype
in api.ROS_BUILTIN_TYPES:
725 linetype = linetype
if "/" in linetype
else "std_msgs/Header" \
726 if "Header" == linetype
else "%s/%s" % (pkg, linetype)
728 if linedef: texts[linetype] = linedef
730 basedef = texts.pop(next(iter(texts)))
731 subdefs = [
"%s\nMSG: %s\n%s" % (
"=" * 80, k, v)
for k, v
in texts.items()]
732 return basedef + (
"\n" if subdefs
else "") +
"\n".join(subdefs)
733 except Exception
as e:
734 ConsolePrinter.warn(
"Error collecting type definition of %s: %s", typename, e)
741 Returns ROS2 message type definition parsed from IDL file.
746 def format_comment(text):
747 """Returns annotation text formatted with comment prefixes and escapes."""
748 ESCAPES = {
"\n":
"\\n",
"\t":
"\\t",
"\x07":
"\\a",
749 "\x08":
"\\b",
"\x0b":
"\\v",
"\x0c":
"\\f"}
750 repl =
lambda m: ESCAPES[m.group(0)]
751 return "#" +
"\n#".join(re.sub(
"|".join(map(re.escape, ESCAPES)), repl, l)
752 for l
in text.split(
"\\n"))
754 def format_type(typeobj, msgpackage, constant=False):
755 """Returns canonical type name, like "uint8" or "string<=5" or "nav_msgs/Path"."""
757 if isinstance(typeobj, rosidl_parser.definition.AbstractNestedType):
759 valuetype = format_type(typeobj.value_type, msgpackage, constant)
760 size, bounding =
"",
""
761 if isinstance(typeobj, rosidl_parser.definition.Array):
763 elif typeobj.has_maximum_size():
764 size = typeobj.maximum_size
765 if isinstance(typeobj, rosidl_parser.definition.BoundedSequence):
767 result =
"%s[%s%s]" % (valuetype, bounding, size)
768 elif isinstance(typeobj, rosidl_parser.definition.AbstractWString):
770 elif isinstance(typeobj, rosidl_parser.definition.AbstractString):
772 elif isinstance(typeobj, rosidl_parser.definition.NamespacedType):
773 nameparts = typeobj.namespaced_name()
775 if nameparts[0].value == msgpackage
or "std_msgs/Header" == result:
776 result =
canonical(
"/".join(nameparts[-1:]))
778 result = DDS_TYPES.get(typeobj.typename, typeobj.typename)
780 if isinstance(typeobj, rosidl_parser.definition.AbstractGenericString) \
781 and typeobj.has_maximum_size()
and not constant:
782 result +=
"<=%s" % typeobj.maximum_size
786 def get_comments(obj):
787 """Returns all comments for annotatable object, as [text, ]."""
788 return [v.get(
"text",
"")
for v
in obj.get_annotation_values(
"verbatim")
789 if "comment" == v.get(
"language")]
791 typepath = rosidl_runtime_py.get_interface_path(
make_full_typename(typename) +
".idl")
792 with open(typepath, encoding=
"utf-8")
as f:
793 idlcontent = rosidl_parser.parser.parse_idl_string(f.read())
794 msgidl = idlcontent.get_elements_of_type(rosidl_parser.definition.Message)[0]
795 package = msgidl.structure.namespaced_type.namespaces[0]
796 DUMMY = rosidl_parser.definition.EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME
800 lines.extend(map(format_comment, get_comments(msgidl.structure)))
802 if lines
and msgidl.constants: lines.append(
"")
804 for c
in msgidl.constants:
805 ctype = format_type(c.type, package, constant=
True)
806 lines.extend(map(format_comment, get_comments(c)))
807 lines.append(
"%s %s=%s" % (ctype, c.name, c.value))
809 if not (len(msgidl.structure.members) == 1
and DUMMY == msgidl.structure[0].name):
811 if msgidl.constants
and msgidl.structure.members: lines.append(
"")
813 for m
in msgidl.structure.members:
814 lines.extend(map(format_comment, get_comments(m)))
815 lines.append(
"%s %s" % (format_type(m.type, package), m.name))
816 return "\n".join(lines)
820def _get_message_type_hash(typename):
821 """Returns ROS2 message type MD5 hash (internal caching method)."""
823 return "" if msgdef
is None else api.calculate_definition_hash(typename, msgdef)
827 """Returns OrderedDict({field name: field type name}) if ROS2 message, else {}."""
829 fields = ((k,
canonical(v))
for k, v
in val.get_fields_and_field_types().items())
830 return collections.OrderedDict(fields)
834 """Returns ROS2 message type name, like "std_msgs/Header"."""
835 cls = msg_or_cls
if inspect.isclass(msg_or_cls)
else type(msg_or_cls)
836 return canonical(
"%s/%s" % (cls.__module__.split(
".")[0], cls.__name__))
841 Returns object attribute value, with numeric arrays converted to lists.
843 @param name message attribute name
844 @param typename value ROS type name,
for identifying byte arrays
845 @param default value to
return if attribute does
not exist; raises exception otherwise
847 if default
is not Ellipsis
and not hasattr(msg, name):
return default
848 v, scalartype = getattr(msg, name),
scalar(typename)
849 if isinstance(v, (bytes, array.array)): v = list(v)
850 elif numpy
and isinstance(v, (numpy.generic, numpy.ndarray)):
852 if v
and isinstance(v, (list, tuple))
and scalartype
in (
"byte",
"uint8"):
853 if isinstance(v[0], bytes):
854 v = list(map(ord, v))
855 elif scalartype == typename:
857 return list(v)
if isinstance(v, tuple)
else v
862 Returns current ROS2 time, as rclpy.time.Time.
864 @param fallback use wall time
if node
not initialized
866 try:
return node.get_clock().now()
868 if fallback:
return make_time(time.time())
874 Returns currently available ROS2 topics, as [(topicname, typename)].
876 Omits topics that the current ROS2 node itself has published.
879 myname, myns = node.get_name(), node.get_namespace()
881 for topic, typenames
in node.get_publisher_names_and_types_by_node(myname, myns):
882 mytypes.setdefault(topic, []).extend(typenames)
883 for t
in (
"/parameter_events",
"/rosout"):
885 for topic, typenames
in node.get_topic_names_and_types():
886 for typename
in typenames:
887 if topic
not in mytypes
or typename
not in mytypes[topic]:
894 Returns whether value is a ROS2 message
or special like ROS2 time/duration
class or instance.
896 @param ignore_time whether to ignore ROS2 time/duration types
898 is_message = rosidl_runtime_py.utilities.is_message(val)
899 if is_message
and ignore_time: is_message =
not is_ros_time(val)
904 """Returns whether value is a ROS2 time/duration class or instance."""
905 if inspect.isclass(val):
return issubclass(val, tuple(ROS_TIME_CLASSES))
906 return isinstance(val, tuple(ROS_TIME_CLASSES))
910 """Returns an rclpy.duration.Duration."""
911 return rclpy.duration.Duration(seconds=secs, nanoseconds=nsecs)
915 """Returns a ROS2 time, as rclpy.time.Time."""
916 return rclpy.time.Time(seconds=secs, nanoseconds=nsecs)
920 """Returns "pkg/msg/Type" for "pkg/Type"."""
921 if "/msg/" in typename
or "/" not in typename:
923 return "%s/msg/%s" % tuple((x[0], x[-1])
for x
in [typename.split(
"/")])[0]
928 Returns rclpy.qos.QoSProfile that matches the most permissive publisher.
930 @param queue_size QoSProfile.depth
932 qos = rclpy.qos.QoSProfile(depth=queue_size)
933 infos = node.get_publishers_info_by_topic(topic)
934 rels, durs = zip(*[(x.qos_profile.reliability, x.qos_profile.durability)
935 for x
in infos
if canonical(x.topic_type) == typename])
937 if rels: qos.reliability = max(rels)
938 if durs: qos.durability = max(durs)
943 """Returns rclpy.qos.QoSProfile as dictionary."""
946 QOS_TYPES = (bool, int, enum.Enum) + tuple(ROS_TIME_CLASSES)
947 for name
in (n
for n
in dir(qos)
if not n.startswith(
"_")):
948 val = getattr(qos, name)
949 if name.startswith(
"_")
or not isinstance(val, QOS_TYPES):
951 if isinstance(val, enum.Enum):
953 elif isinstance(val, tuple(ROS_TIME_CLASSES)):
960 """Returns ROS2 message as a serialized binary."""
961 with api.TypeMeta.make(msg)
as m:
962 if m.data
is not None:
return m.data
963 return rclpy.serialization.serialize_message(msg)
967 """Returns ROS2 message or service request/response instantiated from serialized binary."""
968 cls = cls_or_typename
970 return rclpy.serialization.deserialize_message(raw, cls)
976 Returns unbounded scalar type from ROS2 message data type
978 Like
"uint8" from "uint8[]",
or "string" from "string<=10[<=5]".
979 Returns type unchanged
if not a collection
or bounded type.
981 if typename
and "[" in typename: typename = typename[:typename.index(
"[")]
982 if typename
and "<=" in typename: typename = typename[:typename.index(
"<=")]
987 """Sets message or object attribute value."""
990 fieldmap = obj.get_fields_and_field_types()
992 name = obj.__slots__[list(fieldmap).index(name)]
993 setattr(obj, name, value)
998 Converts ROS2 time/duration between `rclpy` and `builtin_interfaces` objects.
1000 @param val ROS2 time/duration object
from `rclpy`
or `builtin_interfaces`
1001 @param to_message whether to convert
from `rclpy` to `builtin_interfaces`
or vice versa
1002 @param clock_type ClockType
for converting to `rclpy.Time`, defaults to `ROS_TIME`
1003 @return value converted to appropriate type,
or original value
if not convertible
1005 to_message, clock_type = bool(to_message), (clock_type or rclpy.clock.ClockType.ROS_TIME)
1006 if isinstance(val, tuple(ROS_TIME_CLASSES)):
1007 rcl_cls = next(k
for k, v
in ROS_TIME_MESSAGES.items()
if isinstance(val, (k, v)))
1008 is_rcl = isinstance(val, tuple(ROS_TIME_MESSAGES))
1009 name =
"to_msg" if to_message
and is_rcl
else "from_msg" if to_message == is_rcl
else None
1010 args = [val] + ([clock_type]
if rcl_cls
is rclpy.time.Time
and "from_msg" == name
else [])
1011 return getattr(rcl_cls, name)(*args)
if name
else val
1017 Returns value as ROS2 duration
if convertible (int/float/time/datetime/decimal),
else value.
1019 Convertible types: int/float/time/datetime/decimal/builtin_interfaces.Time.
1022 if isinstance(val, decimal.Decimal):
1024 elif isinstance(val, datetime.datetime):
1025 result =
make_duration(int(val.timestamp()), 1000 * val.microsecond)
1026 elif isinstance(val, (float, int)):
1028 elif isinstance(val, rclpy.time.Time):
1030 elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1036 """Returns value in nanoseconds if value is ROS2 time/duration, else value."""
1037 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1039 if hasattr(val,
"nanoseconds"):
1040 return val.nanoseconds
1041 return val.sec * 10**9 + val.nanosec
1045 """Returns value in seconds if value is ROS2 time/duration, else value."""
1046 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1048 if hasattr(val,
"nanoseconds"):
1049 secs, nsecs = divmod(val.nanoseconds, 10**9)
1050 return secs + nsecs / 1E9
1051 return val.sec + val.nanosec / 1E9
1055 """Returns value as (seconds, nanoseconds) if value is ROS2 time/duration, else value."""
1056 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1058 if hasattr(val,
"seconds_nanoseconds"):
1059 return val.seconds_nanoseconds()
1060 if hasattr(val,
"nanoseconds"):
1061 return divmod(val.nanoseconds, 10**9)
1062 return (val.sec, val.nanosec)
1067 Returns value as ROS2 time
if convertible,
else value.
1069 Convertible types: int/float/duration/datetime/decimal/builtin_interfaces.Time.
1072 if isinstance(val, decimal.Decimal):
1073 result =
make_time(int(val), float(val % 1) * 10**9)
1074 elif isinstance(val, datetime.datetime):
1075 result =
make_time(int(val.timestamp()), 1000 * val.microsecond)
1076 elif isinstance(val, (float, int)):
1078 elif isinstance(val, rclpy.duration.Duration):
1079 result =
make_time(nsecs=val.nanoseconds)
1080 elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1086 "BAG_EXTENSIONS",
"DDS_TYPES",
"ROS_ALIAS_TYPES",
"ROS_TIME_CLASSES",
"ROS_TIME_MESSAGES",
1087 "ROS_TIME_TYPES",
"SKIP_EXTENSIONS",
"Bag",
"ROS2Bag",
"context",
"executor",
"node",
1088 "canonical",
"create_publisher",
"create_subscriber",
"deserialize_message",
1089 "format_message_value",
"get_message_class",
"get_message_definition",
1090 "get_message_definition_idl",
"get_message_fields",
"get_message_type",
1091 "get_message_type_hash",
"get_message_value",
"get_rostime",
"get_topic_types",
"init_node",
1092 "is_ros_message",
"is_ros_time",
"make_duration",
"make_full_typename",
"make_subscriber_qos",
1093 "make_time",
"qos_to_dict",
"scalar",
"serialize_message",
"set_message_value",
"shutdown_node",
1094 "time_message",
"to_duration",
"to_nsec",
"to_sec",
"to_sec_nsec",
"to_time",
"validate",
tuple MODES
Supported opening modes, overridden in subclasses.
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
get_qoses(self, topic, typename)
Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
get_topic_info(self, counts=True)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
closed
Returns whether file is closed.
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
ROS2 bag reader and writer (SQLite format), providing most of rosbag.Bag interface.
get_type_and_topic_info(self, topic_filters=None)
Returns thorough metainfo on topic and message types.
mode
Returns file open mode.
open(self)
Opens the bag file if not already open.
filename
Returns bag file path.
autodetect(cls, f)
Returns whether file is recognizable as SQLite format.
get_start_time(self)
Returns the start time of the bag, as UNIX timestamp, or None if bag empty.
get_end_time(self)
Returns the end time of the bag, as UNIX timestamp, or None if bag empty.
write(self, topic, msg, t=None, raw=False, qoses=None, **__)
Writes a message to the bag.
get_message_class(self, typename, typehash=None)
Returns ROS2 message type class, or None if unknown message type for bag.
get_qoses(self, topic, typename)
Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.
get_message_type_hash(self, msg_or_type)
Returns ROS2 message type MD5 hash, or None if unknown message type for bag.
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
str SQLITE_MAGIC
SQLite file header magic start bytes.
close(self)
Closes the bag file.
__contains__(self, key)
Returns whether bag contains given topic.
__init__(self, filename, mode="a", *_, **__)
closed
Returns whether file is closed.
size
Returns current file size in bytes (including journaling files).
topics
Returns the list of topics in bag, in alphabetic order.
__next__(self)
Retrieves next message from bag as (topic, message, timestamp).
get_topic_info(self, counts=True, ensure_types=True)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
get_message_definition(self, msg_or_type)
Returns ROS2 message type definition full text, including subtype definitions.
get_message_count(self, topic_filters=None)
Returns the number of messages in the bag.
str CREATE_SQL
ROS2 bag SQLite schema.
time_message(val, to_message=True, clock_type=None)
Converts ROS2 time/duration between rclpy and builtin_interfaces objects.
make_full_typename(typename)
Returns "pkg/msg/Type" for "pkg/Type".
canonical(typename, unbounded=False)
Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
make_subscriber_qos(topic, typename, queue_size=10)
Returns rclpy.qos.QoSProfile that matches the most permissive publisher.
create_publisher(topic, cls_or_typename, queue_size)
Returns an rclpy.Publisher instance, with .get_num_connections() and .unregister().
serialize_message(msg)
Returns ROS2 message as a serialized binary.
to_sec(val)
Returns value in seconds if value is ROS2 time/duration, else value.
qos_to_dict(qos)
Returns rclpy.qos.QoSProfile as dictionary.
get_topic_types()
Returns currently available ROS2 topics, as [(topicname, typename)].
format_message_value(msg, name, value)
Returns a message attribute value as string.
get_message_value(msg, name, typename=None, default=Ellipsis)
Returns object attribute value, with numeric arrays converted to lists.
get_message_class(typename)
Returns ROS2 message class, or None if unknown type.
to_duration(val)
Returns value as ROS2 duration if convertible (int/float/time/datetime/decimal), else value.
make_time(secs=0, nsecs=0)
Returns a ROS2 time, as rclpy.time.Time.
deserialize_message(raw, cls_or_typename)
Returns ROS2 message or service request/response instantiated from serialized binary.
get_message_type(msg_or_cls)
Returns ROS2 message type name, like "std_msgs/Header".
get_message_definition(msg_or_type)
Returns ROS2 message type definition full text, including subtype definitions.
validate(live=False)
Returns whether ROS2 environment is set, prints error if not.
scalar(typename)
Returns unbounded scalar type from ROS2 message data type.
get_message_definition_idl(typename)
Returns ROS2 message type definition parsed from IDL file.
make_duration(secs=0, nsecs=0)
Returns an rclpy.duration.Duration.
get_rostime(fallback=False)
Returns current ROS2 time, as rclpy.time.Time.
to_nsec(val)
Returns value in nanoseconds if value is ROS2 time/duration, else value.
to_time(val)
Returns value as ROS2 time if convertible, else value.
shutdown_node()
Shuts down live ROS2 node.
set_message_value(obj, name, value)
Sets message or object attribute value.
is_ros_time(val)
Returns whether value is a ROS2 time/duration class or instance.
create_subscriber(topic, cls_or_typename, handler, queue_size)
Returns an rclpy.Subscription.
get_message_type_hash(msg_or_type)
Returns ROS2 message type MD5 hash, or "" if unknown type.
to_sec_nsec(val)
Returns value as (seconds, nanoseconds) if value is ROS2 time/duration, else value.
init_node(name)
Initializes a ROS2 node if not already initialized.
is_ros_message(val, ignore_time=False)
Returns whether value is a ROS2 message or special like ROS2 time/duration class or instance.
get_message_fields(val)
Returns OrderedDict({field name: field type name}) if ROS2 message, else {}.