rosros 0.2.5
Simple unified interface to ROS1 / ROS2 Python API
Loading...
Searching...
No Matches
msproxy.py
Go to the documentation of this file.
1"""
2Stand-ins for `rospy.msproxy` in ROS2.
3
4------------------------------------------------------------------------------
5This file is part of rosros - simple unified interface to ROS1 / ROS2.
6Released under the BSD License.
7
8@author Erki Suurjaak
9@created 30.05.2022
10@modified 30.05.2022
11------------------------------------------------------------------------------
12"""
13## @namespace rosros.rospify.msproxy
14from .. import ros2
15from .. import util
16
17
18class MasterProxy:
19 """Partial stand-in for `rospy.MasterProxy`"""
20
21 def __init__(self, node):
22 """
23 @param node rclpy.Node instance
24 """
25 self._node = node
26
27 def getSystemState(self):
28 """
29 Retrieve list representation of system state (i.e. publishers, subscribers, and services).
30
31 @return: (1, "current system state", systemState).
32
33 System state is in list representation::
34 [publishers, subscribers, services].
35
36 publishers is of the form::
37 [ [topic1, [topic1Publisher1...topic1PublisherN]] ... ]
38
39 subscribers is of the form::
40 [ [topic1, [topic1Subscriber1...topic1SubscriberN]] ... ]
41
42 services is of the form::
43 [ [service1, []] ... ]
44 """
45 pubs, subs, srvs = [], [], []
46 for topic, _ in self._node.get_topic_names_and_types():
47 nodes = []
48 for info in self._node.get_publishers_info_by_topic(topic):
49 nodes.append(util.namejoin(info.node_namespace, info.node_name))
50 pubs.append([topic, sorted(nodes)])
51 nodes = []
52 for info in self._node.get_subscriptions_info_by_topic(topic):
53 nodes.append(util.namejoin(info.node_namespace, info.node_name))
54 subs.append([topic, sorted(nodes)])
55
56 srvnodes = {} # {service name: [node, ]}
57 for name, ns in self._node.get_node_names_and_namespaces():
58 fullname = util.namejoin(ns, name)
59 for srv, _ in self._node.get_service_names_and_types_by_node(name, ns):
60 srvnodes.setdefault(srv, []).append(fullname)
61 srvs = [[srv, sorted(nodes)] for srv, nodes in sorted(srvnodes.items())]
62
63 return 1, "current system state", [sorted(pubs), sorted(subs), srvs]
64
65 def getPublishedTopics(self, subgraph=""):
66 """
67 Get list of topics that can be subscribed to.
68
69 This does not return topics that have no publishers.
70 See `getSystemState()` to get more comprehensive list.
71
72 @param subgraph return only topics under subgraph, e.g. "/foo" will match
73 "/foo/bar" but not "/foobar". "" returns all topics.
74 @return: (1, "current topics", [[topicName, typeName], ])
75 """
76 pairs = set() # {(topic, typename)}
77 if subgraph: subgraph = subgraph.rstrip("/") + "/"
78 for topic, _ in self._node.get_topic_names_and_types():
79 if subgraph and not topic.startswith(subgraph): continue # for topic
80
81 for info in self._node.get_publishers_info_by_topic(topic):
82 pairs.add((topic, ros2.canonical(info.topic_type)))
83 return 1, "current topics", [list(x) for x in sorted(pairs)]
84
85 def getTopicTypes(self):
86 """
87 Returns a list of topic names and their types.
88
89 @return (1, "current system state", [[topicName, topocType], ])
90 """
91 result = []
92 for topic, typenames in sorted(self._node.get_topic_names_and_types()):
93 for typename in typenames:
94 result.append([topic, ros2.canonical(typename)])
95 return 1, "current system state", result
96
97
98__all__ = [
99 "MasterProxy"
100]
getSystemState(self)
Retrieve list representation of system state (i.e.
Definition msproxy.py:44
getPublishedTopics(self, subgraph="")
Get list of topics that can be subscribed to.
Definition msproxy.py:74
getTopicTypes(self)
Returns a list of topic names and their types.
Definition msproxy.py:89