grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
generate_msgs.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Generates random ROS messages and publishes them to live topics.
5
6Usage:
7
8 generate_msgs.py [TYPE [TYPE ...]]
9 [--no-type TYPE [TYPE ...]]
10 [--max-count NUM]
11 [--max-per-topic NUM]
12 [--max-topics NUM]
13 [--topics-per-type NUM]
14 [--interval SECONDS]
15 [--publish-prefix PREFIX]
16 [--publish-suffix SUFFIX]
17 [--no-latch]
18 [--verbose]
19 [--option KEY=VALUE [KEY=VALUE ...]]
20
21Topic names default to "/generate_msgs/type", like "/generate_msgs/std_msgs/Bool".
22
23Supports both ROS1 and ROS2, version detected from environment.
24
25Stand-alone script, requires ROS1 / ROS2 Python libraries.
26ROS1 requires ROS master to be running.
27
28------------------------------------------------------------------------------
29This file is part of grepros - grep for ROS bag files and live topics.
30Released under the BSD License.
31
32@author Erki Suurjaak
33@created 05.02.2022
34@modified 09.02.2022
35------------------------------------------------------------------------------
36"""
37import argparse
38import collections
39import json
40import os
41import random
42import re
43import signal
44import string
45import subprocess
46import sys
47import threading
48import time
49import traceback
50
51rospy = rclpy = None
52if os.getenv("ROS_VERSION") != "2":
53 import genpy
54 import roslib.message
55 import rospy
56else:
57 import builtin_interfaces.msg
58 import rclpy
59 import rclpy.duration
60 import rclpy.qos
61 import rclpy.time
62 import rosidl_runtime_py.utilities
63
64
65ARGUMENTS = {
66 "description": "Generates random ROS messages, publishes to live topics.",
67 "arguments": [
68 dict(args=["TYPES"], nargs="*", metavar="TYPE",
69 help="ROS message types to use if not all available,\n"
70 '(supports * wildcards, like "geometry_msgs/*")'),
71
72 dict(args=["-v", "--verbose"],
73 dest="VERBOSE", default=False, action="store_true",
74 help="print each emitted message"),
75
76 dict(args=["--no-type"],
77 dest="SKIP_TYPES", metavar="TYPE", nargs="+", default=[], action="append",
78 help="ROS message types to skip (supports * wildcards)"),
79
80 dict(args=["-m", "--max-count"],
81 dest="COUNT", metavar="NUM", default=0, type=int,
82 help="maximum number of messages to emit"),
83
84 dict(args=["--max-topics"],
85 dest="MAX_TOPICS", metavar="NUM", default=0, type=int,
86 help="maximum number of topics to emit"),
87
88 dict(args=["--max-per-topic"],
89 dest="MAX_PER_TOPIC", metavar="NUM", default=0, type=int,
90 help="number of messages to emit in each topic"),
91
92 dict(args=["--topics-per-type"],
93 dest="TOPICS_PER_TYPE", metavar="NUM", default=1, type=int,
94 help="number of topics to emit per message type (default 1)"),
95
96 dict(args=["--interval"],
97 dest="INTERVAL", metavar="SECONDS", default=0.5, type=float,
98 help="live publish interval (default 0.5)"),
99
100 dict(args=["--publish-prefix"],
101 dest="PUBLISH_PREFIX", metavar="PREFIX", default="generate_msgs",
102 help='prefix to prepend to topic name (default "generate_msgs")'),
103
104 dict(args=["--publish-suffix"],
105 dest="PUBLISH_SUFFIX", metavar="SUFFIX", default="",
106 help='suffix to append to topic name'),
107
108 dict(args=["--no-latch"],
109 dest="LATCH", default=True, action="store_false",
110 help="do not latch published topics"),
111
112 dict(args=["--option"], # Replaced with dictionary after parsing
113 dest="OPTIONS", metavar="KEY=VALUE", nargs="+", default=[], action="append",
114 help="options for generated message attributes, as\n"
115 " arraylen=MIN,MAX range / length of primitive arrays\n"
116 " or NUM (default 50,100)\n"
117 " nestedlen=MIN,MAX range / length of nested message lists\n"
118 " or NUM (default 1,2)\n"
119 " strlen=MIN,MAX range / length of strings (default 10,50)\n"
120 " or NUM\n"
121 " strchars=CHARS characters to use in strings\n"
122 " (default all printables)\n"
123 " NUMTYPE=MIN,MAX value range / constant of numeric types\n"
124 " or CONSTANT like int8\n"),
125 ],
126}
127
128
129NAME = "generate_msgs"
130
132class rosapi(object):
133 """Generic interface for accessing ROS1 / ROS2 API."""
135
136 NODE = None
137
139 ROS_NUMERIC_TYPES = ["byte", "char", "int8", "int16", "int32", "int64", "uint8",
140 "uint16", "uint32", "uint64", "float32", "float64", "bool"]
142
143 ROS_STRING_TYPES = ["string", "wstring"]
144
146 ROS_BUILTIN_TYPES = ROS_NUMERIC_TYPES + ROS_STRING_TYPES
147
149 ROS_TIME_CLASSES = {rospy.Time: "time", rospy.Duration: "duration",
150 genpy.Time: "time", genpy.Duration: "duration"} if rospy else \
151 {rclpy.time.Time: "builtin_interfaces/Time",
152 builtin_interfaces.msg.Time: "builtin_interfaces/Time",
153 rclpy.duration.Duration: "builtin_interfaces/Duration",
154 builtin_interfaces.msg.Duration: "builtin_interfaces/Duration"}
155
156
157 ROS_INTEGER_RANGES = dict({
158 "byte": (-2** 7, 2** 7 - 1),
159 "int8": (-2** 7, 2** 7 - 1),
160 "int16": (-2**15, 2**15 - 1),
161 "int32": (-2**31, 2**31 - 1),
162 "int64": (-2**63, 2**63 - 1),
163 "char": (0, 2** 8 - 1),
164 "uint8": (0, 2** 8 - 1),
165 "uint16": (0, 2**16 - 1),
166 "uint32": (0, 2**31 - 1),
167 "uint64": (0, 2**64 - 1),
168 }, **{
169 "byte": (0, 2** 8 - 1),
170 "char": (-2** 7, 2** 7 - 1),
171 } if rclpy else {}) # ROS2 *reverses* byte and char
172
173
174 DDS_TYPES = {"boolean": "bool",
175 "float": "float32",
176 "double": "float64",
177 "octet": "byte",
178 "short": "int16",
179 "unsigned short": "uint16",
180 "long": "int32",
181 "unsigned long": "uint32",
182 "long long": "int64",
183 "unsigned long long": "uint64"}
184
185 @classmethod
186 def get_message_types(cls):
187 """Returns a list of available message types, as ["pksg/Msg", ]."""
188 cmd = "rosmsg list" if rospy else "ros2 interface list --only-msgs"
189 output = subprocess.check_output(cmd, shell=True).decode()
190 return sorted(cls.canonical(l.strip()) for l in output.splitlines()
191 if re.match(r"\w+/\w+", l.strip()))
192
193 @classmethod
194 def init(cls, launch=False):
195 """Initializes ROS, creating and spinning node if specified."""
196 if rospy and launch:
197 rospy.init_node(NAME)
198 if rospy and launch:
199 spinner = threading.Thread(target=rospy.spin)
200 if rclpy:
201 rclpy.init()
202 if rclpy and launch:
203 cls.NODE = rclpy.create_node(NAME, enable_rosout=False, start_parameter_services=False)
204 spinner = threading.Thread(target=rclpy.spin, args=(cls.NODE, ))
205 if launch:
206 spinner.daemon = True
207 spinner.start()
208
209 @classmethod
210 def canonical(cls, typename):
211 """
212 Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
213
214 Converts DDS types like "octet" to "byte", and "sequence<uint8, 100>" to "uint8[100]".
215 """
216 is_array, bound, dimension = False, "", ""
218 match = re.match("sequence<(.+)>", typename)
219 if match: # "sequence<uint8, 100>" or "sequence<uint8>"
220 is_array = True
221 typename = match.group(1)
222 match = re.match(r"([^,]+)?,\s?(\d+)", typename)
223 if match: # sequence<uint8, 10>
224 typename = match.group(1)
225 if match.lastindex > 1: dimension = match.group(2)
226
227 match = re.match("(w?string)<(.+)>", typename)
228 if match: # string<5>
229 typename, bound = match.groups()
230
231 if "[" in typename: # "string<=5[<=10]" or "string<=5[10]"
232 dimension = typename[typename.index("[") + 1:typename.index("]")]
233 typename, is_array = typename[:typename.index("[")], True
234
235 if "<=" in typename: # "string<=5"
236 typename, bound = typename.split("<=")
237
238 if typename.count("/") > 1:
239 typename = "%s/%s" % tuple((x[0], x[-1]) for x in [typename.split("/")])[0]
240
241 suffix = ("<=%s" % bound if bound else "") + ("[%s]" % dimension if is_array else "")
242 return cls.DDS_TYPES.get(typename, typename) + suffix
243
244 @classmethod
245 def create_publisher(cls, topic, typecls, latch=True, queue_size=10):
246 """Returns ROS publisher instance."""
247 if rospy:
248 return rospy.Publisher(topic, typecls, latch=latch, queue_size=queue_size)
249
250 qos = rclpy.qos.QoSProfile(depth=queue_size)
251 if latch: qos.durability = rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL
252 return cls.NODE.create_publisher(typecls, topic, qos)
253
254 @classmethod
255 def get_message_class(cls, typename):
256 """Returns ROS message class."""
257 if rospy:
258 return roslib.message.get_message_class(typename)
259 return rosidl_runtime_py.utilities.get_message(cls.make_full_typename(typename))
260
261 @classmethod
262 def get_message_fields(cls, val):
263 """Returns OrderedDict({field name: field type name}) if ROS1 message, else {}."""
264 if rospy:
265 names = getattr(val, "__slots__", [])
266 if isinstance(val, (rospy.Time, rospy.Duration)): # Empty __slots__
267 names = genpy.TVal.__slots__
268 return collections.OrderedDict(zip(names, getattr(val, "_slot_types", [])))
269
270 fields = {k: cls.canonical(v) for k, v in val.get_fields_and_field_types().items()}
271 return collections.OrderedDict(fields)
272
273 @classmethod
274 def is_ros_message(cls, val):
275 """Returns whether value is a ROS message or special like time/duration."""
276 if rospy:
277 return isinstance(val, (genpy.Message, genpy.TVal))
278 return rosidl_runtime_py.utilities.is_message(val)
279
280 @classmethod
281 def is_ros_time(cls, val):
282 """Returns whether value is a ROS time/duration."""
283 return isinstance(val, tuple(cls.ROS_TIME_CLASSESROS_TIME_CLASSES))
284
285 @classmethod
286 def make_full_typename(cls, typename):
287 """Returns "pkg/msg/Type" for "pkg/Type"."""
288 if "/msg/" in typename or "/" not in typename:
289 return typename
290 return "%s/msg/%s" % tuple((x[0], x[-1]) for x in [typename.split("/")])[0]
291
292 @classmethod
293 def message_to_yaml(cls, msg):
294 """Returns ROS message as YAML string."""
295 if rospy:
296 return str(msg)
297 return rosidl_runtime_py.message_to_yaml(msg)
298
299 @classmethod
300 def scalar(cls, typename, bound=False):
301 """
302 Returns scalar type from ROS message data type, like "uint8" from "uint8[100]".
303
304 Returns type unchanged if already a scalar.
305
306 @param bound if True, does not strip string boundaries like "string<=10"
307 """
308 if "[" in typename: typename = typename[:typename.index("[")] # int8[?]
309 if "<=" in typename and not bound:
310 typename = typename[:typename.index("<=")] # string<=10
311 return typename
312
313 @classmethod
314 def shutdown(cls):
315 rclpy and rclpy.shutdown()
317
318
319class generator(object):
320 """Generates random ROS values and message attributes."""
322
323 OPTIONS = {
324 "arraylen": (50, 100), # Length range for primitive arrays like uint8[]
325 "nestedlen": ( 1, 2), # Length range for nested message lists like Point[]
326 "strlen": (10, 50), # Length range for strings
327 "strchars": string.printable.strip() + " ", # Characters used in string
328 }
329
330 @classmethod
331 def make_random_value(cls, typename, options=None):
332 """
333 Returns random value for ROS builtin type.
334
335 @param options {numtype like "int8": fixvalue or (minval, maxval),
336 "strlen": fixlen or (minlen, maxlen),
337 "strchars": str} if not using generator defaults
338 """
339 options = dict(cls.OPTIONS, **options or {})
340 ranges = dict(rosapi.ROS_INTEGER_RANGES, **options or {})
341 if rosapi.scalar(typename) in rosapi.ROS_STRING_TYPES: # "string<=10" to "string"
342 LEN = int(re.sub(r"\D", "", typename)) if re.search(r"\d", typename) else \
343 options["strlen"]
344 if isinstance(LEN, (list, tuple)): LEN = random.randint(*LEN[:2])
345 value = "".join(take_sample(options["strchars"], LEN)) if options["strchars"] else ""
346 elif typename in ("bool", ):
347 value = random.choice(ranges.get("bool", [True, False]))
348 elif typename in ("float32", "float64"):
349 value = random.random()
350 if typename in ranges:
351 a, b = (ranges[typename] * 2)[:2]
352 value = a if a == b else value * (b - a) # Constant or from range
353 else:
354 a, b = (ranges[typename] * 2)[:2]
355 value = a if a == b else random.randint(a, b) # Constant or from range
356 if rclpy and typename in ("byte", ): # ROS2 *requires* byte value to be bytes()
357 value = bytes([value])
358 return value
359
360 @classmethod
361 def populate(cls, msg, options=None):
362 """
363 Returns ROS message with fields populated with random content.
364
365 @param options {"arraylen" or "nestedlen" or "strlen": fixlen or (minlen, maxlen),
366 numtype like "int8": fixvalue or (minval, maxval),
367 "strchars": str} if not using generator defaults
368 """
369 options = dict(cls.OPTIONS, **options or {})
370 for name, typename in rosapi.get_message_fields(msg).items():
371 scalartype = rosapi.scalar(typename)
372 if typename in rosapi.ROS_BUILTIN_TYPES \
373 or "[" not in typename and scalartype in rosapi.ROS_BUILTIN_TYPES:
374 value = cls.make_random_value(typename, options)
375 elif scalartype in rosapi.ROS_BUILTIN_TYPES: # List of primitives
376 LEN = options["arraylen"] if typename.endswith("[]") else \
377 int(re.sub(r"\D", "", typename[typename.index("["):]))
378 if isinstance(LEN, (list, tuple)): LEN = random.randint(*LEN[:2])
379 value = [cls.make_random_value(rosapi.scalar(typename, bound=True), options)
380 for _ in range(LEN)]
381 elif typename == scalartype: # Single nested message
382 value = cls.populate(getattr(msg, name), options)
383 else: # List of nested messages
384 LEN = options["nestedlen"] if typename.endswith("[]") else \
385 int(re.sub(r"\D", "", typename[typename.index("["):]))
386 if isinstance(LEN, (list, tuple)): LEN = random.randint(*LEN[:2])
387 msgcls = rosapi.get_message_class(scalartype)
388 value = [cls.populate(msgcls(), options) for _ in range(LEN)]
389 if rosapi.is_ros_time(msg):
390 value = abs(value)
391 setattr(msg, name, value)
392 return msg
393
394
395
397 """Returns a populated ArgumentParser instance."""
398 kws = dict(description=ARGUMENTS["description"], formatter_class=argparse.RawTextHelpFormatter)
399 argparser = argparse.ArgumentParser(**kws)
400 for arg in map(dict, ARGUMENTS["arguments"]):
401 argparser.add_argument(*arg.pop("args"), **arg)
402 return argparser
403
404
405def plural(word, items):
406 """Returns "N words" or "1 word"."""
407 count = len(items) if isinstance(items, (dict, list, set, tuple)) else items
408 return "%s %s%s" % (count, word, "s" if count != 1 else "")
409
410
411def take_sample(population, k):
412 """Returns a list of k randomly chosen elements from population."""
413 result, n, k = [], k, min(k, len(population))
414 result = random.sample(population, k)
415 while len(result) < n:
416 result += random.sample(population, min(k, n - len(result)))
417 return result
418
419
420def wildcard_to_regex(text, end=True):
421 """
422 Returns plain wildcard like "foo*bar" as re.Pattern("foo.*bar", re.I).
423
424 @param end whether pattern should match until end (adds $)
425 """
426 suff = "$" if end else ""
427 return re.compile(".*".join(map(re.escape, text.split("*"))) + suff, re.I)
428
429
430def process_args(args):
431 """
432 Converts or combines arguments where necessary, returns args.
433
434 @param args arguments object like argparse.Namespace
435 """
436 for k, v in vars(args).items(): # Flatten lists of lists and drop duplicates
437 if not isinstance(v, list): continue # for k, v
438 here = set()
439 setattr(args, k, [x for xx in v for x in (xx if isinstance(xx, list) else [xx])
440 if not (x in here or here.add(x))])
441
442 # Split and parse keyword options
443 opts = dict(generator.OPTIONS, **dict(x.split("=", 1) for x in args.OPTIONS))
444 for k, v in list(opts.items()):
445 if not k.endswith("len") and k not in rosapi.ROS_NUMERIC_TYPES \
446 or not isinstance(v, str):
447 continue # for k, v
448 try:
449 vv = sorted(json.loads("[%s]" % v))
450 ctor = float if k.startswith("float") else bool if "bool" == k else int
451 if k.endswith("int64") and sys.version_info < (3, ): ctor = long # Py2
452 vv = [ctor(x) for x in vv]
453 if k in rosapi.ROS_INTEGER_RANGES: # Force into allowed range
454 a, b = rosapi.ROS_INTEGER_RANGES[k]
455 vv = [max(a, min(b, x)) for x in vv]
456 opts[k] = vv[0] if len(vv) < 2 and k.endswith("len") else tuple(vv[:2])
457 except Exception:
458 sys.exit("Error parsing option %s=%s." % (k, v))
459 args.OPTIONS = opts
460
461 return args
462
463
464def run(args):
465 """Generates messages until Ctrl-C or end condition reached."""
466 msgtypes = rosapi.get_message_types()
467 patterns = [wildcard_to_regex(x) for x in args.TYPES]
468 nopatterns = [wildcard_to_regex(x) for x in args.SKIP_TYPES]
469 msgtypes = [x for x in msgtypes if not patterns or any(p.match(x) for p in patterns)]
470 availables = [x for x in msgtypes if not nopatterns or not any(p.match(x) for p in nopatterns)]
471 if not availables:
472 print("No message types %s." %
473 ("match" if args.TYPES or args.SKIP_TYPES else "available"))
474 sys.exit(1)
475
476 def choose_topic(typename):
477 """Returns new or existing ROS topic name for message type."""
478 existing = [n for n, t in topiccounts if t == typename]
479 if len(existing) < args.TOPICS_PER_TYPE:
480 prefix = "/" + args.PUBLISH_PREFIX.strip("/")
481 prefix += "/" if len(prefix) > 1 else ""
482 suffix = "/topic%s" % (len(existing) + 1) if args.TOPICS_PER_TYPE > 1 else ""
483 suffix += args.PUBLISH_SUFFIX
484 return "%s%s%s" % (prefix, typename, suffix)
485 return random.choice(existing)
486
487 def choose_type():
488 """Returns a random ROS message type name."""
489 if availables and (not args.MAX_TOPICS
490 or len(topiccounts) / (args.TOPICS_PER_TYPE or 1) < args.MAX_TOPICS):
491 return availables.pop(random.randint(0, len(availables) - 1))
492 candidates = [t for (_, t), c in topiccounts.items() if len(c) < args.MAX_PER_TOPIC] \
493 if args.MAX_PER_TOPIC else list(typecounts)
494 return random.choice(candidates) if candidates else None
495
496 def is_finished():
497 """Returns whether generating is complete."""
498 done = count >= args.COUNT if args.COUNT else False
499 if not done and args.MAX_PER_TOPIC:
500 done = all(len(x) >= args.MAX_PER_TOPIC for x in topiccounts.values())
501 return done
502
503 count = 0 # Total number of messages emitted
504 pubs = {} # {(topic, typename): ROS publisher instance}
505 typecounts = collections.Counter() # {typename: messages emitted}
506 topiccounts = collections.Counter() # {(topic, typename): messages emitted}
507 print("Message types available: %s." % len(msgtypes))
508 print("Generating a random message each %s seconds." % args.INTERVAL)
509 rosapi.init(launch=True)
510 signal.signal(signal.SIGINT, lambda *_, **__: sys.exit()) # Break ROS1 spin on Ctrl-C
511 try:
512 while not is_finished():
513 typename = choose_type()
514 if not typename:
515 break # while
516 topic = choose_topic(typename)
517 topickey = (topic, typename)
518 if topickey not in topiccounts:
519 print("Adding topic %s." % topic)
520
521 try:
522 cls = rosapi.get_message_class(typename)
523 msg = generator.populate(cls(), args.OPTIONS)
524 if topickey not in pubs:
525 pubs[topickey] = rosapi.create_publisher(topic, cls, latch=args.LATCH)
526 except Exception as e:
527 print("Error processing message type %r: %s" % (typename, e))
528 continue # while
529
530 if args.VERBOSE:
531 print("-- [%s] Message %s in %s" % (count + 1, topiccounts[topickey] + 1, topic))
532 print(rosapi.message_to_yaml(msg))
533
534 pubs[topickey].publish(msg)
535 count += 1
536 topiccounts[topickey] += 1
537 typecounts[typename] += 1
538
539 if count and not count % 100:
540 print("Total count: %s in %s." %
541 (plural("message", count), plural("topic", topiccounts)))
542 if args.INTERVAL:
543 time.sleep(args.INTERVAL)
544 except (KeyboardInterrupt, SystemExit):
545 pass
546 except Exception:
547 traceback.print_exc()
548
549 print("")
550 print("Emitted %s in %s%s." % (plural("message", count), plural("topic", topiccounts),
551 (" and %s" % plural("type", typecounts)) if args.TOPICS_PER_TYPE > 1 else ""))
552
553 print("")
554 print("Press Ctrl-C to close publishers and exit.")
555 try:
556 while True: time.sleep(10)
557 except KeyboardInterrupt:
558 pass
559 rosapi.shutdown()
560 sys.exit()
561
562
563
564if "__main__" == __name__:
565 runargs = process_args(make_argparser().parse_args())
566 run(runargs)
Generates random ROS values and message attributes.
dict OPTIONS
Attribute generating options.
populate(cls, msg, options=None)
Returns ROS message with fields populated with random content.
make_random_value(cls, typename, options=None)
Returns random value for ROS builtin type.
Generic interface for accessing ROS1 / ROS2 API.
make_full_typename(cls, typename)
Returns "pkg/msg/Type" for "pkg/Type".
is_ros_time(cls, val)
Returns whether value is a ROS time/duration.
NODE
rclpy.Node instance
get_message_types(cls)
Returns a list of available message types, as ["pksg/Msg", ].
dict DDS_TYPES
ROS2 Data Distribution Service types to ROS built-ins.
get_message_class(cls, typename)
Returns ROS message class.
canonical(cls, typename)
Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
get_message_fields(cls, val)
Returns OrderedDict({field name: field type name}) if ROS1 message, else {}.
is_ros_message(cls, val)
Returns whether value is a ROS message or special like time/duration.
dict ROS_TIME_CLASSES
ROS time/duration types mapped to type names.
create_publisher(cls, topic, typecls, latch=True, queue_size=10)
Returns ROS publisher instance.
scalar(cls, typename, bound=False)
Returns scalar type from ROS message data type, like "uint8" from "uint8[100]".
message_to_yaml(cls, msg)
Returns ROS message as YAML string.
init(cls, launch=False)
Initializes ROS, creating and spinning node if specified.
make_argparser()
Returns a populated ArgumentParser instance.
take_sample(population, k)
Returns a list of k randomly chosen elements from population.
wildcard_to_regex(text, end=True)
Returns plain wildcard like "foo*bar" as re.Pattern("foo.*bar", re.I).
run(args)
Generates messages until Ctrl-C or end condition reached.
process_args(args)
Converts or combines arguments where necessary, returns args.