rosros 0.2.5
Simple unified interface to ROS1 / ROS2 Python API
Loading...
Searching...
No Matches
topics.py
Go to the documentation of this file.
1"""
2Stand-ins for `rospy.topics` in ROS2.
3
4------------------------------------------------------------------------------
5This file is part of rosros - simple unified interface to ROS1 / ROS2.
6Released under the BSD License.
7
8@author Erki Suurjaak
9@created 30.05.2022
10@modified 25.06.2022
11------------------------------------------------------------------------------
12"""
13## @namespace rosros.rospify.topics
14import abc
15
16import rclpy.publisher
17import rclpy.subscription
18
19from .. import ros2
20from . import exceptions
21#from . msg import AnyMsg # Imported late to avoid circular import
22
23
24class Message(abc.ABC):
25 """Abstract base class of Message data classes auto-generated from msg files."""
26
27 @classmethod
28 def __subclasshook__(cls, C):
29 """Returns true if C is a ROS2 message class, else `NotImplemented`."""
30 return True if ros2.is_ros_message(C) else NotImplemented
31
32
33
35
36
37class Publisher(abc.ABC):
38 """Stand-in for `rospy.Publisher`, wrapping the creation of ROS2 publisher."""
39
40 def __new__(cls, name, data_class, subscriber_listener=None, tcp_nodelay=None,
41 latch=False, headers=None, queue_size=None):
42 """
43 Constructor.
44
45 @param name resource name of topic, e.g. 'laser'.
46 @param data_class message class for serialization
47 @param subscriber_listener ignored (ROS1 compatibility stand-in)
48 @param tcp_nodelay ignored (ROS1 compatibility stand-in)
49 @param latch if True, the last message published is 'latched',
50 meaning that any compatible subscribers will be sent
51 that message immediately upon connection
52 @param headers ignored (ROS1 compatibility stand-in)
53 @param queue_size the queue size used for asynchronously publishing messages
54 from different threads. A size of zero means an infinite queue,
55 which can be dangerous. When the keyword is not being used
56 or when None is passed, all publishing will happen synchronously
57 and a warning message will be printed.
59 @throws ROSException if parameters are invalid
60 """
61 try:
62 return ros2.create_publisher(name, data_class, latch=latch, queue_size=queue_size)
63 except Exception as e:
64 raise exceptions.ROSException() from e
65
66 @classmethod
67 def __subclasshook__(cls, C):
68 """Returns true if C is a ROS2 publisher class, else `NotImplemented`."""
69 return True if issubclass(C, rclpy.publisher.Publisher) else NotImplemented
70
72class Subscriber(abc.ABC):
73 """Stand-in for `rospy.Subscriber`, wrapping the creation of ROS2 subscription."""
74
75 def __new__(cls, name, data_class, callback=None, callback_args=None,
76 queue_size=None, buff_size=None, tcp_nodelay=None):
77 """
78 Returns rclpy.subscription.Subscription instance, supporting `AnyMsg`.
79
80 @param name graph resource name of topic, e.g. 'laser'
81 @param data_class data type class to use for messages, e.g. std_msgs.msg.String
82 @param callback function to call ( fn(data)) when data is received.
83 If callback_args is set, the function must accept the callback_args
84 as a second argument, i.e. fn(data, callback_args).
85 NOTE: Additional callbacks can be added using add_callback().
86 @param callback_args additional arguments to pass to the callback.
87 This is useful when you wish to reuse the same callback
88 for multiple subscriptions.
89 @param queue_size maximum number of messages to receive at a time.
90 This will generally be 1 or None (infinite, default).
91 @param buff_size ignored (ROS1 compatibility stand-in)
92 @param tcp_nodelay ignored (ROS1 compatibility stand-in)
94 @throws ROSException if parameters are invalid
95 """
96 from . msg import AnyMsg # Imported late to avoid circular import
97 try:
98 raw = False
99 if issubclass(data_class, AnyMsg): # Look up real type from ROS2 network
100 infos = ros2.NODE.get_publishers_info_by_topic(name)
101 infos = infos or ros2.NODE.get_subscriptions_info_by_topic(name)
102 typename = next(ros2.canonical(x.topic_type) for x in infos)
103 data_class, raw = ros2.get_message_class(typename), True
104 sub = ros2.create_subscriber(name, data_class, callback, callback_args=callback_args,
105 queue_size=queue_size or 0, raw=raw)
106 sub._anymsg = raw
107 except Exception as e:
108 raise exceptions.ROSException() from e
109 return sub
110
111 @classmethod
112 def __subclasshook__(cls, C):
113 """Returns true if C is a ROS2 subscription class, else `NotImplemented`."""
114 return True if issubclass(C, rclpy.subscription.Subscription) else NotImplemented
115
116
117__all__ = [
118 "Message", "Publisher", "SubscribeListener", "Subscriber"
119]
Base exception class for ROS clients.
Definition exceptions.py:54
Abstract base class of Message data classes auto-generated from msg files.
Definition topics.py:24
__subclasshook__(cls, C)
Returns true if C is a ROS2 message class, else `NotImplemented`.
Definition topics.py:29
Stand-in for `rospy.Publisher`, wrapping the creation of ROS2 publisher.
Definition topics.py:37
__subclasshook__(cls, C)
Returns true if C is a ROS2 publisher class, else `NotImplemented`.
Definition topics.py:66
__new__(cls, name, data_class, subscriber_listener=None, tcp_nodelay=None, latch=False, headers=None, queue_size=None)
Constructor.
Definition topics.py:59
Empty stand-in for `rospy.SubscribeListener`.
Definition topics.py:34
Stand-in for `rospy.Subscriber`, wrapping the creation of ROS2 subscription.
Definition topics.py:71
__new__(cls, name, data_class, callback=None, callback_args=None, queue_size=None, buff_size=None, tcp_nodelay=None)
Returns rclpy.subscription.Subscription instance, supporting `AnyMsg`.
Definition topics.py:94
__subclasshook__(cls, C)
Returns true if C is a ROS2 subscription class, else `NotImplemented`.
Definition topics.py:111