grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
search.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3Search core.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 28.09.2021
11@modified 24.03.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.search
15from argparse import Namespace
16import copy
17import collections
18import functools
19import logging
20import re
21
22import six
23
24from . import api
25from . import common
26from . import inputs
27
28
29class Scanner(object):
30 """
31 ROS message grepper.
32
33 In highlighted results, message field values that match search criteria are modified
34 to wrap the matching parts in {@link grepros.common.MatchMarkers MatchMarkers} tags,
35 with numeric field values converted to strings beforehand.
36 """
37
38
39 GrepMessage = collections.namedtuple("BagMessage", "topic message timestamp match index")
40
41
42 ANY_MATCHES = [((), re.compile("(.*)", re.DOTALL)), (), re.compile("(.?)", re.DOTALL)]
43
44
45 DEFAULT_ARGS = dict(PATTERN=(), CASE=False, FIXED_STRING=False, INVERT=False, EXPRESSION=False,
46 HIGHLIGHT=False, NTH_MATCH=1, BEFORE=0, AFTER=0, CONTEXT=0, MAX_COUNT=0,
47 MAX_PER_TOPIC=0, MAX_TOPICS=0, SELECT_FIELD=(), NOSELECT_FIELD=(),
48 MATCH_WRAPPER="**")
49
50
51 def __init__(self, args=None, **kwargs):
52 """
53 @param args arguments as namespace or dictionary, case-insensitive
54 @param args.pattern pattern(s) to find in message field values
55 @param args.fixed_string pattern contains ordinary strings, not regular expressions
56 @param args.case use case-sensitive matching in pattern
57 @param args.invert select messages not matching pattern
58 @param args.expression pattern(s) are a logical expression
59 like 'this AND (this2 OR NOT "skip this")',
60 with elements as patterns to find in message fields
61 @param args.highlight highlight matched values
62 @param args.before number of messages of leading context to emit before match
63 @param args.after number of messages of trailing context to emit after match
64 @param args.context number of messages of leading and trailing context to emit
65 around match, overrides args.before and args.after
66 @param args.max_count number of matched messages to emit (per file if bag input)
67 @param args.max_per_topic number of matched messages to emit from each topic
68 @param args.max_topics number of topics to emit matches from
69 @param args.nth_match emit every Nth match in topic, starting from first
70 @param args.select_field message fields to use in matching if not all
71 @param args.noselect_field message fields to skip in matching
72 @param args.match_wrapper string to wrap around matched values in find() and match(),
73 both sides if one value, start and end if more than one,
74 or no wrapping if zero values (default "**")
75 @param kwargs any and all arguments as keyword overrides, case-insensitive
76 <!--sep-->
77
78 Additional arguments when using match() or find(grepros.api.Bag):
79
80 @param args.topic ROS topics to read if not all
81 @param args.type ROS message types to read if not all
82 @param args.skip_topic ROS topics to skip
83 @param args.skip_type ROS message types to skip
84 @param args.start_time earliest timestamp of messages to read
85 @param args.end_time latest timestamp of messages to read
86 @param args.start_index message index within topic to start from
87 @param args.end_index message index within topic to stop at
88 @param args.unique emit messages that are unique in topic
89 @param args.nth_message read every Nth message in topic, starting from first
90 @param args.nth_interval minimum time interval between messages in topic,
91 as seconds or ROS duration
92 @param args.condition Python expressions that must evaluate as true
93 for message to be processable, see ConditionMixin
94 @param args.progress whether to print progress bar
95 @param args.stop_on_error stop execution on any error like unknown message type
96 """
97 # {key: [(() if any field else ('nested', 'path') or re.Pattern, re.Pattern), ]}
98 self._patterns = {}
100 self._expression = None # Nested [op, val] like ["NOT", ["VAL", "skip this"]]
101 # {(topic, typename, typehash): {message ID: message}}
102 self._messages = collections.defaultdict(collections.OrderedDict)
103 # {(topic, typename, typehash): {message ID: ROS time}}
104 self._stamps = collections.defaultdict(collections.OrderedDict)
105 # {(topic, typename, typehash): {None: processed, True: matched, False: emitted as context}}
106 self._counts = collections.defaultdict(collections.Counter)
107 # {(topic, typename, typehash): {message ID: True if matched else False if emitted else None}}
108 self._statuses = collections.defaultdict(collections.OrderedDict)
109 # Patterns to check in message plaintext and skip full matching if not found
110 self._brute_prechecks = [] # [re.Pattern to match against message fulltext for early skip]
111 self._idcounter = 0 # Counter for unique message IDs
112 self._settings = { # Various cached settings
113 "highlight": None, # Highlight matched values in message fields
114 "passthrough": False, # Emit messages without pattern-matching and highlighting
115 "pure_anymatch": False, # Simple match for any content
116 "wraps": [], # Match wrapper start-end strings
117 }
118
119
120 self.source = None
121
122 self.sink = None
123
124 self.valid = None
125
126 self.args0 = common.ensure_namespace(args, **kwargs)
127 self.args = common.ArgumentUtil.validate(common.ensure_namespace(args, Scanner.DEFAULT_ARGS, **kwargs))
128 if self.args.CONTEXT: self.args.BEFORE = self.args.AFTER = self.args.CONTEXT
129
130
131 def find(self, source, highlight=None):
132 """
133 Yields matched and context messages from source.
134
135 @param source inputs.Source or api.Bag instance
136 @param highlight whether to highlight matched values in message fields,
137 defaults to flag from constructor
138 @return GrepMessage namedtuples of
139 (topic, message, timestamp, match, index in topic),
140 where match is matched optionally highlighted message
141 or `None` if yielding a context message
142 """
143 if not self.validate(reset=True):
144 return
145 if isinstance(source, api.Bag):
146 source = inputs.BagSource(source, **vars(self.args))
147 self._prepare(source, highlight=highlight, progress=True)
148 for topic, msg, stamp, matched, index in self._generate():
149 yield self.GrepMessage(topic, msg, stamp, matched, index)
150
151
152 def match(self, topic, msg, stamp, highlight=None):
153 """
154 Returns matched message if message matches search filters.
155
156 @param topic topic name
157 @param msg ROS message
158 @param stamp message ROS timestamp
159 @param highlight whether to highlight matched values in message fields,
160 defaults to flag from constructor
161 @return original or highlighted message on match else `None`
162 """
163 result = None
164 if not self.validate(reset=True):
165 return result
166 if isinstance(self.source, inputs.AppSource): self._configure_settings(highlight=highlight)
167 else: self._prepare(inputs.AppSource(self.args), highlight=highlight)
168
169 self.source.push(topic, msg, stamp)
170 item = self.source.read_queue()
171 if item is not None:
172 msgid = self._idcounter = self._idcounter + 1
173 topickey = api.TypeMeta.make(msg, topic).topickey
174 self._register_message(topickey, msgid, msg, stamp)
175 matched = self._is_processable(topic, msg, stamp) and self.get_match(msg)
176
177 self.source.notify(matched)
178 if matched and not self._counts[topickey][True] % (self.args.NTH_MATCH or 1):
179 self._statuses[topickey][msgid] = True
180 self._counts[topickey][True] += 1
181 result = matched
182 elif matched: # Not NTH_MATCH, skip emitting
183 self._statuses[topickey][msgid] = True
184 self._counts[topickey][True] += 1
185 self._prune_data(topickey)
186 self.source.mark_queue(topic, msg, stamp)
187 return result
188
189
190 def work(self, source, sink):
191 """
192 Greps messages yielded from source and emits matched content to sink.
193
194 @param source inputs.Source or api.Bag instance
195 @param sink outputs.Sink instance
196 @return count matched
197 """
198 if not self.validate(reset=True):
199 return
200 if isinstance(source, api.Bag):
201 source = inputs.BagSource(source, **vars(self.args))
202 self._prepare(source, sink, highlight=self.args.HIGHLIGHT, progress=True)
203 total_matched = 0
204 for topic, msg, stamp, matched, index in self._generate():
205 sink.emit_meta()
206 sink.emit(topic, msg, stamp, matched, index)
207 total_matched += bool(matched)
208 return total_matched
209
211 def validate(self, reset=False):
212 """Returns whether conditions have valid syntax, prints errors."""
213 if self.valid is not None and not reset: return self.valid
214
215 errors = collections.defaultdict(list) # {category: [error, ]}
216 if not self.args.FIXED_STRING and not self.args.EXPRESSION:
217 for v in self.args.PATTERN: # Pre-check patterns before parsing for full error state
218 split = v.find("=", 1, -1) # May be "PATTERN" or "attribute=PATTERN"
219 v = v[split + 1:] if split > 0 else v
220 try: re.compile(re.escape(v) if self.args.FIXED_STRING else v)
221 except Exception as e:
222 errors["Invalid regular expression"].append("'%s': %s" % (v, e))
223 try: self._parse_patterns()
224 except Exception as e: errors[""].append(str(e))
225
226 for err in errors.get("", []):
227 common.ConsolePrinter.log(logging.ERROR, err)
228 for category in filter(bool, errors):
229 common.ConsolePrinter.log(logging.ERROR, category)
230 for err in errors[category]:
231 common.ConsolePrinter.log(logging.ERROR, " %s" % err)
232
233 self.valid = not errors
234 return self.valid
235
237 def __enter__(self):
238 """Context manager entry, does nothing, returns self."""
239 return self
240
242 def __exit__(self, exc_type, exc_value, traceback):
243 """Context manager exit, does nothing."""
244 return self
245
246
247 def _generate(self):
248 """
249 Yields matched and context messages from source.
250
251 @return tuples of (topic, msg, stamp, matched optionally highlighted msg, index in topic)
252 """
253 batch_matched, batch = False, None
254 for topic, msg, stamp in self.source.read():
255 if batch != self.source.get_batch():
256 batch, batch_matched = self.source.get_batch(), False
257 if self._counts: self._clear_data()
258
259 msgid = self._idcounter = self._idcounter + 1
260 topickey = api.TypeMeta.make(msg, topic).topickey
261 self._register_message(topickey, msgid, msg, stamp)
262 matched = self._is_processable(topic, msg, stamp) and self.get_match(msg)
263
264 self.source.notify(matched)
265 if matched and not self._counts[topickey][True] % (self.args.NTH_MATCH or 1):
266 self._statuses[topickey][msgid] = True
267 self._counts[topickey][True] += 1
268 for x in self._generate_context(topickey, before=True): yield x
269 yield (topic, msg, stamp, matched, self._counts[topickey][None])
270 elif matched: # Not NTH_MATCH, skip emitting
271 self._statuses[topickey][msgid] = True
272 self._counts[topickey][True] += 1
273 elif self.args.AFTER \
274 and self._has_in_window(topickey, self.args.AFTER + 1, status=True):
275 for x in self._generate_context(topickey, before=False): yield x
276 batch_matched = batch_matched or bool(matched)
277
278 self._prune_data(topickey)
279 if batch_matched and self._is_max_done():
280 if self.sink: self.sink.flush()
281 self.source.close_batch()
282
283
284 def _is_processable(self, topic, msg, stamp):
285 """
286 Returns whether processing current message in topic is acceptable:
287 that topic or total maximum count has not been reached,
288 and current message in topic is in configured range, if any.
289 """
290 topickey = api.TypeMeta.make(msg, topic).topickey
291 if self.args.MAX_COUNT \
292 and sum(x[True] for x in self._counts.values()) >= self.args.MAX_COUNT:
293 return False
294 if self.args.MAX_PER_TOPIC and self._counts[topickey][True] >= self.args.MAX_PER_TOPIC:
295 return False
296 if self.args.MAX_TOPICS:
297 topics_matched = [k for k, vv in self._counts.items() if vv[True]]
298 if topickey not in topics_matched and len(topics_matched) >= self.args.MAX_TOPICS:
299 return False
300 if self.source \
301 and not self.source.is_processable(topic, msg, stamp, self._counts[topickey][None]):
302 return False
303 return True
304
305
306 def _generate_context(self, topickey, before=False):
307 """Yields before/after context for latest match."""
308 count = self.args.BEFORE + 1 if before else self.args.AFTER
309 candidates = list(self._statuses[topickey])[-count:]
310 current_index = self._counts[topickey][None]
311 for i, msgid in enumerate(candidates) if count else ():
312 if self._statuses[topickey][msgid] is None:
313 idx = current_index + i - (len(candidates) - 1 if before else 1)
314 msg, stamp = self._messages[topickey][msgid], self._stamps[topickey][msgid]
315 self._counts[topickey][False] += 1
316 yield topickey[0], msg, stamp, None, idx
317 self._statuses[topickey][msgid] = False
318
319
320 def _clear_data(self):
321 """Clears local structures."""
322 for d in (self._counts, self._messages, self._stamps, self._statuses):
323 d.clear()
324 api.TypeMeta.clear()
325
326
327 def _prepare(self, source, sink=None, highlight=None, progress=False):
328 """Clears local structures, binds and registers source and sink, if any."""
329 self._clear_data()
330 self.source, self.sink = source, sink
331 source.bind(sink), sink and sink.bind(source)
332 source.preprocess = False
333 self._configure_settings(highlight=highlight, progress=progress)
334
335
336 def _prune_data(self, topickey):
337 """Drops history older than context window."""
338 WINDOW = max(self.args.BEFORE, self.args.AFTER) + 1
339 for dct in (self._messages, self._stamps, self._statuses):
340 while len(dct[topickey]) > WINDOW:
341 msgid = next(iter(dct[topickey]))
342 value = dct[topickey].pop(msgid)
343 dct is self._messages and api.TypeMeta.discard(value)
344
345
346 def _parse_patterns(self):
347 """Parses pattern arguments into re.Patterns. Raises on invalid pattern."""
348 NOBRUTE_SIGILS = r"\A", r"\Z", "?(" # Regex specials ruling out brute precheck
349 BRUTE, FLAGS = not self.args.INVERT, re.DOTALL | (0 if self.args.CASE else re.I)
350 self._patterns.clear()
351 self._expression = None
352 del self._brute_prechecks[:]
353 contents = []
354
355 def make_pattern(v):
356 """Returns (path Pattern or (), value Pattern)."""
357 split = v.find("=", 1, -1)
358 v, path = (v[split + 1:], v[:split]) if split > 0 else (v, ())
359 # Special case if '' or "": add pattern for matching empty string
360 v = "|^$" if v in ("''", '""') else (re.escape(v) if self.args.FIXED_STRING else v)
361 path = re.compile(r"(^|\.)%s($|\.)" % ".*".join(map(re.escape, path.split("*")))) \
362 if path else ()
363 try: return (path, re.compile("(%s)" % v, FLAGS))
364 except Exception as e:
365 raise ValueError("Invalid regular expression\n '%s': %s" % (v, e))
366
367 if self.args.EXPRESSION and self.args.PATTERN:
368 self._expression = self._expressor.parse(" ".join(self.args.PATTERN), make_pattern)
369 for v in self.args.PATTERN if not self._expression else ():
370 contents.append(make_pattern(v))
371 if BRUTE and (self.args.FIXED_STRING or not any(x in v for x in NOBRUTE_SIGILS)):
372 if self.args.FIXED_STRING: v = re.escape(v)
373 self._brute_prechecks.append(re.compile(v, re.I | re.M))
374 if not self.args.PATTERN: # Add match-all pattern
375 contents.append(self.ANY_MATCHES[0])
376 self._patterns["content"] = contents
377
378 selects, noselects = self.args.SELECT_FIELD, self.args.NOSELECT_FIELD
379 for key, vals in [("select", selects), ("noselect", noselects)]:
380 self._patterns[key] = [(tuple(v.split(".")), common.path_to_regex(v)) for v in vals]
381
382
383 def _register_message(self, topickey, msgid, msg, stamp):
384 """Registers message with local structures."""
385 self._counts[topickey][None] += 1
386 self._messages[topickey][msgid] = msg
387 self._stamps [topickey][msgid] = stamp
388 self._statuses[topickey][msgid] = None
389
390
391 def _configure_settings(self, highlight=None, progress=False):
392 """Caches settings for message matching."""
393 highlight = bool(highlight if highlight is not None else self.args.HIGHLIGHT
394 if not self.sink or self.sink.is_highlighting() else False)
395 pure_anymatch = not self.args.INVERT and not self._patterns["select"] \
396 and set(self._patterns["content"]) <= set(self.ANY_MATCHES)
397 no_matching = pure_anymatch and not self._expression and not self._patterns["noselect"]
398 passthrough = no_matching and not highlight # No message processing at all
399 wraps = [] if not highlight else self.args.MATCH_WRAPPER if not self.sink else \
400 (common.MatchMarkers.START, common.MatchMarkers.END)
401 wraps = wraps if isinstance(wraps, (list, tuple)) else [] if wraps is None else [wraps]
402 wraps = ((wraps or [""]) * 2)[:2]
403 if wraps: # Track pattern contribution to wrapping
404 ops = {ExpressionTree.AND: BooleanResult.and_, ExpressionTree.NOT: BooleanResult.not_,
405 ExpressionTree.OR: functools.partial(BooleanResult.or_, eager=True)}
406 self._expressor.configure(operators=ops, void=BooleanResult(None))
407 else: self._expressor.configure(operators=ExpressionTree.OPERATORS,
408 void=ExpressionTree.VOID) # Ensure defaults
409 self._settings.update(highlight=highlight, passthrough=passthrough,
410 pure_anymatch=pure_anymatch, wraps=wraps)
411 self.source.configure(self.args0)
412 self.sink and self.sink.configure(self.args0)
413 if progress and (not no_matching or self.args.MAX_COUNT):
414 bar_opts = dict()
415 if self.args.MAX_COUNT: bar_opts.update(match_max=self.args.MAX_COUNT)
416 if not no_matching: bar_opts.update(source_value=0) # Count source and match separately
417 self.source.configure_progress(**bar_opts)
418
419
420 def _is_max_done(self):
421 """Returns whether max match count has been reached (and message after-context emitted)."""
422 result, is_maxed = False, False
423 if self.args.MAX_COUNT:
424 is_maxed = sum(vv[True] for vv in self._counts.values()) >= self.args.MAX_COUNT
425 if not is_maxed and self.args.MAX_PER_TOPIC:
426 count_required = self.args.MAX_TOPICS or len(self.source.topics)
427 count_maxed = sum(vv[True] >= self.args.MAX_PER_TOPIC
428 or vv[None] >= (self.source.topics.get(k) or 0)
429 for k, vv in self._counts.items())
430 is_maxed = (count_maxed >= count_required)
431 if is_maxed:
432 result = not self.args.AFTER or \
433 not any(self._has_in_window(k, self.args.AFTER, status=True, full=True)
434 for k in self._counts)
435 return result
436
437
438 def _has_in_window(self, topickey, length, status, full=False):
439 """Returns whether given status exists in recent message window."""
440 if not length or full and len(self._statuses[topickey]) < length:
441 return False
442 return status in list(self._statuses[topickey].values())[-length:]
443
444
445 def get_match(self, msg):
446 """
447 Returns transformed message if all patterns find a match in message, else None.
448
449 Matching field values are converted to strings and surrounded by markers.
450 Returns original message if any-match and sink does not require highlighting.
451 """
452
453 def process_value(v, parent, top, patterns):
454 """
455 Populates `field_matches` and `pattern_spans` for patterns matching given string value.
456 Populates `field_values`. Returns set of pattern indexes that found a match.
457 """
458 indexes, spans, topstr = set(), [], ".".join(map(str, top))
459 v2 = str(list(v) if isinstance(v, LISTIFIABLES) else v)
460 if v and isinstance(v, (list, tuple)): v2 = v2[1:-1] # Omit collection braces leave []
461 for i, (path, p) in enumerate(patterns):
462 if path and not path.search(topstr): continue # for
463 matches = [next(p.finditer(v2), None)] if PLAIN_INVERT else list(p.finditer(v2))
464 # Join consecutive zero-length matches, extend remaining zero-lengths to end of value
465 matchspans = common.merge_spans([x.span() for x in matches if x], join_blanks=True)
466 matchspans = [(a, b if a != b else len(v2)) for a, b in matchspans]
467 if matchspans:
468 indexes.add(i), spans.extend(matchspans)
469 pattern_spans.setdefault(id(patterns[i]), {})[top] = matchspans
470 field_values.setdefault(top, (parent, v, v2))
471 if PLAIN_INVERT: spans = [(0, len(v2))] if v2 and not spans else []
472 if spans: field_matches.setdefault(top, []).extend(spans)
473 return indexes
474
475 def populate_matches(obj, patterns, top=(), parent=None):
476 """
477 Recursively populates `field_matches` and `pattern_spans` for fields matching patterns.
478 Populates `field_values`. Returns set of pattern indexes that found a match.
479 """
480 indexes = set()
481 selects, noselects = self._patterns["select"], self._patterns["noselect"]
482 fieldmap = api.get_message_fields(obj) # Returns obj if not ROS message
483 if fieldmap != obj:
484 fieldmap = api.filter_fields(fieldmap, top, include=selects, exclude=noselects)
485 for k, t in fieldmap.items() if fieldmap != obj else ():
486 v, path = api.get_message_value(obj, k, t), top + (k, )
487 if api.is_ros_message(v): # Nested message
488 indexes |= populate_matches(v, patterns, path, obj)
489 elif v and isinstance(v, (list, tuple)) \
490 and api.scalar(t) not in api.ROS_NUMERIC_TYPES:
491 for i, x in enumerate(v): # List of strings or nested messages
492 indexes |= populate_matches(x, patterns, path + (i, ), v)
493 else: # Scalar value, empty list, or list of numbers
494 indexes |= process_value(v, obj, path, patterns)
495 if not api.is_ros_message(obj):
496 indexes |= process_value(obj, parent, top, patterns)
497 return indexes
498
499 def wrap_matches(values, matches):
500 """Replaces result-message field values with matched parts wrapped in marker tags."""
501 for path, spans in matches.items() if any(WRAPS) else ():
502 parent, v1, v2 = values[path]
503 for a, b in reversed(common.merge_spans(spans)): # Backwards for stable indexes
504 v2 = v2[:a] + WRAPS[0] + v2[a:b] + WRAPS[1] + v2[b:]
505 if v1 and isinstance(v1, (list, tuple)): v2 = "[%s]" % v2 # Readd collection braces
506 if isinstance(parent, list) and isinstance(path[-1], int): parent[path[-1]] = v2
507 else: api.set_message_value(parent, path[-1], v2)
508
509 def process_message(obj, patterns):
510 """Returns whether message matches patterns, wraps matches in marker tags if so."""
511 indexes = populate_matches(obj, patterns)
512 is_match = not indexes if self.args.INVERT else len(indexes) == len(patterns)
513 if not indexes and self._settings["pure_anymatch"] and not api.get_message_fields(obj):
514 is_match = True # Ensure any-match for messages with no fields
515 if is_match and WRAPS: wrap_matches(field_values, field_matches)
516 return is_match
517
518
519 if self._settings["passthrough"]: return msg
520
521 if self._brute_prechecks:
522 text = "\n".join("%r" % (v, ) for _, v, _ in api.iter_message_fields(msg, flat=True))
523 if not all(any(p.finditer(text)) for p in self._brute_prechecks):
524 return None # Skip detailed matching if patterns not present at all
525
526 WRAPS = self._settings["wraps"]
527 LISTIFIABLES = (bytes, tuple) if six.PY3 else (tuple, )
528 PLAIN_INVERT = self.args.INVERT and not self._expression
529 field_values = {} # {field path: (parent, original value, stringified value)}
530 field_matches = {} # {field path: [(span), ]}
531 pattern_spans = {} # {id(pattern tuple): {field path: (span)}}
532
533 result, is_match = copy.deepcopy(msg) if WRAPS else msg, False
534 if self._expression:
535 evaler = lambda x: bool(populate_matches(result, [x]))
536 terminal = evaler if not WRAPS else lambda x: BooleanResult(x, evaler)
537 eager = [ExpressionTree.OR] if WRAPS else ()
538 evalresult = self._expressor.evaluate(self._expression, terminal, eager)
539 is_match = not evalresult if self.args.INVERT else evalresult
540 if is_match and WRAPS:
541 actives = [pattern_spans[id(v)] for v in evalresult]
542 matches = {k: sum((v.get(k, []) for v in actives), [])
543 for k in set(sum((list(v) for v in actives), []))} or \
544 {k: [(0, len(v))] for k, (_, _, v) in field_values.items()} # Wrap all
545 wrap_matches(field_values, matches)
546 else:
547 is_match = process_message(result, self._patterns["content"])
548 return result if is_match else None
549
550
551
552class ExpressionTree(object):
553 """
554 Parses and evaluates operator expressions like "a AND (b OR NOT c)".
555
556 Operands can be quoted strings, '\' can be used to escape quotes within the string.
557 Operators are case-insensitive.
558 """
559
560 QUOTES, ESCAPE, LBRACE, RBRACE, WHITESPACE = "'\"", "\\", "(", ")", " \n\r\t"
561 SEPARATORS = WHITESPACE + LBRACE + RBRACE
562 AND, OR, NOT, VAL = "AND", "OR", "NOT", "VAL"
563
564 CASED = False # Whether operators are case-sensitive
565 IMPLICIT = AND # Implicit operator inserted between operands lacking one
566 UNARIES = (NOT, ) # Unary operators, expecting operand after operator
567 BINARIES = (AND, OR) # Binary operators, expecting operands before and after operator
568 OPERATORS = {AND: (lambda a, b: a and b), OR: (lambda a, b: a or b), NOT: lambda a: not a}
569 RANKS = {VAL: 1, NOT: 2, AND: 3, OR: 4}
570
571 SHORTCIRCUITS = {AND: False, OR: True} # Values for binary operators to short-circuit on
572 FORMAT_TEMPLATES = {AND: "%s and %s", OR: "%s or %s", NOT: "not %s"}
573 VOID = None # Placeholder for operands skipped as short-circuited
574
575
576 def __init__(self, **props):
577 """
578 @param props class property overrides, case-insensitive, e.g. `cased=False`
579 """
580 self._state = None # Temporary state namespace dictionary during parse
581 self.configure(**props)
584 def configure(self, **props):
585 """
586 Overrides instance configuration.
588 @param props class property overrides, case-insensitive, e.g. `cased=False`
589 """
590 for k, v in props.items():
591 K, V = k.upper(), getattr(self, k.upper())
592 accept = (type(V), type(None)) if "IMPLICIT" == K else () if "VOID" == K else type(V)
593 if accept and not isinstance(v, accept) and set(map(type, (v, V))) - set([list, tuple]):
594 raise ValueError("Invalid value for %s=%s: expected %s" % (k, v, type(V).__name__))
595 setattr(self, K, v)
596
597
598 def evaluate(self, tree, terminal=None, eager=()):
599 """
600 Returns result of evaluating expression tree.
601
602 @param tree expression tree structure as given by parse()
603 @param terminal callback(value) to evaluate value nodes with, if not using value directly
604 @param eager operators where to evaluate both operands in full, despite short-circuit
605 """
606 stack = [(tree, [], [], None)] if tree else []
607 while stack: # [(node, evaled vals, parent evaled vals, parent op)]
608 ((op, val), nvals, pvals, parentop), done = stack.pop(), set()
609 if nvals: done.add(pvals.append(self.OPERATORS[op](*nvals))) # Backtracking: fill parent
610 elif pvals and parentop in self.SHORTCIRCUITS and not (eager and parentop in eager):
611 ctor = type(self.SHORTCIRCUITS[parentop]) # Skip if first sibling short-circuits op
612 if ctor(pvals[0]) == self.SHORTCIRCUITS[parentop]: done.add(pvals.append(self.VOID))
613 if done: continue # while
614 if op not in self.OPERATORS: pvals.append(val if terminal is None else terminal(val))
615 else: stack.extend([((op, val), nvals, pvals, parentop)] +
616 [(v, [], nvals, op) for v in val[::-1]]) # Queue in processing order
617 return pvals.pop() if tree else None
618
619
620 def format(self, tree, terminal=None):
621 """
622 Returns expression tree formatted as string.
623
624 @param tree expression tree structure as given by parse()
625 @param terminal callback(value) to format value nodes with, if not using value directly
626 """
627 BRACED = "%s".join(self.FORMAT_TEMPLATES.get(x, x) for x in [self.LBRACE, self.RBRACE])
628 TPL = lambda op: ("%s {0} %s" if op in self.BINARIES else "{0} %s").format(op)
629 WRP = lambda op, parentop: BRACED if self.RANKS[op] > self.RANKS[parentop] else "%s"
630 FMT = lambda vv, op, nodes: tuple(WRP(nop, op) % v for (nop, _), v in zip(nodes, vv))
631 stack = [(tree, [], [])] if tree else [] # [(node, formatted vals, parent formatted vals)]
632 while stack: # Add to parent if all processed or terminal node, else queue for processing
633 (op, val), nvals, pvals = stack.pop()
634 if nvals: pvals.append((self.FORMAT_TEMPLATES.get(op) or TPL(op)) % FMT(nvals, op, val))
635 elif op not in self.OPERATORS: pvals.append(val if terminal is None else terminal(val))
636 else: stack.extend([((op, val), nvals, pvals)] + [(v, [], nvals) for v in val[::-1]])
637 return pvals.pop() if tree else ""
638
639
640 def parse(self, text, terminal=None):
641 """
642 Returns an operator expression like "a AND (b OR NOT c)" parsed into a binary tree.
643
644 Binary tree like ["AND", [["VAL", "a"], ["OR", [["VAL", "b"], ["NOT", [["VAL", "c"]]]]]]].
645 Raises on invalid expression.
646
647 @param terminal callback(text) returning node value for operands, if not using plain text
648 """
649 root, node, buf, quote, escape, i = [], [], "", "", "", -1
650 self._state = locals()
651 h = self._make_helpers(self._state, text, terminal)
652
653 for i, char in enumerate(text + " "): # Append space to simplify termination
654 # First pass: handle quotes, or explicit/implicit word ends and operators
655 if quote:
656 if escape:
657 if char == self.ESCAPE: char = "" # Double escape: retain single
658 elif char in self.QUOTES: buf = buf[:-1] # Drop escape char from before quote
659 elif char == quote: # End quote
660 (node, root), buf, quote, char = h.add_node(self.VAL, buf, i), "", "", ""
661 escape = char if self.ESCAPE == char else ""
662 elif char in self.QUOTES:
663 h.validate(i, "quotes", buf=buf)
664 if node and h.finished(node): node, root = h.add_implicit(node, i)
665 quote, char = char, "" # Start quoted string, consume quotemark
666 elif char in self.SEPARATORS:
667 op = h.parse_op(buf)
668 if op: # Explicit operator
669 h.validate(i, "op", op=op)
670 if op in self.UNARIES and node and h.finished(node):
671 node, root = h.add_implicit(node, i)
672 val = h.make_val(op, None if op in self.UNARIES else node)
673 node, root = h.add_node(op, val, i)
674 else: # Consume accumulated buffer if any, handle implicit operators
675 if (buf or char == self.LBRACE) and node and h.finished(node):
676 node, root = h.add_implicit(node, i)
677 if buf: node, root = h.add_node(self.VAL, buf, i) # Completed operand
678 buf = ""
679 # Second pass: accumulate text buffer, or enter/exit bracket groups
680 if quote or char not in self.SEPARATORS: buf += char
681 elif char == self.LBRACE: _, (node, root) = h.stack_push((node, root, i)), ([], [])
682 elif char == self.RBRACE: _, (node, root) = h.validate(i, "rbrace"), h.stack_pop()
683 self._state.update(locals())
684 h.validate(i)
685 return root
686
687
688 def _make_helpers(self, state, text, terminal=None):
689 """Returns namespace object with parsing helper functions."""
690 ERRLABEL, OP_MAXLEN = "Invalid expression: ", max(map(len, self.OPERATORS))
691 OPERATORS = {x if self.CASED else x.upper(): x for x in self.OPERATORS}
692 stack, parents = [], {}
693
694 finished = lambda n: not (isinstance(n[1], list) and n[1] and n[1][-1] is None)
695 outranks = lambda a, b: self.RANKS[a] > self.RANKS[b] # whether operator a ranks over b
696 postbrace = lambda: state.get("stacki") is not None # whether brackets just ended
697 mark = lambda i: "\n%s\n%s^" % (text, " " * i) # expression text marked at pos
698 oper = lambda n: n[0] # node type
699 parse_op = lambda b: OPERATORS.get(b if self.CASED else b.upper()) \
700 if len(b) <= OP_MAXLEN else None
701 make_node = lambda o, v: [o, terminal(v) if terminal and self.VAL == o else v]
702 make_val = lambda o, *a: list(a) + [None] * (1 + (o in self.BINARIES) - len(a))
703 add_child = lambda a, b: (a[1].__setitem__(-1, b), parents.update({id(b): a}))
704 get_child = lambda n, i: n[1][i] if n[0] in self.OPERATORS else None
705
706 def missing(op, first=False): # Return error text for missing operand in operator
707 label = ("1st " if first else "2nd ") if op in self.BINARIES else ""
708 return ERRLABEL + "missing %selement for %s-operator" % (label, op)
709
710 def add_node(op, val, i): # Add new node to tree, return (node to use as last, root)
711 node0, root0 = _, newroot = state["node"], state["root"]
712 if op in self.BINARIES: # Attach last child or root to new if needed
713 if not postbrace() and finished(node0) and not outranks(op, oper(node0)):
714 val = make_val(op, get_child(node0, -1), None) # Last child into new
715 elif not outranks(oper(root0), op):
716 val = make_val(op, root0, None) # Root into new
717 newnode = make_node(op, val)
718
719 if node0 and not postbrace() and (not finished(node0) # Last is unfinished
720 or oper(node0) in self.BINARIES and not outranks(op, oper(node0))): # op <= last binop
721 add_child(node0, newnode) # Attach new node to last
722 elif not root0 or (root0 is node0 if postbrace() else not outranks(oper(root0), op)):
723 newroot = newnode # Replace root if new outranks, or expression so far was braced
724 latest = node0 if node0 and op == self.VAL else newnode
725 while oper(latest) in self.UNARIES and finished(latest) and id(latest) in parents:
726 latest = parents[id(latest)] # Walk up filled unary nodes until binop/root
727 state.update(node=latest, root=newroot, nodei=i, stacki=None)
728 return latest, newroot
729
730 def add_implicit(node, i): # Add implicit operator, return (node to use as last, root)
731 if not self.IMPLICIT: raise ValueError(ERRLABEL + "missing operator" + mark(i))
732 return add_node(self.IMPLICIT, make_val(self.IMPLICIT, node), i)
733
734 def stack_pop(): # Unstack previous and add current, return (node to use as last, root)
735 (node, root, stacki), nodex, rootx = stack.pop(), state["node"], state["root"]
736 if node: node, stacki, _ = root, None, add_child(node, rootx) # Nest into last
737 elif not root: node, root = nodex, rootx # Replace empty root with nested
738 state.update(node=node, root=root, stacki=stacki)
739 return node, root
740
741 def validate(i, ctx=None, **kws): # Raise ValueError if parse state invalid
742 if "quotes" == ctx:
743 if kws["buf"]: raise ValueError(ERRLABEL + "invalid syntax" + mark(i))
744 elif "op" == ctx:
745 op, node = kws["op"], state["node"]
746 if op in self.BINARIES and (not node or not finished(node)):
747 raise ValueError(missing(oper(node) if node else op, first=not node) + mark(i))
748 elif "rbrace" == ctx:
749 node, nodei = (state.get(k) for k in ("node", "nodei"))
750 if not stack: raise ValueError(ERRLABEL + "bracket end has no start" + mark(i))
751 if not node: raise ValueError(ERRLABEL + "empty bracket" + mark(i))
752 if not finished(node): raise ValueError(missing(oper(node)) + mark(nodei))
753 else: # All parsing done, tree in final state
754 quote, node, nodei = (state.get(k) for k in ("quote", "node", "nodei"))
755 if quote: raise ValueError(ERRLABEL + "unfinished quote" + mark(i - 1))
756 if stack: raise ValueError(ERRLABEL + "unterminated bracket" + mark(stack[-1][-1]))
757 if node and not finished(node): raise ValueError(missing(oper(node)) + mark(nodei))
758
759 return Namespace(add_child=add_child, add_implicit=add_implicit, add_node=add_node,
760 make_val=make_val, finished=finished, parse_op=parse_op,
761 stack_push=stack.append, stack_pop=stack_pop, validate=validate)
762
763
764class BooleanResult(object):
765 """Accumulative result of boolean expression evaluation, tracking value contribution."""
766
767 def __init__(self, value=Ellipsis, terminal=None, **__props):
768 self._result = Ellipsis # Final accumulated result of expression
769 self._values = [] # All accumulated operands
770 self._actives = [] # For each operand, True/False/None: active/invertedly active/inactive
771 for k, v in __props.items(): setattr(self, "_" + k, v)
772 if value is not Ellipsis: self.set(value, terminal)
773
774 def set(self, value, terminal=None):
775 """Sets value to instance, using terminal callback for evaluation if given."""
776 self._result = bool(terminal(value) if terminal else value)
777 self._values, self._actives = [value], [True if self._result else None]
778
779 def __iter__(self):
780 """Yields active values: contributing to true result positively."""
781 for v in (v for v, a in zip(self._values, self._actives) if a): yield v
782
783 def __bool__(self): return self._result # Py3
784 def __nonzero__(self): return self._result # Py2ยด
785 def __eq__(self, other): return (bool(self) if isinstance(other, bool) else self) is other
786
787 @classmethod
788 def and_(cls, a, b):
789 """Returns new BooleanResult as a and b."""
790 actives = [on if y else None for x, y in ((a, b), (b, a)) for on in x._actives]
791 return cls(result=bool(a and b), values=a._values + b._values, actives=actives)
792
793 @classmethod
794 def or_(cls, a, b, eager=False):
795 """Returns new BooleanResult as a or b."""
796 actives = a._actives + [x if eager or not a else None for x in b._actives]
797 return cls(result=bool(a or b), values=a._values + b._values, actives=actives)
798
799 @classmethod
800 def not_(cls, a):
801 """Returns new BooleanResult as not a."""
802 actives = [None if x is None else not x for x in a._actives]
803 return cls(result=not a, values=a._values, actives=actives)
804
805
806__all__ = ["BooleanResult", "ExpressionTree", "Scanner"]
Bag factory metaclass.
Definition api.py:381
Highlight markers for matches in message values.
Definition common.py:50
Produces messages from iterable or pushed data.
Definition inputs.py:1276
Produces messages from ROS bagfiles.
Definition inputs.py:522
Message producer base class.
Definition inputs.py:35
Output base class.
Definition outputs.py:32
Accumulative result of boolean expression evaluation, tracking value contribution.
Definition search.py:787
or_(cls, a, b, eager=False)
Returns new BooleanResult as a or b.
Definition search.py:817
not_(cls, a)
Returns new BooleanResult as not a.
Definition search.py:823
and_(cls, a, b)
Returns new BooleanResult as a and b.
Definition search.py:811
Parses and evaluates operator expressions like "a AND (b OR NOT c)".
Definition search.py:579
configure(self, **props)
Overrides instance configuration.
Definition search.py:610
__init__(self, **props)
Definition search.py:600
evaluate(self, tree, terminal=None, eager=())
Returns result of evaluating expression tree.
Definition search.py:626
parse(self, text, terminal=None)
Returns an operator expression like "a AND (b OR NOT c)" parsed into a binary tree.
Definition search.py:669
format(self, tree, terminal=None)
Returns expression tree formatted as string.
Definition search.py:647
validate(self, reset=False)
Returns whether conditions have valid syntax, prints errors.
Definition search.py:210
source
Source instance.
Definition search.py:120
__init__(self, args=None, **kwargs)
Definition search.py:96
work(self, source, sink)
Greps messages yielded from source and emits matched content to sink.
Definition search.py:196
find(self, source, highlight=None)
Yields matched and context messages from source.
Definition search.py:143
valid
Result of validate()
Definition search.py:124
__exit__(self, exc_type, exc_value, traceback)
Context manager exit, does nothing.
Definition search.py:241
__enter__(self)
Context manager entry, does nothing, returns self.
Definition search.py:236
sink
Sink instance.
Definition search.py:122
GrepMessage
Namedtuple of (topic name, ROS message, ROS time object, message if matched, index in topic).
Definition search.py:39
match(self, topic, msg, stamp, highlight=None)
Returns matched message if message matches search filters.
Definition search.py:162
get_match(self, msg)
Returns transformed message if all patterns find a match in message, else None.
Definition search.py:472
list ANY_MATCHES
Match patterns for global any-match.
Definition search.py:42