69 ROS1 bag reader and writer.
71 Extends `rosbag.Bag` with more conveniences, smooths over the rosbag bug of ignoring
72 topic and time filters in format v1.2, and smooths over the rosbag bug
73 of yielding messages of wrong type, if message types in different topics
74 have different packages but identical fields and hashes.
76 Does **not** smooth over the rosbag bug of writing different types to one topic.
78 rosbag does allow writing messages of different types to one topic,
79 just like live ROS topics can have multiple message types published
80 to one topic. And their serialized bytes will actually be in the bag,
81 but rosbag will only register the first type for this topic (unless it is
82 explicitly given another connection header with metadata on the other type).
84 All messages yielded will be deserialized by rosbag as that first type,
85 and whether reading will raise an exception or not depends on whether
86 the other type has enough bytes to be deserialized as that first type.
101 @param f bag file path, or a stream object
102 @param mode mode to open bag in, defaults to "r" (read mode)
103 @param reindex if true and bag is unindexed, make a copy
104 of the file (unless unindexed format) and reindex original
105 @param progress show progress bar with reindexing status
106 @param kwargs additional keyword arguments for `rosbag.Bag`, like `compression`
110 f, args = (args[0]
if args
else kwargs.pop(
"f")), args[1:]
111 mode, args = (args[0]
if args
else kwargs.pop(
"mode",
"r")), args[1:]
112 if mode
not in self.
MODES:
raise ValueError(
"invalid mode %r" % mode)
114 kwargs.setdefault(
"skip_index",
True)
115 reindex, progress = (kwargs.pop(k,
False)
for k
in (
"reindex",
"progress"))
116 getargspec = getattr(inspect,
"getfullargspec", getattr(inspect,
"getargspec",
None))
117 for n
in set(kwargs) - set(getargspec(rosbag.Bag.__init__).args): kwargs.pop(n)
119 if common.is_stream(f):
120 if not common.verify_io(f, mode):
121 raise io.UnsupportedOperation({
"r":
"read",
"w":
"write",
"a":
"append"}[mode])
122 super(ROS1Bag, self).
__init__(f, mode, *args, **kwargs)
127 if "a" == mode
and (
not os.path.exists(f)
or not os.path.getsize(f)):
129 os.path.exists(f)
and os.remove(f)
132 super(ROS1Bag, self).
__init__(f, mode, *args, **kwargs)
133 except rosbag.ROSBagUnindexedException:
134 if not reindex:
raise
135 Bag.reindex_file(f, progress, *args, **kwargs)
136 super(ROS1Bag, self).
__init__(f, mode, *args, **kwargs)
141 """Returns ROS1 message type definition full text from bag, including subtype definitions."""
143 return self.
__TYPEDEFS.get((msg_or_type._type, msg_or_type._md5sum))
144 typename = msg_or_type
145 return next((d
for (n, h), d
in self.
__TYPEDEFS.items()
if n == typename),
None)
150 Returns rospy message class for typename, or None if unknown type.
152 Generates class dynamically if not already generated.
154 @param typehash message type definition hash, if any
157 typehash = typehash
or next((h
for n, h
in self.
__TYPEDEFS if n == typename),
None)
158 typekey = (typename, typehash)
161 self.
__TYPES[(n, c._md5sum)] = c
162 return self.
__TYPES.get(typekey)
166 """Returns ROS1 message type MD5 hash, or None if unknown type."""
168 typename = msg_or_type
169 typehash =
next((h
for n, h
in self.
__TYPEDEFS if n == typename),
None)
172 typehash =
next((h
for n, h
in self.
__TYPEDEFS if n == typename),
None)
177 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
179 except Exception:
return None
183 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
185 except Exception:
return None
189 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
194 raw=False, connection_filter=None, **__):
196 Yields messages from the bag, optionally filtered by topic, timestamp and connection details.
198 @param topics list of topics or a single topic.
199 If an empty list is given, all topics will be read.
200 @param start_time earliest timestamp of messages to return,
201 as ROS time or convertible (int/float/duration/datetime/decimal)
202 @param end_time latest timestamp of messages to return,
203 as ROS time or convertible (int/float/duration/datetime/decimal)
204 @param connection_filter function to filter connections to include
205 @param raw if true, then returned messages are tuples of
206 (typename, bytes, typehash, typeclass)
207 or (typename, bytes, typehash, position, typeclass),
208 depending on file format
209 @return BagMessage namedtuples of
210 (topic, message, timestamp as rospy.Time)
212 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
213 if "w" == self._mode:
raise io.UnsupportedOperation(
"read")
216 for n, h
in self.
__TYPEDEFS: hashtypes.setdefault(h, []).append(n)
217 read_topics = topics
if isinstance(topics, (dict, list, set, tuple))
else \
218 [topics]
if topics
else None
219 dupes = {t: (n, h)
for t, n, h
in self.
__topics
220 if (read_topics
is None or t
in read_topics)
and len(hashtypes.get(h, [])) > 1}
223 if self.version != 102
or (
not topics
and start_time
is None and end_time
is None):
224 in_range =
lambda *_:
True
225 else: in_range =
lambda t, s: ((
not read_topics
or t
in read_topics)
and
226 (start_time
is None or s >= start_time)
and
227 (end_time
is None or s <= end_time))
229 start_time, end_time = map(to_time, (start_time, end_time))
230 kwargs = dict(topics=topics, start_time=start_time, end_time=end_time,
231 connection_filter=connection_filter, raw=raw)
233 for topic, msg, stamp
in super(ROS1Bag, self).
read_messages(**kwargs):
234 if in_range(topic, stamp):
235 TypeMeta.make(msg, topic, self)
240 for topic, msg, stamp
in super(ROS1Bag, self).
read_messages(**kwargs):
241 if not in_range(topic, stamp):
continue
245 typename, typehash = (msg[0], msg[2])
if raw
else (msg._type, msg._md5sum)
246 if dupes[topic] != (typename, typehash):
251 TypeMeta.make(msg, topic, self)
256 def write(self, topic, msg, t=None, raw=False, connection_header=None, **__):
258 Writes a message to the bag.
260 Populates connection header if topic already in bag but with a different message type.
262 @param topic name of topic
263 @param msg ROS1 message
264 @param t message timestamp if not using current wall time,
265 as ROS time or convertible (int/float/duration/datetime/decimal)
266 @param raw if true, `msg` is in raw format,
267 (typename, bytes, typehash, typeclass)
268 @param connection_header custom connection record for topic,
269 as {"topic", "type", "md5sum", "message_definition"}
271 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
272 if "r" == self._mode:
raise io.UnsupportedOperation(
"write")
274 return super(ROS1Bag, self).
write(topic, msg,
to_time(t), raw, connection_header)
278 """Opens the bag file if not already open."""
284 """Closes the bag file."""
286 super(ROS1Bag, self).
close()
292 """Returns whether bag contains given topic."""
293 return any(key == t
for t, _, _
in self.
__topics)
297 """Retrieves next message from bag as (topic, message, timestamp)."""
298 if self.
closedclosed:
raise ValueError(
"I/O operation on closed file.")
305 """Returns the list of topics in bag, in alphabetic order."""
306 return sorted((t
for t, _, _
in self.
__topics), key=str.lower)
311 """Returns whether file is closed."""
312 return not self._file
315 def _convert_message(self, msg, typename2, typehash2=None):
316 """Returns message converted to given type; fields must match."""
320 v1 = v2 = getattr(msg, fname)
321 if ftypename != fields2.get(fname, ftypename):
323 setattr(msg2, fname, v2)
327 def _populate_meta(self):
328 """Populates topics and message type definitions and hashes."""
329 result = collections.Counter()
330 counts = collections.Counter()
331 for c
in self._chunks:
332 for c_id, count
in c.connection_counts.items():
333 counts[c_id] += count
334 for c
in self._connections.values():
335 result[(c.topic, c.datatype, c.md5sum)] += counts[c.id]
336 self.
__TYPEDEFS[(c.datatype, c.md5sum)] = c.msg_def
340 def _register_write(self, topic, msg, raw=False, connection_header=None):
341 """Registers message in local metadata, writes connection header if new type for topic."""
342 if raw: typename, typehash, typeclass = msg[0], msg[2], msg[3]
343 else: typename, typehash, typeclass = msg._type, msg._md5sum, type(msg)
344 topickey, typekey = (topic, typename, typehash), (typename, typehash)
347 and any(topic == t
and typekey != (n, h)
for t, n, h
in self.
__topics):
349 if not connection_header:
350 connection_header = {
"topic": topic,
"type": typename,
"md5sum": typehash,
351 "message_definition": typeclass._full_text}
352 conn_id = len(self._connections)
353 connection_info = rosbag.bag._ConnectionInfo(conn_id, topic, connection_header)
354 self._write_connection_record(connection_info, encrypt=
False)
355 self._connections[conn_id] = self._topic_connections[topic] = connection_info
357 self.
__TYPES.setdefault(typekey, typeclass)
358 self.
__TYPEDEFS.setdefault(typekey, typeclass._full_text)
362 def _ensure_typedef(self, typename, typehash=None):
363 """Parses subtype definition from any full definition where available, if not loaded."""
364 typehash = typehash
or next((h
for n, h
in self.
__TYPEDEFS if n == typename),
None)
365 typekey = (typename, typehash)
367 for (roottype, roothash), rootdef
in list(self.
__TYPEDEFS.items()):
368 rootkey = (roottype, roothash)
371 subdefs = tuple(parse_definition_subtypes(rootdef).items())
372 subhashes = {n: calculate_definition_hash(n, d, subdefs)
for n, d
in subdefs}
373 self.
__TYPEDEFS.update(((n, subhashes[n]), d)
for n, d
in subdefs)
374 self.
__PARSEDS.update(((n, h),
True)
for n, h
in subhashes.items())
384 Reindexes bag, making a backup copy in file directory.
386 @param progress show progress bar for reindexing status
388 KWS = [
"mode",
"compression",
"chunk_threshold",
389 "allow_unindexed",
"options",
"skip_index"]
390 kwargs.update(zip(KWS, args), allow_unindexed=
True)
391 copied, bar, f2 =
False,
None,
None
393 fmt =
lambda s: common.format_bytes(s, strip=
False)
394 name, size = os.path.basename(f), os.path.getsize(f)
395 aftertemplate =
" Reindexing %s (%s): {afterword}" % (name, fmt(size))
398 ConsolePrinter.warn(
"Unindexed bag %s, reindexing.", f)
399 bar
and bar.update(0).start()
401 with rosbag.Bag(f, **kwargs)
as inbag:
402 inplace = (inbag.version > 102)
404 f2 =
"%s.orig%s" % os.path.splitext(f)
408 inbag, outbag =
None,
None
409 outkwargs = dict(kwargs, mode=
"a" if inplace
else "w", allow_unindexed=
True)
411 inbag = rosbag.Bag(f2, **dict(kwargs, mode=
"r"))
412 outbag = rosbag.Bag(f, **outkwargs)
413 Bag._run_reindex(inbag, outbag, bar)
414 bar
and bar.update(bar.max)
415 except BaseException:
416 inbag
and inbag.close()
417 outbag
and outbag.close()
418 copied
and shutil.move(f2, f)
420 finally: bar
and bar.update(bar.value, flush=
True).stop()
426 def _run_reindex(inbag, outbag, bar=None):
427 """Runs reindexing, showing optional progress bar."""
428 update_bar = noop =
lambda s:
None
429 indexbag, writebag = (inbag, outbag)
if inbag.version == 102
else (outbag,
None)
431 fmt =
lambda s: common.format_bytes(s, strip=
False)
432 update_bar =
lambda s: (setattr(bar,
"afterword", fmt(s)),
433 setattr(bar,
"pulse",
False), bar.update(s).stop())
436 progress = update_bar
if not writebag
else noop
437 for offset
in indexbag.reindex():
442 progress = update_bar
if bar
else noop
443 for (topic, msg, t, header)
in indexbag.read_messages(return_connection_header=
True):
444 writebag.write(topic, msg, t, connection_header=header)
445 progress(indexbag._file.tell())
519 """Returns a rospy.Publisher."""
520 def pub_unregister():
522 if not pub.get_num_connections(): super(rospy.Publisher, pub).unregister()
524 cls = cls_or_typename
526 pub = rospy.Publisher(topic, cls, queue_size=queue_size)
527 pub.unregister = pub_unregister
533 Returns a rospy.Subscriber.
535 Local message packages are not required. Subscribes as AnyMsg,
536 creates message class dynamically from connection info,
537 and deserializes message before providing to handler.
539 Supplemented with .get_message_class(), .get_message_definition(),
540 .get_message_type_hash(), and .get_qoses().
542 The supplementary .get_message_xyz() methods should only be invoked after at least one message
543 has been received from the topic, as they get populated from live connection metadata.
546 if msg._connection_header[
"type"] != typename:
548 typekey = (typename, msg._connection_header[
"md5sum"])
549 if typekey
not in TYPECLASSES:
550 typedef = msg._connection_header[
"message_definition"]
552 TYPECLASSES.setdefault((name, cls._md5sum), cls)
553 if isinstance(msg, rospy.AnyMsg):
554 try: msg = TYPECLASSES[typekey]().deserialize(msg._buff)
555 except Exception
as e:
556 ConsolePrinter.error(
"Error deserializing message in topic %r (%s): %s",
561 sub = rospy.Subscriber(topic, rospy.AnyMsg, myhandler, queue_size=queue_size)
562 sub.get_message_class =
lambda: next(c
for (n, h), c
in TYPECLASSES.items()
565 for (n, h), c
in TYPECLASSES.items()
if n == typename)
566 sub.get_message_type_hash =
lambda: next(h
for n, h
in TYPECLASSES
if n == typename)
567 sub.get_qoses =
lambda:
None
752 """Returns value as ROS1 duration if convertible (int/float/time/datetime/decimal), else value."""
754 if isinstance(val, decimal.Decimal):
755 result = rospy.Duration(int(val), float(val % 1) * 10**9)
756 elif isinstance(val, datetime.datetime):
757 result = rospy.Duration(int(val.timestamp()), 1000 * val.microsecond)
758 elif isinstance(val, (float, int)):
759 result = rospy.Duration(val)
760 elif isinstance(val, rospy.Time):
761 result = rospy.Duration(val.secs, val.nsecs)
781 """Returns value as ROS1 time if convertible (int/float/duration/datetime/decimal), else value."""
783 if isinstance(val, decimal.Decimal):
784 result = rospy.Time(int(val), float(val % 1) * 10**9)
785 elif isinstance(val, datetime.datetime):
786 result = rospy.Time(int(val.timestamp()), 1000 * val.microsecond)
787 elif isinstance(val, six.integer_types + (float, )):
788 result = rospy.Time(val)
789 elif isinstance(val, genpy.Duration):
790 result = rospy.Time(val.secs, val.nsecs)