Simple unified interface to ROS1 / ROS2 Python API
Index
Installation
Example usage
Parameters
Constructor args in publishers and services
Timers
Bags
ROS core functionality
ROS API helpers
Converting an existing package
API documentation
View the Project on GitHub suurjaak/rosros
A simple node that publishes a message on receiving a service request:
import rosros def on_trigger(req): pub.publish(True) return {"success": True, "message": "Triggered!"} rosros.init_node("mynode") params = rosros.init_params(service="/trigger", topic="/triggerings") srv = rosros.create_service(params["service"], "std_srvs/Trigger", on_trigger) pub = rosros.create_publisher(params["topic"], "std_msgs/Bool", latch=True, queue_size=2) rosros.spin()
Equivalent ROS1 code using rospy:
# ROS1:
import rospy, std_msgs.msg, std_srvs.srv
def on_trigger(req):
pub.publish(True)
return {"success": True, "message": "Triggered!"}
rospy.init_node("mynode")
service = rospy.get_param("~service", "/trigger")
topic = rospy.get_param("~topic", "/triggerings")
rospy.set_param("~service", service)
rospy.set_param("~topic", topic)
srv = rospy.Service(service, std_srvs.srv.Trigger, on_trigger)
pub = rospy.Publisher(topic, std_msgs.msg.Bool, latch=True, queue_size=2)
rospy.spin()
Equivalent ROS2 code using rclpy:
# ROS2:
import rclpy, rclpy.qos, std_msgs.msg, std_srvs.srv
def on_trigger(req, resp):
pub.publish(std_msgs.msg.Bool(data=True))
resp.success = True
resp.message = "Triggered!"
return resp
rclpy.init()
node = rclpy.create_node("mynode")
service = node.get_parameter_or("service", rclpy.Parameter("", value="/trigger")).value
topic = node.get_parameter_or("topic", rclpy.Parameter("", value="/triggerings")).value
if not node.has_parameter("service"):
node.declare_parameter("service", service)
if not node.has_parameter("topic"):
node.declare_parameter("topic", topic)
srv = node.create_service(std_srvs.srv.Trigger, service, on_trigger)
qos = rclpy.qos.QoSProfile(depth=2, durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL)
pub = node.create_publisher(std_msgs.msg.Bool, topic, qos)
rclpy.spin(node)
Any ROS2-only extras used will be ignored under ROS1, like topic QoS reliability.
rosros provides a convenient way to auto-load all node parameters, and override or populate additional parameters from a nested dictionary.
rosros.init_node("mynode") # If the given defaults were not among existing node parameters, # they will be auto-declared on the node. params = rosros.init_params({"my": {"nested": {"param1": 5, "param2": True}}})
rosros provides short-hand convenience wrappers for publishing messages, invoking services and returning service responses, without needing to create the message or service request/response class instance explicitly.
ROS1 already provides this short-hand in rospy
,
but ROS2 API requires the instances to be created explicitly.
In rosros:
pub = rosros.create_publisher("/topic", "std_msgs/Bool") pub.publish(True) pub.publish(data=True) pub.publish() # Service callback need not accept any parameters if request object is irrelevant srv = rosros.create_service("/service", "std_srvs/SetBool", lambda: {"message": "Triggered!"}) cli = rosros.create_client("/service", "std_srvs/SetBool") cli.call(True) cli.call(data=True) cli(True)
Equivalent ROS1 code using rospy:
# ROS1:
pub = rospy.Publisher("/topic", std_msgs.msg.Bool, queue_size=0)
pub.publish(True)
pub.publish(data=True)
pub.publish()
srv = rospy.Service("/service", std_srvs.srv.SetBool, lambda req: {"message": "Triggered!"})
cli = rospy.ServiceProxy("/service", std_srvs.srv.SetBool)
cli.call(True)
cli.call(data=True)
cli(True)
Equivalent ROS2 code using rospy:
# ROS2:
pub = mynode.create_publisher(std_msgs.msg.Bool, "/topic", 0)
pub.publish(std_msgs.msg.Bool(data=True))
pub.publish(std_msgs.msg.Bool(data=True))
pub.publish(std_msgs.msg.Bool())
def callback(req, resp):
resp.message = "Triggered!"
return resp
srv = mynode.create_service(std_srvs.srv.SetBool, "/service", callback)
cli = mynode.create_client(std_srvs.srv.SetBool, "/service")
cli.call(std_srvs.srv.SetBool.Request(data=True))
rosros provides convenience one-shot timers in ROS2 like ROS1 does.
Additionally, rosros provides an option to start invoking the timer callback immediately, instead of waiting one period before first call.
timer = rosros.create_timer(2, lambda: print("just once, after 2 sec"), oneshot=True) timer = rosros.create_timer(5, lambda: print("every 5 sec, starting now"), immediate=True) # Can take seconds, can take an explicit Duration object timer = rosros.create_timer(rosros.api.make_duration(5), lambda: print("every 5 sec"))
rosros provides a unified interface for reading and writing ROS bags.
Bag format is the custom ROS format in ROS1, and the SQLite database format in ROS2.
bag = rosros.Bag("my.bag", mode="a") print(bag) # Prints bag metainfo, equivalent to "rosbag info" bag.write("/my/topic", std_msgs.msg.Bool()) for topic, message, stamp in bag: print("[%s] %s: %s" % (rosros.api.to_sec(stamp), topic, message))