3Utilities for ROS message definition texts.
5------------------------------------------------------------------------------
6This file is part of rosros - simple unified interface to ROS1 / ROS2.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace rosros.parsing
19 import rosidl_parser.parser
20 import rosidl_parser.definition
21 import rosidl_runtime_py
22except ImportError:
pass
26from . util
import memoize
33 Returns MD5 hash for message / service type definition.
35 @param subdefs definitions
for subtypes,
if any,
as ((typename, definition), )
37 pkg, parts = typename.rsplit("/", 1)[0], definition.split(
"\n---\n", 1)
40 return hashlib.md5(
"".join(texts).encode()).hexdigest()
46 Returns text for calculating message
or service request/response type definition.
48 @param subdefs definitions
for subtypes,
as ((typename, definition), )
52 FIELD_RGX = re.compile(
r"^([a-z][^\s:]+)\s+([^\s=]+)(\s*=\s*([^\n]+))?(\s+([^\n]+))?", re.I)
53 STR_CONST_RGX = re.compile(
r"^w?string\s+([^\s=#]+)\s*=")
54 lines, subdefmap = [], dict(subdefs
or ())
57 for line
in definition.splitlines():
58 if set(line) == set(
"="):
61 if "#" in line
and not STR_CONST_RGX.match(line): line = line[:line.index(
"#")]
62 match = FIELD_RGX.match(line)
63 if match
and match.group(3):
64 lines.append(
"%s %s=%s" % (match.group(1), match.group(2), match.group(4).strip()))
66 for line
in definition.splitlines():
67 if set(line) == set(
"="):
69 if "#" in line
and not STR_CONST_RGX.match(line): line = line[:line.index(
"#")]
70 match = FIELD_RGX.match(line)
71 if match
and not match.group(3):
72 scalartype, namestr = api.scalar(match.group(1)), match.group(2)
73 if scalartype
in api.ROS_COMMON_TYPES:
74 typestr = match.group(1)
75 if match.group(5): namestr = (namestr +
" " + match.group(6)).strip()
77 subtype = scalartype
if "/" in scalartype
else "std_msgs/Header" \
78 if "Header" == scalartype
else "%s/%s" % (pkg, scalartype)
80 lines.append(
"%s %s" % (typestr, namestr))
81 return "\n".join(lines).strip()
87 Returns subtype names and type definitions
from a full message definition.
89 @return {
"pkg/MsgType":
"full definition for MsgType including subtypes"}
93 curtype, curlines =
"", []
94 rgx = re.compile(
r"^((=+)|(MSG: (.+)))$")
95 for line
in typedef.splitlines():
97 if m
and m.group(2)
and curtype:
98 result[curtype] =
"\n".join(curlines)
99 curtype, curlines =
"", []
100 elif m
and m.group(4):
101 curtype, curlines = m.group(4), []
102 elif not m
and curtype:
103 curlines.append(line)
105 result[curtype] =
"\n".join(curlines)
108 FIELD_RGX = re.compile(
r"^([a-z][^\s]+)\s+([^\s=]+)(\s*=\s*([^\n]+))?(\s+([^\n]+))?", re.I)
109 for subtype, subdef
in list(result.items()):
110 pkg = subtype.rsplit(
"/", 1)[0]
111 for line
in subdef.splitlines():
112 m = FIELD_RGX.match(line)
114 scalartype, fulltype = api.scalar(m.group(1)),
None
115 if scalartype
not in api.ROS_COMMON_TYPES:
116 fulltype = scalartype
if "/" in scalartype
else "std_msgs/Header" \
117 if "Header" == scalartype
else "%s/%s" % (pkg, scalartype)
118 if fulltype
in result:
119 addendum =
"%s\nMSG: %s\n%s" % (
"=" * 80, fulltype, result[fulltype])
120 result[subtype] = result[subtype].rstrip() + (
"\n\n%s\n" % addendum)
128 Returns ROS2 message/service type definition full text.
130 Parses and assembles text
from .msg
or .srv
or .idl files on disk.
132 @param full include subtype definitions, separated
with lines of
"="
135 texts, pkg = {}, typename.rsplit(
"/", 1)[0]
136 category =
"srv" if "/srv/" in typename
else "msg"
137 if "srv" == category: full =
False
139 basepath = api.make_full_typename(typename) + (
".%s" % category)
140 with open(rosidl_runtime_py.get_interface_path(basepath))
as f:
141 texts[typename] = f.read()
144 for line
in texts[typename].splitlines()
if full
else ():
145 if not line
or not line[0].isalpha():
147 linetype = api.scalar(api.canonical(re.sub(
r"^([a-zA-Z][^\s]+)(.+)",
r"\1", line)))
148 if linetype
in api.ROS_BUILTIN_TYPES:
150 linetype = linetype
if "/" in linetype
else "std_msgs/Header" \
151 if "Header" == linetype
else "%s/%s" % (pkg, linetype)
153 if linedef: texts[linetype] = linedef
155 basedef = texts.pop(next(iter(texts)))
156 subdefs = [
"%s\nMSG: %s\n%s" % (
"=" * 80, k, v)
for k, v
in texts.items()]
157 return basedef + (
"\n" if subdefs
else "") +
"\n".join(subdefs)
163 Returns ROS2 service type definition full text.
165 Parses and assembles text
from .srv
or .idl files on disk.
170 basepath = api.make_full_typename(typename,
"srv") +
".srv"
171 with open(rosidl_runtime_py.get_interface_path(basepath))
as f:
180 """Returns ROS2 message type definition parsed from IDL file."""
183 basepath = api.make_full_typename(typename) +
".idl"
184 typepath = rosidl_runtime_py.get_interface_path(basepath)
185 with open(typepath)
as f:
186 idlcontent = rosidl_parser.parser.parse_idl_string(f.read())
187 msgidl = idlcontent.get_elements_of_type(rosidl_parser.definition.Message)[0]
193 """Returns ROS2 service type definition parsed from IDL file."""
196 basepath = api.make_full_typename(typename,
"srv") +
".idl"
197 typepath = rosidl_runtime_py.get_interface_path(basepath)
198 with open(typepath)
as f:
199 idlcontent = rosidl_parser.parser.parse_idl_string(f.read())
201 srvidl = idlcontent.get_elements_of_type(rosidl_parser.definition.Service)[0]
202 msgidls = (srvidl.request_message, srvidl.response_message)
203 return "\n---\n".join(map(rosidl_format_message_content, msgidls)).lstrip()
208 Returns canonical type name for ROS2 IDL parser entity, like
"uint8" or "nav_msgs/Path".
210 @param typeobj ROS2 IDL parser entity, like `rosidl_parser.definition.Array`
211 @param msgpackage name of parsed package
212 @param constant whether parsed item
is a constant
214 from .
import api, ros2
216 if isinstance(typeobj, rosidl_parser.definition.AbstractNestedType):
219 size, bounding =
"",
""
220 if isinstance(typeobj, rosidl_parser.definition.Array):
222 elif typeobj.has_maximum_size():
223 size = typeobj.maximum_size
224 if isinstance(typeobj, rosidl_parser.definition.BoundedSequence):
226 result =
"%s[%s%s]" % (valuetype, bounding, size)
227 elif isinstance(typeobj, rosidl_parser.definition.AbstractWString):
229 elif isinstance(typeobj, rosidl_parser.definition.AbstractString):
231 elif isinstance(typeobj, rosidl_parser.definition.NamespacedType):
232 nameparts = typeobj.namespaced_name()
233 result = api.canonical(
"/".join(nameparts))
234 if nameparts[0].value == msgpackage
or "std_msgs/Header" == result:
235 result = api.canonical(
"/".join(nameparts[-1:]))
237 result = ros2.DDS_TYPES.get(typeobj.typename, typeobj.typename)
239 if isinstance(typeobj, rosidl_parser.definition.AbstractGenericString) \
240 and typeobj.has_maximum_size()
and not constant:
241 result +=
"<=%s" % typeobj.maximum_size
247 """Returns annotation text formatted with comment prefixes and escapes."""
248 ESCAPES = {
"\n":
"\\n",
"\t":
"\\t",
"\x07":
"\\a",
249 "\x08":
"\\b",
"\x0b":
"\\v",
"\x0c":
"\\f"}
250 repl =
lambda m: ESCAPES[m.group(0)]
251 return "#" +
"\n#".join(re.sub(
"|".join(map(re.escape, ESCAPES)), repl, l)
252 for l
in text.split(
"\\n"))
256 """Returns all comments for annotatable object, as [text, ]."""
257 return [v.get(
"text",
"")
for v
in obj.get_annotation_values(
"verbatim")
258 if "comment" == v.get(
"language")]
263 Returns message IDL as .msg text.
265 @param msgidl rosidl_parser.definition.Message instance
268 package = msgidl.structure.namespaced_type.namespaces[0]
269 DUMMY = rosidl_parser.definition.EMPTY_STRUCTURE_REQUIRED_MEMBER_NAME
274 if lines
and msgidl.constants: lines.append(
"")
276 for c
in msgidl.constants:
279 lines.append(
"%s %s=%s" % (ctype, c.name, c.value))
281 if not (len(msgidl.structure.members) == 1
and DUMMY == msgidl.structure.members[0].name):
283 if msgidl.constants
and msgidl.structure.members: lines.append(
"")
285 for m
in msgidl.structure.members:
288 return "\n".join(lines)
292 "calculate_definition_hash",
"get_ros2_message_definition",
"get_ros2_service_definition"
get_ros2_service_definition_idl(typename)
Returns ROS2 service type definition parsed from IDL file.
get_ros2_message_definition(typename, full=True)
Returns ROS2 message/service type definition full text.
get_ros2_message_definition_idl(typename)
Returns ROS2 message type definition parsed from IDL file.
rosidl_format_message_content(msgidl)
Returns message IDL as .msg text.
calculate_definition_hash(typename, definition, subdefs=())
Returns MD5 hash for message / service type definition.
make_definition_hash_text(pkg, definition, subdefs=())
Returns text for calculating message or service request/response type definition.
rosidl_get_comments(obj)
Returns all comments for annotatable object, as [text, ].
get_ros2_service_definition(typename)
Returns ROS2 service type definition full text.
rosidl_format_comment(text)
Returns annotation text formatted with comment prefixes and escapes.
rosidl_format_idl_type(typeobj, msgpackage, constant=False)
Returns canonical type name for ROS2 IDL parser entity, like "uint8" or "nav_msgs/Path".
parse_definition_subtypes(typedef)
Returns subtype names and type definitions from a full message definition.