rosros 0.2.5
Simple unified interface to ROS1 / ROS2 Python API
Loading...
Searching...
No Matches
timer.py
Go to the documentation of this file.
1"""
2Stand-ins for `rospy.timer` in ROS2.
3
4------------------------------------------------------------------------------
5This file is part of rosros - simple unified interface to ROS1 / ROS2.
6Released under the BSD License.
7
8@author Erki Suurjaak
9@created 30.05.2022
10@modified 08.12.2023
11------------------------------------------------------------------------------
12"""
13## @namespace rosros.rospify.timer
14import abc
15import threading
16import time
17
18import rclpy.clock
19import rclpy.timer
20
21from .. import ros2
22from . import exceptions
23
24
25def sleep(duration):
26 """
27 Sleeps for the specified duration in ROS time.
28
29 @param duration seconds (or Duration) to sleep, <=0 returns immediately
30
31 @throws ROSInterruptException if ROS shutdown occurs before sleep completes
32 @throws ROSTimeMovedBackwardsException if ROS time is set backwards
33 """
34 duration = ros2.to_sec(duration)
35 if duration <= 0: return
36
37 jumpback = None
38 timer = ros2.create_timer(duration, callback=None)
39
40 def jumphandler(jump):
41 nonlocal jumpback
42 jumpback = jump
43 ros2.destroy_entity(rate)
44
45 thresh = rclpy.clock.JumpThreshold(min_forward=None, min_backward=ros2.make_duration(nsecs=-1))
46 handle = ros2.NODE.get_clock().create_jump_callback(thresh, post_callback=jumphandler)
47 rate = rclpy.timer.Rate(timer, context=ros2.NODE.context)
48 try:
49 rate.sleep()
50 except rclpy.exceptions.ROSInterruptException as e:
51 raise exceptions.ROSInterruptException(*e.args) from e
52 finally:
53 handle.unregister()
54 ros2.destroy_entity(timer)
55 if jumpback:
56 raise exceptions.ROSTimeMovedBackwardsException(ros2.to_sec(jumpback.delta))
57
58
59
60class Rate(abc.ABC):
61 """Stand-in for `rospy.Rate`, wrapping the creation of ROS2 rate instance."""
62
63 def __new__(cls, hz, reset=None):
64 """
65 Returns rclpy.timer.Rate instance.
66
67 @param hz rate to determine sleeping
68 @param reset ignored (ROS1 compatibility stand-in)
69 """
70 return ros2.create_rate(hz)
71
72 @classmethod
73 def __subclasshook__(cls, C):
74 """Returns true if C is a ROS2 rate class, else `NotImplemented`."""
75 return True if issubclass(C, rclpy.timer.Rate) else NotImplemented
76
77
78class TimerEvent:
79 """
80 Constructor.
81
82 Port of rospy.timer.TimerEvent from ROS1 to ROS2.
83
84 @param last_expected when previous callback should have happened, as rospy.Time
85 @param last_real when previous callback actually happened, as rospy.Time
86 @param current_expected when current callback should have been called, as rospy.Time
87 @param last_duration duration of the last callback (end time minus start time) in seconds,
88 as float. Note that this is always in wall-clock time.
89 """
90 def __init__(self, last_expected, last_real, current_expected, current_real, last_duration):
91 self.last_expected = last_expected
92 self.last_real = last_real
93 self.current_expected = current_expected
94 self.current_real = current_real
95 self.last_duration = last_duration
96
98class Timer(abc.ABC):
99 """Stand-in for `rospy.Timer`, wrapping the creation of ROS2 timer instance."""
100
101 def __new__(cls, period, callback, oneshot=False, reset=None):
102 """
103 Returns rclpy.timer.Timer instance.
104
105 @param period desired period between callbacks, as `Duration`
106 @param callback function to be called, taking rospy.TimerEvent
107 @param oneshot if True, fire only once, otherwise fire continuously
108 until shutdown is called [default: False]
109 @param reset ignored (ROS1 compatibility stand-in)
110 """
111 if callable(callback):
112 def callwrapper(period, callback):
113 """Closure for timer callback, retains state for populating TimerEvent."""
114 if isinstance(period, (int, float)): period = ros2.make_duration(period)
115 last_expected, last_real, last_duration = None, None, None
116 current_expected = ros2.get_rostime() + period
117
118 def inner():
119 """Feeds TimerEvent to callback, expected in rospy API."""
120 nonlocal current_expected, last_expected, last_real, last_duration
121 current_real = ros2.get_rostime()
122 start = time.time()
123 callback(TimerEvent(last_expected, last_real, current_expected, current_real,
124 last_duration))
125 last_duration = time.time() - start
126 last_expected, last_real = current_expected, current_real
127 current_expected += period
128 return inner
129 callback = callwrapper(period, callback)
130 return ros2.create_timer(period, callback, oneshot)
131
132 @classmethod
133 def __subclasshook__(cls, C):
134 """Returns true if C is a ROS2 timer class, else `NotImplemented`."""
135 return True if issubclass(C, (rclpy.timer.Timer, threading.Thread)) else NotImplemented
136
137
138
139__all__ = [
140 "sleep", "Rate", "Timer"
141]
Exception if time moved backwards.
Definition exceptions.py:69
Stand-in for `rospy.Rate`, wrapping the creation of ROS2 rate instance.
Definition timer.py:60
__new__(cls, hz, reset=None)
Returns rclpy.timer.Rate instance.
Definition timer.py:69
__subclasshook__(cls, C)
Returns true if C is a ROS2 rate class, else `NotImplemented`.
Definition timer.py:73
__init__(self, last_expected, last_real, current_expected, current_real, last_duration)
Definition timer.py:89
Stand-in for `rospy.Timer`, wrapping the creation of ROS2 timer instance.
Definition timer.py:97
__new__(cls, period, callback, oneshot=False, reset=None)
Returns rclpy.timer.Timer instance.
Definition timer.py:109
__subclasshook__(cls, C)
Returns true if C is a ROS2 timer class, else `NotImplemented`.
Definition timer.py:132