grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
dbbase.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3Shared functionality for database sinks.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 11.12.2021
11@modified 24.03.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.plugins.auto.dbbase
15import atexit
16import collections
17
18from ... import api
19from ... common import PATH_TYPES, ConsolePrinter, ensure_namespace, plural
20from ... outputs import Sink
21from . sqlbase import SqlMixin, quote
22
23
24class BaseDataSink(Sink, SqlMixin):
25 """
26 Base class for writing messages to a database.
27
28 Output will have:
29 - table "topics", with topic and message type names
30 - table "types", with message type definitions
31
32 plus:
33 - table "pkg/MsgType" for each topic message type, with detailed fields,
34 BYTEA fields for uint8[], array fields for scalar list attributes,
35 and JSONB fields for lists of ROS messages;
36 with foreign keys if nesting else subtype values as JSON dictionaries;
37 plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
38 If not nesting, only topic message type tables are created.
39 - view "/full/topic/name" for each topic, selecting from the message type table
40
41 If a message type table already exists but for a type with a different MD5 hash,
42 the new table will have its MD5 hash appended to end, as "pkg/MsgType (hash)".
43 """
44
45
46 ENGINE = None
47
48
49 COMMIT_INTERVAL = 1000
50
51
52 MESSAGE_TYPE_TOPICCOLS = [("_topic", "TEXT"),
53 ("_topic_id", "INTEGER"), ]
54
55 MESSAGE_TYPE_BASECOLS = [("_dt", "TIMESTAMP"),
56 ("_timestamp", "INTEGER"),
57 ("_id", "INTEGER NOT NULL "
58 "PRIMARY KEY AUTOINCREMENT"), ]
59
60 MESSAGE_TYPE_NESTCOLS = [("_parent_type", "TEXT"),
61 ("_parent_id", "INTEGER"), ]
62
63
64 DEFAULT_ARGS = dict(META=False, WRITE_OPTIONS={}, VERBOSE=False)
65
66
67 def __init__(self, args=None, **kwargs):
68 """
69 @param args arguments as namespace or dictionary, case-insensitive;
70 or a single item as the database connection string
71 @param args.write database connection string
72 @param args.write_options ```
73 {"commit-interval": transaction size (0 is autocommit),
74 "nesting": "array" to recursively insert arrays
75 of nested types, or "all" for any nesting)}
76 ```
77 @param args.meta whether to emit metainfo
78 @param args.verbose whether to emit debug information
79 @param kwargs any and all arguments as keyword overrides, case-insensitive
80 """
81 args = {"WRITE": str(args)} if isinstance(args, PATH_TYPES) else args
82 args = ensure_namespace(args, BaseDataSink.DEFAULT_ARGS, **kwargs)
83 super(BaseDataSink, self).__init__(args)
84 SqlMixin.__init__(self, args)
85
86
87 self.db = None
89 self._cursor = None # Database cursor or connection
90 self._dialect_dialect = self.ENGINE.lower() # Override SqlMixin._dialect
91 self._close_printed = False
92
93 # Whether to create tables and rows for nested message types,
94 # "array" if to do this only for arrays of nested types, or
95 # "all" for any nested type, including those fully flattened into parent fields.
96 # In parent, nested arrays are inserted as foreign keys instead of formatted values.
97 self._nesting_nesting = None
98
99 self._checkeds = {} # {topickey/typekey: whether existence checks are done}
100 self._sql_queue = {} # {SQL: [(args), ]}
101 self._nested_counts = {} # {(typename, typehash): count}
102
103 atexit.register(self.closeclosecloseclose)
105
106 def validate(self):
107 """
108 Returns whether args.write_options has valid values, if any.
109
110 Checks parameters "commit-interval" and "nesting".
111 """
112 ok, sqlconfig_ok = True, SqlMixin._validate_dialect_file(self)
113 if "commit-interval" in self.args.WRITE_OPTIONS:
114 try: ok = int(self.args.WRITE_OPTIONS["commit-interval"]) >= 0
115 except Exception: ok = False
116 if not ok:
117 ConsolePrinter.error("Invalid commit-interval option for %s: %r.",
118 self.ENGINE, self.args.WRITE_OPTIONS["commit-interval"])
119 if self.args.WRITE_OPTIONS.get("nesting") not in (None, False, "", "array", "all"):
120 ConsolePrinter.error("Invalid nesting option for %s: %r. "
121 "Choose one of {array,all}.",
122 self.ENGINE, self.args.WRITE_OPTIONS["nesting"])
123 ok = False
124 if ok and sqlconfig_ok:
125 self._nesting_nesting = self.args.WRITE_OPTIONS.get("nesting")
126 return ok and sqlconfig_ok
127
128
129 def emit(self, topic, msg, stamp=None, match=None, index=None):
130 """Writes message to database."""
131 if not self.validatevalidatevalidate(): raise Exception("invalid")
132 if not self.db:
133 self._init_db()
134 stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
135 self._process_type(msg)
136 self._process_topic(topic, msg)
137 self._process_message(topic, msg, stamp)
138 super(BaseDataSink, self).emit(topic, msg, stamp, match, index)
139
140
141 def close(self):
142 """Closes database connection, if any, emits metainfo."""
143 try: self.close_output()
144 finally:
145 if not self._close_printed and self._counts_counts:
146 self._close_printed = True
147 if hasattr(self, "format_output_meta"):
148 ConsolePrinter.debug("Wrote %s database for %s",
149 self.ENGINE, self.format_output_meta())
150 else:
151 target = self._make_db_label()
152 ConsolePrinter.debug("Wrote %s in %s to %s database %s.",
153 plural("message", sum(self._counts_counts.values())),
154 plural("topic", self._counts_counts), self.ENGINE, target)
155 if self._nested_counts:
156 ConsolePrinter.debug("Wrote %s in %s to %s.",
157 plural("nested message", sum(self._nested_counts.values())),
158 plural("nested message type", self._nested_counts), self.ENGINE
159 )
160 self._checkeds.clear()
161 self._nested_counts.clear()
162 SqlMixin.close(self)
163 super(BaseDataSink, self).close()
164
165
166 def close_output(self):
167 """Closes database connection, if any, executing any pending statements."""
168 if self.db:
169 for sql in list(self._sql_queue):
170 self._executemany(sql, self._sql_queue.pop(sql))
171 self.db.commit()
172 self._cursor.close()
173 self._cursor = None
174 self.db.close()
175 self.db = None
176
177
178 def _init_db(self):
179 """Opens database connection, and populates schema if not already existing."""
180 for d in (self._checkeds, self._topics_topics, self._types_types): d.clear()
181 self._close_printed = False
182
183 if "commit-interval" in self.args.WRITE_OPTIONS:
184 self.COMMIT_INTERVALCOMMIT_INTERVAL = int(self.args.WRITE_OPTIONS["commit-interval"])
185 self.db = self._connect()
186 self._cursor = self._make_cursor()
187 self._executescript(self._get_dialect_option("base_schema"))
188 self.db.commit()
189 self._load_schema()
190 TYPECOLS = self.MESSAGE_TYPE_TOPICCOLS + self.MESSAGE_TYPE_BASECOLS
191 if self._nesting_nesting: TYPECOLS += self.MESSAGE_TYPE_NESTCOLS
192 self._ensure_columns(TYPECOLS)
193
194
195 def _load_schema(self):
196 """Populates instance attributes with schema metainfo."""
197 self._cursor.execute("SELECT * FROM topics")
198 for row in self._cursor.fetchall():
199 topickey = (row["name"], row["md5"])
200 self._topics_topics[topickey] = row
201
202 self._cursor.execute("SELECT * FROM types")
203 for row in self._cursor.fetchall():
204 typekey = (row["type"], row["md5"])
205 self._types_types[typekey] = row
206
207
208 def _process_topic(self, topic, msg):
209 """
210 Inserts topics-row and creates view `/topic/name` if not already existing.
211
212 Also creates types-row and pkg/MsgType table for this message if not existing.
213 If nesting enabled, creates types recursively.
214 """
215 topickey = api.TypeMeta.make(msg, topic).topickey
216 if topickey in self._checkeds:
217 return
218
219 if topickey not in self._topics_topics:
220 exclude_cols = list(self.MESSAGE_TYPE_TOPICCOLS)
221 if self._nesting_nesting: exclude_cols += self.MESSAGE_TYPE_NESTCOLS
222 tdata = self._make_topic_data(topic, msg, exclude_cols)
223 self._topics_topics[topickey] = tdata
224 self._executescript(tdata["sql"])
225
226 sql, args = self._make_topic_insert_sql(topic, msg)
227 if self.args.VERBOSE and topickey not in self._counts_counts:
228 ConsolePrinter.debug("Adding topic %s in %s output.", topic, self.ENGINE)
229 self._topics_topics[topickey]["id"] = self._execute_insert(sql, args)
230
231 if self.COMMIT_INTERVALCOMMIT_INTERVAL: self.db.commit()
232 self._checkeds[topickey] = True
233
234
235 def _process_type(self, msg, rootmsg=None):
236 """
237 Creates types-row and pkg/MsgType table if not already existing.
238
239 @return created types-row, or None if already existed
240 """
241 rootmsg = rootmsg or msg
242 with api.TypeMeta.make(msg, root=rootmsg) as m:
243 typename, typekey = (m.typename, m.typekey)
244 if typekey in self._checkeds:
245 return None
246
247 if typekey not in self._types_types:
248 if self.args.VERBOSE and typekey not in self._schema:
249 ConsolePrinter.debug("Adding type %s in %s output.", typename, self.ENGINE)
250 extra_cols = self.MESSAGE_TYPE_TOPICCOLS + self.MESSAGE_TYPE_BASECOLS
251 if self._nesting_nesting: extra_cols += self.MESSAGE_TYPE_NESTCOLS
252 tdata = self._make_type_data(msg, extra_cols)
253 self._schema[typekey] = collections.OrderedDict(tdata.pop("cols"))
254 self._types_types[typekey] = tdata
255
256 self._executescript(tdata["sql"])
257 sql, args = self._make_type_insert_sql(msg)
258 tdata["id"] = self._execute_insert(sql, args)
259
260
261 nested_tables = self._types_types[typekey].get("nested_tables") or {}
262 nesteds = api.iter_message_fields(msg, messages_only=True) if self._nesting_nesting else ()
263 for path, submsgs, subtype in nesteds:
264 scalartype = api.scalar(subtype)
265 if subtype == scalartype and "all" != self._nesting_nesting:
266 continue # for path
267
268 subtypehash = not submsgs and self.source.get_message_type_hash(scalartype)
269 if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
270 [submsg] = submsgs[:1] or [self.source.get_message_class(scalartype, subtypehash)()]
271 subdata = self._process_type(submsg, rootmsg)
272 if subdata: nested_tables[".".join(path)] = subdata["table_name"]
273 if nested_tables:
274 self._types_types[typekey]["nested_tables"] = nested_tables
275 sets, where = {"nested_tables": nested_tables}, {"id": self._types_types[typekey]["id"]}
276 sql, args = self._make_update_sql("types", sets, where)
277 self._cursor.execute(sql, args)
278 self._checkeds[typekey] = True
279 return self._types_types[typekey]
280
281
282 def _process_message(self, topic, msg, stamp):
283 """
284 Inserts pkg/MsgType row for this message.
285
286 Inserts sub-rows for subtypes in message if nesting enabled.
287 Commits transaction if interval due.
288 """
289 self._populate_type(topic, msg, stamp)
291 do_commit = sum(len(v) for v in self._sql_queue.values()) >= self.COMMIT_INTERVALCOMMIT_INTERVAL
292 for sql in list(self._sql_queue) if do_commit else ():
293 self._executemany(sql, self._sql_queue.pop(sql))
294 do_commit and self.db.commit()
295
296
297 def _populate_type(self, topic, msg, stamp,
298 rootmsg=None, parent_type=None, parent_id=None):
299 """
300 Inserts pkg/MsgType row for message.
301
302 If nesting is enabled, inserts sub-rows for subtypes in message,
303 and returns inserted ID.
304 """
305 rootmsg = rootmsg or msg
306 with api.TypeMeta.make(msg, root=rootmsg) as m:
307 typename, typekey = m.typename, m.typekey
308 with api.TypeMeta.make(rootmsg) as m:
309 topic_id = self._topics_topics[m.topickey]["id"]
310 table_name = self._types_types[typekey]["table_name"]
311
312 myid = self._get_next_id(table_name) if self._nesting_nesting else None
313 coldefs = self.MESSAGE_TYPE_TOPICCOLS + self.MESSAGE_TYPE_BASECOLS[:-1]
314 colvals = [topic, topic_id, api.to_datetime(stamp), api.to_nsec(stamp)]
315 if self._nesting_nesting:
316 coldefs += self.MESSAGE_TYPE_BASECOLS[-1:] + self.MESSAGE_TYPE_NESTCOLS
317 colvals += [myid, parent_type, parent_id]
318 extra_cols = list(zip([c for c, _ in coldefs], colvals))
319 sql, args = self._make_message_insert_sql(topic, msg, extra_cols)
320 self._ensure_execute(sql, args)
321 if parent_type: self._nested_counts[typekey] = self._nested_counts.get(typekey, 0) + 1
322
323 subids = {} # {message field path: [ids]}
324 nesteds = api.iter_message_fields(msg, messages_only=True) if self._nesting_nesting else ()
325 for subpath, submsgs, subtype in nesteds:
326 scalartype = api.scalar(subtype)
327 if subtype == scalartype and "all" != self._nesting_nesting:
328 continue # for subpath
329 if isinstance(submsgs, (list, tuple)):
330 subids[subpath] = []
331 for submsg in submsgs if isinstance(submsgs, (list, tuple)) else [submsgs]:
332 subid = self._populate_type(topic, submsg, stamp, rootmsg, typename, myid)
333 if isinstance(submsgs, (list, tuple)):
334 subids[subpath].append(subid)
335 if subids:
336 sets, where = {".".join(p): subids[p] for p in subids}, {"_id": myid}
337 sql, args = self._make_update_sql(table_name, sets, where)
338 self._ensure_execute(sql, args)
339 return myid
340
341
342 def _ensure_columns(self, cols):
343 """Adds specified columns to any type tables lacking them."""
344 sqls = []
345 for typekey, typecols in ((k, v) for k, v in self._schema.items() if k in self._types_types):
346 table_name = self._types_types[typekey]["table_name"]
347 for c, t in ((c, t) for c, t in cols if c not in typecols):
348 sql = "ALTER TABLE %s ADD COLUMN %s %s;" % (quote(table_name), c, t)
349 sqls.append(sql)
350 typecols[c] = t
351 if sqls:
352 self._executescript("\n".join(sqls))
353 self.db.commit()
354
355
356 def _ensure_execute(self, sql, args):
357 """Executes SQL if in autocommit mode, else caches arguments for batch execution."""
358 args = tuple(args) if isinstance(args, list) else args
360 self._sql_queue.setdefault(sql, []).append(args)
361 else:
362 self._cursor.execute(sql, args)
363
364
365 def _connect(self):
366 """Returns new database connection."""
367 raise NotImplementedError()
368
369
370 def _execute_insert(self, sql, args):
371 """Executes INSERT statement, returns inserted ID."""
372 raise NotImplementedError()
373
374
375 def _executemany(self, sql, argses):
376 """Executes SQL with all args sequences."""
377 raise NotImplementedError()
378
379
380 def _executescript(self, sql):
381 """Executes SQL with one or more statements."""
382 raise NotImplementedError()
383
384
385 def _get_next_id(self, table):
386 """Returns next ID value for table."""
387 raise NotImplementedError()
388
389
390 def _make_cursor(self):
391 """Returns new database cursor."""
392 return self.db.cursor()
393
394
395 def _make_db_label(self):
396 """Returns formatted label for database."""
397 return self.args.WRITE
398
399
400__all__ = ["BaseDataSink"]
_ensure_stamp_index(self, topic, msg, stamp=None, index=None)
Definition outputs.py:131
source
inputs.Source instance bound to this sink
Definition outputs.py:55
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
Definition outputs.py:99
close(self)
Shuts down output, closing any files or connections.
Definition outputs.py:106
__init__(self, args=None, **kwargs)
Definition dbbase.py:81
list MESSAGE_TYPE_TOPICCOLS
Default topic-related columns for pkg/MsgType tables.
Definition dbbase.py:52
close_output(self)
Closes database connection, if any, executing any pending statements.
Definition dbbase.py:167
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes message to database.
Definition dbbase.py:130
validate(self)
Returns whether args.write_options has valid values, if any.
Definition dbbase.py:112
close(self)
Closes database connection, if any, emits metainfo.
Definition dbbase.py:142
list MESSAGE_TYPE_BASECOLS
Default columns for pkg/MsgType tables.
Definition dbbase.py:55
list MESSAGE_TYPE_NESTCOLS
Additional default columns for pkg/MsgType tables with nesting output.
Definition dbbase.py:60
int COMMIT_INTERVAL
Number of emits between commits; 0 is autocommit.
Definition dbbase.py:49
ENGINE
Database engine name, overridden in subclasses.
Definition dbbase.py:46
validate(self)
Returns whether arguments are valid.
Definition sqlbase.py:63
close(self)
Clears data structures.
Definition sqlbase.py:122