rosros 0.2.5
Simple unified interface to ROS1 / ROS2 Python API
Loading...
Searching...
No Matches
parsing.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3Utilities for ROS message definition texts.
4
5------------------------------------------------------------------------------
6This file is part of rosros - simple unified interface to ROS1 / ROS2.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 12.02.2022
11@modified 14.04.2022
12------------------------------------------------------------------------------
13"""
14## @namespace rosros.parsing
15import hashlib
16import re
17
18try: # ROS2 only
19 import rosidl_parser.parser
20 import rosidl_parser.definition
21 import rosidl_runtime_py
22except ImportError: pass
23
24
25# from . import api # Imported late to avoid circular import
26from . util import memoize
27
28
29
30@memoize
31def calculate_definition_hash(typename, definition, subdefs=()):
32 """
33 Returns MD5 hash for message / service type definition.
34
35 @param subdefs definitions for subtypes, if any, as ((typename, definition), )
36 """
37 pkg, parts = typename.rsplit("/", 1)[0], definition.split("\n---\n", 1)
38 subdefs = (subdefs or ()) + tuple(parse_definition_subtypes(definition).items())
39 texts = [make_definition_hash_text(pkg, x, subdefs) for x in parts]
40 return hashlib.md5("".join(texts).encode()).hexdigest()
41
42
43@memoize
44def make_definition_hash_text(pkg, definition, subdefs=()):
45 """
46 Returns text for calculating message or service request/response type definition.
47
48 @param subdefs definitions for subtypes, as ((typename, definition), )
49 """
50 from . import api # Imported late to avoid circular import
51 # "type name (= constvalue)?" or "type name (defaultvalue)?" (ROS2 format)
52 FIELD_RGX = re.compile(r"^([a-z][^\s:]+)\s+([^\s=]+)(\s*=\s*([^\n]+))?(\s+([^\n]+))?", re.I)
53 STR_CONST_RGX = re.compile(r"^w?string\s+([^\s=#]+)\s*=")
54 lines, subdefmap = [], dict(subdefs or ())
55
56 # First pass: collect constants
57 for line in definition.splitlines():
58 if set(line) == set("="): # Subtype separator
59 break # for line
60 # String constants cannot have line comments
61 if "#" in line and not STR_CONST_RGX.match(line): line = line[:line.index("#")]
62 match = FIELD_RGX.match(line)
63 if match and match.group(3):
64 lines.append("%s %s=%s" % (match.group(1), match.group(2), match.group(4).strip()))
65 # Second pass: collect fields and subtype hashes
66 for line in definition.splitlines():
67 if set(line) == set("="): # Subtype separator
68 break # for line
69 if "#" in line and not STR_CONST_RGX.match(line): line = line[:line.index("#")]
70 match = FIELD_RGX.match(line)
71 if match and not match.group(3): # Not constant
72 scalartype, namestr = api.scalar(match.group(1)), match.group(2)
73 if scalartype in api.ROS_COMMON_TYPES:
74 typestr = match.group(1)
75 if match.group(5): namestr = (namestr + " " + match.group(6)).strip()
76 else:
77 subtype = scalartype if "/" in scalartype else "std_msgs/Header" \
78 if "Header" == scalartype else "%s/%s" % (pkg, scalartype)
79 typestr = calculate_definition_hash(subtype, subdefmap[subtype], subdefs)
80 lines.append("%s %s" % (typestr, namestr))
81 return "\n".join(lines).strip()
82
83
84@memoize
85def parse_definition_subtypes(typedef):
86 """
87 Returns subtype names and type definitions from a full message definition.
88
89 @return {"pkg/MsgType": "full definition for MsgType including subtypes"}
90 """
91 from . import api # Imported late to avoid circular import
92 result = {} # {subtypename: subtypedef}
93 curtype, curlines = "", []
94 rgx = re.compile(r"^((=+)|(MSG: (.+)))$") # Group 2: separator, 4: new type
95 for line in typedef.splitlines():
96 m = rgx.match(line)
97 if m and m.group(2) and curtype: # Separator line between nested definitions
98 result[curtype] = "\n".join(curlines)
99 curtype, curlines = "", []
100 elif m and m.group(4): # Start of nested definition "MSG: pkg/MsgType"
101 curtype, curlines = m.group(4), []
102 elif not m and curtype: # Nested definition content
103 curlines.append(line)
104 if curtype:
105 result[curtype] = "\n".join(curlines)
106
107 # "type name (= constvalue)?" or "type name (defaultvalue)?" (ROS2 format)
108 FIELD_RGX = re.compile(r"^([a-z][^\s]+)\s+([^\s=]+)(\s*=\s*([^\n]+))?(\s+([^\n]+))?", re.I)
109 for subtype, subdef in list(result.items()):
110 pkg = subtype.rsplit("/", 1)[0]
111 for line in subdef.splitlines():
112 m = FIELD_RGX.match(line)
113 if m and m.group(1):
114 scalartype, fulltype = api.scalar(m.group(1)), None
115 if scalartype not in api.ROS_COMMON_TYPES:
116 fulltype = scalartype if "/" in scalartype else "std_msgs/Header" \
117 if "Header" == scalartype else "%s/%s" % (pkg, scalartype)
118 if fulltype in result:
119 addendum = "%s\nMSG: %s\n%s" % ("=" * 80, fulltype, result[fulltype])
120 result[subtype] = result[subtype].rstrip() + ("\n\n%s\n" % addendum)
121 return result
122
123
124
125@memoize
126def get_ros2_message_definition(typename, full=True):
127 """
128 Returns ROS2 message/service type definition full text.
129
130 Parses and assembles text from .msg or .srv or .idl files on disk.
131
132 @param full include subtype definitions, separated with lines of "="
133 """
134 from . import api # Imported late to avoid circular import
135 texts, pkg = {}, typename.rsplit("/", 1)[0]
136 category = "srv" if "/srv/" in typename else "msg"
137 if "srv" == category: full = False
138 try:
139 basepath = api.make_full_typename(typename) + (".%s" % category)
140 with open(rosidl_runtime_py.get_interface_path(basepath)) as f:
141 texts[typename] = f.read()
142 except Exception: # .msg/.srv file unavailable: parse IDL
143 texts[typename] = get_ros2_message_definition_idl(typename)
144 for line in texts[typename].splitlines() if full else ():
145 if not line or not line[0].isalpha():
146 continue # for line
147 linetype = api.scalar(api.canonical(re.sub(r"^([a-zA-Z][^\s]+)(.+)", r"\1", line)))
148 if linetype in api.ROS_BUILTIN_TYPES:
149 continue # for line
150 linetype = linetype if "/" in linetype else "std_msgs/Header" \
151 if "Header" == linetype else "%s/%s" % (pkg, linetype)
152 linedef = None if linetype in texts else get_ros2_message_definition(linetype)
153 if linedef: texts[linetype] = linedef
154
155 basedef = texts.pop(next(iter(texts)))
156 subdefs = ["%s\nMSG: %s\n%s" % ("=" * 80, k, v) for k, v in texts.items()]
157 return basedef + ("\n" if subdefs else "") + "\n".join(subdefs)
158
159
160@memoize
161def get_ros2_service_definition(typename):
162 """
163 Returns ROS2 service type definition full text.
164
165 Parses and assembles text from .srv or .idl files on disk.
166 """
167 from . import api # Imported late to avoid circular import
168 text = None
169 try:
170 basepath = api.make_full_typename(typename, "srv") + ".srv"
171 with open(rosidl_runtime_py.get_interface_path(basepath)) as f:
172 text = f.read()
173 except Exception: # .srv file unavailable: parse IDL
174 text = get_ros2_service_definition_idl(typename)
175 return text
176
177
178@memoize
180 """Returns ROS2 message type definition parsed from IDL file."""
181 from . import api # Imported late to avoid circular import
182
183 basepath = api.make_full_typename(typename) + ".idl"
184 typepath = rosidl_runtime_py.get_interface_path(basepath)
185 with open(typepath) as f:
186 idlcontent = rosidl_parser.parser.parse_idl_string(f.read())
187 msgidl = idlcontent.get_elements_of_type(rosidl_parser.definition.Message)[0]
188 return rosidl_format_message_content(msgidl)
189
190
191@memoize
193 """Returns ROS2 service type definition parsed from IDL file."""
194 from . import api # Imported late to avoid circular import
195
196 basepath = api.make_full_typename(typename, "srv") + ".idl"
197 typepath = rosidl_runtime_py.get_interface_path(basepath)
198 with open(typepath) as f:
199 idlcontent = rosidl_parser.parser.parse_idl_string(f.read())
200
201 srvidl = idlcontent.get_elements_of_type(rosidl_parser.definition.Service)[0]
202 msgidls = (srvidl.request_message, srvidl.response_message)
203 return "\n---\n".join(map(rosidl_format_message_content, msgidls)).lstrip()
204
205
206def rosidl_format_idl_type(typeobj, msgpackage, constant=False):
207 """
208 Returns canonical type name for ROS2 IDL parser entity, like "uint8" or "nav_msgs/Path".
209
210 @param typeobj ROS2 IDL parser entity, like `rosidl_parser.definition.Array`
211 @param msgpackage name of parsed package
212 @param constant whether parsed item is a constant
213 """
214 from . import api, ros2 # Imported late to avoid circular import
215 result = None
216 if isinstance(typeobj, rosidl_parser.definition.AbstractNestedType):
217 # Array, BoundedSequence, UnboundedSequence
218 valuetype = rosidl_format_idl_type(typeobj.value_type, msgpackage, constant)
219 size, bounding = "", ""
220 if isinstance(typeobj, rosidl_parser.definition.Array):
221 size = typeobj.size
222 elif typeobj.has_maximum_size():
223 size = typeobj.maximum_size
224 if isinstance(typeobj, rosidl_parser.definition.BoundedSequence):
225 bounding = "<="
226 result = "%s[%s%s]" % (valuetype, bounding, size) # type[], type[N], type[<=N]
227 elif isinstance(typeobj, rosidl_parser.definition.AbstractWString):
228 result = "wstring"
229 elif isinstance(typeobj, rosidl_parser.definition.AbstractString):
230 result = "string"
231 elif isinstance(typeobj, rosidl_parser.definition.NamespacedType):
232 nameparts = typeobj.namespaced_name()
233 result = api.canonical("/".join(nameparts))
234 if nameparts[0].value == msgpackage or "std_msgs/Header" == result:
235 result = api.canonical("/".join(nameparts[-1:])) # Omit package if local or Header
236 else: # Primitive like int8
237 result = ros2.DDS_TYPES.get(typeobj.typename, typeobj.typename)
238
239 if isinstance(typeobj, rosidl_parser.definition.AbstractGenericString) \
240 and typeobj.has_maximum_size() and not constant: # Constants get parsed into "string<=N"
241 result += "<=%s" % typeobj.maximum_size
242
243 return result
244
245
247 """Returns annotation text formatted with comment prefixes and escapes."""
248 ESCAPES = {"\n": "\\n", "\t": "\\t", "\x07": "\\a",
249 "\x08": "\\b", "\x0b": "\\v", "\x0c": "\\f"}
250 repl = lambda m: ESCAPES[m.group(0)]
251 return "#" + "\n#".join(re.sub("|".join(map(re.escape, ESCAPES)), repl, l)
252 for l in text.split("\\n"))
253
254
256 """Returns all comments for annotatable object, as [text, ]."""
257 return [v.get("text", "") for v in obj.get_annotation_values("verbatim")
258 if "comment" == v.get("language")]
259
260
262 """
263 Returns message IDL as .msg text.
264
265 @param msgidl rosidl_parser.definition.Message instance
266 """
267 lines = []
268 package = msgidl.structure.namespaced_type.namespaces[0]
269 DUMMY = rosidl_parser.definition.EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME
270
271 # Add general comments
272 lines.extend(map(rosidl_format_comment, rosidl_get_comments(msgidl.structure)))
273 # Add blank line between general comments and constants
274 if lines and msgidl.constants: lines.append("")
275 # Add constants
276 for c in msgidl.constants:
277 ctype = rosidl_format_idl_type(c.type, package, constant=True)
278 lines.extend(map(rosidl_format_comment, rosidl_get_comments(c)))
279 lines.append("%s %s=%s" % (ctype, c.name, c.value))
280 # (Parser adds dummy placeholder if constants-only message)
281 if not (len(msgidl.structure.members) == 1 and DUMMY == msgidl.structure.members[0].name):
282 # Add blank line between constants and fields
283 if msgidl.constants and msgidl.structure.members: lines.append("")
284 # Add fields
285 for m in msgidl.structure.members:
286 lines.extend(map(rosidl_format_comment, rosidl_get_comments(m)))
287 lines.append("%s %s" % (rosidl_format_idl_type(m.type, package), m.name))
288 return "\n".join(lines)
289
290
291__all__ = [
292 "calculate_definition_hash", "get_ros2_message_definition", "get_ros2_service_definition"
293]
get_ros2_service_definition_idl(typename)
Returns ROS2 service type definition parsed from IDL file.
Definition parsing.py:192
get_ros2_message_definition(typename, full=True)
Returns ROS2 message/service type definition full text.
Definition parsing.py:133
get_ros2_message_definition_idl(typename)
Returns ROS2 message type definition parsed from IDL file.
Definition parsing.py:179
rosidl_format_message_content(msgidl)
Returns message IDL as .msg text.
Definition parsing.py:266
calculate_definition_hash(typename, definition, subdefs=())
Returns MD5 hash for message / service type definition.
Definition parsing.py:36
make_definition_hash_text(pkg, definition, subdefs=())
Returns text for calculating message or service request/response type definition.
Definition parsing.py:49
rosidl_get_comments(obj)
Returns all comments for annotatable object, as [text, ].
Definition parsing.py:255
get_ros2_service_definition(typename)
Returns ROS2 service type definition full text.
Definition parsing.py:166
rosidl_format_comment(text)
Returns annotation text formatted with comment prefixes and escapes.
Definition parsing.py:246
rosidl_format_idl_type(typeobj, msgpackage, constant=False)
Returns canonical type name for ROS2 IDL parser entity, like "uint8" or "nav_msgs/Path".
Definition parsing.py:213
parse_definition_subtypes(typedef)
Returns subtype names and type definitions from a full message definition.
Definition parsing.py:90