grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
library.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3grepros library interface.
4
5Source classes:
6
7- {@link grepros.inputs.AppSource AppSource}: produces messages from iterable or pushed data
8- {@link grepros.inputs.BagSource BagSource}: produces messages from ROS bagfiles
9- {@link grepros.inputs.LiveSource LiveSource}: produces messages from live ROS topics
10
11Sink classes:
12
13- {@link grepros.outputs.AppSink AppSink}: provides messages to callback function
14- {@link grepros.outputs.BagSink BagSink}: writes messages to bagfile
15- {@link grepros.outputs.ConsoleSink ConsoleSink}: prints messages to console
16- {@link grepros.plugins.auto.csv.CsvSink CsvSink}: writes messages to CSV files, each topic to a separate file
17- {@link grepros.plugins.auto.html.HtmlSink HtmlSink}: writes messages to an HTML file
18- {@link grepros.outputs.LiveSink LiveSink}: publishes messages to ROS topics
19- {@link grepros.plugins.mcap.McapSink McapSink}: writes messages to MCAP file
20- {@link grepros.outputs.MultiSink MultiSink}: combines any number of sinks
21- {@link grepros.plugins.parquet.ParquetSink ParquetSink}: writes messages to Apache Parquet files
22- {@link grepros.plugins.auto.postgres.PostgresSink PostgresSink}: writes messages to a Postgres database
23- {@link grepros.plugins.auto.sqlite.SqliteSink SqliteSink}: writes messages to an SQLite database
24- {@link grepros.plugins.sql.SqlSink SqlSink}: writes SQL schema file for message type tables and topic views
25
26{@link grepros.api.BaseBag Bag}: generic bag interface.
27{@link grepros.search.Scanner Scanner}: ROS message grepper.
28
29Format-specific bag classes:
30
31- {@link grepros.ros1.ROS1Bag ROS1Bag}: ROS1 bag reader and writer in .bag format
32- {@link grepros.ros2.ROS2Bag ROS2Bag}: ROS2 bag reader and writer in .db3 SQLite format
33- {@link grepros.plugins.embag.EmbagReader EmbagReader}: ROS1 bag reader
34 using the <a href="https://github.com/embarktrucks/embag">embag</a> library
35- {@link grepros.plugins.mcap.McapBag McapBag}: ROS1/ROS2 bag reader and writer in MCAP format
36
37Output sink `write_options` arguments can be given with underscores
38instead of dashes, e.g. `"rollover_size"` instead of `"rollover-size"`.
39
40------------------------------------------------------------------------------
41This file is part of grepros - grep for ROS bag files and live topics.
42Released under the BSD License.
43
44@author Erki Suurjaak
45@created 09.12.2022
46@modified 22.03.2024
47------------------------------------------------------------------------------
48"""
49## @namespace grepros.library
50from . plugins.auto.csv import CsvSink
51from . plugins.auto.html import HtmlSink
52from . plugins.auto.postgres import PostgresSink
53from . plugins.auto.sqlite import SqliteSink
54from . plugins.mcap import McapBag, McapSink
55from . plugins.parquet import ParquetSink
56from . plugins.sql import SqlSink
57
58from . api import Bag
59from . inputs import AppSource, BagSource, LiveSource, Source
60from . outputs import AppSink, BagSink, ConsoleSink, LiveSink, MultiSink, Sink
61from . search import BooleanResult, ExpressionTree, Scanner
62from . import api
63from . import common
64from . import plugins
65
66
67_inited = False
68
69
70def grep(args=None, **kwargs):
71 """
72 Yields matching messages from specified source.
73
74 Initializes grepros if not already initialized.
75
76 Read from bagfiles: `grep(file="2022-10-*.bag", pattern="cpu")`.
77
78 Read from live topics: `grep(live=True, pattern="cpu")`.
79
80
81 @param args arguments as namespace or dictionary, case-insensitive;
82 or a single path as the ROS bagfile to read,
83 or one or more {@link grepros.api.Bag Bag} instances,
84 or a {@link grepros.inputs.Source Source} instance
85 @param kwargs any and all arguments as keyword overrides, case-insensitive
86 <!--sep-->
87
88 Bag source:
89 @param args.file names of ROS bagfiles to read if not all in directory
90 @param args.path paths to scan if not current directory
91 @param args.recurse recurse into subdirectories when looking for bagfiles
92 @param args.decompress decompress archived bags to file directory
93 @param args.reindex make a copy of unindexed bags and reindex them (ROS1 only)
94 @param args.orderby "topic" or "type" if any to group results by
95 @param args.timescale emit messages on original timeline from first message
96 at given rate, 0 disables
97 @param args.timescale_emission start timeline from first matched message not first in bag
98 @param args.bag one or more {@link grepros.api.Bag Bag} instances
99 <!--sep-->
100
101 Live source:
102 @param args.live whether reading messages from live ROS topics
103 @param args.queue_size_in subscriber queue size (default 10)
104 @param args.ros_time_in stamp messages with ROS time instead of wall time
105 <!--sep-->
106
107 App source:
108 @param args.app whether reading messages from iterable or pushed data;
109 may contain the iterable itself
110 @param args.iterable iterable yielding (topic, msg, stamp) or (topic, msg);
111 yielding `None` signals end of content
112 Any source:
113 @param args.topic ROS topics to read if not all
114 @param args.type ROS message types to read if not all
115 @param args.skip_topic ROS topics to skip
116 @param args.skip_type ROS message types to skip
117 @param args.start_time earliest timestamp of messages to read
118 @param args.end_time latest timestamp of messages to read
119 @param args.start_index message index within topic to start from
120 @param args.end_index message index within topic to stop at
121
122 @param args.nth_message read every Nth message in topic, starting from first
123 @param args.nth_interval minimum time interval between messages in topic,
124 as seconds or ROS duration
125
126 @param args.select_field message fields to use in matching if not all
127 @param args.noselect_field message fields to skip in matching
128
129 @param args.unique emit messages that are unique in topic
130 (select_field and noselect_field apply if specified)
131 @param args.condition Python expressions that must evaluate as true
132 for message to be processable, see ConditionMixin
133 <!--sep-->
134
135 Search&zwj;:
136 @param args.pattern pattern(s) to find in message field values
137 @param args.fixed_string pattern contains ordinary strings, not regular expressions
138 @param args.case use case-sensitive matching in pattern
139 @param args.invert select messages not matching pattern
140 @param args.expression pattern(s) are a logical expression
141 like 'this AND (this2 OR NOT "skip this")',
142 with elements as patterns to find in message fields
143
144 @param args.nth_match emit every Nth match in topic, starting from first
145 @param args.max_count number of matched messages to emit (per file if bag input)
146 @param args.max_per_topic number of matched messages to emit from each topic
147 @param args.max_topics number of topics to print matches from
148
149 @param args.before number of messages of leading context to emit before match
150 @param args.after number of messages of trailing context to emit after match
151 @param args.context number of messages of leading and trailing context
152 to emit around match
153
154 @param args.highlight highlight matched values
155 @param args.match_wrapper string to wrap around matched values,
156 both sides if one value, start and end if more than one,
157 or no wrapping if zero values
158
159 @return {@link grepros.Scanner.GrepMessage GrepMessage} namedtuples
160 of (topic, message, timestamp, match, index)
161 """
162 DEFAULT_ARGS = dict(FILE=[], LIVE=False, APP=False, ITERABLE=None,
163 COLOR="never", HIGHLIGHT=False)
164
165 args0 = args
166 is_bag = isinstance(args, Bag) or \
167 common.is_iterable(args) and all(isinstance(x, Bag) for x in args)
168 args = {"FILE": str(args)} if isinstance(args, common.PATH_TYPES) else \
169 {} if is_bag or isinstance(args, Source) else args
170 args = common.ArgumentUtil.validate(common.ensure_namespace(args, DEFAULT_ARGS, **kwargs))
171 if not _inited: init(args)
172
173 if common.is_iterable(args.APP) and not common.is_iterable(args.ITERABLE):
174 args.APP, args.ITERABLE = True, args.APP
175 src = args0 if isinstance(args0, Source) else \
176 LiveSource(args) if args.LIVE else \
177 AppSource(args) if args.APP else \
178 BagSource(args0, **vars(args)) if is_bag else BagSource(args)
179 if args and isinstance(args0, Source): src.configure(**kwargs)
180 src.validate()
181
182 try:
183 for x in Scanner(args).find(src): yield x
184 finally:
185 if not isinstance(args0, (Bag, Source)): src.close()
186
187
188def source(args=None, **kwargs):
189 """
190 Convenience for creating a {@link grepros.inputs.Source Source} instance from arguments.
191
192 Initializes grepros if not already initialized.
193
194 @param args arguments as namespace or dictionary, case-insensitive;
195 or a single path as the ROS bagfile to read
196 @param kwargs any and all arguments as keyword overrides, case-insensitive
197 @param args.file one or more names of ROS bagfiles to read from
198 @param args.live read messages from live ROS topics instead
199 @param args.app read messages from iterable or pushed data instead;
200 may contain the iterable itself
201 <!--sep-->
202
203 Bag source:
204 @param args.file names of ROS bagfiles to read if not all in directory
205 @param args.path paths to scan if not current directory
206 @param args.recurse recurse into subdirectories when looking for bagfiles
207 @param args.orderby "topic" or "type" if any to group results by
208 @param args.decompress decompress archived bags to file directory
209 @param args.reindex make a copy of unindexed bags and reindex them (ROS1 only)
210 @param args.timescale emit messages on original timeline from first message
211 at given rate, 0 disables
212 @param args.timescale_emission start timeline from first matched message not first in bag
213 @param args.progress whether to print progress bar
214 <!--sep-->
215
216 Live source:
217 @param args.queue_size_in subscriber queue size (default 10)
218 @param args.ros_time_in stamp messages with ROS time instead of wall time
219 @param args.progress whether to print progress bar
220 <!--sep-->
221
222 App source:
223 @param args.iterable iterable yielding (topic, msg, stamp) or (topic, msg);
224 yielding `None` signals end of content
225 <!--sep-->
226
227 Any source:
228 @param args.topic ROS topics to read if not all
229 @param args.type ROS message types to read if not all
230 @param args.skip_topic ROS topics to skip
231 @param args.skip_type ROS message types to skip
232 @param args.start_time earliest timestamp of messages to read
233 @param args.end_time latest timestamp of messages to read
234 @param args.start_index message index within topic to start from
235 @param args.end_index message index within topic to stop at
236 @param args.unique emit messages that are unique in topic
237 @param args.select_field message fields to use for uniqueness if not all
238 @param args.noselect_field message fields to skip for uniqueness
239 @param args.nth_message read every Nth message in topic, starting from first
240 @param args.nth_interval minimum time interval between messages in topic,
241 as seconds or ROS duration
242 @param args.condition Python expressions that must evaluate as true
243 for message to be processable, see ConditionMixin
244 """
245 DEFAULT_ARGS = dict(FILE=[], LIVE=False, APP=False, ITERABLE=None)
246 args = {"FILE": str(args)} if isinstance(args, common.PATH_TYPES) else args
247 args = common.ensure_namespace(args, DEFAULT_ARGS, **kwargs)
248 if not _inited: init(args)
249
250 if common.is_iterable(args.APP) and not common.is_iterable(args.ITERABLE):
251 args.APP, args.ITERABLE = True, args.APP
252 result = (LiveSource if args.LIVE else AppSource if args.APP else BagSource)(args)
253 result.validate()
254 return result
255
256
257def sink(args=None, **kwargs):
258 """
259 Convenience for creating a {@link grepros.outputs.Sink Sink} instance from arguments,
260 {@link grepros.outputs.MultiSink MultiSink} if several outputs.
261
262 Initializes grepros if not already initialized.
263
264 @param args arguments as namespace or dictionary, case-insensitive;
265 or a single item as sink target like bag filename
266 @param kwargs any and all arguments as keyword overrides, case-insensitive
267 @param args.app provide messages to given callback function
268 @param args.console print matches to console
269 @param args.publish publish matches to live topics
270 @param args.write file or other target like Postgres database to write,
271 as "target", or ["target", dict(format="format", ..)]
272 or [[..target1..], [..target2..], ..]
273 @param args.write_options format-specific options like
274 {"overwrite": whether to overwrite existing file
275 (default false)}
276 <!--sep-->
277
278 Console sink:
279 @param args.line_prefix print source prefix like bag filename on each message line
280 @param args.max_field_lines maximum number of lines to print per field
281 @param args.start_line message line number to start output from
282 @param args.end_line message line number to stop output at
283 @param args.max_message_lines maximum number of lines to output per message
284 @param args.lines_around_match number of message lines around matched fields to output
285 @param args.matched_fields_only output only the fields where match was found
286 @param args.wrap_width character width to wrap message YAML output at
287 @param args.match_wrapper string to wrap around matched values,
288 both sides if one value, start and end if more than one,
289 or no wrapping if zero values
290 <!--sep-->
291
292 Console / HTML sink:
293 @param args.color False or "never" for not using colors in replacements
294 @param args.highlight highlight matched values (default true)
295 @param args.emit_field message fields to emit if not all
296 @param args.noemit_field message fields to skip in output
297 @param args.max_field_lines maximum number of lines to output per field
298 @param args.start_line message line number to start output from
299 @param args.end_line message line number to stop output at
300 @param args.max_message_lines maximum number of lines to output per message
301 @param args.lines_around_match number of message lines around matched fields to output
302 @param args.matched_fields_only output only the fields where match was found
303 @param args.wrap_width character width to wrap message YAML output at
304 @param args.match_wrapper string to wrap around matched values,
305 both sides if one value, start and end if more than one,
306 or no wrapping if zero values
307 <!--sep-->
308
309 Topic sink:
310 @param args.queue_size_out publisher queue size (default 10)
311 @param args.publish_prefix output topic prefix, prepended to input topic
312 @param args.publish_suffix output topic suffix, appended to output topic
313 @param args.publish_fixname single output topic name to publish to,
314 overrides prefix and suffix if given
315 <!--sep-->
316
317 App sink:
318 @param args.emit callback(topic, msg, stamp, highlighted msg, index in topic)
319 if any
320 @param args.metaemit callback(metadata dict) if any,
321 invoked before first emit from source batch
322 <!--sep-->
323
324 Any sink:
325 @param args.meta whether to print metainfo
326 @param args.verbose whether to print debug information
327 """
328 DEFAULT_ARGS = dict(CONSOLE=False, PUBLISH=False, WRITE=[], APP=False, EMIT=None, METAEMIT=None)
329
330 result = None
331 args = {"WRITE": str(args)} if isinstance(args, common.PATH_TYPES) else args
332 args = common.ensure_namespace(args, DEFAULT_ARGS, **kwargs)
333 if not _inited: init(args)
334
335 if args.WRITE:
336 if isinstance(args.WRITE, common.PATH_TYPES):
337 args.WRITE = [[args.WRITE]] # Nest deeper, single file given
338 elif isinstance(args.WRITE, (list, tuple)) and isinstance(args.WRITE[0], common.PATH_TYPES):
339 args.WRITE = [args.WRITE] # Nest deeper, must have been single [target, ..opts]
340 if callable(args.APP) and not callable(args.EMIT): args.APP, args.EMIT = True, args.APP
341
342 multisink = MultiSink(args)
343 multisink.validate()
344 result = multisink.sinks[0] if len(multisink.sinks) == 1 else multisink
345 return result
346
347
348def init(args=None, **kwargs):
349 """
350 Initializes ROS version bindings, loads all built-in plugins if dependencies available.
351
352 @param args
353 @param args.plugin one or more extra plugins to load,
354 as full names or instances of Python module/class
355 @param kwargs any and all arguments as keyword overrides, case-insensitive
356 """
357 global _inited
358 args = common.ensure_namespace(args, {"PLUGIN": []}, **kwargs)
359 if _inited:
360 if args: plugins.configure(args)
361 return
362
363 common.ConsolePrinter.configure(color=None, apimode=True)
364 api.validate()
365 plugins.init(args)
366 for x in (plugins.mcap, plugins.parquet, plugins.sql):
367 try: plugins.configure(PLUGIN=x)
368 except Exception: pass
369 Bag.READER_CLASSES.add(McapBag) # Ensure MCAP files at least get recognized,
370 Bag.WRITER_CLASSES.add(McapBag) # even if loading them will fail when dependencies missing
371 if args.PLUGIN: plugins.configure(args)
372 # Switch message metadata cache to constrain on total number instead of time
373 api.TypeMeta.LIFETIME, api.TypeMeta.POPULATION = 0, 100
374 _inited = True
375
376
377
378__all__ = [
379 "AppSink", "AppSource", "Bag", "BagSink", "BagSource", "BooleanResult", "ConsoleSink",
380 "CsvSink", "ExpressionTree", "HtmlSink", "LiveSink", "LiveSource", "McapBag", "McapSink",
381 "MultiSink", "ParquetSink", "PostgresSink", "Scanner", "Sink", "Source", "SqliteSink",
382 "SqlSink", "grep", "init", "sink", "source",
383]
Bag factory metaclass.
Definition api.py:381
Produces messages from iterable or pushed data.
Definition inputs.py:1276
Produces messages from ROS bagfiles.
Definition inputs.py:522
Produces messages from live ROS topics.
Definition inputs.py:1005
Message producer base class.
Definition inputs.py:35
Combines any number of sinks.
Definition outputs.py:918
Output base class.
Definition outputs.py:32
MCAP bag interface, providing most of rosbag.Bag interface.
Definition mcap.py:47
ROS message grepper.
Definition search.py:36
source(args=None, **kwargs)
Convenience for creating a Source instance from arguments.
Definition library.py:241
sink(args=None, **kwargs)
Convenience for creating a Sink instance from arguments, MultiSink if several outputs.
Definition library.py:324
init(args=None, **kwargs)
Initializes ROS version bindings, loads all built-in plugins if dependencies available.
Definition library.py:353
grep(args=None, **kwargs)
Yields matching messages from specified source.
Definition library.py:159