grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
ros1.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3ROS1 interface.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 01.11.2021
11@modified 30.04.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.ros1
15import collections
16import datetime
17import decimal
18import inspect
19import logging
20import io
21import os
22import shutil
23import threading
24import time
25
26import genpy
27import genpy.dynamic
28import rosbag
29import roslib
30import rospy
31import six
32
33from . import api
34from . import common
35from . api import TypeMeta, calculate_definition_hash, parse_definition_subtypes
36from . common import ConsolePrinter, memoize
37
38
39## Bagfile extensions to seek
40BAG_EXTENSIONS = (".bag", ".bag.active")
41
42
43SKIP_EXTENSIONS = (".bag.orig.active", )
44
45
46ROS_TIME_TYPES = ["time", "duration"]
47
48
49ROS_TIME_CLASSES = {rospy.Time: "time", rospy.Duration: "duration"}
50
51
52ROS_ALIAS_TYPES = {"byte": "int8", "char": "uint8"}
53
54
55TYPECLASSES = {}
56
57
58SLEEP_INTERVAL = 0.5
59
60
61master = None
62
63
64genpy_mtx = threading.RLock()
65
66
67class ROS1Bag(rosbag.Bag, api.BaseBag):
68 """
69 ROS1 bag reader and writer.
70
71 Extends `rosbag.Bag` with more conveniences, smooths over the rosbag bug of ignoring
72 topic and time filters in format v1.2, and smooths over the rosbag bug
73 of yielding messages of wrong type, if message types in different topics
74 have different packages but identical fields and hashes.
75
76 Does **not** smooth over the rosbag bug of writing different types to one topic.
77
78 rosbag does allow writing messages of different types to one topic,
79 just like live ROS topics can have multiple message types published
80 to one topic. And their serialized bytes will actually be in the bag,
81 but rosbag will only register the first type for this topic (unless it is
82 explicitly given another connection header with metadata on the other type).
83
84 All messages yielded will be deserialized by rosbag as that first type,
85 and whether reading will raise an exception or not depends on whether
86 the other type has enough bytes to be deserialized as that first type.
87 """
88
89 # {(typename, typehash): message type class}
90 __TYPES = {}
91
92
93 __TYPEDEFS = {}
94
95 # {(typename, typehash): whether subtype definitions parsed}
96 __PARSEDS = {}
97
98
99 def __init__(self, *args, **kwargs):
100 """
101 @param f bag file path, or a stream object
102 @param mode mode to open bag in, defaults to "r" (read mode)
103 @param reindex if true and bag is unindexed, make a copy
104 of the file (unless unindexed format) and reindex original
105 @param progress show progress bar with reindexing status
106 @param kwargs additional keyword arguments for `rosbag.Bag`, like `compression`
107 """
108 self.__topics = {} # {(topic, typename, typehash): message count}
109 self.__iterer = None # Generator from read_messages() for next()
110 f, args = (args[0] if args else kwargs.pop("f")), args[1:]
111 mode, args = (args[0] if args else kwargs.pop("mode", "r")), args[1:]
112 if mode not in self.MODES: raise ValueError("invalid mode %r" % mode)
113
114 kwargs.setdefault("skip_index", True)
115 reindex, progress = (kwargs.pop(k, False) for k in ("reindex", "progress"))
116 getargspec = getattr(inspect, "getfullargspec", getattr(inspect, "getargspec", None))
117 for n in set(kwargs) - set(getargspec(rosbag.Bag.__init__).args): kwargs.pop(n)
118
119 if common.is_stream(f):
120 if not common.verify_io(f, mode):
121 raise io.UnsupportedOperation({"r": "read", "w": "write", "a": "append"}[mode])
122 super(ROS1Bag, self).__init__(f, mode, *args, **kwargs)
123 self._populate_meta()
124 return
125
126 f = str(f)
127 if "a" == mode and (not os.path.exists(f) or not os.path.getsize(f)):
128 mode = "w" # rosbag raises error on append if no file or empty file
129 os.path.exists(f) and os.remove(f)
130
131 try:
132 super(ROS1Bag, self).__init__(f, mode, *args, **kwargs)
133 except rosbag.ROSBagUnindexedException:
134 if not reindex: raise
135 Bag.reindex_file(f, progress, *args, **kwargs)
136 super(ROS1Bag, self).__init__(f, mode, *args, **kwargs)
137 self._populate_meta()
138
139
140 def get_message_definition(self, msg_or_type):
141 """Returns ROS1 message type definition full text from bag, including subtype definitions."""
142 if is_ros_message(msg_or_type):
143 return self.__TYPEDEFS.get((msg_or_type._type, msg_or_type._md5sum))
144 typename = msg_or_type
145 return next((d for (n, h), d in self.__TYPEDEFS.items() if n == typename), None)
146
147
148 def get_message_class(self, typename, typehash=None):
149 """
150 Returns rospy message class for typename, or None if unknown type.
151
152 Generates class dynamically if not already generated.
153
154 @param typehash message type definition hash, if any
155 """
156 if (typename, typehash) not in self.__TYPES: self._ensure_typedef(typename, typehash)
157 typehash = typehash or next((h for n, h in self.__TYPEDEFS if n == typename), None)
158 typekey = (typename, typehash)
159 if typekey not in self.__TYPES and typekey in self.__TYPEDEFS:
160 for n, c in generate_message_classes(typename, self.__TYPEDEFS[typekey]).items():
161 self.__TYPES[(n, c._md5sum)] = c
162 return self.__TYPES.get(typekey)
163
164
165 def get_message_type_hash(self, msg_or_type):
166 """Returns ROS1 message type MD5 hash, or None if unknown type."""
167 if is_ros_message(msg_or_type): return msg_or_type._md5sum
168 typename = msg_or_type
169 typehash = next((h for n, h in self.__TYPEDEFS if n == typename), None)
170 if not typehash:
171 self._ensure_typedef(typename)
172 typehash = next((h for n, h in self.__TYPEDEFS if n == typename), None)
173 return typehash
174
176 def get_start_time(self):
177 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
178 try: return super(ROS1Bag, self).get_start_time()
179 except Exception: return None
180
181
182 def get_end_time(self):
183 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
184 try: return super(ROS1Bag, self).get_end_time()
185 except Exception: return None
187
188 def get_topic_info(self, *_, **__):
189 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
190 return dict(self.__topics)
191
193 def read_messages(self, topics=None, start_time=None, end_time=None,
194 raw=False, connection_filter=None, **__):
195 """
196 Yields messages from the bag, optionally filtered by topic, timestamp and connection details.
197
198 @param topics list of topics or a single topic.
199 If an empty list is given, all topics will be read.
200 @param start_time earliest timestamp of messages to return,
201 as ROS time or convertible (int/float/duration/datetime/decimal)
202 @param end_time latest timestamp of messages to return,
203 as ROS time or convertible (int/float/duration/datetime/decimal)
204 @param connection_filter function to filter connections to include
205 @param raw if true, then returned messages are tuples of
206 (typename, bytes, typehash, typeclass)
207 or (typename, bytes, typehash, position, typeclass),
208 depending on file format
209 @return BagMessage namedtuples of
210 (topic, message, timestamp as rospy.Time)
211 """
212 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
213 if "w" == self._mode: raise io.UnsupportedOperation("read")
214
215 hashtypes = {}
216 for n, h in self.__TYPEDEFS: hashtypes.setdefault(h, []).append(n)
217 read_topics = topics if isinstance(topics, (dict, list, set, tuple)) else \
218 [topics] if topics else None
219 dupes = {t: (n, h) for t, n, h in self.__topics
220 if (read_topics is None or t in read_topics) and len(hashtypes.get(h, [])) > 1}
222 # Workaround for rosbag.Bag ignoring topic and time filters in format v1.2
223 if self.version != 102 or (not topics and start_time is None and end_time is None):
224 in_range = lambda *_: True
225 else: in_range = lambda t, s: ((not read_topics or t in read_topics) and
226 (start_time is None or s >= start_time) and
227 (end_time is None or s <= end_time))
228
229 start_time, end_time = map(to_time, (start_time, end_time))
230 kwargs = dict(topics=topics, start_time=start_time, end_time=end_time,
231 connection_filter=connection_filter, raw=raw)
232 if not dupes:
233 for topic, msg, stamp in super(ROS1Bag, self).read_messages(**kwargs):
234 if in_range(topic, stamp):
235 TypeMeta.make(msg, topic, self)
236 yield self.BagMessage(topic, msg, stamp)
237 if self.closedclosedclosedclosed: break # for topic
238 return
239
240 for topic, msg, stamp in super(ROS1Bag, self).read_messages(**kwargs):
241 if not in_range(topic, stamp): continue # for topic
242
243 # Workaround for rosbag bug of using wrong type for identical type hashes
244 if topic in dupes:
245 typename, typehash = (msg[0], msg[2]) if raw else (msg._type, msg._md5sum)
246 if dupes[topic] != (typename, typehash):
247 if raw:
248 msg = msg[:-1] + (self.get_message_classget_message_class(typename, typehash), )
249 else:
250 msg = self._convert_message(msg, *dupes[topic])
251 TypeMeta.make(msg, topic, self)
252 yield self.BagMessage(topic, msg, stamp)
253 if self.closedclosedclosedclosed: break # for topic
254
255
256 def write(self, topic, msg, t=None, raw=False, connection_header=None, **__):
257 """
258 Writes a message to the bag.
259
260 Populates connection header if topic already in bag but with a different message type.
261
262 @param topic name of topic
263 @param msg ROS1 message
264 @param t message timestamp if not using current wall time,
265 as ROS time or convertible (int/float/duration/datetime/decimal)
266 @param raw if true, `msg` is in raw format,
267 (typename, bytes, typehash, typeclass)
268 @param connection_header custom connection record for topic,
269 as {"topic", "type", "md5sum", "message_definition"}
270 """
271 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
272 if "r" == self._mode: raise io.UnsupportedOperation("write")
273 self._register_write(topic, msg, raw, connection_header)
274 return super(ROS1Bag, self).write(topic, msg, to_time(t), raw, connection_header)
275
276
277 def open(self):
278 """Opens the bag file if not already open."""
279 if not self._file:
280 self._open(self.filenamefilenamefilename, self.modemodemode, allow_unindexed=True)
281
283 def close(self):
284 """Closes the bag file."""
285 if self._file:
286 super(ROS1Bag, self).close()
287 self._iterer = None
288 self._clear_index()
290
291 def __contains__(self, key):
292 """Returns whether bag contains given topic."""
293 return any(key == t for t, _, _ in self.__topics)
294
296 def __next__(self):
297 """Retrieves next message from bag as (topic, message, timestamp)."""
298 if self.closedclosedclosedclosed: raise ValueError("I/O operation on closed file.")
299 if self.__iterer is None: self.__iterer = self.read_messagesread_messages()
300 return next(self.__iterer)
301
302
303 @property
304 def topics(self):
305 """Returns the list of topics in bag, in alphabetic order."""
306 return sorted((t for t, _, _ in self.__topics), key=str.lower)
307
309 @property
310 def closed(self):
311 """Returns whether file is closed."""
312 return not self._file
313
314
315 def _convert_message(self, msg, typename2, typehash2=None):
316 """Returns message converted to given type; fields must match."""
317 msg2 = self.get_message_classget_message_class(typename2, typehash2)()
318 fields2 = get_message_fields(msg2)
319 for fname, ftypename in get_message_fields(msg).items():
320 v1 = v2 = getattr(msg, fname)
321 if ftypename != fields2.get(fname, ftypename):
322 v2 = self._convert_message(v1, fields2[fname])
323 setattr(msg2, fname, v2)
324 return msg2
326
327 def _populate_meta(self):
328 """Populates topics and message type definitions and hashes."""
329 result = collections.Counter() # {(topic, typename, typehash): count}
330 counts = collections.Counter() # {connection ID: count}
331 for c in self._chunks:
332 for c_id, count in c.connection_counts.items():
333 counts[c_id] += count
334 for c in self._connections.values():
335 result[(c.topic, c.datatype, c.md5sum)] += counts[c.id]
336 self.__TYPEDEFS[(c.datatype, c.md5sum)] = c.msg_def
337 self.__topics = dict(result)
338
339
340 def _register_write(self, topic, msg, raw=False, connection_header=None):
341 """Registers message in local metadata, writes connection header if new type for topic."""
342 if raw: typename, typehash, typeclass = msg[0], msg[2], msg[3]
343 else: typename, typehash, typeclass = msg._type, msg._md5sum, type(msg)
344 topickey, typekey = (topic, typename, typehash), (typename, typehash)
345
346 if topickey not in self.__topics \
347 and any(topic == t and typekey != (n, h) for t, n, h in self.__topics):
348 # Write connection header if topic already present but with different type
349 if not connection_header:
350 connection_header = {"topic": topic, "type": typename, "md5sum": typehash,
351 "message_definition": typeclass._full_text}
352 conn_id = len(self._connections)
353 connection_info = rosbag.bag._ConnectionInfo(conn_id, topic, connection_header)
354 self._write_connection_record(connection_info, encrypt=False)
355 self._connections[conn_id] = self._topic_connections[topic] = connection_info
356
357 self.__TYPES.setdefault(typekey, typeclass)
358 self.__TYPEDEFS.setdefault(typekey, typeclass._full_text)
359 self.__topics[topickey] = self.__topics.get(topickey, 0) + 1
360
361
362 def _ensure_typedef(self, typename, typehash=None):
363 """Parses subtype definition from any full definition where available, if not loaded."""
364 typehash = typehash or next((h for n, h in self.__TYPEDEFS if n == typename), None)
365 typekey = (typename, typehash)
366 if typekey not in self.__TYPEDEFS:
367 for (roottype, roothash), rootdef in list(self.__TYPEDEFS.items()):
368 rootkey = (roottype, roothash)
369 if self.__PARSEDS.get(rootkey): continue # for (roottype, roothash)
370
371 subdefs = tuple(parse_definition_subtypes(rootdef).items()) # ((typename, typedef), )
372 subhashes = {n: calculate_definition_hash(n, d, subdefs) for n, d in subdefs}
373 self.__TYPEDEFS.update(((n, subhashes[n]), d) for n, d in subdefs)
374 self.__PARSEDS.update(((n, h), True) for n, h in subhashes.items())
375 self.__PARSEDS[rootkey] = True
376 if typekey in self.__TYPEDEFS:
377 break # for (roottype, roothash)
378 self.__TYPEDEFS.setdefault(typekey, "")
379
380
381 @staticmethod
382 def reindex_file(f, progress, *args, **kwargs):
383 """
384 Reindexes bag, making a backup copy in file directory.
385
386 @param progress show progress bar for reindexing status
387 """
388 KWS = ["mode", "compression", "chunk_threshold",
389 "allow_unindexed", "options", "skip_index"]
390 kwargs.update(zip(KWS, args), allow_unindexed=True)
391 copied, bar, f2 = False, None, None
392 if progress:
393 fmt = lambda s: common.format_bytes(s, strip=False)
394 name, size = os.path.basename(f), os.path.getsize(f)
395 aftertemplate = " Reindexing %s (%s): {afterword}" % (name, fmt(size))
396 bar = common.ProgressBar(size, interval=0.1, pulse=True, aftertemplate=aftertemplate)
397
398 ConsolePrinter.warn("Unindexed bag %s, reindexing.", f)
399 bar and bar.update(0).start() # Start progress pulse
400 try:
401 with rosbag.Bag(f, **kwargs) as inbag:
402 inplace = (inbag.version > 102)
403
404 f2 = "%s.orig%s" % os.path.splitext(f)
405 shutil.copy(f, f2)
406 copied = True
407
408 inbag, outbag = None, None
409 outkwargs = dict(kwargs, mode="a" if inplace else "w", allow_unindexed=True)
410 try:
411 inbag = rosbag.Bag(f2, **dict(kwargs, mode="r"))
412 outbag = rosbag.Bag(f, **outkwargs)
413 Bag._run_reindex(inbag, outbag, bar)
414 bar and bar.update(bar.max)
415 except BaseException: # Ensure steady state on KeyboardInterrupt/SystemExit
416 inbag and inbag.close()
417 outbag and outbag.close()
418 copied and shutil.move(f2, f)
419 raise
420 finally: bar and bar.update(bar.value, flush=True).stop()
421 inbag.close()
422 outbag.close()
423
424
425 @staticmethod
426 def _run_reindex(inbag, outbag, bar=None):
427 """Runs reindexing, showing optional progress bar."""
428 update_bar = noop = lambda s: None
429 indexbag, writebag = (inbag, outbag) if inbag.version == 102 else (outbag, None)
430 if bar:
431 fmt = lambda s: common.format_bytes(s, strip=False)
432 update_bar = lambda s: (setattr(bar, "afterword", fmt(s)),
433 setattr(bar, "pulse", False), bar.update(s).stop())
434 # v102: build index from inbag, write all messages to outbag.
435 # Other versions: re-build index in outbag file in-place.
436 progress = update_bar if not writebag else noop # Incremental progress during re-build
437 for offset in indexbag.reindex():
438 progress(offset)
439 if not writebag:
440 return
441
442 progress = update_bar if bar else noop # Incremental progress during re-write
443 for (topic, msg, t, header) in indexbag.read_messages(return_connection_header=True):
444 writebag.write(topic, msg, t, connection_header=header)
445 progress(indexbag._file.tell())
446Bag = ROS1Bag
447
448
449
450def init_node(name):
451 """
452 Initializes a ROS1 node if not already initialized.
453
454 Blocks until ROS master available.
455 """
456 if not validate(live=True):
457 return
458
459 global master
460 if not master:
461 master = rospy.client.get_master()
462 available = None
463 while not available:
464 try: uri = master.getUri()[-1]
465 except Exception:
466 if available is None:
467 ConsolePrinter.log(logging.ERROR,
468 "Unable to register with master. Will keep trying.")
469 available = False
470 time.sleep(SLEEP_INTERVAL)
471 else:
472 ConsolePrinter.debug("Connected to ROS master at %s.", uri)
473 available = True
474 try: rospy.get_rostime()
475 except Exception: # Init node only if not already inited
476 rospy.init_node(name, anonymous=True, disable_signals=True)
477
478
479def shutdown_node():
480 """Shuts down live ROS1 node."""
481 global master
482 if master:
483 master = None
484 rospy.signal_shutdown("Close grepros")
485
486
487def validate(live=False):
488 """
489 Returns whether ROS1 environment is set, prints error if not.
490
491 @param live whether environment must support launching a ROS node
492 """
493 VARS = ["ROS_MASTER_URI", "ROS_ROOT", "ROS_VERSION"] if live else ["ROS_VERSION"]
494 missing = [k for k in VARS if not os.getenv(k)]
495 if missing:
496 ConsolePrinter.error("ROS environment not sourced: missing %s.",
497 ", ".join(sorted(missing)))
498 if "1" != os.getenv("ROS_VERSION", "1"):
499 ConsolePrinter.error("ROS environment not supported: need ROS_VERSION=1.")
500 missing = True
501 return not missing
502
503
504@memoize
505def canonical(typename, unbounded=False):
506 """
507 Returns "pkg/Type" for "pkg/subdir/Type".
508
509 @param unbounded drop array bounds, e.g. returning "uint8[]" for "uint8[10]"
510 """
511 if typename and typename.count("/") > 1:
512 typename = "%s/%s" % tuple((x[0], x[-1]) for x in [typename.split("/")])[0]
513 if unbounded and typename and "[" in typename:
514 typename = typename[:typename.index("[")] + "[]"
515 return typename
516
517
518def create_publisher(topic, cls_or_typename, queue_size):
519 """Returns a rospy.Publisher."""
520 def pub_unregister():
521 # ROS1 prints errors when closing a publisher with subscribers
522 if not pub.get_num_connections(): super(rospy.Publisher, pub).unregister()
523
524 cls = cls_or_typename
525 if isinstance(cls, common.TEXT_TYPES): cls = get_message_class(cls)
526 pub = rospy.Publisher(topic, cls, queue_size=queue_size)
527 pub.unregister = pub_unregister
528 return pub
529
530
531def create_subscriber(topic, typename, handler, queue_size):
532 """
533 Returns a rospy.Subscriber.
534
535 Local message packages are not required. Subscribes as AnyMsg,
536 creates message class dynamically from connection info,
537 and deserializes message before providing to handler.
538
539 Supplemented with .get_message_class(), .get_message_definition(),
540 .get_message_type_hash(), and .get_qoses().
541
542 The supplementary .get_message_xyz() methods should only be invoked after at least one message
543 has been received from the topic, as they get populated from live connection metadata.
544 """
545 def myhandler(msg):
546 if msg._connection_header["type"] != typename:
547 return
548 typekey = (typename, msg._connection_header["md5sum"])
549 if typekey not in TYPECLASSES:
550 typedef = msg._connection_header["message_definition"]
551 for name, cls in generate_message_classes(typename, typedef).items():
552 TYPECLASSES.setdefault((name, cls._md5sum), cls)
553 if isinstance(msg, rospy.AnyMsg): # /clock can yield already deserialized messages
554 try: msg = TYPECLASSES[typekey]().deserialize(msg._buff)
555 except Exception as e:
556 ConsolePrinter.error("Error deserializing message in topic %r (%s): %s",
557 topic, typename, e)
558 return
559 handler(msg)
560
561 sub = rospy.Subscriber(topic, rospy.AnyMsg, myhandler, queue_size=queue_size)
562 sub.get_message_class = lambda: next(c for (n, h), c in TYPECLASSES.items()
563 if n == typename)
564 sub.get_message_definition = lambda: next(get_message_definition(c)
565 for (n, h), c in TYPECLASSES.items() if n == typename)
566 sub.get_message_type_hash = lambda: next(h for n, h in TYPECLASSES if n == typename)
567 sub.get_qoses = lambda: None
568 return sub
569
571def format_message_value(msg, name, value):
572 """
573 Returns a message attribute value as string.
574
575 Result is at least 10 chars wide if message is a ROS time/duration
576 (aligning seconds and nanoseconds).
577 """
578 LENS = {"secs": 10, "nsecs": 9}
579 v = "%s" % (value, )
580 if not isinstance(msg, genpy.TVal) or name not in LENS:
581 return v
582
583 EXTRA = sum(v.count(x) * len(x) for x in (common.MatchMarkers.START, common.MatchMarkers.END))
584 return ("%%%ds" % (LENS[name] + EXTRA)) % v # Default %10s/%9s for secs/nsecs
585
586
587@memoize
588def generate_message_classes(typename, typedef):
589 """
590 Generates ROS message classes dynamically from given name and definition.
591
592 Modifies `sys.path` to include the generated Python module.
593
594 @param typename message type name like "std_msgs/String"
595 @param typedef message type definition full text, including all subtype definitions
596 @return dictionary of {typename: typeclass} for message type and all subtypes
597 """
598 with genpy_mtx: return genpy.dynamic.generate_dynamic(typename, typedef)
599
600
601@memoize
602def get_message_class(typename):
603 """Returns ROS1 message class."""
604 if typename in ROS_TIME_TYPES:
605 return next(k for k, v in ROS_TIME_CLASSES.items() if v == typename)
606 if typename in ("genpy/Time", "rospy/Time"):
607 return rospy.Time
608 if typename in ("genpy/Duration", "rospy/Duration"):
609 return rospy.Duration
610 try: return roslib.message.get_message_class(typename)
611 except Exception: return None
612
613
614def get_message_definition(msg_or_type):
615 """
616 Returns ROS1 message type definition full text, including subtype definitions.
617
618 Returns None if unknown type.
619 """
620 msg_or_cls = msg_or_type if is_ros_message(msg_or_type) else get_message_class(msg_or_type)
621 return getattr(msg_or_cls, "_full_text", None)
623
624def get_message_type_hash(msg_or_type):
625 """Returns ROS message type MD5 hash, or "" if unknown type."""
626 msg_or_cls = msg_or_type if is_ros_message(msg_or_type) else get_message_class(msg_or_type)
627 return getattr(msg_or_cls, "_md5sum", "")
628
629
630def get_message_fields(val):
631 """
632 Returns OrderedDict({field name: field type name}) if ROS1 message, else {}.
633
634 @param val ROS1 message class or instance
635 """
636 names, types = (getattr(val, k, []) for k in ("__slots__", "_slot_types"))
637 # Bug in genpy: class slot types defined as "int32", but everywhere else types use "uint32"
638 if isinstance(val, genpy.TVal): names, types = genpy.TVal.__slots__, ["uint32", "uint32"]
639 return collections.OrderedDict(zip(names, types))
640
641
642def get_message_type(msg_or_cls):
643 """Returns ROS1 message type name, like "std_msgs/Header"."""
644 if is_ros_time(msg_or_cls):
645 cls = msg_or_cls if inspect.isclass(msg_or_cls) else type(msg_or_cls)
646 return "duration" if "duration" in cls.__name__.lower() else "time"
647 return msg_or_cls._type
648
650def get_message_value(msg, name, typename=None, default=Ellipsis):
651 """
652 Returns object attribute value, with numeric arrays converted to lists.
653
654 @param name message attribute name
655 @param typename value ROS type name, for identifying byte arrays
656 @param default value to return if attribute does not exist; raises exception otherwise
657 """
658 if default is not Ellipsis and not hasattr(msg, name): return default
659 v = getattr(msg, name)
660 listifiable = typename and typename.startswith(("uint8[", "char[")) and isinstance(v, bytes)
661 if six.PY2 and listifiable: # Ignore already highlighted values from Scanner
662 listifiable = v[:1] != "[" or v[-1:] != "]" or common.MatchMarkers.START not in v
663 return list(bytearray(v)) if listifiable else v
664 return list(v) if listifiable or isinstance(v, tuple) else v
665
666
667def get_rostime(fallback=False):
668 """
669 Returns current ROS1 time, as rospy.Time.
670
671 @param fallback use wall time if node not initialized
672 """
673 try: return rospy.get_rostime()
674 except Exception:
675 if fallback: return make_time(time.time())
676 raise
677
678
679def get_topic_types():
680 """
681 Returns currently available ROS1 topics, as [(topicname, typename)].
683 Omits topics that the current ROS1 node itself has published.
684 """
685 result, myname = [], rospy.get_name()
686 pubs, _, _ = master.getSystemState()[-1]
687 mypubs = [t for t, nn in pubs if myname in nn and t not in ("/rosout", "/rosout_agg")]
688 for topic, typename in master.getTopicTypes()[-1]:
689 if topic not in mypubs:
690 result.append((topic, typename))
691 return result
692
693
694def is_ros_message(val, ignore_time=False):
695 """
696 Returns whether value is a ROS1 message or special like ROS1 time/duration class or instance.
698 @param ignore_time whether to ignore ROS1 time/duration types
699 """
700 isfunc = issubclass if inspect.isclass(val) else isinstance
701 return isfunc(val, genpy.Message if ignore_time else (genpy.Message, genpy.TVal))
702
703
704def is_ros_time(val):
705 """Returns whether value is a ROS1 time/duration class or instance."""
706 return issubclass(val, genpy.TVal) if inspect.isclass(val) else isinstance(val, genpy.TVal)
707
708
709def make_duration(secs=0, nsecs=0):
710 """Returns a ROS1 duration, as rospy.Duration."""
711 return rospy.Duration(secs=secs, nsecs=nsecs)
712
713
714def make_time(secs=0, nsecs=0):
715 """Returns a ROS1 time, as rospy.Time."""
716 return rospy.Time(secs=secs, nsecs=nsecs)
717
718
719def serialize_message(msg):
720 """Returns ROS1 message as a serialized binary."""
721 with TypeMeta.make(msg) as m:
722 if m.data is not None: return m.data
723 buf = io.BytesIO()
724 msg.serialize(buf)
725 return buf.getvalue()
726
727
728def deserialize_message(raw, cls_or_typename):
729 """Returns ROS1 message or service request/response instantiated from serialized binary."""
730 cls = cls_or_typename
731 if isinstance(cls, common.TEXT_TYPES): cls = get_message_class(cls)
732 return cls().deserialize(raw)
733
735@memoize
736def scalar(typename):
737 """
738 Returns scalar type from ROS message data type, like "uint8" from "uint8[100]".
740 Returns type unchanged if already a scalar.
741 """
742 return typename[:typename.index("[")] if typename and "[" in typename else typename
743
745def set_message_value(obj, name, value):
746 """Sets message or object attribute value."""
747 setattr(obj, name, value)
748
749
750def to_duration(val):
751 """Returns value as ROS1 duration if convertible (int/float/time/datetime/decimal), else value."""
752 result = val
753 if isinstance(val, decimal.Decimal):
754 result = rospy.Duration(int(val), float(val % 1) * 10**9)
755 elif isinstance(val, datetime.datetime):
756 result = rospy.Duration(int(val.timestamp()), 1000 * val.microsecond)
757 elif isinstance(val, (float, int)):
758 result = rospy.Duration(val)
759 elif isinstance(val, rospy.Time):
760 result = rospy.Duration(val.secs, val.nsecs)
761 return result
762
763
764def to_nsec(val):
765 """Returns value in nanoseconds if value is ROS time/duration, else value."""
766 return val.to_nsec() if isinstance(val, genpy.TVal) else val
767
768
769def to_sec(val):
770 """Returns value in seconds if value is ROS1 time/duration, else value."""
771 return val.to_sec() if isinstance(val, genpy.TVal) else val
772
773
774def to_sec_nsec(val):
775 """Returns value as (seconds, nanoseconds) if value is ROS1 time/duration, else value."""
776 return (val.secs, val.nsecs) if isinstance(val, genpy.TVal) else val
777
778
779def to_time(val):
780 """Returns value as ROS1 time if convertible (int/float/duration/datetime/decimal), else value."""
781 result = val
782 if isinstance(val, decimal.Decimal):
783 result = rospy.Time(int(val), float(val % 1) * 10**9)
784 elif isinstance(val, datetime.datetime):
785 result = rospy.Time(int(val.timestamp()), 1000 * val.microsecond)
786 elif isinstance(val, six.integer_types + (float, )):
787 result = rospy.Time(val)
788 elif isinstance(val, genpy.Duration):
789 result = rospy.Time(val.secs, val.nsecs)
790 return result
791
792
793__all__ = [
794 "BAG_EXTENSIONS", "ROS_ALIAS_TYPES", "ROS_TIME_CLASSES", "ROS_TIME_TYPES", "SKIP_EXTENSIONS",
795 "SLEEP_INTERVAL", "TYPECLASSES", "Bag", "ROS1Bag", "master",
796 "canonical", "create_publisher", "create_subscriber", "deserialize_message",
797 "format_message_value", "generate_message_classes", "get_message_class",
798 "get_message_definition", "get_message_fields", "get_message_type", "get_message_type_hash",
799 "get_message_value", "get_rostime", "get_topic_types", "init_node", "is_ros_message",
800 "is_ros_time", "make_duration", "make_time", "scalar", "serialize_message", "set_message_value",
801 "shutdown_node", "to_duration", "to_nsec", "to_sec", "to_sec_nsec", "to_time", "validate",
802]
ROS bag interface.
Definition api.py:98
tuple MODES
Supported opening modes, overridden in subclasses.
Definition api.py:112
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
Definition api.py:101
mode
Returns file open mode.
Definition api.py:341
filename
Returns bag file path.
Definition api.py:327
get_message_class(self, typename, typehash=None)
Returns ROS message type class, or None if unknown message type for bag.
Definition api.py:256
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Yields messages from the bag, optionally filtered by topic and timestamp.
Definition api.py:291
closed
Returns whether file is closed.
Definition api.py:348
A simple ASCII progress bar with a ticker thread.
Definition common.py:546
ROS1 bag reader and writer.
Definition ros1.py:87
open(self)
Opens the bag file if not already open.
Definition ros1.py:289
__init__(self, *args, **kwargs)
Definition ros1.py:117
get_start_time(self)
Returns the start time of the bag, as UNIX timestamp, or None if bag empty.
Definition ros1.py:186
get_end_time(self)
Returns the end time of the bag, as UNIX timestamp, or None if bag empty.
Definition ros1.py:192
get_message_class(self, typename, typehash=None)
Returns rospy message class for typename, or None if unknown type.
Definition ros1.py:165
get_message_type_hash(self, msg_or_type)
Returns ROS1 message type MD5 hash, or None if unknown type.
Definition ros1.py:175
get_topic_info(self, *_, **__)
Returns topic and message type metainfo as {(topic, typename, typehash): count}.
Definition ros1.py:198
close(self)
Closes the bag file.
Definition ros1.py:295
reindex_file(f, progress, *args, **kwargs)
Reindexes bag, making a backup copy in file directory.
Definition ros1.py:411
__contains__(self, key)
Returns whether bag contains given topic.
Definition ros1.py:303
closed
Returns whether file is closed.
Definition ros1.py:325
topics
Returns the list of topics in bag, in alphabetic order.
Definition ros1.py:317
__next__(self)
Retrieves next message from bag as (topic, message, timestamp).
Definition ros1.py:308
get_message_definition(self, msg_or_type)
Returns ROS1 message type definition full text from bag, including subtype definitions.
Definition ros1.py:150
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, connection_filter=None, **__)
Yields messages from the bag, optionally filtered by topic, timestamp and connection details.
Definition ros1.py:222
write(self, topic, msg, t=None, raw=False, connection_header=None, **__)
Writes a message to the bag.
Definition ros1.py:282
canonical(typename, unbounded=False)
Returns "pkg/Type" for "pkg/subdir/Type".
Definition ros1.py:536
create_publisher(topic, cls_or_typename, queue_size)
Returns a rospy.Publisher.
Definition ros1.py:544
serialize_message(msg)
Returns ROS1 message as a serialized binary.
Definition ros1.py:744
to_sec(val)
Returns value in seconds if value is ROS1 time/duration, else value.
Definition ros1.py:794
get_topic_types()
Returns currently available ROS1 topics, as [(topicname, typename)].
Definition ros1.py:709
format_message_value(msg, name, value)
Returns a message attribute value as string.
Definition ros1.py:603
get_message_value(msg, name, typename=None, default=Ellipsis)
Returns object attribute value, with numeric arrays converted to lists.
Definition ros1.py:682
get_message_class(typename)
Returns ROS1 message class.
Definition ros1.py:627
to_duration(val)
Returns value as ROS1 duration if convertible (int/float/time/datetime/decimal), else value.
Definition ros1.py:775
make_time(secs=0, nsecs=0)
Returns a ROS1 time, as rospy.Time.
Definition ros1.py:739
deserialize_message(raw, cls_or_typename)
Returns ROS1 message or service request/response instantiated from serialized binary.
Definition ros1.py:753
get_message_type(msg_or_cls)
Returns ROS1 message type name, like "std_msgs/Header".
Definition ros1.py:667
get_message_definition(msg_or_type)
Returns ROS1 message type definition full text, including subtype definitions.
Definition ros1.py:644
validate(live=False)
Returns whether ROS1 environment is set, prints error if not.
Definition ros1.py:518
scalar(typename)
Returns scalar type from ROS message data type, like "uint8" from "uint8[100]".
Definition ros1.py:766
make_duration(secs=0, nsecs=0)
Returns a ROS1 duration, as rospy.Duration.
Definition ros1.py:734
create_subscriber(topic, typename, handler, queue_size)
Returns a rospy.Subscriber.
Definition ros1.py:570
get_rostime(fallback=False)
Returns current ROS1 time, as rospy.Time.
Definition ros1.py:697
to_nsec(val)
Returns value in nanoseconds if value is ROS time/duration, else value.
Definition ros1.py:789
to_time(val)
Returns value as ROS1 time if convertible (int/float/duration/datetime/decimal), else value.
Definition ros1.py:804
shutdown_node()
Shuts down live ROS1 node.
Definition ros1.py:505
set_message_value(obj, name, value)
Sets message or object attribute value.
Definition ros1.py:770
is_ros_time(val)
Returns whether value is a ROS1 time/duration class or instance.
Definition ros1.py:729
get_message_type_hash(msg_or_type)
Returns ROS message type MD5 hash, or "" if unknown type.
Definition ros1.py:649
to_sec_nsec(val)
Returns value as (seconds, nanoseconds) if value is ROS1 time/duration, else value.
Definition ros1.py:799
init_node(name)
Initializes a ROS1 node if not already initialized.
Definition ros1.py:481
is_ros_message(val, ignore_time=False)
Returns whether value is a ROS1 message or special like ROS1 time/duration class or instance.
Definition ros1.py:724
get_message_fields(val)
Returns OrderedDict({field name: field type name}) if ROS1 message, else {}.
Definition ros1.py:660
generate_message_classes(typename, typedef)
Generates ROS message classes dynamically from given name and definition.
Definition ros1.py:622