grepros 1.2.1
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
grepros

Overview

grep for ROS bag files and live topics: read, filter, export.

Written as a command-line tool, also provides a comprehensive API to use as a library.

Supports both ROS1 and ROS2. ROS environment variables need to be set, at least ROS_VERSION.

Supports loading custom plugins, mainly for additional output formats.

Main class index

grepros.Bag generic ROS bag interface
grepros.Scanner ROS message grepper
Sources
grepros.AppSource produces messages from iterable or pushed data
grepros.BagSource produces messages from ROS bagfiles
grepros.LiveSource produces messages from live ROS topics
Sinks
grepros.AppSink provides messages to callback function
grepros.BagSink writes messages to bagfile
grepros.ConsoleSink prints messages to console
grepros.CsvSink writes messages to CSV files, each topic separately
grepros.HtmlSink writes messages to an HTML file
grepros.LiveSink publishes messages to live ROS topics
grepros.McapSink writes messages to an MCAP bag file
grepros.MultiSink combines any number of sinks
grepros.ParquetSink writes messages to Apache Parquet files
grepros.PostgresSink writes messages to a Postgres database
grepros.SqliteSink writes messages to an SQLite database

Convenience entrypoints

grepros.grep(..) yields matching messages from specified source
grepros.source(..) returns a Source instance
grepros.sink(..) returns a Sink instance

Command-line scripts

generate_msgs Test script, generating and publishing random ROS messages
grepros Main command-line tool

grepros.api: a simple unified interface for working with ROS1/ROS2 types and messages.

Example usage

Convenience entrypoints

import grepros
grepros.init()
# Print first message from each bag under path:
for topic, msg, stamp, *_ in grepros.grep(path="my/path", max_count=1):
print(topic, stamp, msg)
# Write one message from each live topic to an HTML file:
with grepros.source(live=True, max_per_topic=1) as source, \
grepros.sink("my.html") as sink:
for topic, msg, stamp in source: sink.emit(topic, msg, stamp)

Working with bags

import grepros
grepros.init()
# Read and write bags:
with grepros.Bag("my.bag") as inbag, grepros.Bag("my.mcap", mode="w") as outbag:
for topic, msg, stamp in inbag:
outbag.write(topic, msg, stamp) # Convert ROS1 bag to MCAP
# Find messages in bag:
bag = grepros.Bag("my.bag")
scan = grepros.Scanner(topic="/diagnostics", pattern="temperature")
for topic, msg, stamp, match in scan.find(bag, highlight=True):
print("MATCH: ", topic, stamp, match)
# Write live topics to bag, no more than once a minute per topic:
with grepros.Bag("my.bag", "w") as bag:
for topic, msg, stamp, match, index in grepros.grep(live=True, nth_interval=60):
bag.write(topic, msg, stamp)
# Find messages +- 2 minutes around first pointcloud message in bag:
with grepros.Bag("my.bag") as bag:
_, _, stamp, *_ = next(grepros.grep(bag, type="*/pointcloud*"))
delta = grepros.api.make_duration(secs=120)
args = dict(start_time=stamp - delta, end_time=stamp + delta)
for topic, msg, stamp, *_ in grepros.grep(bag, **args):
print("%s [%s] %s" % (topic, stamp, msg))
# Bag API conveniences:
with grepros.Bag("my.bag") as bag:
print("Messages in bag: ", len(bag))
if "/my/topic" in bag:
print("/my/topic messages in bag: ", len(bag["/my/topic"]))
for topic in bag.topics:
print("Topic: ", topic)
for topic, msg, stamp in bag[topic]:
print(msg)
make_duration(secs=0, nsecs=0)
Returns a ROS duration.
Definition api.py:977

Sources and sinks

import grepros
grepros.init()
# Write all bags in directory to Postgres database:
with grepros.PostgresSink("username=postgres dbname=postgres") as sink:
for data in grepros.BagSource(path="/tmp/bags"):
sink.emit(*data)
# Grep live topics:
for topic, msg, stamp, match, index in grepros.LiveSource(topic="/diagnostics", pattern="cpu"):
print("MESSAGE #%s MATCH: " % index, match)
# Subscribe to live topics and write to bag:
with grepros.LiveSource(topic="/rosout") as source, \
grepros.Bag("my.bag", "w") as bag:
for topic, msg, stamp, *_ in grepros.Scanner(pattern="error").find(source)
bag.write(topic, msg, stamp)
# Write all pointclouds from bags in directory to SQLite database:
with grepros.BagSource(path="/tmp/bags") as source, \
grepros.SqliteSink("my.sqlite") as sink:
total = grepros.Scanner(type="*/pointcloud*").work(source, sink)
print("Messages written: %s" % total)