rosros 0.2.5
Simple unified interface to ROS1 / ROS2 Python API
Loading...
Searching...
No Matches
topic_endpoint_info.py
Go to the documentation of this file.
1"""
2Port of ROS2 `rclpy.topic_endpoint_info` for ROS1.
3
4Partially modified copy from ROS2 `rclpy.topic_endpoint_info`,
5at https://github.com/ros2/rclpy (`rclpy/rclpy/topic_endpoint_info.py`),
6released under the Apache 2.0 License.
7
8------------------------------------------------------------------------------
9This file is part of rosros - simple unified interface to ROS1 / ROS2.
10Released under the BSD License.
11
12@author Erki Suurjaak
13@created 16.02.2022
14@modified 23.02.2022
15------------------------------------------------------------------------------
16"""
17## @namespace rosros.rclify.topic_endpoint_info
18
19# Original file copyright notice:
20#
21# Copyright 2016 Open Source Robotics Foundation, Inc.
22#
23# Licensed under the Apache License, Version 2.0 (the "License");
24# you may not use this file except in compliance with the License.
25# You may obtain a copy of the License at
26#
27# http://www.apache.org/licenses/LICENSE-2.0
28#
29# Unless required by applicable law or agreed to in writing, software
30# distributed under the License is distributed on an "AS IS" BASIS,
31# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
32# See the License for the specific language governing permissions and
33# limitations under the License.
34from enum import IntEnum
35
36from . qos import QoSPresetProfiles, QoSProfile
37
38
39class TopicEndpointTypeEnum(IntEnum):
40 """Enum for possible types of topic endpoints."""
41 INVALID = 0
42 PUBLISHER = 1
43 SUBSCRIPTION = 2
44
45
47 """Information on a topic endpoint."""
48
49 __slots__ = [
50 '_node_name',
51 '_node_namespace',
52 '_topic_type',
53 '_endpoint_type',
54 '_endpoint_gid',
55 '_qos_profile'
56 ]
57
58 def __init__(self, **kwargs):
59 assert all('_' + key in self.__slots____slots__ for key in kwargs.keys()), \
60 'Invalid arguments passed to constructor: %r' % kwargs.keys()
61
62 self.node_namenode_namenode_name = kwargs.get('node_name', '')
63 self.node_namespacenode_namespacenode_namespace = kwargs.get('node_namespace', '')
64 self.topic_typetopic_typetopic_type = kwargs.get('topic_type', '')
65 self.endpoint_typeendpoint_typeendpoint_type = kwargs.get('endpoint_type', TopicEndpointTypeEnum.INVALID)
66 self.endpoint_gidendpoint_gidendpoint_gid = kwargs.get('endpoint_gid', [])
67 self.qos_profileqos_profileqos_profile = kwargs.get('qos_profile', QoSPresetProfiles.UNKNOWN.value)
68
69 @property
70 def node_name(self):
71 """
72 Get field 'node_name'.
73
74 :returns: node_name attribute
75 :rtype: str
76 """
77 return self._node_name
78
79 @node_name.setter
80 def node_name(self, value):
81 assert isinstance(value, str)
82 self._node_name = value
83
84 @property
85 def node_namespace(self):
86 """
87 Get field 'node_namespace'.
88
89 :returns: node_namespace attribute
90 :rtype: str
91 """
92 return self._node_namespace
93
94 @node_namespace.setter
95 def node_namespace(self, value):
96 assert isinstance(value, str)
97 self._node_namespace = value
98
99 @property
100 def topic_type(self):
101 """
102 Get field 'topic_type'.
104 :returns: topic_type attribute
105 :rtype: str
106 """
107 return self._topic_type
108
109 @topic_type.setter
110 def topic_type(self, value):
111 assert isinstance(value, str)
112 self._topic_type = value
113
114 @property
115 def endpoint_type(self):
116 """
117 Get field 'endpoint_type'.
118
119 :returns: endpoint_type attribute
120 :rtype: TopicEndpointTypeEnum
121 """
122 return self._endpoint_type
123
124 @endpoint_type.setter
125 def endpoint_type(self, value):
126 if isinstance(value, TopicEndpointTypeEnum):
127 self._endpoint_type = value
128 elif isinstance(value, int):
130 else:
131 assert False
132
133 @property
134 def endpoint_gid(self):
135 """
136 Get field 'endpoint_gid'.
137
138 :returns: endpoint_gid attribute
139 :rtype: list
140 """
141 return self._endpoint_gid
142
143 @endpoint_gid.setter
144 def endpoint_gid(self, value):
145 assert all(isinstance(x, int) for x in value)
146 self._endpoint_gid = value
147
148 @property
149 def qos_profile(self):
150 """
151 Get field 'qos_profile'.
153 :returns: qos_profile attribute
154 :rtype: QoSProfile
155 """
156 return self._qos_profile
157
158 @qos_profile.setter
159 def qos_profile(self, value):
160 if isinstance(value, QoSProfile):
161 self._qos_profile = value
162 elif isinstance(value, dict):
163 self._qos_profile = QoSProfile(**value)
164 else:
165 assert False
166
167 def __eq__(self, other):
168 if not isinstance(other, TopicEndpointInfo):
169 return False
170 return all(
171 self.__getattribute__(slot) == other.__getattribute__(slot)
172 for slot in self.__slots____slots__)
173
174 def __str__(self):
175 result = 'Node name: %s\n' % self.node_namenode_namenode_name
176 result += 'Node namespace: %s\n' % self.node_namespacenode_namespacenode_namespace
177 result += 'Topic type: %s\n' % self.topic_typetopic_typetopic_type
178 result += 'Endpoint type: %s\n' % self.endpoint_typeendpoint_typeendpoint_type.name
179 result += 'GID: %s\n' % '.'.join(format(x, '02x') for x in self.endpoint_gidendpoint_gidendpoint_gid)
180 result += 'QoS profile:\n'
181 result += ' Reliability: %s\n' % self.qos_profileqos_profileqos_profile.reliability.name
182 result += ' Durability: %s\n' % self.qos_profileqos_profileqos_profile.durability.name
183 result += ' Lifespan: %d nanoseconds\n' % self.qos_profileqos_profileqos_profile.lifespan.nanoseconds
184 result += ' Deadline: %d nanoseconds\n' % self.qos_profileqos_profileqos_profile.deadline.nanoseconds
185 result += ' Liveliness: %s\n' % self.qos_profileqos_profileqos_profile.liveliness.name
186 result += ' Liveliness lease duration: %d nanoseconds' % \
187 self.qos_profileqos_profileqos_profile.liveliness_lease_duration.nanoseconds
188 return result
Define Quality of Service policies.
Definition qos.py:307
Enum for possible types of topic endpoints.