rosros 0.2.5
Simple unified interface to ROS1 / ROS2 Python API
Loading...
Searching...
No Matches
qos.py
Go to the documentation of this file.
1"""
2Partial stand-in for `rclpy.qos` in ROS1.
3
4Partially modified copy from ROS2 `rclpy.qos`,
5at https://github.com/ros2/rclpy (`rclpy/rclpy/qos.py`),
6released under the Apache 2.0 License.
7
8Includes port of preset QoS profiles from ROS2 Middleware Interface and ROS2 Actions for ROS1.
9
10QoS profile names and values taken from:
11- https://github.com/ros2/rmw (`include/rmw/qos_profiles.h`, `include/rmw/types.h`)
12- https://github.com/ros2/rcl (`include/rcl_action/default_qos.h`)
13
14------------------------------------------------------------------------------
15This file is part of rosros - simple unified interface to ROS1 / ROS2.
16Released under the BSD License.
17
18@author Erki Suurjaak
19@created 16.02.2022
20@modified 16.06.2022
21------------------------------------------------------------------------------
22"""
23## @namespace rosros.rclify.qos
24
25# Original file copyright notice:
26#
27# Copyright 2016 Open Source Robotics Foundation, Inc.
28#
29# Licensed under the Apache License, Version 2.0 (the "License");
30# you may not use this file except in compliance with the License.
31# You may obtain a copy of the License at
32#
33# http://www.apache.org/licenses/LICENSE-2.0
34#
35# Unless required by applicable law or agreed to in writing, software
36# distributed under the License is distributed on an "AS IS" BASIS,
37# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
38# See the License for the specific language governing permissions and
39# limitations under the License.
40from enum import Enum
41from enum import IntEnum
42
43from rospy import Duration
44
45
46class QoSPolicyKind(IntEnum):
47 """Enum for types of QoS policies that a Publisher or Subscription can set."""
48
49 INVALID = 1 << 0 # 2 ** 0
50 DURABILITY = 1 << 1 # 2 ** 1
51 DEADLINE = 1 << 2 # 2 ** 2
52 LIVELINESS = 1 << 3 # 2 ** 3
53 RELIABILITY = 1 << 4 # 2 ** 4
54 HISTORY = 1 << 5 # 2 ** 5
55 LIFESPAN = 1 << 6 # 2 ** 6
56
57
58class QoSPolicyEnum(IntEnum):
59 """
60 Base for QoS Policy enumerations.
61
62 Provides helper function to filter keys for utilities.
63 """
64
65 @classmethod
66 def short_keys(cls):
67 """Return a list of shortened typing-friendly enum values."""
68 return [k.lower() for k in cls.__members__.keys() if not k.startswith('RMW')]
69
70 @classmethod
71 def get_from_short_key(cls, name):
72 """Retrieve a policy type from a short name, case-insensitive."""
73 return cls[name.upper()].value
74
75 @property
76 def short_key(self):
77 for k, v in self.__class__.__members__.items():
78 if k.startswith('RMW'):
79 continue
80 if self.value is v:
81 return k.lower()
82 raise AttributeError(
83 'failed to find value %s in %s' %
84 (self.value, self.__class__.__name__))
85
88 """Enum for QoS History settings."""
90 RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT = 0
91 RMW_QOS_POLICY_HISTORY_KEEP_LAST = 1
92 RMW_QOS_POLICY_HISTORY_KEEP_ALL = 2
93 RMW_QOS_POLICY_HISTORY_UNKNOWN = 3
94 SYSTEM_DEFAULT = RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT
95 KEEP_LAST = RMW_QOS_POLICY_HISTORY_KEEP_LAST
96 KEEP_ALL = RMW_QOS_POLICY_HISTORY_KEEP_ALL
97 UNKNOWN = RMW_QOS_POLICY_HISTORY_UNKNOWN
100# Alias with the old name, for retrocompatibility
101QoSHistoryPolicy = HistoryPolicy
102
105 """
106 Enum for QoS Reliability settings.
107
108 This enum matches the one defined in rmw/types.h
109 """
110
111 RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT = 0
112 RMW_QOS_POLICY_RELIABILITY_RELIABLE = 1
113 RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT = 2
114 RMW_QOS_POLICY_RELIABILITY_UNKNOWN = 3
115 SYSTEM_DEFAULT = RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT
116 RELIABLE = RMW_QOS_POLICY_RELIABILITY_RELIABLE
117 BEST_EFFORT = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT
118 UNKNOWN = RMW_QOS_POLICY_RELIABILITY_UNKNOWN
121# Alias with the old name, for retrocompatibility
122QoSReliabilityPolicy = ReliabilityPolicy
123
126 """
127 Enum for QoS Durability settings.
128
129 This enum matches the one defined in rmw/types.h
130 """
131
132 RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT = 0
133 RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL = 1
134 RMW_QOS_POLICY_DURABILITY_VOLATILE = 2
135 RMW_QOS_POLICY_DURABILITY_UNKNOWN = 3
136 SYSTEM_DEFAULT = RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT
137 TRANSIENT_LOCAL = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL
138 VOLATILE = RMW_QOS_POLICY_DURABILITY_VOLATILE
139 UNKNOWN = RMW_QOS_POLICY_DURABILITY_UNKNOWN
142# Alias with the old name, for retrocompatibility
143QoSDurabilityPolicy = DurabilityPolicy
144
147 """
148 Enum for QoS Liveliness settings.
149
150 This enum matches the one defined in rmw/types.h
151 """
152
153 RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT = 0
154 RMW_QOS_POLICY_LIVELINESS_AUTOMATIC = 1
155 RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC = 3
156 RMW_QOS_POLICY_LIVELINESS_UNKNOWN = 4
157 SYSTEM_DEFAULT = RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT
158 AUTOMATIC = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC
159 MANUAL_BY_TOPIC = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC
160 UNKNOWN = RMW_QOS_POLICY_LIVELINESS_UNKNOWN
163# Alias with the old name, for retrocompatibility
164QoSLivelinessPolicy = LivelinessPolicy
165
167DURATION_UNSPEFICIED = Duration(0)
168LIVELINESS_LEASE_DURATION_DEFAULT = DURATION_UNSPEFICIED
169DEADLINE_DEFAULT = DURATION_UNSPEFICIED
170LIFESPAN_DEFAULT = DURATION_UNSPEFICIED
171DEPTH_SYSTEM_DEFAULT = 0
172LIVELINESS_UNKNOWN = 4 # From ros2/rmw: include/rmw/types.h
175DEFAULT = dict(
176 history = HistoryPolicy.KEEP_LAST,
177 depth = 10,
178 reliability = ReliabilityPolicy.RELIABLE,
179 durability = DurabilityPolicy.VOLATILE,
180 deadline = DEADLINE_DEFAULT,
181 lifespan = LIFESPAN_DEFAULT,
182 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
183 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
184 avoid_ros_namespace_conventions = False,
185)
186
187
188UNKNOWN = dict(
189 history = HistoryPolicy.UNKNOWN,
190 depth = DEPTH_SYSTEM_DEFAULT,
191 reliability = ReliabilityPolicy.UNKNOWN,
192 durability = DurabilityPolicy.UNKNOWN,
193 deadline = DEADLINE_DEFAULT,
194 lifespan = LIFESPAN_DEFAULT,
195 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
196 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
197 avoid_ros_namespace_conventions = False,
198)
199
200
201SYSTEM_DEFAULT = dict(
202 history = HistoryPolicy.SYSTEM_DEFAULT,
203 depth = DEPTH_SYSTEM_DEFAULT,
204 reliability = ReliabilityPolicy.SYSTEM_DEFAULT,
205 durability = DurabilityPolicy.SYSTEM_DEFAULT,
206 deadline = DEADLINE_DEFAULT,
207 lifespan = LIFESPAN_DEFAULT,
208 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
209 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
210 avoid_ros_namespace_conventions = False,
211)
212
213
214SENSOR_DATA = dict(
215 history = HistoryPolicy.KEEP_LAST,
216 depth = 5,
217 reliability = ReliabilityPolicy.BEST_EFFORT,
218 durability = DurabilityPolicy.VOLATILE,
219 deadline = DEADLINE_DEFAULT,
220 lifespan = LIFESPAN_DEFAULT,
221 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
222 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
223 avoid_ros_namespace_conventions = False,
224)
225
226
227SERVICES = dict(
228 history = HistoryPolicy.KEEP_LAST,
229 depth = 10,
230 reliability = ReliabilityPolicy.RELIABLE,
231 durability = DurabilityPolicy.VOLATILE,
232 deadline = DEADLINE_DEFAULT,
233 lifespan = LIFESPAN_DEFAULT,
234 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
235 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
236 avoid_ros_namespace_conventions = False,
237)
238
239
240PARAMETERS = dict(
241 history = HistoryPolicy.KEEP_LAST,
242 depth = 1000,
243 reliability = ReliabilityPolicy.RELIABLE,
244 durability = DurabilityPolicy.VOLATILE,
245 deadline = DEADLINE_DEFAULT,
246 lifespan = LIFESPAN_DEFAULT,
247 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
248 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
249 avoid_ros_namespace_conventions = False,
250)
251
252
253PARAMETER_EVENTS = dict(
254 history = HistoryPolicy.KEEP_LAST,
255 depth = 1000,
256 reliability = ReliabilityPolicy.RELIABLE,
257 durability = DurabilityPolicy.VOLATILE,
258 deadline = DEADLINE_DEFAULT,
259 lifespan = LIFESPAN_DEFAULT,
260 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
261 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
262 avoid_ros_namespace_conventions = False,
263)
264
265
266ACTIONS = dict(
267 history = HistoryPolicy.KEEP_LAST,
268 depth = 1,
269 reliability = ReliabilityPolicy.RELIABLE,
270 durability = DurabilityPolicy.TRANSIENT_LOCAL,
271 deadline = DEADLINE_DEFAULT,
272 lifespan = LIFESPAN_DEFAULT,
273 liveliness = LivelinessPolicy.SYSTEM_DEFAULT,
274 liveliness_lease_duration = LIVELINESS_LEASE_DURATION_DEFAULT,
275 avoid_ros_namespace_conventions = False,
276)
277
278
279PresetNames = {
280 "qos_profile_default": DEFAULT,
281 "qos_profile_unknown": UNKNOWN,
282 "qos_profile_system_default": SYSTEM_DEFAULT,
283 "qos_profile_sensor_data": SENSOR_DATA,
284 "qos_profile_services_default": SERVICES,
285 "qos_profile_parameters": PARAMETERS,
286 "qos_profile_parameter_events": PARAMETER_EVENTS,
287 "rcl_action_qos_profile_status_default": ACTIONS,
288}
289
290
291def qos_policy_name_from_kind(policy_kind):
292 """Get QoS policy name from QoSPolicyKind enum."""
293 if isinstance(policy_kind, QoSPolicyKind):
294 return "%s_QOS_POLICY" % policy_kind.name # "DEADLINE_QOS_POLICY" etc
295 return "INVALID_QOS_POLICY"
296
297
299 """Raised when constructing a QoSProfile with invalid arguments."""
301 def __init__(self, *args):
302 Exception.__init__(self, 'Invalid QoSProfile', *args)
304
305class QoSProfile:
306 """Define Quality of Service policies."""
308 # default QoS profile not exposed to the user to encourage them to think about QoS settings
309 __qos_profile_default_dict = PresetNames['qos_profile_default']
310
311 __slots__ = [
312 '_history',
313 '_depth',
314 '_reliability',
315 '_durability',
316 '_lifespan',
317 '_deadline',
318 '_liveliness',
319 '_liveliness_lease_duration',
320 '_avoid_ros_namespace_conventions',
321 ]
322
323 def __init__(self, **kwargs):
324 assert all('_' + key in self.__slots____slots__ for key in kwargs.keys()), \
325 'Invalid arguments passed to constructor: %r' % kwargs.keys()
326
327 if 'history' not in kwargs:
328 if 'depth' not in kwargs:
329 raise InvalidQoSProfileException('History and/or depth settings are required.')
330 kwargs['history'] = QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST
331
332 self.historyhistoryhistory = kwargs.get('history')
333
334 if (
335 QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST == self.historyhistoryhistory and
336 'depth' not in kwargs
337 ):
338 raise InvalidQoSProfileException('History set to KEEP_LAST without a depth setting.')
339
340 self.depthdepthdepth = kwargs.get('depth', QoSProfile.__qos_profile_default_dict['depth'])
341 self.reliabilityreliabilityreliability = kwargs.get(
342 'reliability', QoSProfile.__qos_profile_default_dict['reliability'])
343 self.durabilitydurabilitydurability = kwargs.get(
344 'durability', QoSProfile.__qos_profile_default_dict['durability'])
345 self.lifespanlifespanlifespan = kwargs.get('lifespan', QoSProfile.__qos_profile_default_dict['lifespan'])
346 self.deadlinedeadlinedeadline = kwargs.get('deadline', QoSProfile.__qos_profile_default_dict['deadline'])
347 self.livelinesslivelinessliveliness = kwargs.get(
348 'liveliness', QoSProfile.__qos_profile_default_dict['liveliness'])
350 'liveliness_lease_duration',
351 QoSProfile.__qos_profile_default_dict['liveliness_lease_duration'])
353 'avoid_ros_namespace_conventions',
354 QoSProfile.__qos_profile_default_dict['avoid_ros_namespace_conventions'])
355
356 @property
357 def history(self):
358 """
359 Get field 'history'.
360
361 :returns: history attribute
362 :rtype: QoSHistoryPolicy
363 """
364 return self._history
365
366 @history.setter
367 def history(self, value):
368 assert isinstance(value, (QoSHistoryPolicy, int))
369 self._history = QoSHistoryPolicy(value)
370
371 @property
372 def reliability(self):
373 """
374 Get field 'reliability'.
375
376 :returns: reliability attribute
377 :rtype: QoSReliabilityPolicy
378 """
379 return self._reliability
380
381 @reliability.setter
382 def reliability(self, value):
383 assert isinstance(value, (QoSReliabilityPolicy, int))
385
386 @property
387 def durability(self):
388 """
389 Get field 'durability'.
390
391 :returns: durability attribute
392 :rtype: QoSDurabilityPolicy
393 """
394 return self._durability
396 @durability.setter
397 def durability(self, value):
398 assert isinstance(value, (QoSDurabilityPolicy, int))
399 self._durability = QoSDurabilityPolicy(value)
400
401 @property
402 def depth(self):
403 """
404 Get field 'depth'.
405
406 :returns: depth attribute
407 :rtype: int
408 """
409 return self._depth
410
411 @depth.setter
412 def depth(self, value):
413 assert isinstance(value, int)
414 self._depth = value
415
416 @property
417 def lifespan(self):
418 """
419 Get field 'lifespan'.
420
421 :returns: lifespan attribute
422 :rtype: Duration
423 """
424 return self._lifespan
425
426 @lifespan.setter
427 def lifespan(self, value):
428 assert isinstance(value, Duration)
429 self._lifespan = value
430
431 @property
432 def deadline(self):
433 """
434 Get field 'deadline'.
435
436 :returns: deadline attribute.
437 :rtype: Duration
438 """
439 return self._deadline
441 @deadline.setter
442 def deadline(self, value):
443 assert isinstance(value, Duration)
444 self._deadline = value
445
446 @property
447 def liveliness(self):
448 """
449 Get field 'liveliness'.
450
451 :returns: liveliness attribute
452 :rtype: QoSLivelinessPolicy
453 """
454 return self._liveliness
455
456 @liveliness.setter
457 def liveliness(self, value):
458 assert isinstance(value, (QoSLivelinessPolicy, int))
459 self._liveliness = QoSLivelinessPolicy(value)
460
461 @property
463 """
464 Get field 'liveliness_lease_duration'.
465
466 :returns: liveliness_lease_duration attribute.
467 :rtype: Duration
468 """
470
471 @liveliness_lease_duration.setter
472 def liveliness_lease_duration(self, value):
473 assert isinstance(value, Duration)
474 self._liveliness_lease_duration = value
475
476 @property
478 """
479 Get field 'avoid_ros_namespace_conventions'.
480
481 :returns: avoid_ros_namespace_conventions attribute
482 :rtype: bool
483 """
486 @avoid_ros_namespace_conventions.setter
487 def avoid_ros_namespace_conventions(self, value):
488 assert isinstance(value, bool)
490
491 def get_c_qos_profile(self):
492 """Returns None (ROS2 API compatibility stand-in)."""
493 return None
495 def __eq__(self, other):
496 if not isinstance(other, QoSProfile):
497 return False
498 return all(
499 self.__getattribute__(slot) == other.__getattribute__(slot)
500 for slot in self.__slots____slots__)
501
502
503# The details of the following profiles can be found at
504# 1. ROS QoS principles:
505# https://design.ros2.org/articles/qos.html
506# 2. ros2/rmw : rmw/include/rmw/qos_profiles.h
507
508#: Used for initialization. Should not be used as the actual QoS profile.
509qos_profile_unknown = QoSProfile(**PresetNames['qos_profile_unknown'])
510#: Uses the default QoS settings defined in the DDS vendor tool
511qos_profile_system_default = QoSProfile(**PresetNames['qos_profile_system_default'])
512#: For sensor data, using best effort reliability and small queue depth
513qos_profile_sensor_data = QoSProfile(**PresetNames['qos_profile_sensor_data'])
514#: For services, using reliable reliability and volatile durability
515qos_profile_services_default = QoSProfile(**PresetNames['qos_profile_services_default'])
516#: For parameter communication. Similar to service QoS profile but with larger
517#: queue depth so that requests do not get lost.
518qos_profile_parameters = QoSProfile(**PresetNames['qos_profile_parameters'])
519#: For parameter change events. Currently same as the QoS profile for
520#: parameters.
521qos_profile_parameter_events = QoSProfile(**PresetNames['qos_profile_parameter_events'])
522
523# Separate rcl_action profile defined at
524# ros2/rcl : rcl/rcl_action/include/rcl_action/default_qos.h
525#
526#: For actions, using reliable reliability, transient-local durability.
527qos_profile_action_status_default = QoSProfile(**PresetNames['rcl_action_qos_profile_status_default'])
528
529
530class QoSPresetProfiles(Enum):
531 UNKNOWN = qos_profile_unknown
532 SYSTEM_DEFAULT = qos_profile_system_default
533 SENSOR_DATA = qos_profile_sensor_data
534 SERVICES_DEFAULT = qos_profile_services_default
535 PARAMETERS = qos_profile_parameters
536 PARAMETER_EVENTS = qos_profile_parameter_events
537 ACTION_STATUS_DEFAULT = qos_profile_action_status_default
538
539 """Noted that the following are duplicated from QoSPolicyEnum.
540
541 Our supported version of Python3 (3.5) doesn't have a fix that allows mixins on Enum.
542 """
543 @classmethod
544 def short_keys(cls):
545 """Return a list of shortened typing-friendly enum values."""
546 return [k.lower() for k in cls.__members__.keys() if not k.startswith('RMW')]
548 @classmethod
549 def get_from_short_key(cls, name):
550 """Retrieve a policy type from a short name, case-insensitive."""
551 return cls[name.upper()].value
Enum for QoS Durability settings.
Definition qos.py:132
Enum for QoS History settings.
Definition qos.py:89
Raised when constructing a QoSProfile with invalid arguments.
Definition qos.py:300
Enum for QoS Liveliness settings.
Definition qos.py:153
Base for QoS Policy enumerations.
Definition qos.py:63
short_keys(cls)
Return a list of shortened typing-friendly enum values.
Definition qos.py:66
get_from_short_key(cls, name)
Retrieve a policy type from a short name, case-insensitive.
Definition qos.py:71
Enum for types of QoS policies that a Publisher or Subscription can set.
Definition qos.py:46
Define Quality of Service policies.
Definition qos.py:307
liveliness
Get field 'liveliness'.
Definition qos.py:476
__init__(self, **kwargs)
Definition qos.py:328
durability(self, value)
Definition qos.py:411
deadline(self, value)
Definition qos.py:465
__eq__(self, other)
Definition qos.py:527
history
Get field 'history'.
Definition qos.py:368
history(self, value)
Definition qos.py:375
depth
Get field 'depth'.
Definition qos.py:422
reliability
Get field 'reliability'.
Definition qos.py:386
avoid_ros_namespace_conventions(self, value)
Definition qos.py:519
liveliness_lease_duration
Get field 'liveliness_lease_duration'.
Definition qos.py:494
durability
Get field 'durability'.
Definition qos.py:404
avoid_ros_namespace_conventions(self)
Definition qos.py:514
liveliness_lease_duration(self, value)
Definition qos.py:501
liveliness(self, value)
Definition qos.py:483
deadline
Get field 'deadline'.
Definition qos.py:458
lifespan
Get field 'lifespan'.
Definition qos.py:440
lifespan(self, value)
Definition qos.py:447
avoid_ros_namespace_conventions
Get field 'avoid_ros_namespace_conventions'.
Definition qos.py:512
get_c_qos_profile(self)
Returns None (ROS2 API compatibility stand-in).
Definition qos.py:523
reliability(self, value)
Definition qos.py:393
Enum for QoS Reliability settings.
Definition qos.py:111
qos_policy_name_from_kind(policy_kind)
Get QoS policy name from QoSPolicyKind enum.
Definition qos.py:293