grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
ros2.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3ROS2 interface.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 02.11.2021
11@modified 06.05.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.ros2
15import array
16import collections
17import datetime
18import decimal
19import enum
20import inspect
21import io
22import os
23import re
24import sqlite3
25import threading
26import time
27
28import builtin_interfaces.msg
29try: import numpy
30except Exception: numpy = None
31import rclpy
32import rclpy.clock
33import rclpy.duration
34import rclpy.executors
35import rclpy.serialization
36import rclpy.time
37import rosidl_parser.parser
38import rosidl_parser.definition
39import rosidl_runtime_py.utilities
40import yaml
41
42from . import api
43from . common import PATH_TYPES, ConsolePrinter, MatchMarkers, memoize
44
45
46## Bagfile extensions to seek
47BAG_EXTENSIONS = (".db3", )
48
49
50SKIP_EXTENSIONS = ()
51
52
53ROS_TIME_TYPES = ["builtin_interfaces/Time", "builtin_interfaces/Duration"]
54
55
56ROS_TIME_CLASSES = {rclpy.time.Time: "builtin_interfaces/Time",
57 builtin_interfaces.msg.Time: "builtin_interfaces/Time",
58 rclpy.duration.Duration: "builtin_interfaces/Duration",
59 builtin_interfaces.msg.Duration: "builtin_interfaces/Duration"}
60
61
62ROS_TIME_MESSAGES = {rclpy.time.Time: builtin_interfaces.msg.Time,
63 rclpy.duration.Duration: builtin_interfaces.msg.Duration}
64
65
66ROS_ALIAS_TYPES = {"byte": "uint8", "char": "int8"}
67
68
69DDS_TYPES = {"boolean": "bool",
70 "float": "float32",
71 "double": "float64",
72 "octet": "byte",
73 "short": "int16",
74 "unsigned short": "uint16",
75 "long": "int32",
76 "unsigned long": "uint32",
77 "long long": "int64",
78 "unsigned long long": "uint64", }
79
80
81node = None
82
83
84context = None
85
86
87executor = None
88
89
90
92 """ROS2 bag reader and writer (SQLite format), providing most of rosbag.Bag interface."""
93
94
95 STREAMABLE = False
96
97
98 CREATE_SQL = """
99CREATE TABLE IF NOT EXISTS messages (
100 id INTEGER PRIMARY KEY,
101 topic_id INTEGER NOT NULL,
102 timestamp INTEGER NOT NULL,
103 data BLOB NOT NULL
104);
105
106CREATE TABLE IF NOT EXISTS topics (
107 id INTEGER PRIMARY KEY,
108 name TEXT NOT NULL,
109 type TEXT NOT NULL,
110 serialization_format TEXT NOT NULL,
111 offered_qos_profiles TEXT NOT NULL
112);
113
114CREATE INDEX IF NOT EXISTS timestamp_idx ON messages (timestamp ASC);
115
116PRAGMA journal_mode=WAL;
117PRAGMA synchronous=NORMAL;
118 """
119
120
121 SQLITE_MAGIC = b"SQLite format 3\x00"
122
123
124 def __init__(self, filename, mode="a", *_, **__):
125 """
126 @param filename bag file path to open
127 @param mode file will be overwritten if "w"
128 """
129 if not isinstance(filename, PATH_TYPES):
130 raise ValueError("invalid filename %r" % type(filename))
131 if mode not in self.MODES: raise ValueError("invalid mode %r" % mode)
132
133 self._db = None # sqlite3.Connection instance
134 self._mode = mode
135 self._topics = {} # {(topic, typename): {id, name, type}}
136 self._counts = {} # {(topic, typename, typehash): message count}
137 self._qoses = {} # {(topic, typename): [{qos profile dict}]}
138 self._iterer = None # Generator from read_messages() for next()
139 self._ttinfo = None # Cached result for get_type_and_topic_info()
140 self._filename = str(filename)
141 self._stop_on_error = True
142
143 self._ensure_open(populate=("r" != mode))
144
145
146 def get_message_count(self, topic_filters=None):
147 """
148 Returns the number of messages in the bag.
149
150 @param topic_filters list of topics or a single topic to filter by, if any
151 """
152 if self._db and self._has_table("messages"):
153 sql, where = "SELECT COUNT(*) AS count FROM messages", ""
154 if topic_filters:
155 self._ensure_topics()
156 topics = topic_filters
157 topics = topics if isinstance(topics, (dict, list, set, tuple)) else [topics]
158 topic_ids = [x["id"] for (topic, _), x in self._topics.items() if topic in topics]
159 where = " WHERE topic_id IN (%s)" % ", ".join(map(str, topic_ids))
160 return self._db.execute(sql + where).fetchone()["count"]
161 return None
162
163
164 def get_start_time(self):
165 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
166 if self._db and self._has_table("messages"):
167 row = self._db.execute("SELECT MIN(timestamp) AS val FROM messages").fetchone()
168 if row["val"] is None: return None
169 secs, nsecs = divmod(row["val"], 10**9)
170 return secs + nsecs / 1E9
171 return None
172
173
174 def get_end_time(self):
175 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
176 if self._db and self._has_table("messages"):
177 row = self._db.execute("SELECT MAX(timestamp) AS val FROM messages").fetchone()
178 if row["val"] is None: return None
179 secs, nsecs = divmod(row["val"], 10**9)
180 return secs + nsecs / 1E9
181 return None
182
183
184 def get_topic_info(self, counts=True, ensure_types=True):
185 """
186 Returns topic and message type metainfo as {(topic, typename, typehash): count}.
187
188 Can skip retrieving message counts, as this requires a full table scan.
189 Can skip looking up message type classes, as those might be unavailable in ROS2 environment.
190
191 @param counts whether to return actual message counts instead of None
192 @param ensure_types whether to look up type classes instead of returning typehash as None
193 """
195 if counts: self._ensure_counts()
196 if ensure_types: self._ensure_types()
197 return dict(self._counts)
198
199
200 def get_type_and_topic_info(self, topic_filters=None):
201 """
202 Returns thorough metainfo on topic and message types.
203
204 @param topic_filters list of topics or a single topic to filter returned topics-dict by,
205 if any
206 @return TypesAndTopicsTuple(msg_types, topics) namedtuple,
207 msg_types as dict of {typename: typehash},
208 topics as a dict of {topic: TopicTuple() namedtuple}.
209 """
210 topics = topic_filters
211 topics = topics if isinstance(topics, (list, set, tuple)) else [topics] if topics else []
212 if self._ttinfo and (not topics or set(topics) == set(t for t, _, _ in self._counts)):
213 return self._ttinfo
214 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
215
216 counts = self.get_topic_infoget_topic_info()
217 msgtypes = {n: h for t, n, h in counts}
218 topicdict = {}
219
220 def median(vals):
221 """Returns median value from given sorted numbers."""
222 vlen = len(vals)
223 return None if not vlen else vals[vlen // 2] if vlen % 2 else \
224 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
225
226 for (t, n, _), c in sorted(counts.items(), key=lambda x: x[0][:2]):
227 if topics and t not in topics: continue # for
228 mymedian = None
229 if c > 1:
230 args = (self._topics[(t, n)]["id"], )
231 cursor = self._db.execute("SELECT timestamp FROM messages WHERE topic_id = ?", args)
232 stamps = sorted(x["timestamp"] / 1E9 for x in cursor)
233 mymedian = median(sorted(s1 - s0 for s1, s0 in zip(stamps[1:], stamps[:-1])))
234 freq = 1.0 / mymedian if mymedian else None
235 topicdict[t] = self.TopicTuple(n, c, len(self.get_qosesget_qoses(t, n) or []), freq)
236 result = self.TypesAndTopicsTuple(msgtypes, topicdict)
237 if not topics or set(topics) == set(t for t, _, _ in self._counts): self._ttinfo = result
238 return result
239
240
241 def get_qoses(self, topic, typename):
242 """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available."""
243 topickey = (topic, typename)
244 if topickey not in self._qoses and topickey in self._topics:
245 topicrow = self._topics[topickey]
246 try:
247 if topicrow.get("offered_qos_profiles"):
248 self._qoses[topickey] = yaml.safe_load(topicrow["offered_qos_profiles"])
249 except Exception as e:
250 ConsolePrinter.warn("Error parsing quality of service for topic %r: %r", topic, e)
251 self._qoses.setdefault(topickey, None)
252 return self._qoses[topickey]
253
254
255 def get_message_class(self, typename, typehash=None):
256 """Returns ROS2 message type class, or None if unknown message type for bag."""
257 self._ensure_topics()
258 if any(n == typename for _, n, in self._topics) and typehash is not None \
259 and not any((n, h) == (typename, typehash) for _, n, h in self._counts):
260 self._ensure_types([t for t, n in self._topics if n == typename])
261 if any((typename, typehash) in [(n, h), (n, None)] for _, n, h in self._counts):
262 return get_message_class(typename)
263 return None
264
265
266 def get_message_definition(self, msg_or_type):
267 """
268 Returns ROS2 message type definition full text, including subtype definitions.
269
270 Returns None if unknown message type for bag.
271 """
272 self._ensure_topics()
273 typename = msg_or_type if isinstance(msg_or_type, str) else get_message_type(msg_or_type)
274 if any(n == typename for _, n, _ in self._counts):
275 return get_message_definition(msg_or_type)
276 return None
277
278
279 def get_message_type_hash(self, msg_or_type):
280 """Returns ROS2 message type MD5 hash, or None if unknown message type for bag."""
281 typename = msg_or_type if isinstance(msg_or_type, str) else get_message_type(msg_or_type)
282 self._ensure_types([t for t, n in self._topics if n == typename])
283 return next((h for _, n, h in self._counts if n == typename), None)
284
285
286 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__):
287 """
288 Yields messages from the bag, optionally filtered by topic and timestamp.
289
290 @param topics list of topics or a single topic to filter by, if any
291 @param start_time earliest timestamp of message to return, as ROS time or convertible
292 (int/float/duration/datetime/decimal)
293 @param end_time latest timestamp of message to return, as ROS time or convertible
294 (int/float/duration/datetime/decimal)
295 @param raw if True, then returned messages are tuples of
296 (typename, bytes, typehash, typeclass).
297 If message type unavailable, returns None for hash and class.
298 @return BagMessage namedtuples of
299 (topic, message, timestamp as rclpy.time.Time)
300 """
301 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
302 if "w" == self._mode: raise io.UnsupportedOperation("read")
304 self._ensure_topics()
305 if not self._topics or (topics is not None and not topics):
306 return
307
308 sql, exprs, args = "SELECT * FROM messages", [], ()
309 if topics:
310 topics = topics if isinstance(topics, (list, tuple)) else [topics]
311 topic_ids = [x["id"] for (topic, _), x in self._topics.items() if topic in topics]
312 exprs += ["topic_id IN (%s)" % ", ".join(map(str, topic_ids))]
313 if start_time is not None:
314 exprs += ["timestamp >= ?"]
315 args += (to_nsec(to_time(start_time)), )
316 if end_time is not None:
317 exprs += ["timestamp <= ?"]
318 args += (to_nsec(to_time(end_time)), )
319 sql += ((" WHERE " + " AND ".join(exprs)) if exprs else "")
320 sql += " ORDER BY timestamp"
321
322 topicmap = {v["id"]: v for v in self._topics.values()}
323 msgtypes = {} # {typename: cls or None if unavailable}
324 topicset = set(topics or [t for t, _ in self._topics])
325 typehashes = {n: h for _, n, h in self._counts} # {typename: typehash or None or ""}
326 for row in self._db.execute(sql, args):
327 tdata = topicmap[row["topic_id"]]
328 topic, typename = tdata["name"], canonical(tdata["type"])
329 if not raw and msgtypes.get(typename, typename) is None: continue # for row
330 if typehashes.get(typename) is None:
331 self._ensure_types([topic])
332 selector = (h for t, n, h in self._counts if (t, n) == (topic, typename))
333 typehash = typehashes[typename] = next(selector, None)
334 else: typehash = typehashes[typename]
335
336 try:
337 cls = msgtypes.get(typename) or \
338 msgtypes.setdefault(typename, get_message_class(typename))
339 if raw: msg = (typename, row["data"], typehash or None, cls)
340 else: msg = rclpy.serialization.deserialize_message(row["data"], cls)
341 except Exception as e:
342 reportfunc = ConsolePrinter.error if self._stop_on_error else ConsolePrinter.warn
343 reportfunc("Error loading type %s in topic %s: %%s" % (typename, topic),
344 "message class not found." if cls is None else e,
345 __once=not self._stop_on_error)
346 if self._stop_on_error: raise
347 if raw: msg = (typename, row["data"], typehash or None, msgtypes.get(typename))
348 elif set(n for n, c in msgtypes.items() if c is None) == topicset:
349 break # for row
350 continue # for row
351 stamp = rclpy.time.Time(nanoseconds=row["timestamp"])
352
353 api.TypeMeta.make(msg, topic, self)
354 yield self.BagMessage(topic, msg, stamp)
355 if not self._db: # Bag has been closed in the meantime
356 break # for row
357
358
359 def write(self, topic, msg, t=None, raw=False, qoses=None, **__):
360 """
361 Writes a message to the bag.
362
363 @param topic name of topic
364 @param msg ROS2 message
365 @param t message timestamp if not using wall time, as ROS time or convertible
366 (int/float/duration/datetime/decimal)
367 @param qoses topic Quality-of-Service settings, if any, as a list of dicts
368 """
369 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
370 if "r" == self._mode: raise io.UnsupportedOperation("write")
371
372 self._ensure_topics()
373 if raw:
374 typename, binary, typehash = msg[:3]
375 else:
376 typename = get_message_type(msg)
377 typehash = get_message_type_hash(msg)
378 binary = serialize_message(msg)
379 topickey = (topic, typename)
380 cursor = self._db.cursor()
381 if topickey not in self._topics:
382 full_typename = make_full_typename(typename)
383 sql = "INSERT INTO topics (name, type, serialization_format, offered_qos_profiles) " \
384 "VALUES (?, ?, ?, ?)"
385 qoses = yaml.safe_dump(qoses) if isinstance(qoses, list) else ""
386 args = (topic, full_typename, "cdr", qoses)
387 cursor.execute(sql, args)
388 tdata = {"id": cursor.lastrowid, "name": topic, "type": full_typename,
389 "serialization_format": "cdr", "offered_qos_profiles": qoses}
390 self._topics[topickey] = tdata
391
392 timestamp = (time.time_ns() if hasattr(time, "time_ns") else int(time.time() * 10**9)) \
393 if t is None else to_nsec(to_time(t))
394 sql = "INSERT INTO messages (topic_id, timestamp, data) VALUES (?, ?, ?)"
395 args = (self._topics[topickey]["id"], timestamp, binary)
396 cursor.execute(sql, args)
397 countkey = (topic, typename, typehash)
398 if self._counts.get(countkey, self) is not None:
399 self._counts[countkey] = self._counts.get(countkey, 0) + 1
400 self._ttinfo = None
401
402
403 def open(self):
404 """Opens the bag file if not already open."""
405 self._ensure_open()
406
407
408 def close(self):
409 """Closes the bag file."""
410 if self._db:
411 self._db.close()
412 self._db = None
413 self._mode = None
414 self._iterer = None
415
416
417 @property
418 def closed(self):
419 """Returns whether file is closed."""
420 return not self._db
421
422
423 @property
424 def topics(self):
425 """Returns the list of topics in bag, in alphabetic order."""
426 return sorted((t for t, _, _ in self._topics), key=str.lower)
427
428
429 @property
430 def filename(self):
431 """Returns bag file path."""
432 return self._filename
433
434
435 @property
436 def size(self):
437 """Returns current file size in bytes (including journaling files)."""
438 result = os.path.getsize(self._filename) if os.path.isfile(self._filename) else None
439 for suffix in ("-journal", "-wal") if result else ():
440 path = "%s%s" % (self._filename, suffix)
441 result += os.path.getsize(path) if os.path.isfile(path) else 0
442 return result
443
444
445 @property
446 def mode(self):
447 """Returns file open mode."""
448 return self._mode
449
450
451 def __contains__(self, key):
452 """Returns whether bag contains given topic."""
453 return any(key == t for t, _, _ in self._topics)
454
455
456 def __next__(self):
457 """Retrieves next message from bag as (topic, message, timestamp)."""
458 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
459 if self._iterer is None: self._iterer = self.read_messagesread_messages()
460 return next(self._iterer)
461
462
463 def _ensure_open(self, populate=False):
464 """Opens bag database if not open, populates schema if specified."""
465 if not self._db:
466 exists = os.path.isfile(self._filename)
467 if "r" == self._mode and not exists:
468 raise IOError("No such file: %r" % self._filename)
469 if "w" == self._mode and exists:
470 os.remove(self._filename)
471 self._db = sqlite3.connect(self._filename, detect_types=sqlite3.PARSE_DECLTYPES,
472 isolation_level=None, check_same_thread=False)
473 self._db.row_factory = lambda cursor, row: dict(sqlite3.Row(cursor, row))
474 if populate:
475 self._db.executescript(self.CREATE_SQLCREATE_SQL)
476
477
478 def _ensure_topics(self):
479 """Populates local topic struct from database, if not already available."""
480 if not self._db or self._topics or not self._has_table("topics"): return
481 for row in self._db.execute("SELECT * FROM topics ORDER BY id"):
482 topickey = (topic, typename) = row["name"], canonical(row["type"])
483 self._topics[(topic, typename)] = row
484 self._counts[(topic, typename, None)] = None
485
486
487 def _ensure_counts(self):
488 """Populates local counts values from database, if not already available."""
489 if not self._db or all(v is not None for v in self._counts.values()) \
490 or not self._has_table("messages"): return
491 self._ensure_topics()
492 topickeys = {self._topics[(t, n)]["id"]: (t, n, h) for (t, n, h) in self._counts}
493 self._counts.clear()
494 for row in self._db.execute("SELECT topic_id, COUNT(*) AS count FROM messages "
495 "GROUP BY topic_id").fetchall():
496 if row["topic_id"] in topickeys:
497 self._counts[topickeys[row["topic_id"]]] = row["count"]
498
499
500 def _ensure_types(self, topics=None):
501 """
502 Populates local type definitions and classes from database, if not already available.
503
504 @param topics selected topics to ensure types for, if not all
505 """
506 if not self._db or (not topics and topics is not None) or not self._has_table("topics") \
507 or not any(h is None for t, _, h in self._counts if topics is None or t in topics):
508 return
509 self._ensure_topics()
510 for countkey, count in list(self._counts.items()):
511 (topic, typename, typehash) = countkey
512 if typehash is None and (topics is None or topic in topics):
513 typehash = get_message_type_hash(typename)
514 self._counts.pop(countkey)
515 self._counts[(topic, typename, typehash)] = count
516
517
518 def _has_table(self, name):
519 """Returns whether specified table exists in database."""
520 sql = "SELECT 1 FROM sqlite_master WHERE type = ? AND name = ?"
521 return bool(self._db.execute(sql, ("table", name)).fetchone())
522
523
524 @classmethod
525 def autodetect(cls, f):
526 """Returns whether file is recognizable as SQLite format."""
527 if os.path.isfile(f) and os.path.getsize(f):
528 with open(f, "rb") as f:
529 result = (f.read(len(cls.SQLITE_MAGIC)) == cls.SQLITE_MAGIC)
530 else:
531 ext = os.path.splitext(f or "")[-1].lower()
532 result = ext in BAG_EXTENSIONS
533 return result
534Bag = ROS2Bag
535
536
537
538def init_node(name):
539 """Initializes a ROS2 node if not already initialized."""
540 global node, context, executor
541 if node or not validate(live=True):
542 return
543
544 def spin_loop():
545 while context and context.ok():
546 executor.spin_once(timeout_sec=1)
547
548 context = rclpy.Context()
549 try: rclpy.init(context=context)
550 except Exception: pass # Must not be called twice at runtime
551 node_name = "%s_%s_%s" % (name, os.getpid(), int(time.time() * 1000))
552 node = rclpy.create_node(node_name, context=context, use_global_arguments=False,
553 enable_rosout=False, start_parameter_services=False)
554 executor = rclpy.executors.MultiThreadedExecutor(context=context)
555 executor.add_node(node)
556 spinner = threading.Thread(target=spin_loop)
557 spinner.daemon = True
558 spinner.start()
560
561def shutdown_node():
562 """Shuts down live ROS2 node."""
563 global node, context, executor
564 if context:
565 context_, executor_ = context, executor
566 context = executor = node = None
567 executor_.shutdown()
568 context_.shutdown()
569
570
571def validate(live=False):
572 """
573 Returns whether ROS2 environment is set, prints error if not.
574
575 @param live whether environment must support launching a ROS node
576 """
577 missing = [k for k in ["ROS_VERSION"] if not os.getenv(k)]
578 if missing:
579 ConsolePrinter.error("ROS environment not sourced: missing %s.",
580 ", ".join(sorted(missing)))
581 if "2" != os.getenv("ROS_VERSION", "2"):
582 ConsolePrinter.error("ROS environment not supported: need ROS_VERSION=2.")
583 missing = True
584 return not missing
585
587@memoize
588def canonical(typename, unbounded=False):
589 """
590 Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
591
592 Converts DDS types like "octet" to "byte", and "sequence<uint8, 100>" to "uint8[100]".
593
594 @param unbounded drop constraints like array and string bounds,
595 e.g. returning "uint8[]" for "uint8[10]" and "string" for "string<=8"
596 """
597 if not typename: return typename
598 is_array, bound, dimension = False, "", ""
599
600 if "<" in typename:
601 match = re.match("sequence<(.+)>", typename)
602 if match: # "sequence<uint8, 100>" or "sequence<uint8>"
603 is_array = True
604 typename = match.group(1)
605 match = re.match(r"([^,]+)?,\s?(\d+)", typename)
606 if match: # sequence<uint8, 10>
607 typename = match.group(1)
608 if match.lastindex > 1: dimension = match.group(2)
609
610 match = re.match("(w?string)<(.+)>", typename)
611 if match: # string<5>
612 typename, bound = match.groups()
613
614 if "[" in typename: # "string<=5[<=10]" or "string<=5[10]" or "byte[10]" or "byte[]"
615 dimension = typename[typename.index("[") + 1:typename.index("]")]
616 typename, is_array = typename[:typename.index("[")], True
617
618 if "<=" in typename: # "string<=5"
619 typename, bound = typename.split("<=")
621 if typename.count("/") > 1:
622 typename = "%s/%s" % tuple((x[0], x[-1]) for x in [typename.split("/")])[0]
623
624 if unbounded: suffix = "[]" if is_array else ""
625 else: suffix = ("<=%s" % bound if bound else "") + ("[%s]" % dimension if is_array else "")
626 return DDS_TYPES.get(typename, typename) + suffix
627
628
629def create_publisher(topic, cls_or_typename, queue_size):
630 """Returns an rclpy.Publisher instance, with .get_num_connections() and .unregister()."""
631 cls = cls_or_typename
632 if isinstance(cls, str): cls = get_message_class(cls)
633 qos = rclpy.qos.QoSProfile(depth=queue_size)
634 pub = node.create_publisher(cls, topic, qos)
635 pub.get_num_connections = pub.get_subscription_count
636 pub.unregister = pub.destroy
637 return pub
638
639
640def create_subscriber(topic, cls_or_typename, handler, queue_size):
641 """
642 Returns an rclpy.Subscription.
643
644 Supplemented with .get_message_class(), .get_message_definition(),
645 .get_message_type_hash(), .get_qoses(), and.unregister().
646 """
647 cls = typename = cls_or_typename
648 if isinstance(cls, str): cls = get_message_class(cls)
649 else: typename = get_message_type(cls)
650
651 qos = rclpy.qos.QoSProfile(depth=queue_size)
652 qoses = [x.qos_profile for x in node.get_publishers_info_by_topic(topic)
653 if canonical(x.topic_type) == typename]
654 rels, durs = zip(*[(x.reliability, x.durability) for x in qoses]) if qoses else ([], [])
655 # If subscription demands stricter QoS than publisher offers, no messages are received
656 if rels: qos.reliability = max(rels) # DEFAULT < RELIABLE < BEST_EFFORT
657 if durs: qos.durability = max(durs) # DEFAULT < TRANSIENT_LOCAL < VOLATILE
658
659 qosdicts = [qos_to_dict(x) for x in qoses] or None
660 sub = node.create_subscription(cls, topic, handler, qos)
661 sub.get_message_class = lambda: cls
662 sub.get_message_definition = lambda: get_message_definition(cls)
663 sub.get_message_type_hash = lambda: get_message_type_hash(cls)
664 sub.get_qoses = lambda: qosdicts
665 sub.unregister = sub.destroy
666 return sub
667
668
669def format_message_value(msg, name, value):
670 """
671 Returns a message attribute value as string.
672
673 Result is at least 10 chars wide if message is a ROS2 time/duration
674 (aligning seconds and nanoseconds).
675 """
676 LENS = {"sec": 13, "nanosec": 9}
677 v = "%s" % (value, )
678 if not isinstance(msg, tuple(ROS_TIME_CLASSES)) or name not in LENS:
679 return v
680
681 EXTRA = sum(v.count(x) * len(x) for x in (MatchMarkers.START, MatchMarkers.END))
682 return ("%%%ds" % (LENS[name] + EXTRA)) % v # Default %10s/%9s for secs/nanosecs
683
684
685@memoize
686def get_message_class(typename):
687 """Returns ROS2 message class, or None if unknown type."""
688 try: return rosidl_runtime_py.utilities.get_message(make_full_typename(typename))
689 except Exception: return None
690
691
692def get_message_definition(msg_or_type):
693 """
694 Returns ROS2 message type definition full text, including subtype definitions.
695
696 Returns None if unknown type.
697 """
698 typename = msg_or_type if isinstance(msg_or_type, str) else get_message_type(msg_or_type)
699 return _get_message_definition(canonical(typename))
700
701
702def get_message_type_hash(msg_or_type):
703 """Returns ROS2 message type MD5 hash, or "" if unknown type."""
704 typename = msg_or_type if isinstance(msg_or_type, str) else get_message_type(msg_or_type)
705 return _get_message_type_hash(canonical(typename))
706
707
708@memoize
709def _get_message_definition(typename):
710 """Returns ROS2 message type definition full text, or None on error (internal caching method)."""
711 try:
712 texts, pkg = collections.OrderedDict(), typename.rsplit("/", 1)[0]
713 try:
714 typepath = rosidl_runtime_py.get_interface_path(make_full_typename(typename) + ".msg")
715 with open(typepath, encoding="utf-8") as f:
716 texts[typename] = f.read()
717 except Exception: # .msg file unavailable: parse IDL
718 texts[typename] = get_message_definition_idl(typename)
719 for line in texts[typename].splitlines():
720 if not line or not line[0].isalpha():
721 continue # for line
722 linetype = scalar(canonical(re.sub(r"^([a-zA-Z][^\s]+)(.+)", r"\1", line)))
723 if linetype in api.ROS_BUILTIN_TYPES:
724 continue # for line
725 linetype = linetype if "/" in linetype else "std_msgs/Header" \
726 if "Header" == linetype else "%s/%s" % (pkg, linetype)
727 linedef = None if linetype in texts else get_message_definition(linetype)
728 if linedef: texts[linetype] = linedef
729
730 basedef = texts.pop(next(iter(texts)))
731 subdefs = ["%s\nMSG: %s\n%s" % ("=" * 80, k, v) for k, v in texts.items()]
732 return basedef + ("\n" if subdefs else "") + "\n".join(subdefs)
733 except Exception as e:
734 ConsolePrinter.warn("Error collecting type definition of %s: %s", typename, e)
735 return None
736
737
738@memoize
739def get_message_definition_idl(typename):
740 """
741 Returns ROS2 message type definition parsed from IDL file.
742
743 @since version 0.4.2
744 """
745
746 def format_comment(text):
747 """Returns annotation text formatted with comment prefixes and escapes."""
748 ESCAPES = {"\n": "\\n", "\t": "\\t", "\x07": "\\a",
749 "\x08": "\\b", "\x0b": "\\v", "\x0c": "\\f"}
750 repl = lambda m: ESCAPES[m.group(0)]
751 return "#" + "\n#".join(re.sub("|".join(map(re.escape, ESCAPES)), repl, l)
752 for l in text.split("\\n"))
753
754 def format_type(typeobj, msgpackage, constant=False):
755 """Returns canonical type name, like "uint8" or "string<=5" or "nav_msgs/Path"."""
756 result = None
757 if isinstance(typeobj, rosidl_parser.definition.AbstractNestedType):
758 # Array, BoundedSequence, UnboundedSequence
759 valuetype = format_type(typeobj.value_type, msgpackage, constant)
760 size, bounding = "", ""
761 if isinstance(typeobj, rosidl_parser.definition.Array):
762 size = typeobj.size
763 elif typeobj.has_maximum_size():
764 size = typeobj.maximum_size
765 if isinstance(typeobj, rosidl_parser.definition.BoundedSequence):
766 bounding = "<="
767 result = "%s[%s%s]" % (valuetype, bounding, size) # type[], type[N], type[<=N]
768 elif isinstance(typeobj, rosidl_parser.definition.AbstractWString):
769 result = "wstring"
770 elif isinstance(typeobj, rosidl_parser.definition.AbstractString):
771 result = "string"
772 elif isinstance(typeobj, rosidl_parser.definition.NamespacedType):
773 nameparts = typeobj.namespaced_name()
774 result = canonical("/".join(nameparts))
775 if nameparts[0].value == msgpackage or "std_msgs/Header" == result:
776 result = canonical("/".join(nameparts[-1:])) # Omit package if local or Header
777 else: # Primitive like int8
778 result = DDS_TYPES.get(typeobj.typename, typeobj.typename)
779
780 if isinstance(typeobj, rosidl_parser.definition.AbstractGenericString) \
781 and typeobj.has_maximum_size() and not constant: # Constants get parsed into "string<=N"
782 result += "<=%s" % typeobj.maximum_size
783
784 return result
785
786 def get_comments(obj):
787 """Returns all comments for annotatable object, as [text, ]."""
788 return [v.get("text", "") for v in obj.get_annotation_values("verbatim")
789 if "comment" == v.get("language")]
790
791 typepath = rosidl_runtime_py.get_interface_path(make_full_typename(typename) + ".idl")
792 with open(typepath, encoding="utf-8") as f:
793 idlcontent = rosidl_parser.parser.parse_idl_string(f.read())
794 msgidl = idlcontent.get_elements_of_type(rosidl_parser.definition.Message)[0]
795 package = msgidl.structure.namespaced_type.namespaces[0]
796 DUMMY = rosidl_parser.definition.EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME
797
798 lines = []
799 # Add general comments
800 lines.extend(map(format_comment, get_comments(msgidl.structure)))
801 # Add blank line between general comments and constants
802 if lines and msgidl.constants: lines.append("")
803 # Add constants
804 for c in msgidl.constants:
805 ctype = format_type(c.type, package, constant=True)
806 lines.extend(map(format_comment, get_comments(c)))
807 lines.append("%s %s=%s" % (ctype, c.name, c.value))
808 # Parser adds dummy placeholder if constants-only message
809 if not (len(msgidl.structure.members) == 1 and DUMMY == msgidl.structure[0].name):
810 # Add blank line between constants and fields
811 if msgidl.constants and msgidl.structure.members: lines.append("")
812 # Add fields
813 for m in msgidl.structure.members:
814 lines.extend(map(format_comment, get_comments(m)))
815 lines.append("%s %s" % (format_type(m.type, package), m.name))
816 return "\n".join(lines)
817
818
819@memoize
820def _get_message_type_hash(typename):
821 """Returns ROS2 message type MD5 hash (internal caching method)."""
822 msgdef = get_message_definition(typename)
823 return "" if msgdef is None else api.calculate_definition_hash(typename, msgdef)
824
825
826def get_message_fields(val):
827 """Returns OrderedDict({field name: field type name}) if ROS2 message, else {}."""
828 if not is_ros_message(val): return {}
829 fields = ((k, canonical(v)) for k, v in val.get_fields_and_field_types().items())
830 return collections.OrderedDict(fields)
831
832
833def get_message_type(msg_or_cls):
834 """Returns ROS2 message type name, like "std_msgs/Header"."""
835 cls = msg_or_cls if inspect.isclass(msg_or_cls) else type(msg_or_cls)
836 return canonical("%s/%s" % (cls.__module__.split(".")[0], cls.__name__))
837
838
839def get_message_value(msg, name, typename=None, default=Ellipsis):
840 """
841 Returns object attribute value, with numeric arrays converted to lists.
842
843 @param name message attribute name
844 @param typename value ROS type name, for identifying byte arrays
845 @param default value to return if attribute does not exist; raises exception otherwise
846 """
847 if default is not Ellipsis and not hasattr(msg, name): return default
848 v, scalartype = getattr(msg, name), scalar(typename)
849 if isinstance(v, (bytes, array.array)): v = list(v)
850 elif numpy and isinstance(v, (numpy.generic, numpy.ndarray)):
851 v = v.tolist() # Returns value as Python type, either scalar or list
852 if v and isinstance(v, (list, tuple)) and scalartype in ("byte", "uint8"):
853 if isinstance(v[0], bytes):
854 v = list(map(ord, v)) # In ROS2, a byte array like [0, 1] is [b"\0", b"\1"]
855 elif scalartype == typename:
856 v = v[0] # In ROS2, single byte values are given as bytes()
857 return list(v) if isinstance(v, tuple) else v
858
859
860def get_rostime(fallback=False):
861 """
862 Returns current ROS2 time, as rclpy.time.Time.
863
864 @param fallback use wall time if node not initialized
865 """
866 try: return node.get_clock().now()
867 except Exception:
868 if fallback: return make_time(time.time())
869 raise
870
871
872def get_topic_types():
873 """
874 Returns currently available ROS2 topics, as [(topicname, typename)].
875
876 Omits topics that the current ROS2 node itself has published.
877 """
878 result = []
879 myname, myns = node.get_name(), node.get_namespace()
880 mytypes = {} # {topic: [typename, ]}
881 for topic, typenames in node.get_publisher_names_and_types_by_node(myname, myns):
882 mytypes.setdefault(topic, []).extend(typenames)
883 for t in ("/parameter_events", "/rosout"): # Published by all nodes
884 mytypes.pop(t, None)
885 for topic, typenames in node.get_topic_names_and_types(): # [(topicname, [typename, ])]
886 for typename in typenames:
887 if topic not in mytypes or typename not in mytypes[topic]:
888 result += [(topic, canonical(typename))]
889 return result
890
891
892def is_ros_message(val, ignore_time=False):
893 """
894 Returns whether value is a ROS2 message or special like ROS2 time/duration class or instance.
895
896 @param ignore_time whether to ignore ROS2 time/duration types
897 """
898 is_message = rosidl_runtime_py.utilities.is_message(val)
899 if is_message and ignore_time: is_message = not is_ros_time(val)
900 return is_message
901
902
903def is_ros_time(val):
904 """Returns whether value is a ROS2 time/duration class or instance."""
905 if inspect.isclass(val): return issubclass(val, tuple(ROS_TIME_CLASSES))
906 return isinstance(val, tuple(ROS_TIME_CLASSES))
907
908
909def make_duration(secs=0, nsecs=0):
910 """Returns an rclpy.duration.Duration."""
911 return rclpy.duration.Duration(seconds=secs, nanoseconds=nsecs)
912
913
914def make_time(secs=0, nsecs=0):
915 """Returns a ROS2 time, as rclpy.time.Time."""
916 return rclpy.time.Time(seconds=secs, nanoseconds=nsecs)
917
918
919def make_full_typename(typename):
920 """Returns "pkg/msg/Type" for "pkg/Type"."""
921 if "/msg/" in typename or "/" not in typename:
922 return typename
923 return "%s/msg/%s" % tuple((x[0], x[-1]) for x in [typename.split("/")])[0]
924
926def make_subscriber_qos(topic, typename, queue_size=10):
927 """
928 Returns rclpy.qos.QoSProfile that matches the most permissive publisher.
929
930 @param queue_size QoSProfile.depth
931 """
932 qos = rclpy.qos.QoSProfile(depth=queue_size)
933 infos = node.get_publishers_info_by_topic(topic)
934 rels, durs = zip(*[(x.qos_profile.reliability, x.qos_profile.durability)
935 for x in infos if canonical(x.topic_type) == typename])
936 # If subscription demands stricter QoS than publisher offers, no messages are received
937 if rels: qos.reliability = max(rels) # DEFAULT < RELIABLE < BEST_EFFORT
938 if durs: qos.durability = max(durs) # DEFAULT < TRANSIENT_LOCAL < VOLATILE
939 return qos
940
941
942def qos_to_dict(qos):
943 """Returns rclpy.qos.QoSProfile as dictionary."""
944 result = {}
945 if qos:
946 QOS_TYPES = (bool, int, enum.Enum) + tuple(ROS_TIME_CLASSES)
947 for name in (n for n in dir(qos) if not n.startswith("_")):
948 val = getattr(qos, name)
949 if name.startswith("_") or not isinstance(val, QOS_TYPES):
950 continue # for name
951 if isinstance(val, enum.Enum):
952 val = val.value
953 elif isinstance(val, tuple(ROS_TIME_CLASSES)):
954 val = dict(zip(["sec", "nsec"], to_sec_nsec(val)))
955 result[name] = val
956 return [result]
957
958
960 """Returns ROS2 message as a serialized binary."""
961 with api.TypeMeta.make(msg) as m:
962 if m.data is not None: return m.data
963 return rclpy.serialization.serialize_message(msg)
964
965
966def deserialize_message(raw, cls_or_typename):
967 """Returns ROS2 message or service request/response instantiated from serialized binary."""
968 cls = cls_or_typename
969 if isinstance(cls, str): cls = get_message_class(cls)
970 return rclpy.serialization.deserialize_message(raw, cls)
971
972
973@memoize
974def scalar(typename):
975 """
976 Returns unbounded scalar type from ROS2 message data type
977
978 Like "uint8" from "uint8[]", or "string" from "string<=10[<=5]".
979 Returns type unchanged if not a collection or bounded type.
980 """
981 if typename and "[" in typename: typename = typename[:typename.index("[")]
982 if typename and "<=" in typename: typename = typename[:typename.index("<=")]
983 return typename
984
985
986def set_message_value(obj, name, value):
987 """Sets message or object attribute value."""
988 if is_ros_message(obj):
989 # Bypass setter as it does type checking
990 fieldmap = obj.get_fields_and_field_types()
991 if name in fieldmap:
992 name = obj.__slots__[list(fieldmap).index(name)]
993 setattr(obj, name, value)
995
996def time_message(val, to_message=True, clock_type=None):
997 """
998 Converts ROS2 time/duration between `rclpy` and `builtin_interfaces` objects.
999
1000 @param val ROS2 time/duration object from `rclpy` or `builtin_interfaces`
1001 @param to_message whether to convert from `rclpy` to `builtin_interfaces` or vice versa
1002 @param clock_type ClockType for converting to `rclpy.Time`, defaults to `ROS_TIME`
1003 @return value converted to appropriate type, or original value if not convertible
1004 """
1005 to_message, clock_type = bool(to_message), (clock_type or rclpy.clock.ClockType.ROS_TIME)
1006 if isinstance(val, tuple(ROS_TIME_CLASSES)):
1007 rcl_cls = next(k for k, v in ROS_TIME_MESSAGES.items() if isinstance(val, (k, v)))
1008 is_rcl = isinstance(val, tuple(ROS_TIME_MESSAGES))
1009 name = "to_msg" if to_message and is_rcl else "from_msg" if to_message == is_rcl else None
1010 args = [val] + ([clock_type] if rcl_cls is rclpy.time.Time and "from_msg" == name else [])
1011 return getattr(rcl_cls, name)(*args) if name else val
1012 return val
1013
1015def to_duration(val):
1016 """
1017 Returns value as ROS2 duration if convertible (int/float/time/datetime/decimal), else value.
1018
1019 Convertible types: int/float/time/datetime/decimal/builtin_interfaces.Time.
1020 """
1021 result = val
1022 if isinstance(val, decimal.Decimal):
1023 result = make_duration(int(val), float(val % 1) * 10**9)
1024 elif isinstance(val, datetime.datetime):
1025 result = make_duration(int(val.timestamp()), 1000 * val.microsecond)
1026 elif isinstance(val, (float, int)):
1027 result = make_duration(val)
1028 elif isinstance(val, rclpy.time.Time):
1029 result = make_duration(nsecs=val.nanoseconds)
1030 elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1031 result = make_duration(val.sec, val.nanosec)
1032 return result
1033
1034
1035def to_nsec(val):
1036 """Returns value in nanoseconds if value is ROS2 time/duration, else value."""
1037 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1038 return val
1039 if hasattr(val, "nanoseconds"): # rclpy.Time/Duration
1040 return val.nanoseconds
1041 return val.sec * 10**9 + val.nanosec # builtin_interfaces.msg.Time/Duration
1042
1043
1044def to_sec(val):
1045 """Returns value in seconds if value is ROS2 time/duration, else value."""
1046 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1047 return val
1048 if hasattr(val, "nanoseconds"): # rclpy.Time/Duration
1049 secs, nsecs = divmod(val.nanoseconds, 10**9)
1050 return secs + nsecs / 1E9
1051 return val.sec + val.nanosec / 1E9 # builtin_interfaces.msg.Time/Duration
1052
1053
1054def to_sec_nsec(val):
1055 """Returns value as (seconds, nanoseconds) if value is ROS2 time/duration, else value."""
1056 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1057 return val
1058 if hasattr(val, "seconds_nanoseconds"): # rclpy.Time
1059 return val.seconds_nanoseconds()
1060 if hasattr(val, "nanoseconds"): # rclpy.Duration
1061 return divmod(val.nanoseconds, 10**9)
1062 return (val.sec, val.nanosec) # builtin_interfaces.msg.Time/Duration
1063
1064
1065def to_time(val):
1066 """
1067 Returns value as ROS2 time if convertible, else value.
1068
1069 Convertible types: int/float/duration/datetime/decimal/builtin_interfaces.Time.
1070 """
1071 result = val
1072 if isinstance(val, decimal.Decimal):
1073 result = make_time(int(val), float(val % 1) * 10**9)
1074 elif isinstance(val, datetime.datetime):
1075 result = make_time(int(val.timestamp()), 1000 * val.microsecond)
1076 elif isinstance(val, (float, int)):
1077 result = make_time(val)
1078 elif isinstance(val, rclpy.duration.Duration):
1079 result = make_time(nsecs=val.nanoseconds)
1080 elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1081 result = make_time(val.sec, val.nanosec)
1082 return result
1083
1084
1085__all__ = [
1086 "BAG_EXTENSIONS", "DDS_TYPES", "ROS_ALIAS_TYPES", "ROS_TIME_CLASSES", "ROS_TIME_MESSAGES",
1087 "ROS_TIME_TYPES", "SKIP_EXTENSIONS", "Bag", "ROS2Bag", "context", "executor", "node",
1088 "canonical", "create_publisher", "create_subscriber", "deserialize_message",
1089 "format_message_value", "get_message_class", "get_message_definition",
1090 "get_message_definition_idl", "get_message_fields", "get_message_type",
1091 "get_message_type_hash", "get_message_value", "get_rostime", "get_topic_types", "init_node",
1092 "is_ros_message", "is_ros_time", "make_duration", "make_full_typename", "make_subscriber_qos",
1093 "make_time", "qos_to_dict", "scalar", "serialize_message", "set_message_value", "shutdown_node",
1094 "time_message", "to_duration", "to_nsec", "to_sec", "to_sec_nsec", "to_time", "validate",
1095]
ROS bag interface.
Definition api.py:98
tuple MODES
Supported opening modes, overridden in subclasses.
Definition api.py:112
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
Definition api.py:101
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
Definition api.py:109
get_qoses(self, topic, typename)
Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.
Definition api.py:253
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
Definition api.py:291
get_topic_info(self, counts=True)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
Definition api.py:234
closed
Returns whether file is closed.
Definition api.py:348
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
Definition api.py:105
ROS2 bag reader and writer (SQLite format), providing most of rosbag.Bag interface.
Definition ros2.py:91
get_type_and_topic_info(self, topic_filters=None)
Returns thorough metainfo on topic and message types.
Definition ros2.py:209
mode
Returns file open mode.
Definition ros2.py:460
open(self)
Opens the bag file if not already open.
Definition ros2.py:408
filename
Returns bag file path.
Definition ros2.py:440
autodetect(cls, f)
Returns whether file is recognizable as SQLite format.
Definition ros2.py:550
get_start_time(self)
Returns the start time of the bag, as UNIX timestamp, or None if bag empty.
Definition ros2.py:165
get_end_time(self)
Returns the end time of the bag, as UNIX timestamp, or None if bag empty.
Definition ros2.py:175
write(self, topic, msg, t=None, raw=False, qoses=None, **__)
Writes a message to the bag.
Definition ros2.py:373
get_message_class(self, typename, typehash=None)
Returns ROS2 message type class, or None if unknown message type for bag.
Definition ros2.py:255
get_qoses(self, topic, typename)
Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.
Definition ros2.py:241
get_message_type_hash(self, msg_or_type)
Returns ROS2 message type MD5 hash, or None if unknown message type for bag.
Definition ros2.py:279
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
Definition ros2.py:303
str SQLITE_MAGIC
SQLite file header magic start bytes.
Definition ros2.py:121
close(self)
Closes the bag file.
Definition ros2.py:413
__contains__(self, key)
Returns whether bag contains given topic.
Definition ros2.py:466
__init__(self, filename, mode="a", *_, **__)
Definition ros2.py:129
closed
Returns whether file is closed.
Definition ros2.py:424
size
Returns current file size in bytes (including journaling files).
Definition ros2.py:448
topics
Returns the list of topics in bag, in alphabetic order.
Definition ros2.py:432
__next__(self)
Retrieves next message from bag as (topic, message, timestamp).
Definition ros2.py:471
get_topic_info(self, counts=True, ensure_types=True)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
Definition ros2.py:194
get_message_definition(self, msg_or_type)
Returns ROS2 message type definition full text, including subtype definitions.
Definition ros2.py:271
get_message_count(self, topic_filters=None)
Returns the number of messages in the bag.
Definition ros2.py:152
str CREATE_SQL
ROS2 bag SQLite schema.
Definition ros2.py:98
time_message(val, to_message=True, clock_type=None)
Converts ROS2 time/duration between rclpy and builtin_interfaces objects.
Definition ros2.py:1031
make_full_typename(typename)
Returns "pkg/msg/Type" for "pkg/Type".
Definition ros2.py:947
canonical(typename, unbounded=False)
Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
Definition ros2.py:620
make_subscriber_qos(topic, typename, queue_size=10)
Returns rclpy.qos.QoSProfile that matches the most permissive publisher.
Definition ros2.py:959
create_publisher(topic, cls_or_typename, queue_size)
Returns an rclpy.Publisher instance, with .get_num_connections() and .unregister().
Definition ros2.py:653
serialize_message(msg)
Returns ROS2 message as a serialized binary.
Definition ros2.py:987
to_sec(val)
Returns value in seconds if value is ROS2 time/duration, else value.
Definition ros2.py:1071
qos_to_dict(qos)
Returns rclpy.qos.QoSProfile as dictionary.
Definition ros2.py:970
get_topic_types()
Returns currently available ROS2 topics, as [(topicname, typename)].
Definition ros2.py:905
format_message_value(msg, name, value)
Returns a message attribute value as string.
Definition ros2.py:699
get_message_value(msg, name, typename=None, default=Ellipsis)
Returns object attribute value, with numeric arrays converted to lists.
Definition ros2.py:874
get_message_class(typename)
Returns ROS2 message class, or None if unknown type.
Definition ros2.py:710
to_duration(val)
Returns value as ROS2 duration if convertible (int/float/time/datetime/decimal), else value.
Definition ros2.py:1047
make_time(secs=0, nsecs=0)
Returns a ROS2 time, as rclpy.time.Time.
Definition ros2.py:942
deserialize_message(raw, cls_or_typename)
Returns ROS2 message or service request/response instantiated from serialized binary.
Definition ros2.py:994
get_message_type(msg_or_cls)
Returns ROS2 message type name, like "std_msgs/Header".
Definition ros2.py:861
get_message_definition(msg_or_type)
Returns ROS2 message type definition full text, including subtype definitions.
Definition ros2.py:721
validate(live=False)
Returns whether ROS2 environment is set, prints error if not.
Definition ros2.py:601
scalar(typename)
Returns unbounded scalar type from ROS2 message data type.
Definition ros2.py:1008
get_message_definition_idl(typename)
Returns ROS2 message type definition parsed from IDL file.
Definition ros2.py:770
make_duration(secs=0, nsecs=0)
Returns an rclpy.duration.Duration.
Definition ros2.py:937
get_rostime(fallback=False)
Returns current ROS2 time, as rclpy.time.Time.
Definition ros2.py:893
to_nsec(val)
Returns value in nanoseconds if value is ROS2 time/duration, else value.
Definition ros2.py:1062
to_time(val)
Returns value as ROS2 time if convertible, else value.
Definition ros2.py:1097
shutdown_node()
Shuts down live ROS2 node.
Definition ros2.py:586
set_message_value(obj, name, value)
Sets message or object attribute value.
Definition ros2.py:1014
is_ros_time(val)
Returns whether value is a ROS2 time/duration class or instance.
Definition ros2.py:931
create_subscriber(topic, cls_or_typename, handler, queue_size)
Returns an rclpy.Subscription.
Definition ros2.py:670
get_message_type_hash(msg_or_type)
Returns ROS2 message type MD5 hash, or "" if unknown type.
Definition ros2.py:726
to_sec_nsec(val)
Returns value as (seconds, nanoseconds) if value is ROS2 time/duration, else value.
Definition ros2.py:1081
init_node(name)
Initializes a ROS2 node if not already initialized.
Definition ros2.py:563
is_ros_message(val, ignore_time=False)
Returns whether value is a ROS2 message or special like ROS2 time/duration class or instance.
Definition ros2.py:925
get_message_fields(val)
Returns OrderedDict({field name: field type name}) if ROS2 message, else {}.
Definition ros2.py:854