grepros 1.3.0
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
ros1.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3ROS1 interface.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 01.11.2021
11@modified 13.02.2026
12------------------------------------------------------------------------------
13"""
14
15import collections
16import datetime
17import decimal
18import inspect
19import logging
20import io
21import os
22import shutil
23import threading
24import time
25
26import genpy
27import genpy.dynamic
28import rosbag
29import roslib
30import rospy
31import six
32
33from . import api
34from . import common
35from . api import TypeMeta, calculate_definition_hash, parse_definition_subtypes
36from . common import ConsolePrinter, memoize
37
38
39
40BAG_EXTENSIONS = (".bag", ".bag.active")
41
42
43SKIP_EXTENSIONS = (".bag.orig.active", )
44
45
46ROS_TIME_TYPES = ["time", "duration"]
47
48
49ROS_TIME_CLASSES = {rospy.Time: "time", rospy.Duration: "duration"}
50
51
52ROS_ALIAS_TYPES = {"byte": "int8", "char": "uint8"}
53
54
55TYPECLASSES = {}
56
57
58SLEEP_INTERVAL = 0.5
59
60
61master = None
62
63
64genpy_mtx = threading.RLock()
65
66
67class ROS1Bag(rosbag.Bag, api.BaseBag):
68 """
69 ROS1 bag reader and writer.
70
71 Extends `rosbag.Bag` with more conveniences, smooths over the rosbag bug of ignoring
72 topic and time filters in format v1.2, and smooths over the rosbag bug
73 of yielding messages of wrong type, if message types in different topics
74 have different packages but identical fields and hashes.
75
76 Does **not** smooth over the rosbag bug of writing different types to one topic.
77
78 rosbag does allow writing messages of different types to one topic,
79 just like live ROS topics can have multiple message types published
80 to one topic. And their serialized bytes will actually be in the bag,
81 but rosbag will only register the first type for this topic (unless it is
82 explicitly given another connection header with metadata on the other type).
83
84 All messages yielded will be deserialized by rosbag as that first type,
85 and whether reading will raise an exception or not depends on whether
86 the other type has enough bytes to be deserialized as that first type.
87 """
88
89 # {(typename, typehash): message type class}
90 __TYPES = {}
91
92
93 __TYPEDEFS = {}
94
95 # {(typename, typehash): whether subtype definitions parsed}
96 __PARSEDS = {}
97
98
99 def __init__(self, *args, **kwargs):
100 """
101 @param f bag file path, or a stream object
102 @param mode mode to open bag in, defaults to "r" (read mode)
103 @param reindex if true and bag is unindexed, make a copy
104 of the file (unless unindexed format) and reindex original
105 @param progress show progress bar with reindexing status
106 @param kwargs additional keyword arguments for `rosbag.Bag`, like `compression`
107 """
108 self.__topics = {} # {(topic, typename, typehash): message count}
109 self.__iterer = None # Generator from read_messages() for next()
110 f, args = (args[0] if args else kwargs.pop("f")), args[1:]
111 mode, args = (args[0] if args else kwargs.pop("mode", "r")), args[1:]
112 if mode not in self.MODES: raise ValueError("invalid mode %r" % mode)
113
114 kwargs.setdefault("skip_index", True)
115 reindex, progress = (kwargs.pop(k, False) for k in ("reindex", "progress"))
116 getargspec = getattr(inspect, "getfullargspec", getattr(inspect, "getargspec", None))
117 for n in set(kwargs) - set(getargspec(rosbag.Bag.__init__).args): kwargs.pop(n)
118
119 if common.is_stream(f):
120 if not common.verify_io(f, mode):
121 raise io.UnsupportedOperation({"r": "read", "w": "write", "a": "append"}[mode])
122 super(ROS1Bag, self).__init__(f, mode, *args, **kwargs)
123 self._populate_meta()
124 return
125
126 f = str(f)
127 if "a" == mode and (not os.path.exists(f) or not os.path.getsize(f)):
128 mode = "w" # rosbag raises error on append if no file or empty file
129 os.path.exists(f) and os.remove(f)
130
131 try:
132 super(ROS1Bag, self).__init__(f, mode, *args, **kwargs)
133 except rosbag.ROSBagUnindexedException:
134 if not reindex: raise
135 Bag.reindex_file(f, progress, *args, **kwargs)
136 super(ROS1Bag, self).__init__(f, mode, *args, **kwargs)
137 self._populate_meta()
138
139
140 def get_message_definition(self, msg_or_type):
141 """Returns ROS1 message type definition full text from bag, including subtype definitions."""
142 if is_ros_message(msg_or_type):
143 return self.__TYPEDEFS.get((msg_or_type._type, msg_or_type._md5sum))
144 typename = msg_or_type
145 return next((d for (n, h), d in self.__TYPEDEFS.items() if n == typename), None)
146
147
148 def get_message_class(self, typename, typehash=None):
149 """
150 Returns rospy message class for typename, or None if unknown type.
151
152 Generates class dynamically if not already generated.
153
154 @param typehash message type definition hash, if any
155 """
156 if (typename, typehash) not in self.__TYPES: self._ensure_typedef(typename, typehash)
157 typehash = typehash or next((h for n, h in self.__TYPEDEFS if n == typename), None)
158 typekey = (typename, typehash)
159 if typekey not in self.__TYPES and typekey in self.__TYPEDEFS:
160 for n, c in generate_message_classes(typename, self.__TYPEDEFS[typekey]).items():
161 self.__TYPES[(n, c._md5sum)] = c
162 return self.__TYPES.get(typekey)
163
164
165 def get_message_type_hash(self, msg_or_type):
166 """Returns ROS1 message type MD5 hash, or None if unknown type."""
167 if is_ros_message(msg_or_type): return msg_or_type._md5sum
168 typename = msg_or_type
169 typehash = next((h for n, h in self.__TYPEDEFS if n == typename), None)
170 if not typehash:
171 self._ensure_typedef(typename)
172 typehash = next((h for n, h in self.__TYPEDEFS if n == typename), None)
173 return typehash
174
175
176 def get_start_time(self):
177 """Returns the start time of the bag, as UNIX timestamp, or None if bag empty."""
178 try: return super(ROS1Bag, self).get_start_time()
179 except Exception: return None
180
181
182 def get_end_time(self):
183 """Returns the end time of the bag, as UNIX timestamp, or None if bag empty."""
184 try: return super(ROS1Bag, self).get_end_time()
185 except Exception: return None
186
187
188 def get_topic_info(self, *_, **__):
189 """Returns topic and message type metainfo as {(topic, typename, typehash): count}."""
190 return dict(self.__topics)
191
192
193 def read_messages(self, topics=None, start_time=None, end_time=None,
194 raw=False, connection_filter=None, **__):
195 """
196 Yields messages from the bag, optionally filtered by topic, timestamp and connection details.
197
198 @param topics list of topics or a single topic.
199 If an empty list is given, all topics will be read.
200 @param start_time earliest timestamp of messages to return,
201 as ROS time or convertible (int/float/duration/datetime/decimal)
202 @param end_time latest timestamp of messages to return,
203 as ROS time or convertible (int/float/duration/datetime/decimal)
204 @param connection_filter function to filter connections to include
205 @param raw if true, then returned messages are tuples of
206 (typename, bytes, typehash, typeclass)
207 or (typename, bytes, typehash, position, typeclass),
208 depending on file format
209 @return BagMessage namedtuples of
210 (topic, message, timestamp as rospy.Time)
211 """
212 if self.closedclosed: raise ValueError("I/O operation on closed file.")
213 if "w" == self._mode: raise io.UnsupportedOperation("read")
214
215 hashtypes = {}
216 for n, h in self.__TYPEDEFS: hashtypes.setdefault(h, []).append(n)
217 read_topics = topics if isinstance(topics, (dict, list, set, tuple)) else \
218 [topics] if topics else None
219 dupes = {t: (n, h) for t, n, h in self.__topics
220 if (read_topics is None or t in read_topics) and len(hashtypes.get(h, [])) > 1}
221
222 # Workaround for rosbag.Bag ignoring topic and time filters in format v1.2
223 if self.version != 102 or (not topics and start_time is None and end_time is None):
224 in_range = lambda *_: True
225 else: in_range = lambda t, s: ((not read_topics or t in read_topics) and
226 (start_time is None or s >= start_time) and
227 (end_time is None or s <= end_time))
228
229 start_time, end_time = map(to_time, (start_time, end_time))
230 kwargs = dict(topics=topics, start_time=start_time, end_time=end_time,
231 connection_filter=connection_filter, raw=raw)
232 if not dupes:
233 for topic, msg, stamp in super(ROS1Bag, self).read_messages(**kwargs):
234 if in_range(topic, stamp):
235 TypeMeta.make(msg, topic, self)
236 yield self.BagMessage(topic, msg, stamp)
237 if self.closedclosed: break # for topic
238 return
239
240 for topic, msg, stamp in super(ROS1Bag, self).read_messages(**kwargs):
241 if not in_range(topic, stamp): continue # for topic
242
243 # Workaround for rosbag bug of using wrong type for identical type hashes
244 if topic in dupes:
245 typename, typehash = (msg[0], msg[2]) if raw else (msg._type, msg._md5sum)
246 if dupes[topic] != (typename, typehash):
247 if raw:
248 msg = msg[:-1] + (self.get_message_classget_message_class(typename, typehash), )
249 else:
250 msg = self._convert_message(msg, *dupes[topic])
251 TypeMeta.make(msg, topic, self)
252 yield self.BagMessage(topic, msg, stamp)
253 if self.closedclosed: break # for topic
254
255
256 def write(self, topic, msg, t=None, raw=False, connection_header=None, **__):
257 """
258 Writes a message to the bag.
259
260 Populates connection header if topic already in bag but with a different message type.
261
262 @param topic name of topic
263 @param msg ROS1 message
264 @param t message timestamp if not using current wall time,
265 as ROS time or convertible (int/float/duration/datetime/decimal)
266 @param raw if true, `msg` is in raw format,
267 (typename, bytes, typehash, typeclass)
268 @param connection_header custom connection record for topic,
269 as {"topic", "type", "md5sum", "message_definition"}
270 """
271 if self.closedclosed: raise ValueError("I/O operation on closed file.")
272 if "r" == self._mode: raise io.UnsupportedOperation("write")
273 self._register_write(topic, msg, raw, connection_header)
274 return super(ROS1Bag, self).write(topic, msg, to_time(t), raw, connection_header)
275
276
277 def open(self):
278 """Opens the bag file if not already open."""
279 if not self._file:
280 self._open(self.filenamefilename, self.modemode, allow_unindexed=True)
281
282
283 def close(self):
284 """Closes the bag file."""
285 if self._file:
286 super(ROS1Bag, self).close()
287 self._iterer = None
288 self._clear_index()
289
290
291 def __contains__(self, key):
292 """Returns whether bag contains given topic."""
293 return any(key == t for t, _, _ in self.__topics)
294
295
296 def __next__(self):
297 """Retrieves next message from bag as (topic, message, timestamp)."""
298 if self.closedclosed: raise ValueError("I/O operation on closed file.")
299 if self.__iterer is None: self.__iterer = self.read_messagesread_messages()
300 return next(self.__iterer)
301
302
303 @property
304 def topics(self):
305 """Returns the list of topics in bag, in alphabetic order."""
306 return sorted((t for t, _, _ in self.__topics), key=str.lower)
307
308
309 @property
310 def closed(self):
311 """Returns whether file is closed."""
312 return not self._file
313
314
315 def _convert_message(self, msg, typename2, typehash2=None):
316 """Returns message converted to given type; fields must match."""
317 msg2 = self.get_message_classget_message_class(typename2, typehash2)()
318 fields2 = get_message_fields(msg2)
319 for fname, ftypename in get_message_fields(msg).items():
320 v1 = v2 = getattr(msg, fname)
321 if ftypename != fields2.get(fname, ftypename):
322 v2 = self._convert_message(v1, fields2[fname])
323 setattr(msg2, fname, v2)
324 return msg2
325
326
327 def _populate_meta(self):
328 """Populates topics and message type definitions and hashes."""
329 result = collections.Counter() # {(topic, typename, typehash): count}
330 counts = collections.Counter() # {connection ID: count}
331 for c in self._chunks:
332 for c_id, count in c.connection_counts.items():
333 counts[c_id] += count
334 for c in self._connections.values():
335 result[(c.topic, c.datatype, c.md5sum)] += counts[c.id]
336 self.__TYPEDEFS[(c.datatype, c.md5sum)] = c.msg_def
337 self.__topics = dict(result)
338
339
340 def _register_write(self, topic, msg, raw=False, connection_header=None):
341 """Registers message in local metadata, writes connection header if new type for topic."""
342 if raw: typename, typehash, typeclass = msg[0], msg[2], msg[3]
343 else: typename, typehash, typeclass = msg._type, msg._md5sum, type(msg)
344 topickey, typekey = (topic, typename, typehash), (typename, typehash)
345
346 if topickey not in self.__topics \
347 and any(topic == t and typekey != (n, h) for t, n, h in self.__topics):
348 # Write connection header if topic already present but with different type
349 if not connection_header:
350 connection_header = {"topic": topic, "type": typename, "md5sum": typehash,
351 "message_definition": typeclass._full_text}
352 conn_id = len(self._connections)
353 connection_info = rosbag.bag._ConnectionInfo(conn_id, topic, connection_header)
354 self._write_connection_record(connection_info, encrypt=False)
355 self._connections[conn_id] = self._topic_connections[topic] = connection_info
356
357 self.__TYPES.setdefault(typekey, typeclass)
358 self.__TYPEDEFS.setdefault(typekey, typeclass._full_text)
359 self.__topics[topickey] = self.__topics.get(topickey, 0) + 1
360
361
362 def _ensure_typedef(self, typename, typehash=None):
363 """Parses subtype definition from any full definition where available, if not loaded."""
364 typehash = typehash or next((h for n, h in self.__TYPEDEFS if n == typename), None)
365 typekey = (typename, typehash)
366 if typekey not in self.__TYPEDEFS:
367 for (roottype, roothash), rootdef in list(self.__TYPEDEFS.items()):
368 rootkey = (roottype, roothash)
369 if self.__PARSEDS.get(rootkey): continue # for (roottype, roothash)
370
371 subdefs = tuple(parse_definition_subtypes(rootdef).items()) # ((typename, typedef), )
372 subhashes = {n: calculate_definition_hash(n, d, subdefs) for n, d in subdefs}
373 self.__TYPEDEFS.update(((n, subhashes[n]), d) for n, d in subdefs)
374 self.__PARSEDS.update(((n, h), True) for n, h in subhashes.items())
375 self.__PARSEDS[rootkey] = True
376 if typekey in self.__TYPEDEFS:
377 break # for (roottype, roothash)
378 self.__TYPEDEFS.setdefault(typekey, "")
379
380
381 @staticmethod
382 def reindex_file(f, progress, *args, **kwargs):
383 """
384 Reindexes bag, making a backup copy in file directory.
385
386 @param progress show progress bar for reindexing status
387 """
388 KWS = ["mode", "compression", "chunk_threshold",
389 "allow_unindexed", "options", "skip_index"]
390 kwargs.update(zip(KWS, args), allow_unindexed=True)
391 copied, bar, f2 = False, None, None
392 if progress:
393 fmt = lambda s: common.format_bytes(s, strip=False)
394 name, size = os.path.basename(f), os.path.getsize(f)
395 aftertemplate = " Reindexing %s (%s): {afterword}" % (name, fmt(size))
396 bar = common.ProgressBar(size, interval=0.1, pulse=True, aftertemplate=aftertemplate)
397
398 ConsolePrinter.warn("Unindexed bag %s, reindexing.", f)
399 bar and bar.update(0).start() # Start progress pulse
400 try:
401 with rosbag.Bag(f, **kwargs) as inbag:
402 inplace = (inbag.version > 102)
403
404 f2 = "%s.orig%s" % os.path.splitext(f)
405 shutil.copy(f, f2)
406 copied = True
407
408 inbag, outbag = None, None
409 outkwargs = dict(kwargs, mode="a" if inplace else "w", allow_unindexed=True)
410 try:
411 inbag = rosbag.Bag(f2, **dict(kwargs, mode="r"))
412 outbag = rosbag.Bag(f, **outkwargs)
413 Bag._run_reindex(inbag, outbag, bar)
414 bar and bar.update(bar.max)
415 except BaseException: # Ensure steady state on KeyboardInterrupt/SystemExit
416 inbag and inbag.close()
417 outbag and outbag.close()
418 copied and shutil.move(f2, f)
419 raise
420 finally: bar and bar.update(bar.value, flush=True).stop()
421 inbag.close()
422 outbag.close()
423
424
425 @staticmethod
426 def _run_reindex(inbag, outbag, bar=None):
427 """Runs reindexing, showing optional progress bar."""
428 update_bar = noop = lambda s: None
429 indexbag, writebag = (inbag, outbag) if inbag.version == 102 else (outbag, None)
430 if bar:
431 fmt = lambda s: common.format_bytes(s, strip=False)
432 update_bar = lambda s: (setattr(bar, "afterword", fmt(s)),
433 setattr(bar, "pulse", False), bar.update(s).stop())
434 # v102: build index from inbag, write all messages to outbag.
435 # Other versions: re-build index in outbag file in-place.
436 progress = update_bar if not writebag else noop # Incremental progress during re-build
437 for offset in indexbag.reindex():
438 progress(offset)
439 if not writebag:
440 return
441
442 progress = update_bar if bar else noop # Incremental progress during re-write
443 for (topic, msg, t, header) in indexbag.read_messages(return_connection_header=True):
444 writebag.write(topic, msg, t, connection_header=header)
445 progress(indexbag._file.tell())
446Bag = ROS1Bag
447
448
449
450def init_node(name):
451 """
452 Initializes a ROS1 node if not already initialized.
453
454 Blocks until ROS master available.
455 """
456 if not validate(live=True):
457 return
458
459 global master
460 if not master:
461 master = rospy.client.get_master()
462 available = None
463 while not available:
464 try: uri = master.getUri()[-1]
465 except Exception:
466 if available is None:
467 ConsolePrinter.log(logging.ERROR,
468 "Unable to register with master. Will keep trying.")
469 available = False
470 time.sleep(SLEEP_INTERVAL)
471 else:
472 ConsolePrinter.debug("Connected to ROS master at %s.", uri)
473 available = True
474 try: rospy.get_rostime()
475 except Exception: # Init node only if not already inited
476 rospy.init_node(name, anonymous=True, disable_signals=True)
477
478
480 """Shuts down live ROS1 node."""
481 global master
482 if master:
483 master = None
484 rospy.signal_shutdown("Close grepros")
485
486
487def validate(live=False):
488 """
489 Returns whether ROS1 environment is set, prints error if not.
490
491 @param live whether environment must support launching a ROS node
492 """
493 VARS = ["ROS_MASTER_URI", "ROS_ROOT", "ROS_VERSION"] if live else ["ROS_VERSION"]
494 missing = [k for k in VARS if not os.getenv(k)]
495 if missing:
496 ConsolePrinter.error("ROS environment not sourced: missing %s.",
497 ", ".join(sorted(missing)))
498 if "1" != os.getenv("ROS_VERSION", "1"):
499 ConsolePrinter.error("ROS environment not supported: need ROS_VERSION=1.")
500 missing = True
501 return not missing
502
503
504@memoize
505def canonical(typename, unbounded=False):
506 """
507 Returns "pkg/Type" for "pkg/subdir/Type".
508
509 @param unbounded drop array bounds, e.g. returning "uint8[]" for "uint8[10]"
510 """
511 if typename and typename.count("/") > 1:
512 typename = "%s/%s" % tuple((x[0], x[-1]) for x in [typename.split("/")])[0]
513 if unbounded and typename and "[" in typename:
514 typename = typename[:typename.index("[")] + "[]"
515 return typename
516
517
518def create_publisher(topic, cls_or_typename, queue_size):
519 """Returns a rospy.Publisher."""
520 def pub_unregister():
521 # ROS1 prints errors when closing a publisher with subscribers
522 if not pub.get_num_connections(): super(rospy.Publisher, pub).unregister()
523
524 cls = cls_or_typename
525 if isinstance(cls, common.TEXT_TYPES): cls = get_message_class(cls)
526 pub = rospy.Publisher(topic, cls, queue_size=queue_size)
527 pub.unregister = pub_unregister
528 return pub
529
530
531def create_subscriber(topic, typename, handler, queue_size):
532 """
533 Returns a rospy.Subscriber.
534
535 Local message packages are not required. Subscribes as AnyMsg,
536 creates message class dynamically from connection info,
537 and deserializes message before providing to handler.
538
539 Supplemented with .get_message_class(), .get_message_definition(),
540 .get_message_type_hash(), and .get_qoses().
541
542 The supplementary .get_message_xyz() methods should only be invoked after at least one message
543 has been received from the topic, as they get populated from live connection metadata.
544 """
545 def myhandler(msg):
546 if msg._connection_header["type"] != typename:
547 return
548 typekey = (typename, msg._connection_header["md5sum"])
549 if typekey not in TYPECLASSES:
550 typedef = msg._connection_header["message_definition"]
551 for name, cls in generate_message_classes(typename, typedef).items():
552 TYPECLASSES.setdefault((name, cls._md5sum), cls)
553 if isinstance(msg, rospy.AnyMsg): # /clock can yield already deserialized messages
554 try: msg = TYPECLASSES[typekey]().deserialize(msg._buff)
555 except Exception as e:
556 ConsolePrinter.error("Error deserializing message in topic %r (%s): %s",
557 topic, typename, e)
558 return
559 handler(msg)
560
561 sub = rospy.Subscriber(topic, rospy.AnyMsg, myhandler, queue_size=queue_size)
562 sub.get_message_class = lambda: next(c for (n, h), c in TYPECLASSES.items()
563 if n == typename)
564 sub.get_message_definition = lambda: next(get_message_definition(c)
565 for (n, h), c in TYPECLASSES.items() if n == typename)
566 sub.get_message_type_hash = lambda: next(h for n, h in TYPECLASSES if n == typename)
567 sub.get_qoses = lambda: None
568 return sub
569
570
571def format_message_value(msg, name, value):
572 """
573 Returns a message attribute value as string.
574
575 Result is at least 10 chars wide if message is a ROS time/duration
576 (aligning seconds and nanoseconds).
577 """
578 LENS = {"secs": 10, "nsecs": 9}
579 v = "%s" % (value, )
580 if not isinstance(msg, genpy.TVal) or name not in LENS:
581 return v
582
583 EXTRA = sum(v.count(x) * len(x) for x in (common.MatchMarkers.START, common.MatchMarkers.END))
584 return ("%%%ds" % (LENS[name] + EXTRA)) % v # Default %10s/%9s for secs/nsecs
585
586
587@memoize
588def generate_message_classes(typename, typedef):
589 """
590 Generates ROS message classes dynamically from given name and definition.
591
592 Modifies `sys.path` to include the generated Python module.
593
594 @param typename message type name like "std_msgs/String"
595 @param typedef message type definition full text, including all subtype definitions
596 @return dictionary of {typename: typeclass} for message type and all subtypes
597 """
598 with genpy_mtx: return genpy.dynamic.generate_dynamic(typename, typedef)
599
600
601@memoize
602def get_message_class(typename):
603 """Returns ROS1 message class."""
604 if typename in ROS_TIME_TYPES:
605 return next(k for k, v in ROS_TIME_CLASSES.items() if v == typename)
606 if typename in ("genpy/Time", "rospy/Time"):
607 return rospy.Time
608 if typename in ("genpy/Duration", "rospy/Duration"):
609 return rospy.Duration
610 try: return roslib.message.get_message_class(typename)
611 except Exception: return None
612
613
614def get_message_definition(msg_or_type):
615 """
616 Returns ROS1 message type definition full text, including subtype definitions.
617
618 Returns None if unknown type.
619 """
620 msg_or_cls = msg_or_type if is_ros_message(msg_or_type) else get_message_class(msg_or_type)
621 return getattr(msg_or_cls, "_full_text", None)
622
623
624def get_message_type_hash(msg_or_type):
625 """Returns ROS message type MD5 hash, or "" if unknown type."""
626 msg_or_cls = msg_or_type if is_ros_message(msg_or_type) else get_message_class(msg_or_type)
627 return getattr(msg_or_cls, "_md5sum", "")
628
629
631 """
632 Returns OrderedDict({field name: field type name}) if ROS1 message, else {}.
633
634 @param val ROS1 message class or instance
635 """
636 names, types = (getattr(val, k, []) for k in ("__slots__", "_slot_types"))
637 if isinstance(val, genpy.TVal): names = genpy.TVal.__slots__ # Empty slots on rospy time classes
638 # Bug in genpy: class slot types defined as "int32", but everywhere else types use "uint32"
639 if isinstance(val, genpy.Time): types = ["uint32", "uint32"]
640 return collections.OrderedDict(zip(names, types))
641
642
643def get_message_type(msg_or_cls):
644 """Returns ROS1 message type name, like "std_msgs/Header"."""
645 if is_ros_time(msg_or_cls):
646 cls = msg_or_cls if inspect.isclass(msg_or_cls) else type(msg_or_cls)
647 return "duration" if "duration" in cls.__name__.lower() else "time"
648 return msg_or_cls._type
649
650
651def get_message_value(msg, name, typename=None, default=Ellipsis):
652 """
653 Returns object attribute value, with numeric arrays converted to lists.
654
655 @param name message attribute name
656 @param typename value ROS type name, for identifying byte arrays
657 @param default value to return if attribute does not exist; raises exception otherwise
658 """
659 if default is not Ellipsis and not hasattr(msg, name): return default
660 v = getattr(msg, name)
661 listifiable = typename and typename.startswith(("uint8[", "char[")) and isinstance(v, bytes)
662 if six.PY2 and listifiable: # Ignore already highlighted values from Scanner
663 listifiable = v[:1] != "[" or v[-1:] != "]" or common.MatchMarkers.START not in v
664 return list(bytearray(v)) if listifiable else v
665 return list(v) if listifiable or isinstance(v, tuple) else v
666
667
668def get_rostime(fallback=False):
669 """
670 Returns current ROS1 time, as rospy.Time.
671
672 @param fallback use wall time if node not initialized
673 """
674 try: return rospy.get_rostime()
675 except Exception:
676 if fallback: return make_time(time.time())
677 raise
678
679
681 """
682 Returns currently available ROS1 topics, as [(topicname, typename)].
683
684 Omits topics that the current ROS1 node itself has published.
685 """
686 result, myname = [], rospy.get_name()
687 pubs, _, _ = master.getSystemState()[-1]
688 mypubs = [t for t, nn in pubs if myname in nn and t not in ("/rosout", "/rosout_agg")]
689 for topic, typename in master.getTopicTypes()[-1]:
690 if topic not in mypubs:
691 result.append((topic, typename))
692 return result
693
694
695def is_ros_message(val, ignore_time=False):
696 """
697 Returns whether value is a ROS1 message or special like ROS1 time/duration class or instance.
698
699 @param ignore_time whether to ignore ROS1 time/duration types
700 """
701 isfunc = issubclass if inspect.isclass(val) else isinstance
702 return isfunc(val, genpy.Message if ignore_time else (genpy.Message, genpy.TVal))
703
704
705def is_ros_time(val):
706 """Returns whether value is a ROS1 time/duration class or instance."""
707 return issubclass(val, genpy.TVal) if inspect.isclass(val) else isinstance(val, genpy.TVal)
708
709
710def make_duration(secs=0, nsecs=0):
711 """Returns a ROS1 duration, as rospy.Duration."""
712 return rospy.Duration(secs=secs, nsecs=nsecs)
713
714
715def make_time(secs=0, nsecs=0):
716 """Returns a ROS1 time, as rospy.Time."""
717 return rospy.Time(secs=secs, nsecs=nsecs)
718
719
721 """Returns ROS1 message as a serialized binary."""
722 with TypeMeta.make(msg) as m:
723 if m.data is not None: return m.data
724 buf = io.BytesIO()
725 msg.serialize(buf)
726 return buf.getvalue()
727
728
729def deserialize_message(raw, cls_or_typename):
730 """Returns ROS1 message or service request/response instantiated from serialized binary."""
731 cls = cls_or_typename
732 if isinstance(cls, common.TEXT_TYPES): cls = get_message_class(cls)
733 return cls().deserialize(raw)
734
735
736@memoize
737def scalar(typename):
738 """
739 Returns scalar type from ROS message data type, like "uint8" from "uint8[100]".
740
741 Returns type unchanged if already a scalar.
742 """
743 return typename[:typename.index("[")] if typename and "[" in typename else typename
744
745
746def set_message_value(obj, name, value):
747 """Sets message or object attribute value."""
748 setattr(obj, name, value)
749
750
751def to_duration(val):
752 """Returns value as ROS1 duration if convertible (int/float/time/datetime/decimal), else value."""
753 result = val
754 if isinstance(val, decimal.Decimal):
755 result = rospy.Duration(int(val), float(val % 1) * 10**9)
756 elif isinstance(val, datetime.datetime):
757 result = rospy.Duration(int(val.timestamp()), 1000 * val.microsecond)
758 elif isinstance(val, (float, int)):
759 result = rospy.Duration(val)
760 elif isinstance(val, rospy.Time):
761 result = rospy.Duration(val.secs, val.nsecs)
762 return result
763
764
765def to_nsec(val):
766 """Returns value in nanoseconds if value is ROS time/duration, else value."""
767 return val.to_nsec() if isinstance(val, genpy.TVal) else val
768
769
770def to_sec(val):
771 """Returns value in seconds if value is ROS1 time/duration, else value."""
772 return val.to_sec() if isinstance(val, genpy.TVal) else val
773
774
775def to_sec_nsec(val):
776 """Returns value as (seconds, nanoseconds) if value is ROS1 time/duration, else value."""
777 return (val.secs, val.nsecs) if isinstance(val, genpy.TVal) else val
778
779
780def to_time(val):
781 """Returns value as ROS1 time if convertible (int/float/duration/datetime/decimal), else value."""
782 result = val
783 if isinstance(val, decimal.Decimal):
784 result = rospy.Time(int(val), float(val % 1) * 10**9)
785 elif isinstance(val, datetime.datetime):
786 result = rospy.Time(int(val.timestamp()), 1000 * val.microsecond)
787 elif isinstance(val, six.integer_types + (float, )):
788 result = rospy.Time(val)
789 elif isinstance(val, genpy.Duration):
790 result = rospy.Time(val.secs, val.nsecs)
791 return result
792
793
794__all__ = [
795 "BAG_EXTENSIONS", "ROS_ALIAS_TYPES", "ROS_TIME_CLASSES", "ROS_TIME_TYPES", "SKIP_EXTENSIONS",
796 "SLEEP_INTERVAL", "TYPECLASSES", "Bag", "ROS1Bag", "master",
797 "canonical", "create_publisher", "create_subscriber", "deserialize_message",
798 "format_message_value", "generate_message_classes", "get_message_class",
799 "get_message_definition", "get_message_fields", "get_message_type", "get_message_type_hash",
800 "get_message_value", "get_rostime", "get_topic_types", "init_node", "is_ros_message",
801 "is_ros_time", "make_duration", "make_time", "scalar", "serialize_message", "set_message_value",
802 "shutdown_node", "to_duration", "to_nsec", "to_sec", "to_sec_nsec", "to_time", "validate",
803]
tuple MODES
Supported opening modes, overridden in subclasses.
Definition api.py:110
BagMessage
Returned from read_messages() as (topic name, ROS message, ROS timestamp object).
Definition api.py:99
get_message_class(self, typename, typehash=None)
Definition api.py:255
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, **__)
Definition api.py:271
__init__(self, *args, **kwargs)
Definition ros1.py:99
get_start_time(self)
Definition ros1.py:176
get_message_class(self, typename, typehash=None)
Definition ros1.py:148
get_message_type_hash(self, msg_or_type)
Definition ros1.py:165
get_topic_info(self, *_, **__)
Definition ros1.py:188
reindex_file(f, progress, *args, **kwargs)
Definition ros1.py:382
__contains__(self, key)
Definition ros1.py:291
get_message_definition(self, msg_or_type)
Definition ros1.py:140
read_messages(self, topics=None, start_time=None, end_time=None, raw=False, connection_filter=None, **__)
Definition ros1.py:194
write(self, topic, msg, t=None, raw=False, connection_header=None, **__)
Definition ros1.py:256
canonical(typename, unbounded=False)
Definition ros1.py:505
create_publisher(topic, cls_or_typename, queue_size)
Definition ros1.py:518
serialize_message(msg)
Definition ros1.py:720
to_sec(val)
Definition ros1.py:770
get_topic_types()
Definition ros1.py:680
format_message_value(msg, name, value)
Definition ros1.py:571
get_message_value(msg, name, typename=None, default=Ellipsis)
Definition ros1.py:651
get_message_class(typename)
Definition ros1.py:602
to_duration(val)
Definition ros1.py:751
make_time(secs=0, nsecs=0)
Definition ros1.py:715
deserialize_message(raw, cls_or_typename)
Definition ros1.py:729
get_message_type(msg_or_cls)
Definition ros1.py:643
get_message_definition(msg_or_type)
Definition ros1.py:614
validate(live=False)
Definition ros1.py:487
scalar(typename)
Definition ros1.py:737
make_duration(secs=0, nsecs=0)
Definition ros1.py:710
create_subscriber(topic, typename, handler, queue_size)
Definition ros1.py:531
get_rostime(fallback=False)
Definition ros1.py:668
to_nsec(val)
Definition ros1.py:765
to_time(val)
Definition ros1.py:780
shutdown_node()
Definition ros1.py:479
set_message_value(obj, name, value)
Definition ros1.py:746
is_ros_time(val)
Definition ros1.py:705
get_message_type_hash(msg_or_type)
Definition ros1.py:624
to_sec_nsec(val)
Definition ros1.py:775
init_node(name)
Definition ros1.py:450
is_ros_message(val, ignore_time=False)
Definition ros1.py:695
get_message_fields(val)
Definition ros1.py:630
generate_message_classes(typename, typedef)
Definition ros1.py:588