grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
ros2.py File Reference

Go to the source code of this file.

Classes

class  ROS2Bag
 ROS2 bag reader and writer (SQLite format), providing most of rosbag.Bag interface. More...
 

Namespaces

namespace  grepros
 
namespace  grepros.ros2
 ROS2 interface.
 

Functions

 canonical (typename, unbounded=False)
 Returns "pkg/Type" for "pkg/msg/Type", standardizes various ROS2 formats.
 
 create_publisher (topic, cls_or_typename, queue_size)
 Returns an rclpy.Publisher instance, with .get_num_connections() and .unregister().
 
 create_subscriber (topic, cls_or_typename, handler, queue_size)
 Returns an rclpy.Subscription.
 
 deserialize_message (raw, cls_or_typename)
 Returns ROS2 message or service request/response instantiated from serialized binary.
 
 format_message_value (msg, name, value)
 Returns a message attribute value as string.
 
 get_message_class (typename)
 Returns ROS2 message class, or None if unknown type.
 
 get_message_definition (msg_or_type)
 Returns ROS2 message type definition full text, including subtype definitions.
 
 get_message_definition_idl (typename)
 Returns ROS2 message type definition parsed from IDL file.
 
 get_message_fields (val)
 Returns OrderedDict({field name: field type name}) if ROS2 message, else {}.
 
 get_message_type (msg_or_cls)
 Returns ROS2 message type name, like "std_msgs/Header".
 
 get_message_type_hash (msg_or_type)
 Returns ROS2 message type MD5 hash, or "" if unknown type.
 
 get_message_value (msg, name, typename=None, default=Ellipsis)
 Returns object attribute value, with numeric arrays converted to lists.
 
 get_rostime (fallback=False)
 Returns current ROS2 time, as rclpy.time.Time.
 
 get_topic_types ()
 Returns currently available ROS2 topics, as [(topicname, typename)].
 
 init_node (name)
 Initializes a ROS2 node if not already initialized.
 
 is_ros_message (val, ignore_time=False)
 Returns whether value is a ROS2 message or special like ROS2 time/duration class or instance.
 
 is_ros_time (val)
 Returns whether value is a ROS2 time/duration class or instance.
 
 make_duration (secs=0, nsecs=0)
 Returns an rclpy.duration.Duration.
 
 make_full_typename (typename)
 Returns "pkg/msg/Type" for "pkg/Type".
 
 make_subscriber_qos (topic, typename, queue_size=10)
 Returns rclpy.qos.QoSProfile that matches the most permissive publisher.
 
 make_time (secs=0, nsecs=0)
 Returns a ROS2 time, as rclpy.time.Time.
 
 qos_to_dict (qos)
 Returns rclpy.qos.QoSProfile as dictionary.
 
 scalar (typename)
 Returns unbounded scalar type from ROS2 message data type.
 
 serialize_message (msg)
 Returns ROS2 message as a serialized binary.
 
 set_message_value (obj, name, value)
 Sets message or object attribute value.
 
 shutdown_node ()
 Shuts down live ROS2 node.
 
 time_message (val, to_message=True, clock_type=None)
 Converts ROS2 time/duration between rclpy and builtin_interfaces objects.
 
 to_duration (val)
 Returns value as ROS2 duration if convertible (int/float/time/datetime/decimal), else value.
 
 to_nsec (val)
 Returns value in nanoseconds if value is ROS2 time/duration, else value.
 
 to_sec (val)
 Returns value in seconds if value is ROS2 time/duration, else value.
 
 to_sec_nsec (val)
 Returns value as (seconds, nanoseconds) if value is ROS2 time/duration, else value.
 
 to_time (val)
 Returns value as ROS2 time if convertible, else value.
 
 validate (live=False)
 Returns whether ROS2 environment is set, prints error if not.
 

Variables

 Bag = ROS2Bag
 
tuple BAG_EXTENSIONS = (".db3", )
 Bagfile extensions to seek.
 
 context = None
 rclpy.context.Context instance
 
dict DDS_TYPES
 Data Distribution Service types to ROS builtins.
 
 executor = None
 rclpy.executors.Executor instance
 
 node = None
 rclpy.node.Node instance
 
dict ROS_ALIAS_TYPES = {"byte": "uint8", "char": "int8"}
 Mapping between type aliases and real types, like {"byte": "uint8"}.
 
dict ROS_TIME_CLASSES
 ROS2 time/duration types and message types mapped to type names.
 
dict ROS_TIME_MESSAGES
 ROS2 time/duration types mapped to message types.
 
list ROS_TIME_TYPES = ["builtin_interfaces/Time", "builtin_interfaces/Duration"]
 ROS2 time/duration message types.
 
tuple SKIP_EXTENSIONS = ()
 Bagfile extensions to skip.