3Shared functionality for database sinks.
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.plugins.auto.dbbase
19from ... common import PATH_TYPES, ConsolePrinter, ensure_namespace, plural
20from ... outputs import Sink
21from . sqlbase import SqlMixin, quote
24class BaseDataSink(Sink, SqlMixin):
26 Base class for writing messages to a database.
29 - table
"topics",
with topic
and message type names
30 - table
"types",
with message type definitions
33 - table
"pkg/MsgType" for each topic message type,
with detailed fields,
34 BYTEA fields
for uint8[], array fields
for scalar list attributes,
35 and JSONB fields
for lists of ROS messages;
36 with foreign keys
if nesting
else subtype values
as JSON dictionaries;
37 plus underscore-prefixed fields
for metadata, like `_topic`
as the topic name.
38 If
not nesting, only topic message type tables are created.
39 - view
"/full/topic/name" for each topic, selecting
from the message type table
41 If a message type table already exists but
for a type
with a different MD5 hash,
42 the new table will have its MD5 hash appended to end,
as "pkg/MsgType (hash)".
49 COMMIT_INTERVAL = 1000
52 MESSAGE_TYPE_TOPICCOLS = [(
"_topic",
"TEXT"),
53 (
"_topic_id",
"INTEGER"), ]
55 MESSAGE_TYPE_BASECOLS = [(
"_dt",
"TIMESTAMP"),
56 (
"_timestamp",
"INTEGER"),
57 (
"_id",
"INTEGER NOT NULL "
58 "PRIMARY KEY AUTOINCREMENT"), ]
60 MESSAGE_TYPE_NESTCOLS = [(
"_parent_type",
"TEXT"),
61 (
"_parent_id",
"INTEGER"), ]
64 DEFAULT_ARGS = dict(META=
False, WRITE_OPTIONS={}, VERBOSE=
False)
67 def __init__(self, args=None, **kwargs):
69 @param args arguments
as namespace
or dictionary, case-insensitive;
70 or a single item
as the database connection string
71 @param args.write database connection string
72 @param args.write_options ```
73 {
"commit-interval": transaction size (0
is autocommit),
74 "nesting":
"array" to recursively insert arrays
75 of nested types,
or "all" for any nesting)}
77 @param args.meta whether to emit metainfo
78 @param args.verbose whether to emit debug information
79 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
81 args = {"WRITE": str(args)}
if isinstance(args, PATH_TYPES)
else args
82 args = ensure_namespace(args, BaseDataSink.DEFAULT_ARGS, **kwargs)
83 super(BaseDataSink, self).
__init__(args)
84 SqlMixin.__init__(self, args)
108 Returns whether args.write_options has valid values, if any.
110 Checks parameters
"commit-interval" and "nesting".
112 ok, sqlconfig_ok = True, SqlMixin._validate_dialect_file(self)
113 if "commit-interval" in self.
args.WRITE_OPTIONS:
114 try: ok = int(self.
args.WRITE_OPTIONS[
"commit-interval"]) >= 0
115 except Exception: ok =
False
117 ConsolePrinter.error(
"Invalid commit-interval option for %s: %r.",
118 self.
ENGINE, self.
args.WRITE_OPTIONS[
"commit-interval"])
119 if self.
args.WRITE_OPTIONS.get(
"nesting")
not in (
None,
False,
"",
"array",
"all"):
120 ConsolePrinter.error(
"Invalid nesting option for %s: %r. "
121 "Choose one of {array,all}.",
122 self.
ENGINE, self.
args.WRITE_OPTIONS[
"nesting"])
124 if ok
and sqlconfig_ok:
126 return ok
and sqlconfig_ok
129 def emit(self, topic, msg, stamp=None, match=None, index=None):
130 """Writes message to database."""
138 super(BaseDataSink, self).
emit(topic, msg, stamp, match, index)
142 """Closes database connection, if any, emits metainfo."""
147 if hasattr(self,
"format_output_meta"):
148 ConsolePrinter.debug(
"Wrote %s database for %s",
149 self.
ENGINE, self.format_output_meta())
152 ConsolePrinter.debug(
"Wrote %s in %s to %s database %s.",
156 ConsolePrinter.debug(
"Wrote %s in %s to %s.",
163 super(BaseDataSink, self).
close()
167 """Closes database connection, if any, executing any pending statements."""
179 """Opens database connection, and populates schema if not already existing."""
183 if "commit-interval" in self.
args.WRITE_OPTIONS:
195 def _load_schema(self):
196 """Populates instance attributes with schema metainfo."""
197 self.
_cursor.execute(
"SELECT * FROM topics")
198 for row
in self.
_cursor.fetchall():
199 topickey = (row[
"name"], row[
"md5"])
202 self.
_cursor.execute(
"SELECT * FROM types")
203 for row
in self.
_cursor.fetchall():
204 typekey = (row[
"type"], row[
"md5"])
208 def _process_topic(self, topic, msg):
210 Inserts topics-row and creates view `/topic/name`
if not already existing.
212 Also creates types-row
and pkg/MsgType table
for this message
if not existing.
213 If nesting enabled, creates types recursively.
215 topickey = api.TypeMeta.make(msg, topic).topickey
228 ConsolePrinter.debug(
"Adding topic %s in %s output.", topic, self.
ENGINE)
235 def _process_type(self, msg, rootmsg=None):
237 Creates types-row and pkg/MsgType table
if not already existing.
239 @return created types-row,
or None if already existed
241 rootmsg = rootmsg or msg
242 with api.TypeMeta.make(msg, root=rootmsg)
as m:
243 typename, typekey = (m.typename, m.typekey)
248 if self.
args.VERBOSE
and typekey
not in self.
_schema:
249 ConsolePrinter.debug(
"Adding type %s in %s output.", typename, self.
ENGINE)
253 self.
_schema[typekey] = collections.OrderedDict(tdata.pop(
"cols"))
261 nested_tables = self.
_types_types[typekey].get(
"nested_tables")
or {}
262 nesteds = api.iter_message_fields(msg, messages_only=
True)
if self.
_nesting_nesting else ()
263 for path, submsgs, subtype
in nesteds:
264 scalartype = api.scalar(subtype)
268 subtypehash =
not submsgs
and self.
source.get_message_type_hash(scalartype)
269 if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
270 [submsg] = submsgs[:1]
or [self.
source.get_message_class(scalartype, subtypehash)()]
272 if subdata: nested_tables[
".".join(path)] = subdata[
"table_name"]
274 self.
_types_types[typekey][
"nested_tables"] = nested_tables
275 sets, where = {
"nested_tables": nested_tables}, {
"id": self.
_types_types[typekey][
"id"]}
277 self.
_cursor.execute(sql, args)
282 def _process_message(self, topic, msg, stamp):
284 Inserts pkg/MsgType row for this message.
286 Inserts sub-rows
for subtypes
in message
if nesting enabled.
287 Commits transaction
if interval due.
292 for sql
in list(self.
_sql_queue)
if do_commit
else ():
294 do_commit
and self.
db.commit()
297 def _populate_type(self, topic, msg, stamp,
298 rootmsg=None, parent_type=None, parent_id=None):
300 Inserts pkg/MsgType row for message.
302 If nesting
is enabled, inserts sub-rows
for subtypes
in message,
303 and returns inserted ID.
305 rootmsg = rootmsg or msg
306 with api.TypeMeta.make(msg, root=rootmsg)
as m:
307 typename, typekey = m.typename, m.typekey
308 with api.TypeMeta.make(rootmsg)
as m:
314 colvals = [topic, topic_id, api.to_datetime(stamp), api.to_nsec(stamp)]
317 colvals += [myid, parent_type, parent_id]
318 extra_cols = list(zip([c
for c, _
in coldefs], colvals))
324 nesteds = api.iter_message_fields(msg, messages_only=
True)
if self.
_nesting_nesting else ()
325 for subpath, submsgs, subtype
in nesteds:
326 scalartype = api.scalar(subtype)
329 if isinstance(submsgs, (list, tuple)):
331 for submsg
in submsgs
if isinstance(submsgs, (list, tuple))
else [submsgs]:
332 subid = self.
_populate_type(topic, submsg, stamp, rootmsg, typename, myid)
333 if isinstance(submsgs, (list, tuple)):
334 subids[subpath].append(subid)
336 sets, where = {
".".join(p): subids[p]
for p
in subids}, {
"_id": myid}
342 def _ensure_columns(self, cols):
343 """Adds specified columns to any type tables lacking them."""
345 for typekey, typecols
in ((k, v)
for k, v
in self.
_schema.items()
if k
in self.
_types_types):
347 for c, t
in ((c, t)
for c, t
in cols
if c
not in typecols):
348 sql =
"ALTER TABLE %s ADD COLUMN %s %s;" % (quote(table_name), c, t)
356 def _ensure_execute(self, sql, args):
357 """Executes SQL if in autocommit mode, else caches arguments for batch execution."""
358 args = tuple(args)
if isinstance(args, list)
else args
360 self.
_sql_queue.setdefault(sql, []).append(args)
362 self.
_cursor.execute(sql, args)
366 """Returns new database connection."""
367 raise NotImplementedError()
370 def _execute_insert(self, sql, args):
371 """Executes INSERT statement, returns inserted ID."""
372 raise NotImplementedError()
375 def _executemany(self, sql, argses):
376 """Executes SQL with all args sequences."""
377 raise NotImplementedError()
380 def _executescript(self, sql):
381 """Executes SQL with one or more statements."""
382 raise NotImplementedError()
385 def _get_next_id(self, table):
386 """Returns next ID value for table."""
387 raise NotImplementedError()
390 def _make_cursor(self):
391 """Returns new database cursor."""
392 return self.
db.cursor()
395 def _make_db_label(self):
396 """Returns formatted label for database."""
397 return self.
args.WRITE
400__all__ = [
"BaseDataSink"]
_ensure_stamp_index(self, topic, msg, stamp=None, index=None)
source
inputs.Source instance bound to this sink
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
close(self)
Shuts down output, closing any files or connections.
__init__(self, args=None, **kwargs)
list MESSAGE_TYPE_TOPICCOLS
Default topic-related columns for pkg/MsgType tables.
close_output(self)
Closes database connection, if any, executing any pending statements.
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes message to database.
validate(self)
Returns whether args.write_options has valid values, if any.
close(self)
Closes database connection, if any, emits metainfo.
list MESSAGE_TYPE_BASECOLS
Default columns for pkg/MsgType tables.
list MESSAGE_TYPE_NESTCOLS
Additional default columns for pkg/MsgType tables with nesting output.
int COMMIT_INTERVAL
Number of emits between commits; 0 is autocommit.
ENGINE
Database engine name, overridden in subclasses.
validate(self)
Returns whether arguments are valid.
close(self)
Clears data structures.