2Partial stand-in for `rclpy.qos` in ROS1.
4Partially modified copy from ROS2 `rclpy.qos`,
5at https://github.com/ros2/rclpy (`rclpy/rclpy/qos.py`),
6released under the Apache 2.0 License.
8Includes port of preset QoS profiles from ROS2 Middleware Interface and ROS2 Actions for ROS1.
10QoS profile names and values taken from:
11- https://github.com/ros2/rmw (`include/rmw/qos_profiles.h`, `include/rmw/types.h`)
12- https://github.com/ros2/rcl (`include/rcl_action/default_qos.h`)
14------------------------------------------------------------------------------
15This file is part of rosros - simple unified interface to ROS1 / ROS2.
16Released under the BSD License.
21------------------------------------------------------------------------------
23## @namespace rosros.rclify.qos
25# Original file copyright notice:
27# Copyright 2016 Open Source Robotics Foundation, Inc.
29# Licensed under the Apache License, Version 2.0 (the "License");
30# you may not use this file except in compliance with the License.
31# You may obtain a copy of the License at
33# http://www.apache.org/licenses/LICENSE-2.0
41from enum
import IntEnum
43from rospy
import Duration
47 """Enum for types of QoS policies that a Publisher or Subscription can set."""
60 Base for QoS Policy enumerations.
62 Provides helper function to filter keys
for utilities.
67 """Return a list of shortened typing-friendly enum values."""
68 return [k.lower()
for k
in cls.__members__.keys()
if not k.startswith(
'RMW')]
72 """Retrieve a policy type from a short name, case-insensitive."""
73 return cls[name.upper()].value
77 for k, v
in self.__class__.__members__.items():
78 if k.startswith(
'RMW'):
83 'failed to find value %s in %s' %
84 (self.
value, self.__class__.__name__))
88 """Enum for QoS History settings."""
90 RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT = 0
91 RMW_QOS_POLICY_HISTORY_KEEP_LAST = 1
92 RMW_QOS_POLICY_HISTORY_KEEP_ALL = 2
93 RMW_QOS_POLICY_HISTORY_UNKNOWN = 3
94 SYSTEM_DEFAULT = RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT
95 KEEP_LAST = RMW_QOS_POLICY_HISTORY_KEEP_LAST
96 KEEP_ALL = RMW_QOS_POLICY_HISTORY_KEEP_ALL
97 UNKNOWN = RMW_QOS_POLICY_HISTORY_UNKNOWN
101QoSHistoryPolicy = HistoryPolicy
106 Enum for QoS Reliability settings.
108 This enum matches the one defined
in rmw/types.h
111 RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT = 0
112 RMW_QOS_POLICY_RELIABILITY_RELIABLE = 1
113 RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT = 2
114 RMW_QOS_POLICY_RELIABILITY_UNKNOWN = 3
115 SYSTEM_DEFAULT = RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT
116 RELIABLE = RMW_QOS_POLICY_RELIABILITY_RELIABLE
117 BEST_EFFORT = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT
118 UNKNOWN = RMW_QOS_POLICY_RELIABILITY_UNKNOWN
122QoSReliabilityPolicy = ReliabilityPolicy
127 Enum for QoS Durability settings.
129 This enum matches the one defined
in rmw/types.h
132 RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT = 0
133 RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL = 1
134 RMW_QOS_POLICY_DURABILITY_VOLATILE = 2
135 RMW_QOS_POLICY_DURABILITY_UNKNOWN = 3
136 SYSTEM_DEFAULT = RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT
137 TRANSIENT_LOCAL = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL
138 VOLATILE = RMW_QOS_POLICY_DURABILITY_VOLATILE
139 UNKNOWN = RMW_QOS_POLICY_DURABILITY_UNKNOWN
143QoSDurabilityPolicy = DurabilityPolicy
148 Enum for QoS Liveliness settings.
150 This enum matches the one defined
in rmw/types.h
153 RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT = 0
154 RMW_QOS_POLICY_LIVELINESS_AUTOMATIC = 1
155 RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC = 3
156 RMW_QOS_POLICY_LIVELINESS_UNKNOWN = 4
157 SYSTEM_DEFAULT = RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT
158 AUTOMATIC = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC
159 MANUAL_BY_TOPIC = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC
160 UNKNOWN = RMW_QOS_POLICY_LIVELINESS_UNKNOWN
164QoSLivelinessPolicy = LivelinessPolicy
167DURATION_UNSPEFICIED = Duration(0)
168LIVELINESS_LEASE_DURATION_DEFAULT = DURATION_UNSPEFICIED
169DEADLINE_DEFAULT = DURATION_UNSPEFICIED
170LIFESPAN_DEFAULT = DURATION_UNSPEFICIED
171DEPTH_SYSTEM_DEFAULT = 0
172LIVELINESS_UNKNOWN = 4
176 history = HistoryPolicy.KEEP_LAST,
178 reliability = ReliabilityPolicy.RELIABLE,
179 durability = DurabilityPolicy.VOLATILE,
180 deadline = DEADLINE_DEFAULT,
181 lifespan = LIFESPAN_DEFAULT,
182 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
183 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
184 avoid_ros_namespace_conventions =
False,
189 history = HistoryPolicy.UNKNOWN,
190 depth = DEPTH_SYSTEM_DEFAULT,
191 reliability = ReliabilityPolicy.UNKNOWN,
192 durability = DurabilityPolicy.UNKNOWN,
193 deadline = DEADLINE_DEFAULT,
194 lifespan = LIFESPAN_DEFAULT,
195 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
196 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
197 avoid_ros_namespace_conventions =
False,
201SYSTEM_DEFAULT = dict(
202 history = HistoryPolicy.SYSTEM_DEFAULT,
203 depth = DEPTH_SYSTEM_DEFAULT,
204 reliability = ReliabilityPolicy.SYSTEM_DEFAULT,
205 durability = DurabilityPolicy.SYSTEM_DEFAULT,
206 deadline = DEADLINE_DEFAULT,
207 lifespan = LIFESPAN_DEFAULT,
208 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
209 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
210 avoid_ros_namespace_conventions =
False,
215 history = HistoryPolicy.KEEP_LAST,
217 reliability = ReliabilityPolicy.BEST_EFFORT,
218 durability = DurabilityPolicy.VOLATILE,
219 deadline = DEADLINE_DEFAULT,
220 lifespan = LIFESPAN_DEFAULT,
221 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
222 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
223 avoid_ros_namespace_conventions =
False,
228 history = HistoryPolicy.KEEP_LAST,
230 reliability = ReliabilityPolicy.RELIABLE,
231 durability = DurabilityPolicy.VOLATILE,
232 deadline = DEADLINE_DEFAULT,
233 lifespan = LIFESPAN_DEFAULT,
234 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
235 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
236 avoid_ros_namespace_conventions =
False,
241 history = HistoryPolicy.KEEP_LAST,
243 reliability = ReliabilityPolicy.RELIABLE,
244 durability = DurabilityPolicy.VOLATILE,
245 deadline = DEADLINE_DEFAULT,
246 lifespan = LIFESPAN_DEFAULT,
247 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
248 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
249 avoid_ros_namespace_conventions =
False,
253PARAMETER_EVENTS = dict(
254 history = HistoryPolicy.KEEP_LAST,
256 reliability = ReliabilityPolicy.RELIABLE,
257 durability = DurabilityPolicy.VOLATILE,
258 deadline = DEADLINE_DEFAULT,
259 lifespan = LIFESPAN_DEFAULT,
260 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
261 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
262 avoid_ros_namespace_conventions =
False,
267 history = HistoryPolicy.KEEP_LAST,
269 reliability = ReliabilityPolicy.RELIABLE,
270 durability = DurabilityPolicy.TRANSIENT_LOCAL,
271 deadline = DEADLINE_DEFAULT,
272 lifespan = LIFESPAN_DEFAULT,
273 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
274 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
275 avoid_ros_namespace_conventions =
False,
280 "qos_profile_default": DEFAULT,
281 "qos_profile_unknown": UNKNOWN,
282 "qos_profile_system_default": SYSTEM_DEFAULT,
283 "qos_profile_sensor_data": SENSOR_DATA,
284 "qos_profile_services_default": SERVICES,
285 "qos_profile_parameters": PARAMETERS,
286 "qos_profile_parameter_events": PARAMETER_EVENTS,
287 "rcl_action_qos_profile_status_default": ACTIONS,
292 """Get QoS policy name from QoSPolicyKind enum."""
293 if isinstance(policy_kind, QoSPolicyKind):
294 return "%s_QOS_POLICY" % policy_kind.name
295 return "INVALID_QOS_POLICY"
299 """Raised when constructing a QoSProfile with invalid arguments."""
302 Exception.__init__(self,
'Invalid QoSProfile', *args)
306 """Define Quality of Service policies."""
309 __qos_profile_default_dict = PresetNames[
'qos_profile_default']
319 '_liveliness_lease_duration',
320 '_avoid_ros_namespace_conventions',
325 'Invalid arguments passed to constructor: %r' % kwargs.keys()
327 if 'history' not in kwargs:
328 if 'depth' not in kwargs:
330 kwargs[
'history'] = QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST
336 'depth' not in kwargs
340 self.
depthdepthdepth = kwargs.get(
'depth', QoSProfile.__qos_profile_default_dict[
'depth'])
342 'reliability', QoSProfile.__qos_profile_default_dict[
'reliability'])
344 'durability', QoSProfile.__qos_profile_default_dict[
'durability'])
348 'liveliness', QoSProfile.__qos_profile_default_dict[
'liveliness'])
350 'liveliness_lease_duration',
351 QoSProfile.__qos_profile_default_dict[
'liveliness_lease_duration'])
353 'avoid_ros_namespace_conventions',
354 QoSProfile.__qos_profile_default_dict[
'avoid_ros_namespace_conventions'])
361 :returns: history attribute
362 :rtype: QoSHistoryPolicy
368 assert isinstance(value, (QoSHistoryPolicy, int))
374 Get field 'reliability'.
376 :returns: reliability attribute
377 :rtype: QoSReliabilityPolicy
383 assert isinstance(value, (QoSReliabilityPolicy, int))
389 Get field 'durability'.
391 :returns: durability attribute
392 :rtype: QoSDurabilityPolicy
398 assert isinstance(value, (QoSDurabilityPolicy, int))
406 :returns: depth attribute
412 def depth(self, value):
413 assert isinstance(value, int)
419 Get field 'lifespan'.
421 :returns: lifespan attribute
428 assert isinstance(value, Duration)
434 Get field 'deadline'.
436 :returns: deadline attribute.
443 assert isinstance(value, Duration)
449 Get field 'liveliness'.
451 :returns: liveliness attribute
452 :rtype: QoSLivelinessPolicy
458 assert isinstance(value, (QoSLivelinessPolicy, int))
464 Get field 'liveliness_lease_duration'.
466 :returns: liveliness_lease_duration attribute.
471 @liveliness_lease_duration.setter
473 assert isinstance(value, Duration)
479 Get field 'avoid_ros_namespace_conventions'.
481 :returns: avoid_ros_namespace_conventions attribute
486 @avoid_ros_namespace_conventions.setter
488 assert isinstance(value, bool)
492 """Returns None (ROS2 API compatibility stand-in)."""
496 if not isinstance(other, QoSProfile):
499 self.__getattribute__(slot) == other.__getattribute__(slot)
509qos_profile_unknown =
QoSProfile(**PresetNames[
'qos_profile_unknown'])
511qos_profile_system_default =
QoSProfile(**PresetNames[
'qos_profile_system_default'])
513qos_profile_sensor_data =
QoSProfile(**PresetNames[
'qos_profile_sensor_data'])
515qos_profile_services_default =
QoSProfile(**PresetNames[
'qos_profile_services_default'])
518qos_profile_parameters =
QoSProfile(**PresetNames[
'qos_profile_parameters'])
521qos_profile_parameter_events =
QoSProfile(**PresetNames[
'qos_profile_parameter_events'])
527qos_profile_action_status_default =
QoSProfile(**PresetNames[
'rcl_action_qos_profile_status_default'])
531 UNKNOWN = qos_profile_unknown
532 SYSTEM_DEFAULT = qos_profile_system_default
533 SENSOR_DATA = qos_profile_sensor_data
534 SERVICES_DEFAULT = qos_profile_services_default
535 PARAMETERS = qos_profile_parameters
536 PARAMETER_EVENTS = qos_profile_parameter_events
537 ACTION_STATUS_DEFAULT = qos_profile_action_status_default
539 """Noted that the following are duplicated from QoSPolicyEnum.
541 Our supported version of Python3 (3.5) doesn't have a fix that allows mixins on Enum.
545 """Return a list of shortened typing-friendly enum values."""
546 return [k.lower()
for k
in cls.__members__.keys()
if not k.startswith(
'RMW')]
549 def get_from_short_key(cls, name):
550 """Retrieve a policy type from a short name, case-insensitive."""
551 return cls[name.upper()].value
Enum for QoS Durability settings.
Enum for QoS History settings.
Raised when constructing a QoSProfile with invalid arguments.
Enum for QoS Liveliness settings.
Base for QoS Policy enumerations.
short_keys(cls)
Return a list of shortened typing-friendly enum values.
get_from_short_key(cls, name)
Retrieve a policy type from a short name, case-insensitive.
Enum for types of QoS policies that a Publisher or Subscription can set.
Define Quality of Service policies.
liveliness
Get field 'liveliness'.
history
Get field 'history'.
_liveliness_lease_duration
_avoid_ros_namespace_conventions
reliability
Get field 'reliability'.
avoid_ros_namespace_conventions(self, value)
liveliness_lease_duration
Get field 'liveliness_lease_duration'.
durability
Get field 'durability'.
avoid_ros_namespace_conventions(self)
liveliness_lease_duration(self, value)
deadline
Get field 'deadline'.
lifespan
Get field 'lifespan'.
avoid_ros_namespace_conventions
Get field 'avoid_ros_namespace_conventions'.
liveliness_lease_duration(self)
get_c_qos_profile(self)
Returns None (ROS2 API compatibility stand-in).
Enum for QoS Reliability settings.
qos_policy_name_from_kind(policy_kind)
Get QoS policy name from QoSPolicyKind enum.