rosros 0.2.5
Simple unified interface to ROS1 / ROS2 Python API
Loading...
Searching...
No Matches
task.py
Go to the documentation of this file.
1"""
2Port of ROS2 `rclpy.task` for ROS1.
3
4------------------------------------------------------------------------------
5This file is part of rosros - simple unified interface to ROS1 / ROS2.
6Released under the BSD License.
7
8@author Erki Suurjaak
9@created 24.02.2022
10@modified 24.02.2022
11------------------------------------------------------------------------------
12"""
13## @namespace rosros.rclify.task
14
15# Original file copyright notice:
16#
17# Copyright 2016 Open Source Robotics Foundation, Inc.
18#
19# Licensed under the Apache License, Version 2.0 (the "License");
20# you may not use this file except in compliance with the License.
21# You may obtain a copy of the License at
22#
23# http://www.apache.org/licenses/LICENSE-2.0
24#
25# Unless required by applicable law or agreed to in writing, software
26# distributed under the License is distributed on an "AS IS" BASIS,
27# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28# See the License for the specific language governing permissions and
29# limitations under the License.
30
31import inspect
32import sys
33import threading
34import warnings
35
36
37class Future:
38 """Represents the outcome of a task in the future."""
39
40 def __init__(self, *, executor=None):
41 """
42 @param executor ignored (ROS2 API compatibility stand-in)
43 """
44 # true if the task is done or cancelled
45 self._done = False
46 # true if the task is cancelled
47 self._cancelled = False
48 # the final return value of the handler
49 self._result = None
50 # An exception raised by the handler when called
51 self._exception = None
52 self._exception_fetched = False
53 # callbacks to be scheduled after this task completes
54 self._callbacks = []
55 # Lock for threadsafety
56 self._lock = threading.Lock()
57
58 def __del__(self):
59 if self._exception is not None and not self._exception_fetched:
60 print('The following exception was never retrieved: ' + str(self._exception),
61 file=sys.stderr)
62
63 def __await__(self):
64 """Yield while the task is not finished"""
65 while not self._done:
66 yield
67 return self.result()
68
69 def cancel(self):
70 """Request cancellation of the running task if it is not done already."""
71 with self._lock:
72 if not self._done:
73 self._cancelled = True
75
76 def cancelled(self):
77 """
78 Indicate if the task has been cancelled.
79
80 @return True if the task was cancelled
81 """
82 return self._cancelled
83
84 def done(self):
85 """
86 Indicate if the task has finished executing.
87
88 @return True if the task is finished or raised while it was executing
89 """
90 return self._done
91
92 def result(self):
93 """
94 Get the result of a done task.
95
96 @throws Exception if one was set during the task.
97
98 @return The result set by the task
99 """
100 if self._exception:
101 raise self.exception()
102 return self._result
103
104 def exception(self):
105 """
106 Get an exception raised by a done task.
107
108 @return the exception raised by the task
109 """
110 self._exception_fetched = True
111 return self._exception
112
113 def set_result(self, result):
114 """
115 Set the result returned by a task.
116
117 @param result the output of a long running task
118 """
119 with self._lock:
120 self._result = result
121 self._done = True
122 self._cancelled = False
124
125 def set_exception(self, exception):
126 """
127 Set the exception raised by the task.
128
129 @param result the output of a long running task
130 """
131 with self._lock:
132 self._exception = exception
133 self._exception_fetched = False
134 self._done = True
135 self._cancelled = False
137
138 def _invoke_done_callbacks(self):
139 """
140 Schedule done callbacks on the executor if possible, else run them directly.
141
142 This function assumes self._lock is not held.
143 """
144 with self._lock:
145 callbacks = self._callbacks
146 self._callbacks = []
147
148 # Call right away
149 for callback in callbacks:
150 try:
151 callback(self)
152 except Exception as e:
153 # Don't let exceptions be raised because there may be more callbacks to call
154 warnings.warn('Unhandled exception in done callback: {}'.format(e))
155
156 def add_done_callback(self, callback):
157 """
158 Add a callback to be executed when the task is done.
159
160 Callbacks should not raise exceptions.
161
162 The callback may be called immediately by this method if the future is already done.
163 If this happens and the callback raises, the exception will be raised by this method.
164
165 @param callback a callback taking the future as an argument to be run when completed
166 """
167 invoke = False
168 with self._lock:
169 if self._done:
170 invoke = True
171 else:
172 self._callbacks.append(callback)
173
174 # Invoke when not holding self._lock
175 if invoke:
176 callback(self)
177
178
179class Task(Future):
180 """
181 Executes a function or coroutine.
182
183 This executes either a normal function or a coroutine to completion. On completion it creates
184 tasks for any 'done' callbacks.
185 """
186
187 def __init__(self, handler, args=None, kwargs=None, executor=None):
188 super().__init__(executor=executor)
189 # _handler is either a normal function or a coroutine
190 self._handler = handler
191 # Arguments passed into the function
192 if args is None:
193 args = []
194 self._args = args
195 if kwargs is None:
196 kwargs = {}
197 self._kwargs = kwargs
198 if inspect.iscoroutinefunction(handler):
199 self._handler = handler(*args, **kwargs)
200 self._args = None
201 self._kwargs = None
202 # True while the task is being executed
203 self._executing = False
204 # Lock acquired to prevent task from executing in parallel with itself
205 self._task_lock = threading.Lock()
206
207 def __call__(self):
208 """
209 Run or resume a task.
210
211 This attempts to execute a handler. If the handler is a coroutine it will attempt to
212 await it. If there are done callbacks it will schedule them with the executor.
213
214 The return value of the handler is stored as the task result.
215 """
216 if self._done or self._executing or not self._task_lock.acquire(blocking=False):
217 return
218 try:
219 if self._done:
220 return
221 self._executing = True
222
223 if inspect.iscoroutine(self._handler):
224 # Execute a coroutine
225 try:
226 self._handler.send(None)
227 except StopIteration as e:
228 # The coroutine finished; store the result
229 self._handler.close()
230 self.set_result(e.value)
231 self._complete_task()
232 except Exception as e:
233 self.set_exception(e)
234 self._complete_task()
235 else:
236 # Execute a normal function
237 try:
238 self.set_result(self._handler(*self._args, **self._kwargs))
239 except Exception as e:
240 self.set_exception(e)
241 self._complete_task()
242
243 self._executing = False
244 finally:
245 self._task_lock.release()
246
247 def _complete_task(self):
248 """Cleanup after task finished."""
249 self._handler = None
250 self._args = None
251 self._kwargs = None
253 def executing(self):
254 """
255 Check if the task is currently being executed.
256
257 @return True if the task is currently executing
258 """
259 return self._executing
260
261
262__all__ = ["Future", "Task"]
Represents the outcome of a task in the future.
Definition task.py:37
set_exception(self, exception)
Set the exception raised by the task.
Definition task.py:130
cancelled(self)
Indicate if the task has been cancelled.
Definition task.py:81
_invoke_done_callbacks(self)
Schedule done callbacks on the executor if possible, else run them directly.
Definition task.py:145
__init__(self, *executor=None)
Definition task.py:43
set_result(self, result)
Set the result returned by a task.
Definition task.py:118
add_done_callback(self, callback)
Add a callback to be executed when the task is done.
Definition task.py:168
done(self)
Indicate if the task has finished executing.
Definition task.py:89
exception(self)
Get an exception raised by a done task.
Definition task.py:109
__await__(self)
Yield while the task is not finished.
Definition task.py:63
cancel(self)
Request cancellation of the running task if it is not done already.
Definition task.py:69
result(self)
Get the result of a done task.
Definition task.py:99
Executes a function or coroutine.
Definition task.py:187
__init__(self, handler, args=None, kwargs=None, executor=None)
Definition task.py:189
__call__(self)
Run or resume a task.
Definition task.py:217
executing(self)
Check if the task is currently being executed.
Definition task.py:262
_complete_task(self)
Cleanup after task finished.
Definition task.py:252