5------------------------------------------------------------------------------
6This file is part of rosros - simple unified interface to ROS1 / ROS2.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace rosros.ros2
30import ament_index_python
31import builtin_interfaces.msg
33import rclpy.callback_groups
37import rclpy.exceptions
42import rclpy.serialization
44import rclpy.subscription
47import rosidl_runtime_py.utilities
48from rclpy.impl.implementation_singleton import rclpy_implementation as rclpy_extension
56## Stand-in for `rospy.AnyMsg` with equivalent interface
59## ROS2 time/duration message types
60ROS_TIME_TYPES = ["builtin_interfaces/Time", "builtin_interfaces/Duration"]
62## ROS2 time/duration types and message types mapped to type names
63ROS_TIME_CLASSES = {rclpy.time.Time: "builtin_interfaces/Time",
64 builtin_interfaces.msg.Time:
"builtin_interfaces/Time",
65 rclpy.duration.Duration:
"builtin_interfaces/Duration",
66 builtin_interfaces.msg.Duration:
"builtin_interfaces/Duration"}
69ROS_TIME_MESSAGES = {rclpy.time.Time: builtin_interfaces.msg.Time,
70 rclpy.duration.Duration: builtin_interfaces.msg.Duration}
73ROS_ALIAS_TYPES = {
"byte":
"uint8",
"char":
"int8"}
76DDS_TYPES = {
"boolean":
"bool",
81 "unsigned short":
"uint16",
83 "unsigned long":
"uint32",
85 "unsigned long long":
"uint64", }
112SHUTDOWN_CALLBACKS = []
119 """Logging handler that forwards logging messages to ROS2 logger."""
127 """Adds message to ROS2 logger."""
128 if not self.
_logger.is_enabled_for(record.levelno):
131 text = record.msg % record.args
if record.args
else record.msg
135 text +=
"\n\n" +
"".join(traceback.format_exception(*record.exc_info))
138 if logging.DEBUG == record.levelno:
140 elif logging.INFO == record.levelno:
142 elif logging.WARN == record.levelno:
144 elif logging.ERROR == record.levelno:
146 elif logging.FATAL == record.levelno:
154 """ROS2 bag interface, partially mimicking rosbag.Bag."""
160 BagMessage = collections.namedtuple(
"BagMessage",
"topic message timestamp")
164 TopicTuple = collections.namedtuple(
"TopicTuple", [
"msg_type",
"message_count",
165 "connections",
"frequency"])
168 TypesAndTopicsTuple = collections.namedtuple(
"TypesAndTopicsTuple", [
"msg_types",
"topics"])
172CREATE TABLE IF NOT EXISTS messages (
173 id INTEGER PRIMARY KEY,
174 topic_id INTEGER NOT NULL,
175 timestamp INTEGER NOT NULL,
179CREATE TABLE IF NOT EXISTS topics (
180 id INTEGER PRIMARY KEY,
183 serialization_format TEXT NOT NULL,
184 offered_qos_profiles TEXT NOT NULL
187CREATE INDEX IF NOT EXISTS timestamp_idx ON messages (timestamp ASC);
189PRAGMA journal_mode=WAL;
190PRAGMA synchronous=NORMAL;
193 def __init__(self, filename, mode="a", *_, **__):
195 @param filename bag file path to open
196 @param mode mode to open file
in, one of
"r",
"w",
"a";
197 file will be overwritten
if "w",
and unwriteable
if "r"
209 if "w" == mode
and os.path.exists(filename):
211 self.
_db = sqlite3.connect(filename, isolation_level=
None, check_same_thread=
False)
212 self.
_db.row_factory =
lambda cursor, row: dict(sqlite3.Row(cursor, row))
217 """Iterates over all messages in the bag."""
222 """Context manager entry."""
226 def __exit__(self, exc_type, exc_value, traceback):
227 """Context manager exit, closes bag."""
233 Returns the number of messages in the bag.
235 @param topic_filters list of topics
or a single topic to filter by,
if at all
238 sql, args, topics =
"SELECT COUNT(*) AS count FROM messages", (), topic_filters
240 args = tuple(topics)
if isinstance(topics, (list, set, tuple))
else (topics, )
241 argstr =
", ".join(
"?" * len(args))
242 sql +=
" WHERE topic_id IN (SELECT id FROM topics WHERE name IN (%s))" % argstr
243 row = self.
_db.execute(sql, args).fetchone()
249 """Returns the start time of the bag, as UNIX timestamp."""
251 row = self.
_db.execute(
"SELECT MIN(timestamp) AS val FROM messages").fetchone()
252 if row[
"val"]
is None:
return None
253 secs, nsecs = divmod(row[
"val"], 10**9)
254 return secs + nsecs / 1E9
259 """Returns the end time of the bag, as UNIX timestamp."""
261 row = self.
_db.execute(
"SELECT MAX(timestamp) AS val FROM messages").fetchone()
262 if row[
"val"]
is None:
return None
263 secs, nsecs = divmod(row[
"val"], 10**9)
264 return secs + nsecs / 1E9
270 Returns topic and message type metainfo
as {(topic, typename, typehash): count}.
272 @param counts whether to
return actual message counts instead of
None
274 DEFAULTCOUNT = 0 if counts
else None
276 for row
in self.
_db.execute(
"SELECT * FROM topics ORDER BY name"):
277 topic, typename, typehash = row[
"name"],
canonical(row[
"type"]),
None
279 typehash = self.
_types[typename]
if typename
in self.
_types else \
282 logger.warning(
"Error getting message type MD5 hash for %r.",
283 typename, exc_info=
True)
285 self.
_counts[(topic, typename, typehash)] = DEFAULTCOUNT
286 self.
_types.setdefault(typename, typehash)
289 countkeys = {v[
"id"]: (t, n, self.
_types.get(n))
291 for row
in self.
_db.execute(
"SELECT topic_id, COUNT(*) AS count FROM messages "
292 "GROUP BY topic_id"):
293 if row[
"topic_id"]
in countkeys:
294 self.
_counts[countkeys[row[
"topic_id"]]] = row[
"count"]
301 Returns thorough metainfo on topic and message types.
303 @param topic_filters list of topics
or a single topic to filter by,
if at all
305 msg_types
as dict of {typename: typehash},
309 topics = topic_filters
310 topics = topics
if isinstance(topics, (list, set, tuple))
else [topics]
if topics
else []
311 msgtypes = {n: h
for t, n, h
in counts
if not topics
or t
in topics}
315 """Returns median value from given sorted numbers."""
317 return None if not vlen
else vals[vlen // 2]
if vlen % 2
else \
318 float(vals[vlen // 2 - 1] + vals[vlen // 2]) / 2
320 for (t, n, _), c
in sorted(counts.items()):
321 if topics
and t
not in topics:
continue
325 cursor = self.
_db.execute(
"SELECT timestamp FROM messages WHERE topic_id = ?", args)
326 stamps = sorted(x[
"timestamp"] / 1E9
for x
in cursor)
327 mymedian = median(sorted(s1 - s0
for s1, s0
in zip(stamps[1:], stamps[:-1])))
328 freq = 1.0 / mymedian
if mymedian
else None
335 """Returns topic Quality-of-Service profiles as a list of dicts, or None if not available."""
336 topickey = (topic, typename)
340 if topicrow.get(
"offered_qos_profiles"):
341 self.
_qoses[topickey] = yaml.safe_load(topicrow[
"offered_qos_profiles"])
342 except Exception
as e:
343 logger.warning(
"Error parsing quality of service for topic %r: %r", topic, e)
344 self.
_qoses.setdefault(topickey,
None)
345 return self.
_qoses[topickey]
349 """Returns ROS2 message type class."""
354 """Returns ROS2 message type definition full text, including subtype definitions."""
359 """Returns ROS2 message type MD5 hash."""
363 def read_messages(self, topics=None, start_time=None, end_time=None, raw=False, *_, **__):
365 Yields messages from the bag
in chronological order.
367 @param topics list of topics
or a single topic to filter by,
if at all
368 @param start_time earliest timestamp of message to
return,
369 as `rclpy.Time`
or convertible
370 (int/float/duration/datetime/decimal/builtin_interfaces.Time)
371 @param end_time latest timestamp of message to
return,
372 as `rclpy.Time`
or convertible
373 (int/float/duration/datetime/decimal/builtin_interfaces.Time)
374 @param raw
if True, then returned messages are tuples of
375 (typename, bytes, typehash, typeclass)
376 @return generator of
BagMessage(topic, message, rclpy.time.Time) namedtuples
379 if not self.
_dbtopics or (topics
is not None and not topics):
382 sql, exprs, args =
"SELECT * FROM messages", [], ()
384 topics = topics
if isinstance(topics, (list, set, tuple))
else [topics]
385 topic_ids = [x[
"id"]
for (topic, _), x
in self.
_dbtopics.items()
if topic
in topics]
386 exprs += [
"topic_id IN (%s)" %
", ".join(map(str, topic_ids))]
387 if start_time
is not None:
388 exprs += [
"timestamp >= ?"]
390 if end_time
is not None:
391 exprs += [
"timestamp <= ?"]
393 sql += ((
" WHERE " +
" AND ".join(exprs))
if exprs
else "")
394 sql +=
" ORDER BY timestamp"
396 topicmap = {v[
"id"]: v
for v
in self.
_dbtopics.values()}
398 topicset = set(topics
or [t
for t, _
in self.
_dbtopics])
399 for row
in self.
_db.execute(sql, args):
400 tdata = topicmap[row[
"topic_id"]]
401 topic, typename = tdata[
"name"],
canonical(tdata[
"type"])
402 if not raw
and msgtypes.get(typename, typename)
is None:
continue
403 typehash = self.
_types.get(typename)
406 cls = msgtypes[typename]
if typename
in msgtypes
else \
408 if raw: msg = (typename, row[
"data"], typehash, cls)
409 else: msg = rclpy.serialization.deserialize_message(row[
"data"], cls)
410 except Exception
as e:
411 msgtypes.setdefault(typename,
None)
412 logger.warning(
"Error loading type %r in topic %r: %s", typename, topic, e)
413 if raw: msg = (typename, row[
"data"], typehash,
None)
414 elif set(n
for n, c
in msgtypes.items()
if c
is None) == topicset:
417 stamp = rclpy.time.Time(nanoseconds=row[
"timestamp"])
424 def write(self, topic, msg, t=None, raw=False, qoses=None, **__):
426 Writes a message to the bag.
428 @param topic name of topic
429 @param msg ROS2 message
430 @param t message timestamp
if not using current wall time,
431 as `rclpy.Time`
or convertible
432 (int/float/duration/datetime/decimal/builtin_interfaces.Time)
433 @param raw
if true, msg
is expected
as a tuple starting
with
434 (typename, bytes, typehash, )
435 @param qoses list of Quality-of-Service profile dictionaries
for topic,
if any;
436 inserted to topics-table only
if first message
for topic
in bag
438 if "r" == self.
_mode:
439 raise io.UnsupportedOperation(
"write")
443 typename, binary, typehash = msg[:3]
448 topickey = (topic, typename)
449 cursor = self.
_db.cursor()
452 full_typename = api.make_full_typename(typename)
453 sql =
"INSERT INTO topics (name, type, serialization_format, offered_qos_profiles) " \
454 "VALUES (?, ?, ?, ?)"
455 qoses = yaml.safe_dump(qoses)
if isinstance(qoses, (list, tuple))
else ""
456 args = (topic, full_typename,
"cdr", qoses)
457 cursor.execute(sql, args)
458 tdata = {
"id": cursor.lastrowid,
"name": topic,
"type": full_typename,
459 "serialization_format":
"cdr",
"offered_qos_profiles": qoses}
463 sql =
"INSERT INTO messages (topic_id, timestamp, data) VALUES (?, ?, ?)"
464 args = (self.
_dbtopics[topickey][
"id"], timestamp, binary)
465 cursor.execute(sql, args)
466 countkey = (topic, typename, typehash)
467 if isinstance(self.
_counts.get(countkey), int): self.
_counts[countkey] += 1
471 """Closes the bag file."""
478 """Does nothing (ROS1 API conformity stand-in)."""
483 """Returns current file size in bytes (including journaling files)."""
485 for suffix
in (
"-journal",
"-wal")
if result
else ():
487 result += os.path.getsize(path)
if os.path.isfile(path)
else 0
493 """Returns file open mode."""
498 """Returns whether specified table exists in database."""
499 sql =
"SELECT 1 FROM sqlite_master WHERE type = ? AND name = ?"
500 return bool(self.
_db.execute(sql, (
"table", name)).fetchone())
504 """Returns informative text for bag, with a full overview of topics and types."""
506 indent =
lambda s, n: (
"\n%s" % (
" " * n)).join(s.splitlines())
508 fmttime =
lambda x: datetime.datetime.fromtimestamp(x).strftime(
"%b %d %Y %H:%M:%S.%f")[:-4]
510 """Returns duration seconds as text like "1hr 1:12s (3672s)" or "51.8s"."""
512 hh, rem = divmod(secs, 3600)
513 mm, ss = divmod(rem, 60)
514 if hh: result +=
"%dhr " % hh
515 if mm: result +=
"%d:" % mm
517 if hh
or mm: result +=
" (%ds)" % secs
525 if None not in (start, end):
526 entries[
"duration"] = fmtdur(end - start)
527 entries[
"start"] =
"%s (%.2f)" % (fmttime(start), start)
528 entries[
"end"] =
"%s (%.2f)" % (fmttime(end), end)
529 entries[
"size"] = util.format_bytes(self.
sizesize)
530 if any(counts.values()):
531 entries[
"messages"] = str(sum(counts.values()))
533 nhs = sorted(set((n, h)
for _, n, h
in counts))
534 namew = max(len(n)
for n, _
in nhs)
536 entries[
"types"] =
"\n".join(
"%s%s [%s]" % (n,
" " * (namew - len(n)), h)
for n, h
in nhs)
538 topicw = max(len(t)
for t, _, _
in counts)
539 typew = max(len(n)
for _, n, _
in counts)
540 countw = max(len(str(c))
for c
in counts.values())
542 for (t, n, _), c
in sorted(counts.items()):
545 line =
"%s%s" % (t,
" " * (topicw - len(t)))
547 line +=
" %s%s msg%s" % (
" " * (countw - len(str(c))), c,
" " if 1 == c
else "s")
551 line +=
"%s (%s connections)" % (
" " * (typew - len(n)), len(qq))
if len(qq) > 1
else ""
553 entries[
"topics"] =
"\n".join(lines)
555 labelw = max(map(len, entries))
556 return "\n".join(
"%s:%s %s" % (k,
" " * (labelw - len(k)), indent(v, labelw + 2))
557 for k, v
in entries.items())
562 """Container for local mutexes."""
565 NODE = threading.Lock()
568 SPIN = threading.RLock()
571 SPIN_ONCE = threading.RLock()
574 SPIN_START = threading.RLock()
579 """Raises if ROS2 node not inited."""
580 if not NODE:
raise Exception(
"Node not initialized")
583def init_node(name, args=None, namespace=None, anonymous=False, log_level=None, enable_rosout=True,
584 multithreaded=True, reentrant=False):
586 Initializes rclpy and creates ROS2 node.
588 @param name node name, without namespace
589 @param args list of command-line arguments
for the node
590 @param namespace node namespace override
591 @param anonymous whether to auto-generate a unique name
for the node,
592 using the given name
as base
593 @param log_level level to set
for ROS logging
594 (name like
"DEBUG" or one of `logging` constants like `logging.DEBUG`)
595 @param enable_rosout `
False` to suppress auto-publication of rosout
596 @param multithreaded use `MultiThreadedExecutor` instead of `SingleThreadedExecutor`
597 @param reentrant use `ReentrantCallbackGroup` instead of `MutuallyExclusiveCallbackGroup`
598 @return `rclpy.node.Node`
600 global NODE, CALLBACK_GROUP, EXECUTOR, SHUTDOWN
604 name =
"%s_%s_%s" % (name, os.getpid(), int(time.time_ns() * 1000))
609 logger.debug(
"Initializing ROS node %r.", name)
610 try: rclpy.init(args=args)
611 except RuntimeError:
pass
612 NODE = rclpy.create_node(name, namespace=namespace, enable_rosout=enable_rosout,
613 automatically_declare_parameters_from_overrides=
True,
614 allow_undeclared_parameters=
True)
615 execls = rclpy.executors.SingleThreadedExecutor
616 grpcls = rclpy.callback_groups.MutuallyExclusiveCallbackGroup
617 if multithreaded: execls = rclpy.executors.MultiThreadedExecutor
618 if reentrant: grpcls = rclpy.callback_groups.ReentrantCallbackGroup
620 EXECUTOR.add_node(NODE)
621 defgrp = NODE.default_callback_group
622 CALLBACK_GROUP = defgrp
if grpcls
is type(defgrp)
else grpcls()
625 if not any(isinstance(x, ROSLogHandler)
for x
in logger.handlers):
627 if log_level
is not None:
628 if not isinstance(log_level, str): log_level = logging.getLevelName(log_level)
629 logger.setLevel(log_level)
630 ros_level = rclpy.logging.get_logging_severity_from_string(log_level)
631 NODE.get_logger().set_level(ros_level)
637 Informs `rosros` of ROS2 having been initialized outside of `init_node()`.
639 @param node `rclpy.node.Node` instance to use
for ROS operations
641 if not isinstance(node, rclpy.node.Node):
642 raise TypeError(
"register_init() argument must be rclpy Node, not %r" %
643 node.__class__.__name__)
644 global NODE, EXECUTOR
645 NODE, EXECUTOR = node, node.executor
or rclpy.get_global_executor()
647 if not any(isinstance(x, ROSLogHandler)
for x
in logger.handlers):
653 Declares all parameters on node from defaults dictionary.
655 @param defaults nested dictionary
with suitable keys
and values
for ROS
656 (keys must be valid namespace names,
657 list values must
not contain nested values)
658 @param defaultkws parameters
as key=value
659 @return full nested parameters dictionary, combined
from given defaults
660 and externally set parameters on the node
664 stack = list(util.merge_dicts(defaults or {}, defaultkws).items())
667 if isinstance(v, dict):
669 util.set_value(result, k.split(PARAM_SEPARATOR), v)
670 stack.extend((
"%s%s%s" % (k, PARAM_SEPARATOR, k2), v2)
for k2, v2
in v.items())
671 elif not NODE.has_parameter(k):
674 for path, param
in NODE.get_parameters_by_prefix(
"").items():
675 if param.value
is not None:
676 util.set_value(result, [x
for x
in path.split(PARAM_SEPARATOR)
if x], param.value)
677 logger.debug(
"Initialized node parameters %r.", result)
683 Returns whether the parameter exists.
685 @param name full name of the parameter
in node namespace
691def get_param(name, default=None, autoset=True):
693 Returns parameter value from current ROS2 node.
695 @param default optional default to
return if parameter unknown
696 @param autoset set default value to node parameter
if unknown
697 @return parameter value,
or default
if parameter was unknown
699 @throws KeyError
if parameter
not set
and default
not given
703 if not NODE.has_parameter(name):
704 if not autoset
or default
is None:
705 raise KeyError(
"Parameter %r is not set" % name)
706 NODE.declare_parameter(name, default)
707 return NODE.get_parameter(name).value
711 """Returns the names of all current ROS2 node parameters."""
713 return list(NODE.get_parameters_by_prefix(
""))
718 Returns the current ROS2 node parameters, by default as nested dictionary.
720 @param nested
return a nested dictionary,
721 like `{
"my": {
"name": 1}}` vs {
"my.name": 1}
725 for path, param
in NODE.get_parameters_by_prefix(
"").items():
726 key = [x
for x
in path.split(PARAM_SEPARATOR)
if x]
if nested
else path
727 util.set_value(result, key, param.value)
731def set_param(name, value, descriptor=None):
733 Sets a parameter on the node, auto-declaring it if unknown so far.
735 @param name parameter full name
736 @param value parameter value to set
737 @param descriptor optional `rcl_interfaces.msg.ParameterDescriptor`
738 @return the set value
742 if not NODE.has_parameter(name):
743 NODE.declare_parameter(name, **dict(descriptor=descriptor)
if descriptor
else {})
744 result = NODE.set_parameters([rclpy.Parameter(name, value=value)])[0]
745 if not result.successful:
750def delete_param(name):
752 Deletes parameter from the node.
754 @param name full name of the parameter
in node namespace
755 @throws KeyError
if parameter
not set
760 NODE.undeclare_parameter(name)
761 except rclpy.exceptions.ParameterNotDeclaredException:
766 """Returns whether ROS2 has been initialized and is not shut down."""
770def on_shutdown(callback, *args, **kwargs):
772 Registers function to be called on shutdown, after node has been torn down.
774 Function is called
with given arguments.
777 def __init__(self, func, *args, **kwargs):
778 self.func, self.args, self.kwargs = func, args, kwargs
781 if self.called:
return
783 return self.func(*self.args, **self.kwargs)
785 wrapper = CallWrapper(callback, *args, **kwargs)
788 if not SHUTDOWN_CALLBACKS: atexit.register(shutdown)
789 SHUTDOWN_CALLBACKS.append(wrapper)
791 rclpy.get_default_context().on_shutdown(wrapper.__call__)
795 """Sets ROS2 node spinning forever in a background thread."""
800 while SPINNER
and EXECUTOR
and ok(): EXECUTOR.spin_once(0.5)
801 finally: SPINNER =
None
804 with Mutex.SPIN_START:
805 if SPINNER
or Mutex.SPIN._is_owned():
return
806 SPINNER = threading.Thread(target=do_spin)
811 """Spins ROS2 node forever."""
817 except KeyboardInterrupt: SPINNER =
None
820 if Mutex.SPIN._is_owned():
821 with Mutex.SPIN:
return
828 Executes one pending ROS2 operation or waits until timeout.
830 Waits forever
if timeout
not given
or negative.
832 @param timeout time to wait at most,
as seconds
or ROS2 duration;
833 None or <0 waits forever
838 if SPINNER
or Mutex.SPIN_ONCE._is_owned():
839 timeout = 2**31 - 1
if timeout
is None or timeout < 0
else timeout
840 group = rclpy.callback_groups.ReentrantCallbackGroup()
841 timer = NODE.create_timer(timeout, callback=
None, callback_group=group)
842 rclpy.timer.Rate(timer, context=NODE.context).sleep()
843 NODE.destroy_timer(timer)
845 with Mutex.SPIN_ONCE:
846 EXECUTOR.spin_once(timeout_sec=timeout)
851 Spins ROS2 until future complete or timeout reached
or ROS shut down.
853 @param future object
with `concurrent.futures.Future`-conforming interface to complete
854 @param timeout time to wait,
as seconds
or ROS2 duration
861 if not future.done():
865 now = time.monotonic()
866 deadline = (now + timeout)
if timeout
is not None and timeout >= 0
else -1
867 while ok()
and (deadline < 0
or now < deadline)
and not future.done():
869 now = time.monotonic()
870 if not future.done()
and not ok():
871 future.cancel(
"ROS shut down")
876 Shuts down ROS2 execution, if any.
878 @param reason shutdown reason to log,
if any
882 cbs, SHUTDOWN_CALLBACKS[:] = SHUTDOWN_CALLBACKS[:], []
885 except Exception: logger.exception(
"Error calling shutdown callback %r.", cb)
887 global NODE, EXECUTOR, SPINNER, SHUTDOWN
894 if reason: logger.info(
"shutdown [%s]", reason)
895 node, executor = NODE, EXECUTOR
896 rclpy.ok()
and rclpy.shutdown()
897 NODE, EXECUTOR, SPINNER, SHUTDOWN =
None,
None,
None,
True
898 executor
and executor.shutdown()
899 node
and node.destroy_node()
900 finally: run_callbacks()
905 Returns a ROS2 service client instance, for invoking a service.
907 @param service name of service to invoke
908 @param cls_or_typename ROS2 service
class object like `std_srvs.srv.SetBool`
909 or service type name like `
"std_srvs/SetBool"
910 @param qosargs additional key-value arguments
for ROS2
911 `QoSProfile`, like `reliability`
912 @return `rclpy.client.Client`, will support keyword arguments
in calls
913 and be callable itself
917 if qosargs: qos = rclpy.qos.QoSProfile(**qosargs)
918 else: qos = rclpy.qos.QoSPresetProfiles.SERVICES_DEFAULT.value
919 return NODE.create_client(cls, service, qos_profile=qos,
920 callback_group=CALLBACK_GROUP)
925 Returns a ROS2 service server instance, for providing a service.
927 @param service name of service to provide
928 @param cls_or_typename ROS2 service
class object like `std_srvs.srv.SetBool`
929 or service type name like `
"std_srvs/SetBool"
930 @param callback callback function, invoked
with `(request, response)`,
931 expected to
return the populated response,
932 or a list/tuple/dictionary
for populating the response.
933 If the function only takes one argument,
934 it
is invoked
with `(request)`.
935 @param qosargs additional key-value arguments
for ROS2
936 `QoSProfile`, like `reliability`
937 @return `rclpy.service.Service`
941 if qosargs: qos = rclpy.qos.QoSProfile(**qosargs)
942 else: qos = rclpy.qos.QoSPresetProfiles.SERVICES_DEFAULT.value
943 return NODE.create_service(cls, service, util.wrap_arity(callback),
944 qos_profile=qos, callback_group=CALLBACK_GROUP)
947def create_publisher(topic, cls_or_typename, latch=False, queue_size=0, **qosargs):
949 Returns a ROS2 publisher instance.
951 @param topic name of topic to open
952 @param cls_or_typename ROS2 message
class object like `std_msgs.msg.Bool`
953 or message type name like
"std_msgs/Bool"
954 @param latch provide last published message to new subscribers
955 (sets `DurabilityPolicy.TRANSIENT_LOCAL`)
956 @param queue_size queue size of outgoing messages (0
or None: infinite)
957 @param qosargs additional key-value arguments
for ROS2
958 `QoSProfile`, like `reliability`
959 @return `rclpy.publisher.Publisher`,
960 will support keyword arguments
in `publish()`
961 and have `get_num_connections()`
965 qosargs.setdefault("depth", queue_size
or 0)
966 if latch: qosargs[
"durability"] = rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL
967 qos = rclpy.qos.QoSProfile(**qosargs)
968 return NODE.create_publisher(cls, topic, qos, callback_group=CALLBACK_GROUP)
972 queue_size=0, raw=False, **qosargs):
974 Returns a ROS2 subscriber instance.
976 @param topic name of topic to listen to
977 @param cls_or_typename ROS2 message
class object like `std_msgs.msg.Bool`
978 or message type name like
"std_msgs/Bool"
979 @param callback callback function, invoked
with received message,
980 and with additional arguments
if given
981 @param callback_args additional arguments to
pass to the callback,
if any,
982 invoked
as `callback(msg, callback_args)´
983 @param queue_size queue size of incoming messages (0
or None: infinite)
984 @param raw invoke callback
with serialized message bytes
985 @param qosargs additional key-value arguments
for ROS2
986 `QoSProfile`, like `reliability`.
987 `__autodetect` will look up current publishers on the topic
988 and create a compatible QoS.
989 @return `rclpy.subscription.Subscription`
992 qosargs.setdefault("depth", queue_size
or 0)
993 if qosargs.pop(
"__autodetect",
False):
994 qosargs.update(
get_topic_qos(topic, cls_or_typename, queue_size
or 0))
996 qos = rclpy.qos.QoSProfile(**qosargs)
997 sub = NODE.create_subscription(cls, topic, callback, qos, raw=raw,
998 callback_group=CALLBACK_GROUP)
999 if callback_args
is not None:
1000 sub._callbacks = [(callback, callback_args)]
1004def create_timer(period, callback, oneshot=False, immediate=False):
1006 Returns a ROS2 timer instance.
1008 @param period desired period between callbacks,
as seconds
or ROS duration
1009 @param callback callback function invoked on timer,
with no arguments
1010 @param oneshot whether to fire only once
1011 @param immediate whether to fire once immediately instead of waiting one period
1012 @return `rclpy.timer.Timer`
1015 oneshot_timer = None
1016 def oneshot_callback():
1017 oneshot_timer.destroy()
1018 callback
and callback()
1020 period, kws =
to_sec(period), dict(callback_group=CALLBACK_GROUP)
1022 timer = oneshot_timer = NODE.create_timer(1e-9, oneshot_callback, **kws)
1023 if not immediate
or not oneshot:
1025 timer = oneshot_timer = NODE.create_timer(period, oneshot_callback, **kws)
1027 timer = NODE.create_timer(period, callback, **kws)
1033 Returns a ROS2 rate instance, for sleeping at a fixed rate.
1035 @param frequency rate to sleep at,
in Hz
1036 @return `rclpy.timer.Rate`
1039 return NODE.create_rate(frequency)
1043 """Closes the given publisher, subscriber, service client, service server, timer, or rate instance."""
1045 if isinstance(item, rclpy.client.Client): NODE.destroy_client(item)
1046 elif isinstance(item, rclpy.service.Service): NODE.destroy_service(item)
1047 elif isinstance(item, rclpy.publisher.Publisher): NODE.destroy_publisher(item)
1048 elif isinstance(item, rclpy.timer.Rate): NODE.destroy_rate(item)
1049 elif isinstance(item, rclpy.subscription.Subscription): NODE.destroy_subscription(item)
1050 elif isinstance(item, rclpy.timer.Timer): NODE.destroy_timer(item)
1054 try: item
and item.destroy()
1055 except Exception:
pass
1059 """Returns ROS2 node namespace."""
1061 return NODE.get_namespace()
1065 """Returns ROS2 node full name with namespace."""
1067 return util.namejoin(NODE.get_namespace(), NODE.get_name())
1071 """Returns all ROS2 nodes, as `[node full name, ]`."""
1073 return [util.namejoin(b, a)
for a, b
in NODE.get_node_names_and_namespaces()]
1077 """Returns all available ROS2 topics, as `[(topic name, [type name, ]), ]`."""
1079 return sorted([n, sorted(map(canonical, tt))]
for n, tt
in NODE.get_topic_names_and_types())
1082def get_topic_qos(topic, cls_or_typename, queue_size=10, publish=False):
1084 Returns dictionary for populating rclpy.qos.QoSProfile compatible
with counterparts on topic.
1086 @param topic topic full name
1087 @param cls_or_typename message type
class or name
1088 @param queue_size populates
"depth"
1089 @param publish whether to
return QoS settings
for creating a publisher,
1090 by default returns settings
for a subscription
1091 @return {
"depth",
"reliability",
"durability"}
1094 typename = cls_or_typename
1095 if not isinstance(typename, str): typename =
get_message_type(cls_or_typename)
1096 args = dict(depth=queue_size, reliability=rclpy.qos.ReliabilityPolicy.SYSTEM_DEFAULT,
1097 durability=rclpy.qos.DurabilityPolicy.SYSTEM_DEFAULT)
1098 qry = NODE.get_subscriptions_info_by_topic
if publish
else NODE.get_publishers_info_by_topic
1099 qoses = [x.qos_profile
for x
in qry(topic)
if canonical(x.topic_type) == typename]
1100 rels, durs = zip(*[(x.reliability, x.durability)
for x
in qoses])
if qoses
else ([], [])
1102 if rels: args.update(reliability=max(rels))
1103 if durs: args.update(durability =max(durs))
1107def get_services(node=None, namespace=None, include_types=True):
1109 Returns all available ROS2 services, as `[(service name, [type name, ]), ]`.
1111 @param node full name of the node to
return services
for,
if any
1112 @param namespace full
or partial namespace to scope services
from
1113 @param include_types
if false, type names will be returned
as an empty list
1116 services = NODE.get_service_names_and_types() if not node
else \
1117 NODE.get_service_names_and_types_by_node(*util.namesplit(node)[::-1])
1118 return [[n, sorted(map(canonical, tt))
if include_types
else []]
1119 for n, tt
in sorted(services)
if not namespace
or n.startswith(namespace)]
1124 Returns `logging.Logger` for logging to ROS2 log handler.
1126 Logging methods on the logger (`debug()`, `info()`, etc) accept additional keyword arguments:
1127 - `__once__`: whether to log only once
from call site
1128 - `__throttle__`: seconds to skip logging
from call site
for
1129 - `__throttle_identical__`: whether to skip logging identical consecutive texts
from call site
1130 (given log message excluding formatting arguments).
1131 Combines
with `__throttle__` to skip duplicates
for a period.
1137 """Returns current ROS2 time, as `rclpy.time.Time`."""
1139 return NODE.get_clock().now()
1142def remap_name(name, namespace=None):
1144 Returns the absolute remapped topic/service name if mapping exists.
1146 @param name name to seek exact remapping
for
1147 @param namespace namespace to resolve relative
and private names to,
1148 by default current node namespace
1149 @return remapped resolved name,
or original
if not mapped
1153 with NODE.handle
as node_capsule:
1154 name2 = rclpy_extension.rclpy_remap_topic_name(node_capsule, name1)
1155 return name2
if (name1 != name2)
else name
1158def resolve_name(name, namespace=None):
1160 Returns absolute remapped name, namespaced under current node if relative
or private.
1162 @param namespace namespace to use
if not current node full name
1166 with NODE.handle
as node_capsule:
1167 return rclpy_extension.rclpy_remap_topic_name(node_capsule, name1)
1172 Returns absolute name, namespaced under current node if relative
or private.
1174 @param namespace namespace to use
if not current node full name
1179 return util.namesplit(namespace)[0] +
"/"
1182 name2 = (
"/" if name.startswith(
"/")
else "") +
"/".join(filter(bool, name.split(
"/")))
1183 if name2.startswith(PRIVATE_PREFIX):
1184 name2 = util.namejoin(namespace, name2.lstrip(PRIVATE_PREFIX))
1185 elif not name2.startswith(
"/"):
1186 name2 = util.namejoin(util.namesplit(namespace)[0], name2)
1192 Sleeps for the specified duration
in ROS time.
1194 Raises error on ROS shutdown
or ROS time jumping backwards
1196 @param duration time to sleep,
as seconds
or ROS duration, <=0 returns immediately
1199 from .
import rospify
1200 rospify.sleep(duration)
1205 Blocks until topic has at least one publisher.
1207 @param topic name of topic to open
1208 @param timeout time to wait at most,
as seconds
or ROS duration;
1209 None or <0 waits forever
1210 @param cls_or_typename message type to expect
if any,
1211 as ROS message
class object like `std_msgs.msg.Bool`
1212 or message type name like
"std_msgs/Bool"
1213 @return whether a publisher
is available
1217 timeout, first =
to_sec(timeout),
False
1218 deadline =
None if timeout
is None or timeout < 0
else time.monotonic() + timeout
1220 typename = api.make_full_typename(
get_message_type(cls_or_typename)
or cls_or_typename
or "")
1221 if "*" == typename: typename =
None
1222 while not result
and (first
or deadline
is None or time.monotonic() < deadline):
1223 pubs = NODE.get_publishers_info_by_topic(topic)
1224 exists, first = bool(pubs),
False
1225 result = exists
and (
not typename
or any(typename == x.topic_type
for x
in pubs))
1232 Blocks until topic has at least one subscriber.
1234 @param topic name of topic to open
1235 @param timeout time to wait at most,
as seconds
or ROS duration;
1236 None or <0 waits forever
1237 @param cls_or_typename message type to expect
if any,
1238 as ROS message
class object like `std_msgs.msg.Bool`
1239 or message type name like
"std_msgs/Bool"
1240 @return whether a subscriber
is available
1244 timeout, first =
to_sec(timeout),
False
1245 deadline =
None if timeout
is None or timeout < 0
else time.monotonic() + timeout
1247 typename = api.make_full_typename(
get_message_type(cls_or_typename)
or cls_or_typename
or "")
1248 if "*" == typename: typename =
None
1249 while not result
and (first
or deadline
is None or time.monotonic() < deadline):
1250 subs = NODE.get_subscriptions_info_by_topic(topic)
1251 exists, first = bool(subs),
False
1252 result = exists
and (
not typename
or any(typename == x.topic_type
for x
in subs))
1257def wait_for_service(service, timeout=None, cls_or_typename=None):
1259 Blocks until service is available.
1261 @param service name of service
1262 @param timeout time to wait at most,
as seconds
or ROS duration;
1263 None or <0 waits forever
1264 @param cls_or_typename service type to expect
if any,
1265 as ROS service
class object like `std_msgs.msg.Bool`
1266 or service type name like
"std_srvs/SetBool"
1267 @return whether the service
is available
1270 from . rospify.service
import _wait_for_service
1272 _wait_for_service(service, max(0, timeout
or 0), cls_or_typename)
1284 Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
1286 Converts DDS types like
"octet" to
"byte",
and "sequence<uint8, 100>" to
"uint8[100]".
1288 is_array, bound, dimension = False,
"",
""
1291 match = re.match(
"sequence<(.+)>", typename)
1294 typename = match.group(1)
1295 match = re.match(
r"([^,]+)?,\s?(\d+)", typename)
1297 typename = match.group(1)
1298 if match.lastindex > 1: dimension = match.group(2)
1300 match = re.match(
"(w?string)<(.+)>", typename)
1302 typename, bound = match.groups()
1305 dimension = typename[typename.index(
"[") + 1:typename.index(
"]")]
1306 typename, is_array = typename[:typename.index(
"[")],
True
1308 if "<=" in typename:
1309 typename, bound = typename.split(
"<=")
1311 if typename.count(
"/") > 1:
1312 typename =
"%s/%s" % tuple((x[0], x[-1])
for x
in [typename.split(
"/")])[0]
1314 suffix = (
"<=%s" % bound
if bound
else "") + (
"[%s]" % dimension
if is_array
else "")
1315 return DDS_TYPES.get(typename, typename) + suffix
1320 """Returns parameter name using "." separator, and leading root or private sigils stripped."""
1321 return name.replace(
"/", PARAM_SEPARATOR).lstrip(PRIVATE_PREFIX + PARAM_SEPARATOR)
1326 Returns ROS2 message / service class object.
1328 @param msg_or_type full
or canonical
class name,
1329 like
"std_msgs/Bool" or "std_srvs/SetBool" or "std_srvs/SetBoolRequest";
1330 or class instance like `std_msgs.msg.Bool()`
1331 @return ROS2 message / service
class object, like `std_msgs.msg.Bool`
1332 or `std_srvs.srv.SetBool`
or `std_srvs.srv.SetBoolRequest`,
1333 or None if not found
1336 return msg_or_type
if inspect.isclass(msg_or_type)
else type(msg_or_type)
1340 cls = rosidl_runtime_py.utilities.get_message(msg_or_type)
1342 part =
"Request" if re.match(
r".+\wRequest$", msg_or_type)
else \
1343 "Response" if re.match(
r".+\wResponse$", msg_or_type)
else None
1345 msg_or_type = msg_or_type[:-len(part)]
1347 if msg_or_type.endswith(
"_"): msg_or_type = msg_or_type[:-1]
1348 try: cls = rosidl_runtime_py.utilities.get_service(msg_or_type)
1349 except Exception:
pass
1351 cls = getattr(cls, part)
1357 Returns ROS2 message or service request/response type definition text.
1359 Text will include subtype definitions by default
for messages.
1361 @param msg_or_type canonical
or full
class name like
"std_msgs/Bool" or
"std_msgs/msg/Bool",
1362 or class instance like `std_msgs.msg.Bool()`,
1363 or class object like `std_msgs.msg.Bool`
1364 @param full include definitions of nested types, separated by
"\n---\nMSG: pkg/Type\n"
1365 (ignored
for service request/response types)
1366 @return message type definition text
1368 typename = canonical(msg_or_type) if isinstance(msg_or_type, str)
else \
1370 try: result = parsing.get_ros2_message_definition(typename, full)
1371 except Exception: result =
None
1373 part =
"Request" if re.match(
r".+\wRequest$", typename)
else \
1374 "Response" if re.match(
r".+\wResponse$", typename)
else None
1376 typename = re.sub(
r"(.+)_(Request|Response)$",
r"\1", typename)
1377 typename = re.sub(
r"(.+)(Request|Response)$",
r"\1", typename)
1378 result = parsing.get_ros2_service_definition(typename)
1380 result = result.split(
"---\n", 1)[
"Response" == part]
1382 raise LookupError(
"Could not find the interface %r" % msg_or_type)
1388 Returns {field name: field type name} if ROS2 message
or service request/response,
else {}.
1390 @param val ROS2 message
or service request/response instance,
or class object
1393 return {k:
canonical(v)
for k, v
in val.get_fields_and_field_types().items()}
1398 Returns message `std_msgs/Header`-attribute if any,
else `
None`.
1400 @param val ROS message
or service request/response instance
1403 for name, typename
in val.get_fields_and_field_types().items():
1404 if "std_msgs/Header" == typename:
1411 Returns ROS2 message / service canonical type name, like "std_msgs/Header".
1413 Returns
"*" for `AnyMsg`.
1415 @param msg_or_cls
class instance like `std_msgs.msg.Bool()`,
1416 or class object like `std_msgs.msg.Bool`
1417 @return canonical name,
or `
None`
if not ROS message / service
1419 cls = msg_or_cls if inspect.isclass(msg_or_cls)
else type(msg_or_cls)
1422 return canonical(
"%s/%s" % (cls.__module__.split(
".")[0], cls.__name__))
1427 Returns ROS2 message / service type MD5 hash.
1429 @param msg_or_type full
or canonical
class name
1430 like
"std_msgs/Bool" or "std_srvs/SetBool" or "std_srvs/SetBoolRequest",
1431 or class instance like `std_msgs.msg.Bool()`,
1432 or class object like `std_msgs.msg.Bool`
1434 typename = msg_or_type if isinstance(msg_or_type, str)
else get_message_type(msg_or_type)
1440 """Returns ROS2 message type MD5 hash (internal caching method)."""
1442 return parsing.calculate_definition_hash(typename, definition)
1447 Returns object attribute value, with numeric arrays converted to lists.
1449 @param name message attribute name; may also be (nested, path)
or "nested.path"
1450 @param default value to
return if attribute does
not exist; raises exception otherwise
1452 try: v, parent, k = util.get_nested(msg, name)
1454 if default
is not Ellipsis:
return default
1456 if isinstance(v, (bytes, array.array)) \
1457 or "numpy.ndarray" ==
"%s.%s" % (v.__class__.__module__, v.__class__.__name__):
1459 if v
and isinstance(v, (list, tuple))
and is_ros_message(parent):
1460 typename =
canonical(parent.get_fields_and_field_types()[k])
1461 scalartype =
scalar(typename)
1462 if scalartype
in (
"byte",
"uint8"):
1463 if isinstance(v[0], bytes):
1464 v = list(map(ord, v))
1465 elif scalartype == typename:
1473 Returns the share directory of the package.
1475 For example, "/opt/ros/foxy/share/name" or "/home/ros1_ws/install/share/name".
1477 @throws KeyError
if package
not found
1480 return ament_index_python.get_package_share_directory(name)
1481 except ament_index_python.PackageNotFoundError
as e:
1482 raise KeyError(name)
from e
1487 Returns ROS2 service type definition text.
1489 @param srv_or_type canonical
or full
class name
1490 like
"std_srvs/SetBool" or "std_srvs/srv/SetBool",
1491 or class instance like `std_srvs.srv.SetBool()`,
1492 or class object like `std_srvs.srv.SetBool`
1493 @return ROS2 service type definition text
1495 typename = srv_or_type if isinstance(srv_or_type, str)
else get_message_type(srv_or_type)
1496 return parsing.get_ros2_service_definition(
canonical(typename))
1501 Returns ROS2 service request class object.
1503 @param srv_or_type canonical
or full
class name
1504 like
"std_srvs/SetBool" or "std_srvs/srv/SetBool",
1505 or class instance like `std_srvs.srv.SetBool()`,
1506 or class object like `std_srvs.srv.SetBool`
1507 @return ROS2 service request
class object, like `std_srvs.srv.SetBool.Request`
1514 Returns ROS2 service response class object.
1516 @param srv_or_type canonical
or full
class name
1517 like
"std_srvs/SetBool" or "std_srvs/srv/SetBool",
1518 or class instance like `std_srvs.srv.SetBool()`,
1519 or class object like `std_srvs.srv.SetBool`
1520 @return ROS2 service response
class object, like `std_srvs.srv.SetBool.Response`
1527 Returns whether value is a ROS2 message
or service request/response
class or instance.
1529 @param val like `std_msgs.msg.Bool()`
or `std_srvs.srv.SetBoolRequest`
1530 @return True if value
is a ROS2 message
or service request/response
class or instance,
1533 return isinstance(val, AnyMsg)
or inspect.isclass(val)
and issubclass(val, AnyMsg)
or \
1534 rosidl_runtime_py.utilities.is_message(val)
1538 """Returns whether value is a ROS2 service class object."""
1539 return rosidl_runtime_py.utilities.is_service(val)
1543 """Returns whether value is a ROS2 time/duration class or instance."""
1544 if inspect.isclass(val):
return issubclass(val, tuple(ROS_TIME_CLASSES))
1545 return isinstance(val, tuple(ROS_TIME_CLASSES))
1549 """Returns an rclpy.duration.Duration."""
1550 return rclpy.duration.Duration(seconds=secs, nanoseconds=nsecs)
1554 """Returns a ROS2 time, as rclpy.time.Time."""
1555 return rclpy.time.Time(seconds=secs, nanoseconds=nsecs)
1559 """Returns ROS2 message or service request/response as a serialized binary of `bytes()`."""
1560 return rclpy.serialization.serialize_message(msg)
1564 """Returns ROS2 message or service request/response instantiated from serialized binary."""
1565 return rclpy.serialization.deserialize_message(raw,
get_message_class(cls_or_typename))
1571 Returns unbounded scalar type from ROS2 message data type
1573 Like
"uint8" from "uint8[]",
or "string" from "string<=10[<=5]".
1574 Returns type unchanged
if not a collection
or bounded type.
1576 if "[" in typename: typename = typename[:typename.index(
"[")]
1577 if "<=" in typename: typename = typename[:typename.index(
"<=")]
1583 Converts ROS2 time/duration between `rclpy` and `builtin_interfaces` objects.
1585 @param val ROS2 time/duration object
from `rclpy`
or `builtin_interfaces`
1586 @param to_message whether to convert
from `rclpy` to `builtin_interfaces`
or vice versa
1587 @param clock_type ClockType
for converting to `rclpy.Time`, defaults to `ROS_TIME`
1588 @return value converted to appropriate type,
or original value
if not convertible
1590 to_message, clock_type = bool(to_message), (clock_type or rclpy.clock.ClockType.ROS_TIME)
1591 if isinstance(val, tuple(ROS_TIME_CLASSES)):
1592 rcl_cls = next(k
for k, v
in ROS_TIME_MESSAGES.items()
if isinstance(val, (k, v)))
1593 is_rcl = isinstance(val, tuple(ROS_TIME_MESSAGES))
1594 name =
"to_msg" if to_message
and is_rcl
else "from_msg" if to_message == is_rcl
else None
1595 args = [val] + ([clock_type]
if rcl_cls
is rclpy.time.Time
and "from_msg" == name
else [])
1596 return getattr(rcl_cls, name)(*args)
if name
else val
1602 Returns value as ROS2 duration
if convertible (int/float/time/datetime/decimal),
else value.
1604 Convertible types: int/float/time/datetime/decimal/builtin_interfaces.Time.
1607 if isinstance(val, decimal.Decimal):
1609 elif isinstance(val, datetime.datetime):
1610 result =
make_duration(int(val.timestamp()), 1000 * val.microsecond)
1611 elif isinstance(val, (float, int)):
1613 elif isinstance(val, rclpy.time.Time):
1615 elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1621 """Returns value in nanoseconds if value is ROS2 time/duration, else value."""
1622 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1624 if hasattr(val,
"nanoseconds"):
1625 return val.nanoseconds
1626 return val.sec * 10**9 + val.nanosec
1630 """Returns value in seconds if value is ROS2 time/duration, else value."""
1631 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1633 if hasattr(val,
"nanoseconds"):
1634 secs, nsecs = divmod(val.nanoseconds, 10**9)
1635 return secs + nsecs / 1E9
1636 return val.sec + val.nanosec / 1E9
1640 """Returns value as (seconds, nanoseconds) if value is ROS2 time/duration, else value."""
1641 if not isinstance(val, tuple(ROS_TIME_CLASSES)):
1643 if hasattr(val,
"seconds_nanoseconds"):
1644 return val.seconds_nanoseconds()
1645 if hasattr(val,
"nanoseconds"):
1646 return divmod(val.nanoseconds, 10**9)
1647 return (val.sec, val.nanosec)
1652 Returns value as ROS2 time
if convertible,
else value.
1654 Convertible types: int/float/duration/datetime/decimal/builtin_interfaces.Time.
1657 if isinstance(val, decimal.Decimal):
1658 result =
make_time(int(val), float(val % 1) * 10**9)
1659 elif isinstance(val, datetime.datetime):
1660 result =
make_time(int(val.timestamp()), 1000 * val.microsecond)
1661 elif isinstance(val, (float, int)):
1663 elif isinstance(val, rclpy.duration.Duration):
1664 result =
make_time(nsecs=val.nanoseconds)
1665 elif isinstance(val, tuple(ROS_TIME_MESSAGES.values())):
1666 result =
make_time(val.sec, val.nanosec)
1671 "AnyMsg",
"Bag",
"ROSLogHandler",
"DDS_TYPES",
"FAMILY",
"PARAM_SEPARATOR",
1672 "PRIVATE_PREFIX",
"ROS_ALIAS_TYPES",
"ROS_TIME_CLASSES",
"ROS_TIME_TYPES",
1673 "canonical",
"create_client",
"create_publisher",
"create_rate",
"create_service",
1674 "create_subscriber",
"create_timer",
"delete_param",
"deserialize_message",
1675 "destroy_entity",
"format_param_name",
"get_logger",
"get_message_class",
1676 "get_message_definition",
"get_message_fields",
"get_message_type",
1677 "get_message_type_hash",
"get_message_value",
"get_namespace",
"get_node_name",
1678 "get_nodes",
"get_package_share_directory",
"get_param",
"get_param_names",
1679 "get_params",
"get_rostime",
"get_service_definition",
"get_service_request_class",
1680 "get_service_response_class",
"get_services",
"get_topic_qos",
"get_topics",
1681 "has_param",
"init_node",
"init_params",
"is_ros_message",
"is_ros_service",
1682 "is_ros_time",
"make_duration",
"make_time",
"ok",
"on_shutdown",
"register_init",
1683 "remap_name",
"resolve_name",
"scalar",
"serialize_message",
"set_param",
"shutdown",
1684 "sleep",
"spin",
"spin_once",
"spin_until_future_complete",
"start_spin",
1685 "time_message",
"to_duration",
"to_nsec",
"to_sec",
"to_sec_nsec",
"to_time",
1686 "wait_for_publisher",
"wait_for_subscriber",
"wait_for_service"
ROS2 bag interface, partially mimicking rosbag.Bag.
get_message_type_hash(self, msg_or_type)
Returns ROS2 message type MD5 hash.
flush(self)
Does nothing (ROS1 API conformity stand-in).
close(self)
Closes the bag file.
get_topic_info(self, counts=False)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
get_message_class(self, typename, typehash=None)
Returns ROS2 message type class.
__str__(self)
Returns informative text for bag, with a full overview of topics and types.
get_message_count(self, topic_filters=None)
Returns the number of messages in the bag.
BagMessage
Returned from read_messages() as (topic name, ROS message, rclpy.Time).
size
Returns current file size in bytes (including journaling files).
get_type_and_topic_info(self, topic_filters=None)
Returns thorough metainfo on topic and message types.
get_qoses(self, topic, typename)
Returns topic Quality-of-Service profiles as a list of dicts, or None if not available.
str CREATE_SQL
ROS2 bag SQLite schema.
get_end_time(self)
Returns the end time of the bag, as UNIX timestamp.
__exit__(self, exc_type, exc_value, traceback)
Context manager exit, closes bag.
get_message_definition(self, msg_or_type)
Returns ROS2 message type definition full text, including subtype definitions.
TopicTuple
Returned from get_type_and_topic_info() as (typename, message count, connection count,...
_has_table(self, name)
Returns whether specified table exists in database.
__iter__(self)
Iterates over all messages in the bag.
__init__(self, filename, mode="a", *_, **__)
__enter__(self)
Context manager entry.
write(self, topic, msg, t=None, raw=False, qoses=None, **__)
Writes a message to the bag.
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, *_, **__)
Yields messages from the bag in chronological order.
get_start_time(self)
Returns the start time of the bag, as UNIX timestamp.
TypesAndTopicsTuple
Returned from get_type_and_topic_info() as ({typename: typehash}, {topic name: TopicTuple}).
mode
Returns file open mode.
Container for local mutexes.
Logging handler that forwards logging messages to ROS2 logger.
emit(self, record)
Adds message to ROS2 logger.
Logger wrapper with support for throttling logged messages per call site.
get_message_type_hash(msg_or_type)
Returns ROS2 message / service type MD5 hash.
get_message_fields(val)
Returns {field name: field type name} if ROS2 message or service request/response,...
get_topics()
Returns all available ROS2 topics, as `[(topic name, [type name, ]), ]`.
create_service(service, cls_or_typename, callback, **qosargs)
Returns a ROS2 service server instance, for providing a service.
to_duration(val)
Returns value as ROS2 duration if convertible (int/float/time/datetime/decimal), else value.
start_spin()
Sets ROS2 node spinning forever in a background thread.
get_message_type(msg_or_cls)
Returns ROS2 message / service canonical type name, like "std_msgs/Header".
to_sec(val)
Returns value in seconds if value is ROS2 time/duration, else value.
init_params(defaults=None, **defaultkws)
Declares all parameters on node from defaults dictionary.
wait_for_subscriber(topic, timeout=None, cls_or_typename=None)
Blocks until topic has at least one subscriber.
make_duration(secs=0, nsecs=0)
Returns an rclpy.duration.Duration.
create_publisher(topic, cls_or_typename, latch=False, queue_size=0, **qosargs)
Returns a ROS2 publisher instance.
deserialize_message(raw, cls_or_typename)
Returns ROS2 message or service request/response instantiated from serialized binary.
get_node_name()
Returns ROS2 node full name with namespace.
get_params(nested=True)
Returns the current ROS2 node parameters, by default as nested dictionary.
get_message_class(msg_or_type)
Returns ROS2 message / service class object.
get_namespace()
Returns ROS2 node namespace.
spin_once(timeout=None)
Executes one pending ROS2 operation or waits until timeout.
is_ros_service(val)
Returns whether value is a ROS2 service class object.
get_message_value(msg, name, default=...)
Returns object attribute value, with numeric arrays converted to lists.
time_message(val, to_message=True, clock_type=None)
Converts ROS2 time/duration between `rclpy` and `builtin_interfaces` objects.
get_param_names()
Returns the names of all current ROS2 node parameters.
get_service_request_class(srv_or_type)
Returns ROS2 service request class object.
_get_message_type_hash(typename)
Returns ROS2 message type MD5 hash (internal caching method).
to_time(val)
Returns value as ROS2 time if convertible, else value.
get_services(node=None, namespace=None, include_types=True)
Returns all available ROS2 services, as `[(service name, [type name, ]), ]`.
create_timer(period, callback, oneshot=False, immediate=False)
Returns a ROS2 timer instance.
to_sec_nsec(val)
Returns value as (seconds, nanoseconds) if value is ROS2 time/duration, else value.
get_service_response_class(srv_or_type)
Returns ROS2 service response class object.
get_logger()
Returns `logging.Logger` for logging to ROS2 log handler.
spin()
Spins ROS2 node forever.
destroy_entity(item)
Closes the given publisher, subscriber, service client, service server, timer, or rate instance.
get_message_header(val)
Returns message `std_msgs/Header`-attribute if any, else `None`.
wait_for_publisher(topic, timeout=None, cls_or_typename=None)
Blocks until topic has at least one publisher.
create_subscriber(topic, cls_or_typename, callback, callback_args=None, queue_size=0, raw=False, **qosargs)
Returns a ROS2 subscriber instance.
create_client(service, cls_or_typename, **qosargs)
Returns a ROS2 service client instance, for invoking a service.
get_package_share_directory(name)
Returns the share directory of the package.
is_ros_time(val)
Returns whether value is a ROS2 time/duration class or instance.
get_service_definition(srv_or_type)
Returns ROS2 service type definition text.
scalar(typename)
Returns unbounded scalar type from ROS2 message data type.
get_topic_qos(topic, cls_or_typename, queue_size=10, publish=False)
Returns dictionary for populating rclpy.qos.QoSProfile compatible with counterparts on topic.
make_time(secs=0, nsecs=0)
Returns a ROS2 time, as rclpy.time.Time.
get_rostime()
Returns current ROS2 time, as `rclpy.time.Time`.
_assert_node()
Raises if ROS2 node not inited.
format_param_name(name)
Returns parameter name using "." separator, and leading root or private sigils stripped.
_resolve_name(name, namespace=None)
Returns absolute name, namespaced under current node if relative or private.
get_nodes()
Returns all ROS2 nodes, as `[node full name, ]`.
shutdown(reason=None)
Shuts down ROS2 execution, if any.
to_nsec(val)
Returns value in nanoseconds if value is ROS2 time/duration, else value.
ok()
Returns whether ROS2 has been initialized and is not shut down.
register_init(node)
Informs `rosros` of ROS2 having been initialized outside of `init_node()`.
canonical(typename)
Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
serialize_message(msg)
Returns ROS2 message or service request/response as a serialized binary of `bytes()`.
get_message_definition(msg_or_type, full=True)
Returns ROS2 message or service request/response type definition text.
create_rate(frequency)
Returns a ROS2 rate instance, for sleeping at a fixed rate.
spin_until_future_complete(future, timeout=None)
Spins ROS2 until future complete or timeout reached or ROS shut down.
is_ros_message(val)
Returns whether value is a ROS2 message or service request/response class or instance.