2Stand-ins for `rospy.timer` in ROS2.
4------------------------------------------------------------------------------
5This file is part of rosros - simple unified interface to ROS1 / ROS2.
6Released under the BSD License.
11------------------------------------------------------------------------------
13## @namespace rosros.rospify.timer
22from . import exceptions
27 Sleeps for the specified duration
in ROS time.
29 @param duration seconds (
or Duration) to sleep, <=0 returns immediately
31 @throws ROSInterruptException
if ROS shutdown occurs before sleep completes
32 @throws ROSTimeMovedBackwardsException
if ROS time
is set backwards
34 duration = ros2.to_sec(duration)
35 if duration <= 0:
return
38 timer = ros2.create_timer(duration, callback=
None)
40 def jumphandler(jump):
43 ros2.destroy_entity(rate)
45 thresh = rclpy.clock.JumpThreshold(min_forward=
None, min_backward=ros2.make_duration(nsecs=-1))
46 handle = ros2.NODE.get_clock().create_jump_callback(thresh, post_callback=jumphandler)
47 rate = rclpy.timer.Rate(timer, context=ros2.NODE.context)
50 except rclpy.exceptions.ROSInterruptException
as e:
54 ros2.destroy_entity(timer)
61 """Stand-in for `rospy.Rate`, wrapping the creation of ROS2 rate instance."""
63 def __new__(cls, hz, reset=None):
65 Returns rclpy.timer.Rate instance.
67 @param hz rate to determine sleeping
68 @param reset ignored (ROS1 compatibility stand-
in)
70 return ros2.create_rate(hz)
74 """Returns true if C is a ROS2 rate class, else `NotImplemented`."""
75 return True if issubclass(C, rclpy.timer.Rate)
else NotImplemented
82 Port of rospy.timer.TimerEvent from ROS1 to ROS2.
84 @param last_expected when previous callback should have happened,
as rospy.Time
85 @param last_real when previous callback actually happened,
as rospy.Time
86 @param current_expected when current callback should have been called,
as rospy.Time
87 @param last_duration duration of the last callback (end time minus start time)
in seconds,
88 as float. Note that this
is always
in wall-clock time.
90 def __init__(self, last_expected, last_real, current_expected, current_real, last_duration):
99 """Stand-in for `rospy.Timer`, wrapping the creation of ROS2 timer instance."""
101 def __new__(cls, period, callback, oneshot=False, reset=None):
103 Returns rclpy.timer.Timer instance.
105 @param period desired period between callbacks,
as `Duration`
106 @param callback function to be called, taking rospy.TimerEvent
107 @param oneshot
if True, fire only once, otherwise fire continuously
108 until shutdown
is called [default:
False]
109 @param reset ignored (ROS1 compatibility stand-
in)
111 if callable(callback):
112 def callwrapper(period, callback):
113 """Closure for timer callback, retains state for populating TimerEvent."""
114 if isinstance(period, (int, float)): period = ros2.make_duration(period)
115 last_expected, last_real, last_duration =
None,
None,
None
116 current_expected = ros2.get_rostime() + period
119 """Feeds TimerEvent to callback, expected in rospy API."""
120 nonlocal current_expected, last_expected, last_real, last_duration
121 current_real = ros2.get_rostime()
123 callback(
TimerEvent(last_expected, last_real, current_expected, current_real,
125 last_duration = time.time() - start
126 last_expected, last_real = current_expected, current_real
127 current_expected += period
129 callback = callwrapper(period, callback)
130 return ros2.create_timer(period, callback, oneshot)
134 """Returns true if C is a ROS2 timer class, else `NotImplemented`."""
135 return True if issubclass(C, (rclpy.timer.Timer, threading.Thread))
else NotImplemented
140 "sleep",
"Rate",
"Timer"
Exception if time moved backwards.
Stand-in for `rospy.Rate`, wrapping the creation of ROS2 rate instance.
__new__(cls, hz, reset=None)
Returns rclpy.timer.Rate instance.
__subclasshook__(cls, C)
Returns true if C is a ROS2 rate class, else `NotImplemented`.
__init__(self, last_expected, last_real, current_expected, current_real, last_duration)
Stand-in for `rospy.Timer`, wrapping the creation of ROS2 timer instance.
__new__(cls, period, callback, oneshot=False, reset=None)
Returns rclpy.timer.Timer instance.
__subclasshook__(cls, C)
Returns true if C is a ROS2 timer class, else `NotImplemented`.