4Generates random ROS messages and publishes them to live topics.
8 generate_msgs.py [TYPE [TYPE ...]]
9 [--no-type TYPE [TYPE ...]]
13 [--topics-per-type NUM]
15 [--publish-prefix PREFIX]
16 [--publish-suffix SUFFIX]
19 [--option KEY=VALUE [KEY=VALUE ...]]
21Topic names default to "/generate_msgs/type", like
"/generate_msgs/std_msgs/Bool".
23Supports both ROS1
and ROS2, version detected
from environment.
25Stand-alone script, requires ROS1 / ROS2 Python libraries.
26ROS1 requires ROS master to be running.
28------------------------------------------------------------------------------
29This file
is part of grepros - grep
for ROS bag files
and live topics.
30Released under the BSD License.
35------------------------------------------------------------------------------
52if os.getenv("ROS_VERSION") != "2":
57 import builtin_interfaces.msg
62 import rosidl_runtime_py.utilities
66 "description":
"Generates random ROS messages, publishes to live topics.",
68 dict(args=[
"TYPES"], nargs=
"*", metavar=
"TYPE",
69 help=
"ROS message types to use if not all available,\n"
70 '(supports * wildcards, like "geometry_msgs/*")'),
72 dict(args=[
"-v",
"--verbose"],
73 dest=
"VERBOSE", default=
False, action=
"store_true",
74 help=
"print each emitted message"),
76 dict(args=[
"--no-type"],
77 dest=
"SKIP_TYPES", metavar=
"TYPE", nargs=
"+", default=[], action=
"append",
78 help=
"ROS message types to skip (supports * wildcards)"),
80 dict(args=[
"-m",
"--max-count"],
81 dest=
"COUNT", metavar=
"NUM", default=0, type=int,
82 help=
"maximum number of messages to emit"),
84 dict(args=[
"--max-topics"],
85 dest=
"MAX_TOPICS", metavar=
"NUM", default=0, type=int,
86 help=
"maximum number of topics to emit"),
88 dict(args=[
"--max-per-topic"],
89 dest=
"MAX_PER_TOPIC", metavar=
"NUM", default=0, type=int,
90 help=
"number of messages to emit in each topic"),
92 dict(args=[
"--topics-per-type"],
93 dest=
"TOPICS_PER_TYPE", metavar=
"NUM", default=1, type=int,
94 help=
"number of topics to emit per message type (default 1)"),
96 dict(args=[
"--interval"],
97 dest=
"INTERVAL", metavar=
"SECONDS", default=0.5, type=float,
98 help=
"live publish interval (default 0.5)"),
100 dict(args=[
"--publish-prefix"],
101 dest=
"PUBLISH_PREFIX", metavar=
"PREFIX", default=
"generate_msgs",
102 help=
'prefix to prepend to topic name (default "generate_msgs")'),
104 dict(args=[
"--publish-suffix"],
105 dest=
"PUBLISH_SUFFIX", metavar=
"SUFFIX", default=
"",
106 help=
'suffix to append to topic name'),
108 dict(args=[
"--no-latch"],
109 dest=
"LATCH", default=
True, action=
"store_false",
110 help=
"do not latch published topics"),
112 dict(args=[
"--option"],
113 dest=
"OPTIONS", metavar=
"KEY=VALUE", nargs=
"+", default=[], action=
"append",
114 help=
"options for generated message attributes, as\n"
115 " arraylen=MIN,MAX range / length of primitive arrays\n"
116 " or NUM (default 50,100)\n"
117 " nestedlen=MIN,MAX range / length of nested message lists\n"
118 " or NUM (default 1,2)\n"
119 " strlen=MIN,MAX range / length of strings (default 10,50)\n"
121 " strchars=CHARS characters to use in strings\n"
122 " (default all printables)\n"
123 " NUMTYPE=MIN,MAX value range / constant of numeric types\n"
124 " or CONSTANT like int8\n"),
129NAME =
"generate_msgs"
133 """Generic interface for accessing ROS1 / ROS2 API."""
139 ROS_NUMERIC_TYPES = [
"byte",
"char",
"int8",
"int16",
"int32",
"int64",
"uint8",
140 "uint16",
"uint32",
"uint64",
"float32",
"float64",
"bool"]
143 ROS_STRING_TYPES = [
"string",
"wstring"]
146 ROS_BUILTIN_TYPES = ROS_NUMERIC_TYPES + ROS_STRING_TYPES
149 ROS_TIME_CLASSES = {rospy.Time:
"time", rospy.Duration:
"duration",
150 genpy.Time:
"time", genpy.Duration:
"duration"}
if rospy
else \
151 {rclpy.time.Time:
"builtin_interfaces/Time",
152 builtin_interfaces.msg.Time:
"builtin_interfaces/Time",
153 rclpy.duration.Duration:
"builtin_interfaces/Duration",
154 builtin_interfaces.msg.Duration:
"builtin_interfaces/Duration"}
157 ROS_INTEGER_RANGES = dict({
158 "byte": (-2** 7, 2** 7 - 1),
159 "int8": (-2** 7, 2** 7 - 1),
160 "int16": (-2**15, 2**15 - 1),
161 "int32": (-2**31, 2**31 - 1),
162 "int64": (-2**63, 2**63 - 1),
163 "char": (0, 2** 8 - 1),
164 "uint8": (0, 2** 8 - 1),
165 "uint16": (0, 2**16 - 1),
166 "uint32": (0, 2**31 - 1),
167 "uint64": (0, 2**64 - 1),
169 "byte": (0, 2** 8 - 1),
170 "char": (-2** 7, 2** 7 - 1),
174 DDS_TYPES = {
"boolean":
"bool",
179 "unsigned short":
"uint16",
181 "unsigned long":
"uint32",
182 "long long":
"int64",
183 "unsigned long long":
"uint64"}
187 """Returns a list of available message types, as ["pksg/Msg", ]."""
188 cmd =
"rosmsg list" if rospy
else "ros2 interface list --only-msgs"
189 output = subprocess.check_output(cmd, shell=
True).decode()
190 return sorted(cls.
canonical(l.strip())
for l
in output.splitlines()
191 if re.match(
r"\w+/\w+", l.strip()))
194 def init(cls, launch=False):
195 """Initializes ROS, creating and spinning node if specified."""
197 rospy.init_node(NAME)
199 spinner = threading.Thread(target=rospy.spin)
203 cls.
NODE = rclpy.create_node(NAME, enable_rosout=
False, start_parameter_services=
False)
204 spinner = threading.Thread(target=rclpy.spin, args=(cls.
NODE, ))
206 spinner.daemon =
True
212 Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
214 Converts DDS types like
"octet" to
"byte",
and "sequence<uint8, 100>" to
"uint8[100]".
216 is_array, bound, dimension = False,
"",
""
218 match = re.match(
"sequence<(.+)>", typename)
221 typename = match.group(1)
222 match = re.match(
r"([^,]+)?,\s?(\d+)", typename)
224 typename = match.group(1)
225 if match.lastindex > 1: dimension = match.group(2)
227 match = re.match(
"(w?string)<(.+)>", typename)
229 typename, bound = match.groups()
232 dimension = typename[typename.index(
"[") + 1:typename.index(
"]")]
233 typename, is_array = typename[:typename.index(
"[")],
True
236 typename, bound = typename.split(
"<=")
238 if typename.count(
"/") > 1:
239 typename =
"%s/%s" % tuple((x[0], x[-1])
for x
in [typename.split(
"/")])[0]
241 suffix = (
"<=%s" % bound
if bound
else "") + (
"[%s]" % dimension
if is_array
else "")
242 return cls.
DDS_TYPES.get(typename, typename) + suffix
246 """Returns ROS publisher instance."""
248 return rospy.Publisher(topic, typecls, latch=latch, queue_size=queue_size)
250 qos = rclpy.qos.QoSProfile(depth=queue_size)
251 if latch: qos.durability = rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL
256 """Returns ROS message class."""
258 return roslib.message.get_message_class(typename)
263 """Returns OrderedDict({field name: field type name}) if ROS1 message, else {}."""
265 names = getattr(val,
"__slots__", [])
266 if isinstance(val, (rospy.Time, rospy.Duration)):
267 names = genpy.TVal.__slots__
268 return collections.OrderedDict(zip(names, getattr(val,
"_slot_types", [])))
270 fields = {k: cls.
canonical(v)
for k, v
in val.get_fields_and_field_types().items()}
271 return collections.OrderedDict(fields)
275 """Returns whether value is a ROS message or special like time/duration."""
277 return isinstance(val, (genpy.Message, genpy.TVal))
278 return rosidl_runtime_py.utilities.is_message(val)
282 """Returns whether value is a ROS time/duration."""
287 """Returns "pkg/msg/Type" for "pkg/Type"."""
288 if "/msg/" in typename
or "/" not in typename:
290 return "%s/msg/%s" % tuple((x[0], x[-1])
for x
in [typename.split(
"/")])[0]
294 """Returns ROS message as YAML string."""
297 return rosidl_runtime_py.message_to_yaml(msg)
300 def scalar(cls, typename, bound=False):
302 Returns scalar type from ROS message data type, like
"uint8" from "uint8[100]".
304 Returns type unchanged
if already a scalar.
306 @param bound
if True, does
not strip string boundaries like
"string<=10"
308 if "[" in typename: typename = typename[:typename.index(
"[")]
309 if "<=" in typename
and not bound:
310 typename = typename[:typename.index(
"<=")]
315 rclpy
and rclpy.shutdown()
320 """Generates random ROS values and message attributes."""
324 "arraylen": (50, 100),
325 "nestedlen": ( 1, 2),
327 "strchars": string.printable.strip() +
" ",
333 Returns random value for ROS builtin type.
335 @param options {numtype like
"int8": fixvalue
or (minval, maxval),
336 "strlen": fixlen
or (minlen, maxlen),
337 "strchars": str}
if not using generator defaults
340 ranges = dict(rosapi.ROS_INTEGER_RANGES, **options
or {})
341 if rosapi.scalar(typename)
in rosapi.ROS_STRING_TYPES:
342 LEN = int(re.sub(
r"\D",
"", typename))
if re.search(
r"\d", typename)
else \
344 if isinstance(LEN, (list, tuple)): LEN = random.randint(*LEN[:2])
345 value =
"".join(
take_sample(options[
"strchars"], LEN))
if options[
"strchars"]
else ""
346 elif typename
in (
"bool", ):
347 value = random.choice(ranges.get(
"bool", [
True,
False]))
348 elif typename
in (
"float32",
"float64"):
349 value = random.random()
350 if typename
in ranges:
351 a, b = (ranges[typename] * 2)[:2]
352 value = a
if a == b
else value * (b - a)
354 a, b = (ranges[typename] * 2)[:2]
355 value = a
if a == b
else random.randint(a, b)
356 if rclpy
and typename
in (
"byte", ):
357 value = bytes([value])
361 def populate(cls, msg, options=None):
363 Returns ROS message with fields populated
with random content.
365 @param options {
"arraylen" or "nestedlen" or "strlen": fixlen
or (minlen, maxlen),
366 numtype like
"int8": fixvalue
or (minval, maxval),
367 "strchars": str}
if not using generator defaults
369 options = dict(cls.OPTIONS, **options or {})
370 for name, typename
in rosapi.get_message_fields(msg).items():
371 scalartype = rosapi.scalar(typename)
372 if typename
in rosapi.ROS_BUILTIN_TYPES \
373 or "[" not in typename
and scalartype
in rosapi.ROS_BUILTIN_TYPES:
375 elif scalartype
in rosapi.ROS_BUILTIN_TYPES:
376 LEN = options[
"arraylen"]
if typename.endswith(
"[]")
else \
377 int(re.sub(
r"\D",
"", typename[typename.index(
"["):]))
378 if isinstance(LEN, (list, tuple)): LEN = random.randint(*LEN[:2])
381 elif typename == scalartype:
382 value = cls.
populate(getattr(msg, name), options)
384 LEN = options[
"nestedlen"]
if typename.endswith(
"[]")
else \
385 int(re.sub(
r"\D",
"", typename[typename.index(
"["):]))
386 if isinstance(LEN, (list, tuple)): LEN = random.randint(*LEN[:2])
387 msgcls = rosapi.get_message_class(scalartype)
388 value = [cls.
populate(msgcls(), options)
for _
in range(LEN)]
389 if rosapi.is_ros_time(msg):
391 setattr(msg, name, value)
397 """Returns a populated ArgumentParser instance."""
398 kws = dict(description=ARGUMENTS[
"description"], formatter_class=argparse.RawTextHelpFormatter)
399 argparser = argparse.ArgumentParser(**kws)
400 for arg
in map(dict, ARGUMENTS[
"arguments"]):
401 argparser.add_argument(*arg.pop(
"args"), **arg)
405def plural(word, items):
406 """Returns "N words" or "1 word"."""
407 count = len(items)
if isinstance(items, (dict, list, set, tuple))
else items
408 return "%s %s%s" % (count, word,
"s" if count != 1
else "")
412 """Returns a list of k randomly chosen elements from population."""
413 result, n, k = [], k, min(k, len(population))
414 result = random.sample(population, k)
415 while len(result) < n:
416 result += random.sample(population, min(k, n - len(result)))
422 Returns plain wildcard like "foo*bar" as re.Pattern(
"foo.*bar", re.I).
424 @param end whether pattern should match until end (adds $)
426 suff = "$" if end
else ""
427 return re.compile(
".*".join(map(re.escape, text.split(
"*"))) + suff, re.I)
432 Converts or combines arguments where necessary, returns args.
434 @param args arguments object like argparse.Namespace
436 for k, v
in vars(args).items():
437 if not isinstance(v, list):
continue
439 setattr(args, k, [x
for xx
in v
for x
in (xx
if isinstance(xx, list)
else [xx])
440 if not (x
in here
or here.add(x))])
443 opts = dict(generator.OPTIONS, **dict(x.split(
"=", 1)
for x
in args.OPTIONS))
444 for k, v
in list(opts.items()):
445 if not k.endswith(
"len")
and k
not in rosapi.ROS_NUMERIC_TYPES \
446 or not isinstance(v, str):
449 vv = sorted(json.loads(
"[%s]" % v))
450 ctor = float
if k.startswith(
"float")
else bool
if "bool" == k
else int
451 if k.endswith(
"int64")
and sys.version_info < (3, ): ctor = long
452 vv = [ctor(x)
for x
in vv]
453 if k
in rosapi.ROS_INTEGER_RANGES:
454 a, b = rosapi.ROS_INTEGER_RANGES[k]
455 vv = [max(a, min(b, x))
for x
in vv]
456 opts[k] = vv[0]
if len(vv) < 2
and k.endswith(
"len")
else tuple(vv[:2])
458 sys.exit(
"Error parsing option %s=%s." % (k, v))
465 """Generates messages until Ctrl-C or end condition reached."""
466 msgtypes = rosapi.get_message_types()
469 msgtypes = [x
for x
in msgtypes
if not patterns
or any(p.match(x)
for p
in patterns)]
470 availables = [x
for x
in msgtypes
if not nopatterns
or not any(p.match(x)
for p
in nopatterns)]
472 print(
"No message types %s." %
473 (
"match" if args.TYPES
or args.SKIP_TYPES
else "available"))
476 def choose_topic(typename):
477 """Returns new or existing ROS topic name for message type."""
478 existing = [n
for n, t
in topiccounts
if t == typename]
479 if len(existing) < args.TOPICS_PER_TYPE:
480 prefix =
"/" + args.PUBLISH_PREFIX.strip(
"/")
481 prefix +=
"/" if len(prefix) > 1
else ""
482 suffix =
"/topic%s" % (len(existing) + 1)
if args.TOPICS_PER_TYPE > 1
else ""
483 suffix += args.PUBLISH_SUFFIX
484 return "%s%s%s" % (prefix, typename, suffix)
485 return random.choice(existing)
488 """Returns a random ROS message type name."""
489 if availables
and (
not args.MAX_TOPICS
490 or len(topiccounts) / (args.TOPICS_PER_TYPE
or 1) < args.MAX_TOPICS):
491 return availables.pop(random.randint(0, len(availables) - 1))
492 candidates = [t
for (_, t), c
in topiccounts.items()
if len(c) < args.MAX_PER_TOPIC] \
493 if args.MAX_PER_TOPIC
else list(typecounts)
494 return random.choice(candidates)
if candidates
else None
497 """Returns whether generating is complete."""
498 done = count >= args.COUNT
if args.COUNT
else False
499 if not done
and args.MAX_PER_TOPIC:
500 done = all(len(x) >= args.MAX_PER_TOPIC
for x
in topiccounts.values())
505 typecounts = collections.Counter()
506 topiccounts = collections.Counter()
507 print(
"Message types available: %s." % len(msgtypes))
508 print(
"Generating a random message each %s seconds." % args.INTERVAL)
509 rosapi.init(launch=
True)
510 signal.signal(signal.SIGINT,
lambda *_, **__: sys.exit())
512 while not is_finished():
513 typename = choose_type()
516 topic = choose_topic(typename)
517 topickey = (topic, typename)
518 if topickey
not in topiccounts:
519 print(
"Adding topic %s." % topic)
522 cls = rosapi.get_message_class(typename)
523 msg = generator.populate(cls(), args.OPTIONS)
524 if topickey
not in pubs:
525 pubs[topickey] = rosapi.create_publisher(topic, cls, latch=args.LATCH)
526 except Exception
as e:
527 print(
"Error processing message type %r: %s" % (typename, e))
531 print(
"-- [%s] Message %s in %s" % (count + 1, topiccounts[topickey] + 1, topic))
532 print(rosapi.message_to_yaml(msg))
534 pubs[topickey].publish(msg)
536 topiccounts[topickey] += 1
537 typecounts[typename] += 1
539 if count
and not count % 100:
540 print(
"Total count: %s in %s." %
541 (plural(
"message", count), plural(
"topic", topiccounts)))
543 time.sleep(args.INTERVAL)
544 except (KeyboardInterrupt, SystemExit):
547 traceback.print_exc()
550 print(
"Emitted %s in %s%s." % (plural(
"message", count), plural(
"topic", topiccounts),
551 (
" and %s" % plural(
"type", typecounts))
if args.TOPICS_PER_TYPE > 1
else ""))
554 print(
"Press Ctrl-C to close publishers and exit.")
556 while True: time.sleep(10)
557 except KeyboardInterrupt:
564if "__main__" == __name__:
Generates random ROS values and message attributes.
dict OPTIONS
Attribute generating options.
populate(cls, msg, options=None)
Returns ROS message with fields populated with random content.
make_random_value(cls, typename, options=None)
Returns random value for ROS builtin type.
Generic interface for accessing ROS1 / ROS2 API.
make_full_typename(cls, typename)
Returns "pkg/msg/Type" for "pkg/Type".
is_ros_time(cls, val)
Returns whether value is a ROS time/duration.
get_message_types(cls)
Returns a list of available message types, as ["pksg/Msg", ].
dict DDS_TYPES
ROS2 Data Distribution Service types to ROS built-ins.
get_message_class(cls, typename)
Returns ROS message class.
canonical(cls, typename)
Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
get_message_fields(cls, val)
Returns OrderedDict({field name: field type name}) if ROS1 message, else {}.
is_ros_message(cls, val)
Returns whether value is a ROS message or special like time/duration.
dict ROS_TIME_CLASSES
ROS time/duration types mapped to type names.
create_publisher(cls, topic, typecls, latch=True, queue_size=10)
Returns ROS publisher instance.
scalar(cls, typename, bound=False)
Returns scalar type from ROS message data type, like "uint8" from "uint8[100]".
message_to_yaml(cls, msg)
Returns ROS message as YAML string.
init(cls, launch=False)
Initializes ROS, creating and spinning node if specified.
make_argparser()
Returns a populated ArgumentParser instance.
take_sample(population, k)
Returns a list of k randomly chosen elements from population.
wildcard_to_regex(text, end=True)
Returns plain wildcard like "foo*bar" as re.Pattern("foo.*bar", re.I).
run(args)
Generates messages until Ctrl-C or end condition reached.
process_args(args)
Converts or combines arguments where necessary, returns args.