2Stand-ins for `rospy.msproxy` in ROS2.
4------------------------------------------------------------------------------
5This file is part of rosros - simple unified interface to ROS1 / ROS2.
6Released under the BSD License.
11------------------------------------------------------------------------------
13## @namespace rosros.rospify.msproxy
19 """Partial stand-in for `rospy.MasterProxy`"""
23 @param node rclpy.Node instance
29 Retrieve list representation of system state (i.e. publishers, subscribers, and services).
31 @return: (1,
"current system state", systemState).
33 System state
is in list representation::
34 [publishers, subscribers, services].
36 publishers
is of the form::
37 [ [topic1, [topic1Publisher1...topic1PublisherN]] ... ]
39 subscribers
is of the form::
40 [ [topic1, [topic1Subscriber1...topic1SubscriberN]] ... ]
42 services
is of the form::
43 [ [service1, []] ... ]
45 pubs, subs, srvs = [], [], []
46 for topic, _
in self.
_node.get_topic_names_and_types():
48 for info
in self.
_node.get_publishers_info_by_topic(topic):
49 nodes.append(util.namejoin(info.node_namespace, info.node_name))
50 pubs.append([topic, sorted(nodes)])
52 for info
in self.
_node.get_subscriptions_info_by_topic(topic):
53 nodes.append(util.namejoin(info.node_namespace, info.node_name))
54 subs.append([topic, sorted(nodes)])
57 for name, ns
in self.
_node.get_node_names_and_namespaces():
58 fullname = util.namejoin(ns, name)
59 for srv, _
in self.
_node.get_service_names_and_types_by_node(name, ns):
60 srvnodes.setdefault(srv, []).append(fullname)
61 srvs = [[srv, sorted(nodes)]
for srv, nodes
in sorted(srvnodes.items())]
63 return 1,
"current system state", [sorted(pubs), sorted(subs), srvs]
67 Get list of topics that can be subscribed to.
69 This does not return topics that have no publishers.
72 @param subgraph
return only topics under subgraph, e.g.
"/foo" will match
73 "/foo/bar" but
not "/foobar".
"" returns all topics.
74 @return: (1,
"current topics", [[topicName, typeName], ])
77 if subgraph: subgraph = subgraph.rstrip(
"/") +
"/"
78 for topic, _
in self.
_node.get_topic_names_and_types():
79 if subgraph
and not topic.startswith(subgraph):
continue
81 for info
in self.
_node.get_publishers_info_by_topic(topic):
82 pairs.add((topic, ros2.canonical(info.topic_type)))
83 return 1,
"current topics", [list(x)
for x
in sorted(pairs)]
87 Returns a list of topic names and their types.
89 @return (1,
"current system state", [[topicName, topocType], ])
92 for topic, typenames
in sorted(self.
_node.get_topic_names_and_types()):
93 for typename
in typenames:
94 result.append([topic, ros2.canonical(typename)])
95 return 1,
"current system state", result
getSystemState(self)
Retrieve list representation of system state (i.e.
getPublishedTopics(self, subgraph="")
Get list of topics that can be subscribed to.
getTopicTypes(self)
Returns a list of topic names and their types.