rosros 0.2.5
Simple unified interface to ROS1 / ROS2 Python API
Loading...
Searching...
No Matches
api.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3Utilities for ROS built-in types, and message and service types.
4
5------------------------------------------------------------------------------
6This file is part of rosros - simple unified interface to ROS1 / ROS2.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 11.02.2022
11@modified 30.01.2024
12------------------------------------------------------------------------------
13"""
14## @namespace rosros.api
15import datetime
16import decimal
17import inspect
18import os
19
20ros1 = ros2 = None
21if os.getenv("ROS_VERSION") == "2":
22 from . import ros2
23else:
24 from . import ros1
25ros = ros1 or ros2
26
27
28
29ROS_NUMERIC_TYPES = ["byte", "char", "int8", "int16", "int32", "int64", "uint8",
30 "uint16", "uint32", "uint64", "float32", "float64", "bool"]
31
32
33ROS_STRING_TYPES = ["string", "wstring"]
34
35
36ROS_BUILTIN_TYPES = ROS_NUMERIC_TYPES + ROS_STRING_TYPES
37
38
39ROS_BUILTIN_CTORS = {"byte": int, "char": int, "int8": int, "int16": int,
40 "int32": int, "int64": int, "uint8": int, "uint16": int,
41 "uint32": int, "uint64": int, "float32": float, "float64": float,
42 "bool": bool, "string": str, "wstring": str}
43
44
45ROS_TIME_TYPES = ros.ROS_TIME_TYPES
46
47
48ROS_TIME_CLASSES = ros.ROS_TIME_CLASSES
49
50
51ROS_COMMON_TYPES = ROS_BUILTIN_TYPES + ROS_TIME_TYPES
52
53
54ROS_ALIAS_TYPES = ros.ROS_ALIAS_TYPES
55
56
57PARAM_SEPARATOR = ros.PARAM_SEPARATOR
58
59
60PRIVATE_PREFIX = ros.PRIVATE_PREFIX
61
62
63FAMILY = ros.FAMILY
64
65
66ROSLogHandler = ros.ROSLogHandler
67
68
69def canonical(typename):
70 """Returns "pkg/Type" for "pkg/subdir/Type"."""
71 return ros.canonical(typename)
72
73
75 """Returns parameter name with correct separator for ROS version, and leading sigils stripped."""
76 return ros.format_param_name(name)
77
78
79def get_message_class(msg_or_type):
80 """
81 Returns ROS message / service class object.
82
83 @param msg_or_type full or canonical class name,
84 like "std_msgs/Bool" or "std_srvs/SetBool" or "std_srvs/SetBoolRequest";
85 or class instance like `std_msgs.msg.Bool()`
86 @return ROS message / service class object, like `std_msgs.msg.Bool`
87 or `std_srvs.srv.SetBool` or `std_srvs.srv.SetBoolRequest`,
88 or None if not found
89 """
90 return ros.get_message_class(msg_or_type)
91
92
93def get_message_definition(msg_or_type, full=True):
94 """
95 Returns ROS message or service request/response type definition text.
96
97 Text will include subtype definitions by default.
98
99 @param msg_or_type canonical or full class name like "std_msgs/Bool" or "std_msgs/msg/Bool",
100 or class instance like `std_msgs.msg.Bool()`,
101 or class object like `std_msgs.msg.Bool`
102 @param full include definitions of nested types, separated by "\n---\nMSG: pkg/Type\n"
103 (ignored for service request/response types)
104 @return message type definition text
105 """
106 return ros.get_message_definition(msg_or_type, full)
107
108
109def get_message_fields(val):
110 """
111 Returns {field name: field type name} if ROS message or service request/response, else {}.
113 @param val ROS message or service request/response instance, or class object
114 """
115 return ros.get_message_fields(val)
116
117
118def get_message_header(val):
119 """
120 Returns message `std_msgs/Header`-attribute if any, else `None`.
122 @param val ROS message or service request/response instance
123 """
124 return ros.get_message_header(val)
125
126
127def get_message_type(msg_or_cls):
128 """
129 Returns ROS message / service canonical type name, like "std_msgs/Header".
130
131 Returns "*" for `AnyMsg`.
132
133 @param msg_or_cls class instance like `std_msgs.msg.Bool()`,
134 or class object like `std_msgs.msg.Bool`
135 @return canonical name, or `None` if not ROS message / service
136 """
137 return ros.get_message_type(msg_or_cls)
138
139
140def get_message_type_hash(msg_or_type):
141 """
142 Returns ROS message / service type MD5 hash.
143
144 @param msg_or_type full or canonical class name
145 like "std_msgs/Bool" or "std_srvs/SetBool" or "std_srvs/SetBoolRequest",
146 or class instance like `std_msgs.msg.Bool()`,
147 or class object like `std_msgs.msg.Bool`
148 """
149 return ros.get_message_type_hash(msg_or_type)
150
151
152def get_message_value(msg, name, default=...):
153 """
154 Returns object attribute value, with numeric arrays converted to lists.
155
156 @param name message attribute name; may also be (nested, path) or "nested.path"
157 @param default value to return if attribute does not exist; raises exception otherwise
158 """
159 return ros.get_message_value(msg, name, default)
160
161
162def get_service_definition(srv_or_type):
163 """
164 Returns ROS service type definition text.
165
166 @param srv_or_type canonical or full class name
167 like "std_srvs/SetBool" or "std_srvs/srv/SetBool",
168 or class instance like `std_srvs.srv.SetBool()`,
169 or class object like `std_srvs.srv.SetBool`
170 @return ROS service type definition text
171 """
172 return ros.get_service_definition(srv_or_type)
173
174
175def get_service_request_class(srv_or_type):
176 """
177 Returns ROS service request class object.
179 @param srv_or_type canonical or full class name
180 like "std_srvs/SetBool" or "std_srvs/srv/SetBool",
181 or class instance like `std_srvs.srv.SetBool()`,
182 or class object like `std_srvs.srv.SetBool`
183 @return ROS service request class object, like `std_srvs.srv.SetBoolRequest`
184 """
185 return ros.get_service_request_class(srv_or_type)
186
187
188def get_service_response_class(srv_or_type):
189 """
190 Returns ROS service response class object.
191
192 @param srv_or_type canonical or full class name
193 like "std_srvs/SetBool" or "std_srvs/srv/SetBool",
194 or class instance like `std_srvs.srv.SetBool()`,
195 or class object like `std_srvs.srv.SetBool`
196 @return ROS service response class object, like `std_srvs.srv.SetBoolResponse`
197 """
198 return ros.get_service_response_class(srv_or_type)
200
201def get_type_alias(typename):
202 """
203 Returns alias like "char" for ROS built-in type, if any; reverse of get_alias_type().
204
205 In ROS1, byte and char are aliases for int8 and uint8; in ROS2 the reverse.
206 """
207 return next((k for k, v in ROS_ALIAS_TYPES.items() if v == typename), None)
209
210def get_alias_type(typename):
211 """
212 Returns ROS built-in type for alias like "char", if any; reverse of get_type_alias().
213
214 In ROS1, byte and char are aliases for int8 and uint8; in ROS2 the reverse.
215 """
216 return ROS_ALIAS_TYPES.get(typename)
217
219def is_ros_message(val):
220 """
221 Returns whether value is a ROS message or service request/response class or instance.
223 @param val like `std_msgs.msg.Bool()` or `std_srvs.srv.SetBoolRequest`
224 @return True if value is a ROS message or service request/response class or instance,
225 False otherwise
226 """
227 return ros.is_ros_message(val)
228
229
230def is_ros_service(val):
231 """Returns whether value is a ROS service class object."""
232 return ros.is_ros_service(val)
233
234
235def is_ros_time(val):
236 """Returns whether value is a ROS time/duration class or instance."""
237 return ros.is_ros_time(val)
238
239
240def make_duration(secs=0, nsecs=0):
241 """Returns a ROS duration."""
242 return ros.make_duration(secs=secs, nsecs=nsecs)
243
244
245def make_time(secs=0, nsecs=0):
246 """Returns a ROS time."""
247 return ros.make_time(secs=secs, nsecs=nsecs)
248
249
250def make_full_typename(typename, category="msg"):
251 """
252 Returns "pkg/msg/Type" for "pkg/Type".
253
254 @param category type category like "msg" or "srv"
255 """
256 INTER = "/%s/" % category
257 if INTER in typename or "/" not in typename or typename.startswith("%s/" % FAMILY):
258 return typename
259 return INTER.join(next((x[0], x[-1]) for x in [typename.split("/")]))
260
261
262def dict_to_message(dct, msg_or_type):
263 """
264 Returns ROS message populated from Python dictionary.
265
266 Raises TypeError on attribute value type mismatch.
267
268 @param msg_or_type canonical or full class name like "std_msgs/Bool" or "std_msgs/msg/Bool",
269 or class instance like `std_msgs.msg.Bool()`,
270 or class object like `std_msgs.msg.Bool`
271 """
272 msg = get_message_class(msg_or_type) if isinstance(msg_or_type, str) else msg_or_type
273 msg = msg() if inspect.isclass(msg) else msg
274 for name, typename in ros.get_message_fields(msg).items():
275 if name not in dct:
276 continue # for name,
277 v, msgv = dct[name], ros.get_message_value(msg, name)
278
279 if ros.is_ros_message(msgv):
280 v = v if ros.is_ros_message(v) else dict_to_message(v, msgv)
281 elif isinstance(msgv, (list, tuple)):
282 scalarname = ros.scalar(typename)
283 if scalarname in ROS_BUILTIN_TYPES:
284 cls = ROS_BUILTIN_CTORS[scalarname]
285 v = [x if isinstance(x, cls) else cls(x) for x in v]
286 else:
287 cls = ros.get_message_class(scalarname)
288 v = [x if ros.is_ros_message(x) else dict_to_message(x, cls()) for x in v]
289 else:
290 v = type(msgv)(v)
291
292 setattr(msg, name, v)
293 return msg
294
295
296def message_to_dict(msg, replace=None):
297 """
298 Returns ROS message as nested Python dictionary.
299
300 @param replace mapping of {value: replaced value},
301 e.g. {math.nan: None, math.inf: None}
302 """
303 result = {} if ros.is_ros_message(msg) else msg
304 for name, typename in ros.get_message_fields(msg).items():
305 v = ros.get_message_value(msg, name)
306 if ros.is_ros_time(v):
307 v = dict(zip(get_message_fields(v), ros.to_sec_nsec(v)))
308 elif ros.is_ros_message(v):
309 v = message_to_dict(v)
310 elif isinstance(v, (list, tuple)):
311 if ros.scalar(typename) not in ROS_BUILTIN_TYPES:
312 v = [message_to_dict(x) for x in v]
313 elif replace:
314 v = [replace.get(x, x) for x in v]
315 elif replace:
316 v = replace.get(v, v)
317 result[name] = v
318 return result
319
320
321def message_to_str(msg, indent=None):
322 """
323 Returns ROS message as an evaluatable string, e.g. "std_msgs.msg.UInt8(data=0)".
324
325 @param indent multi-line indentation level to use if not returning one-liner;
326 as the number of spaces or the string to indent with
327 """
328 if not is_ros_message(msg):
329 return repr(msg)
330 indent = " " * indent if isinstance(indent, int) else indent or ""
331 sep, start, end = (",\n", "\n%s" % indent, "\n") if indent else (", ", "", "")
332 parts = []
333 for k in get_message_fields(msg):
334 v = get_message_value(msg, k)
335 if is_ros_message(v):
336 v = message_to_str(v, indent)
337 if indent:
338 v = v.replace("\n", start)
339 elif isinstance(v, bytes):
340 v = "bytes(%s)" % list(v)
341 elif isinstance(v, list) and v:
342 nested = sep.join(message_to_str(x, indent) for x in v)
343 if indent:
344 nested = indent + nested.replace("\n", "\n%s" % (indent * 2))
345 v = "[%s%s%s%s]" % (start, nested, end, indent)
346 else:
347 v = repr(v)
348 parts.append("%s=%s" % (k, v))
349 name = make_full_typename(get_message_type(msg)).replace("/", ".")
350 return "%s(%s%s%s)" % (name, start, (sep + indent).join(parts), end)
351
352
353def serialize_message(msg):
354 """Returns ROS message or service request/response as a serialized binary of `bytes()`."""
355 return ros.serialize_message(msg)
356
357
358def deserialize_message(raw, cls_or_typename):
359 """Returns ROS message or service request/response instantiated from serialized binary."""
360 return ros.deserialize_message(raw, cls_or_typename)
361
362
363def scalar(typename):
364 """
365 Returns scalar type from ROS message data type, like "uint8" from uint8-array.
366
367 Returns type unchanged if an ordinary type. In ROS2, returns unbounded type,
368 e.g. "string" from "string<=10[<=5]".
369 """
370 return ros.scalar(typename)
371
372
373def time_category(msg_or_type):
374 """
375 Returns "time" or "duration" for time/duration type, else original value.
376
377 @param msg_or_type full or canonical class name
378 like "duration" or "builtin_interfaces/Time",
379 or class instance like `rospy.Time()`,
380 or class object like `rclpy.time.Time`
381 """
382 typename = msg_or_type
383 if not isinstance(typename, str):
384 cls = msg_or_type if inspect.isclass(msg_or_type) else type(msg_or_type)
385 typename = cls.__name__ if issubclass(cls, tuple(ROS_TIME_CLASSES)) else None
386 return msg_or_type if not typename else "duration" if "duration" in typename.lower() else "time"
387
388
389def time_message(val, to_message=True, clock_type=None):
390 """
391 Converts ROS2 time/duration between `rclpy` and `builtin_interfaces` objects.
392
393 @param val ROS2 time/duration object from `rclpy` or `builtin_interfaces`
394 @param to_message whether to convert from `rclpy` to `builtin_interfaces` or vice versa
395 @param clock_type ClockType for converting to `rclpy.Time`, defaults to `ROS_TIME`
396 @return value converted to appropriate type, or original value if not convertible
397 """
398 if ros1: return val
399 return ros2.time_message(val, to_message, clock_type=clock_type)
400
401
402def to_datetime(val):
403 """Returns value as datetime.datetime if value is ROS time/duration, else value."""
404 sec = ros.to_sec(val)
405 return datetime.datetime.fromtimestamp(sec) if sec is not val else val
406
408def to_decimal(val):
409 """Returns value as decimal.Decimal if value is ROS time/duration, else value."""
410 if ros.is_ros_time(val) and not inspect.isclass(val):
411 return decimal.Decimal("%d.%09d" % ros.to_sec_nsec(val))
412 return val
413
414
415def to_duration(val):
416 """Returns value as ROS duration if convertible (int/float/time/datetime/decimal), else value."""
417 return ros.to_duration(val)
418
419
420def to_nsec(val):
421 """Returns value in nanoseconds if value is ROS time/duration, else value."""
422 return ros.to_nsec(val)
423
424
425def to_sec(val):
426 """Returns value in seconds if value is ROS time/duration, else value."""
427 return ros.to_sec(val)
428
429
430def to_sec_nsec(val):
431 """Returns value as (seconds, nanoseconds) if value is ROS time/duration, else value."""
432 return ros.to_sec_nsec(val)
433
434
435def to_time(val):
436 """Returns value as ROS time if convertible (int/float/duration/datetime/decimal), else value."""
437 return ros.to_time(val)
438
439
440__all__ = [
441 "ROSLogHandler", "FAMILY", "PARAM_SEPARATOR", "PRIVATE_PREFIX", "ROS_ALIAS_TYPES",
442 "ROS_BUILTIN_CTORS", "ROS_BUILTIN_TYPES", "ROS_COMMON_TYPES", "ROS_NUMERIC_TYPES",
443 "ROS_STRING_TYPES", "ROS_TIME_CLASSES", "ROS_TIME_TYPES", "canonical",
444 "deserialize_message", "dict_to_message", "format_param_name", "get_alias_type",
445 "get_message_class", "get_message_definition", "get_message_fields",
446 "get_message_type", "get_message_type_hash", "get_message_value", "get_service_definition",
447 "get_service_request_class", "get_service_response_class", "get_type_alias", "is_ros_message",
448 "is_ros_service", "is_ros_time", "make_duration", "make_full_typename", "make_time",
449 "message_to_dict", "scalar", "serialize_message", "time_message", "to_datetime", "to_decimal",
450 "to_duration", "to_nsec", "to_sec", "to_sec_nsec", "to_time"
451]
dict_to_message(dct, msg_or_type)
Returns ROS message populated from Python dictionary.
Definition api.py:262
format_param_name(name)
Returns parameter name with correct separator for ROS version, and leading sigils stripped.
Definition api.py:74
get_message_fields(val)
Returns {field name: field type name} if ROS message or service request/response, else {}.
Definition api.py:112
deserialize_message(raw, cls_or_typename)
Returns ROS message or service request/response instantiated from serialized binary.
Definition api.py:347
get_message_header(val)
Returns message `std_msgs/Header`-attribute if any, else `None`.
Definition api.py:121
get_message_value(msg, name, default=...)
Returns object attribute value, with numeric arrays converted to lists.
Definition api.py:154
to_decimal(val)
Returns value as decimal.Decimal if value is ROS time/duration, else value.
Definition api.py:395
is_ros_message(val)
Returns whether value is a ROS message or service request/response class or instance.
Definition api.py:218
canonical(typename)
Returns "pkg/Type" for "pkg/subdir/Type".
Definition api.py:69
time_category(msg_or_type)
Returns "time" or "duration" for time/duration type, else original value.
Definition api.py:369
get_type_alias(typename)
Returns alias like "char" for ROS built-in type, if any; reverse of get_alias_type().
Definition api.py:199
is_ros_service(val)
Returns whether value is a ROS service class object.
Definition api.py:222
get_message_type_hash(msg_or_type)
Returns ROS message / service type MD5 hash.
Definition api.py:144
to_nsec(val)
Returns value in nanoseconds if value is ROS time/duration, else value.
Definition api.py:407
to_sec_nsec(val)
Returns value as (seconds, nanoseconds) if value is ROS time/duration, else value.
Definition api.py:417
scalar(typename)
Returns scalar type from ROS message data type, like "uint8" from uint8-array.
Definition api.py:358
to_time(val)
Returns value as ROS time if convertible (int/float/duration/datetime/decimal), else value.
Definition api.py:422
get_service_request_class(srv_or_type)
Returns ROS service request class object.
Definition api.py:178
get_message_type(msg_or_cls)
Returns ROS message / service canonical type name, like "std_msgs/Header".
Definition api.py:133
to_duration(val)
Returns value as ROS duration if convertible (int/float/time/datetime/decimal), else value.
Definition api.py:402
serialize_message(msg)
Returns ROS message or service request/response as a serialized binary of `bytes()`.
Definition api.py:342
get_message_class(msg_or_type)
Returns ROS message / service class object.
Definition api.py:88
make_full_typename(typename, category="msg")
Returns "pkg/msg/Type" for "pkg/Type".
Definition api.py:247
make_time(secs=0, nsecs=0)
Returns a ROS time.
Definition api.py:237
to_datetime(val)
Returns value as datetime.datetime if value is ROS time/duration, else value.
Definition api.py:389
get_alias_type(typename)
Returns ROS built-in type for alias like "char", if any; reverse of get_type_alias().
Definition api.py:208
message_to_dict(msg, replace=None)
Returns ROS message as nested Python dictionary.
Definition api.py:292
get_message_definition(msg_or_type, full=True)
Returns ROS message or service request/response type definition text.
Definition api.py:103
message_to_str(msg, indent=None)
Returns ROS message as an evaluatable string, e.g.
Definition api.py:316
to_sec(val)
Returns value in seconds if value is ROS time/duration, else value.
Definition api.py:412
get_service_response_class(srv_or_type)
Returns ROS service response class object.
Definition api.py:190
get_service_definition(srv_or_type)
Returns ROS service type definition text.
Definition api.py:166
is_ros_time(val)
Returns whether value is a ROS time/duration class or instance.
Definition api.py:227
time_message(val, to_message=True, clock_type=None)
Converts ROS2 time/duration between `rclpy` and `builtin_interfaces` objects.
Definition api.py:384
make_duration(secs=0, nsecs=0)
Returns a ROS duration.
Definition api.py:232