5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.plugins.auto.sqlite
23from ... common import ConsolePrinter, format_bytes, makedirs, verify_io
24from ... outputs import RolloverSinkMixin
25from . dbbase import BaseDataSink, quote
28class SqliteSink(BaseDataSink, RolloverSinkMixin):
30 Writes messages to an SQLite database.
33 - table "messages",
with all messages
as serialized binary data
34 - table
"types",
with message definitions
35 - table
"topics",
with topic information
38 - table
"pkg/MsgType" for each message type,
with detailed fields,
39 and JSON fields
for arrays of nested subtypes,
40 with foreign keys
if nesting
else subtype values
as JSON dictionaries;
41 plus underscore-prefixed fields
for metadata, like `_topic`
as the topic name.
43 If launched
with nesting-option, tables will also be created
for each
46 - view
"/topic/full/name" for each topic,
47 selecting
from the message type table
55 FILE_EXTENSIONS = (
".sqlite",
".sqlite3")
61 DEFAULT_ARGS = dict(META=
False, WRITE_OPTIONS={}, VERBOSE=
False)
64 def __init__(self, args=None, **kwargs):
66 @param args arguments
as namespace
or dictionary, case-insensitive;
67 or a single path
as the name of SQLitefile to write
68 @param args.write name of SQLite file to write, will be appended to
if exists
69 @param args.write_options ```
70 {
"commit-interval": transaction size (0
is autocommit),
71 "message-yaml": populate messages.yaml (default true),
72 "nesting":
"array" to recursively insert arrays
73 of nested types,
or "all" for any nesting),
74 "overwrite": whether to overwrite existing file
76 "rollover-size": bytes limit
for individual output files,
77 "rollover-count": message limit
for individual output files,
78 "rollover-duration": time span limit
for individual output files,
79 as ROS duration
or convertible seconds,
80 "rollover-template": output filename template, supporting
81 strftime format codes like
"%H-%M-%S"
82 and "%(index)s" as output file index}
84 @param args.meta whether to emit metainfo
85 @param args.verbose whether to emit debug information
86 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
88 super(SqliteSink, self).__init__(args, **kwargs)
89 RolloverSinkMixin.__init__(self, args)
98 Returns whether "commit-interval" and "nesting" in args.write_options have valid value,
if any,
99 and file
is writable; parses
"message-yaml" and "overwrite" from args.write_options.
102 ok = all([super(SqliteSink, self).
validate(), RolloverSinkMixin.validate(self)])
103 if self.
args.WRITE_OPTIONS.get(
"message-yaml")
not in (
None,
True,
False,
"true",
"false"):
104 ConsolePrinter.error(
"Invalid message-yaml option for %s: %r. "
105 "Choose one of {true, false}.",
108 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
109 ConsolePrinter.error(
"Invalid overwrite option for %s: %r. "
110 "Choose one of {true, false}.",
113 if not verify_io(self.
args.WRITE,
"w"):
117 self.
_do_yaml = (self.
args.WRITE_OPTIONS.get(
"message-yaml") !=
"false")
118 self.
_overwrite = (self.
args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true"))
122 def emit(self, topic, msg, stamp=None, match=None, index=None):
123 """Writes message to database."""
126 RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
127 super(SqliteSink, self).
emit(topic, msg, stamp, match, index)
132 """Returns current file size in bytes, including journals, or None if size lookup failed."""
134 for f
in (
"%s%s" % (self.
filenamefilename, x)
for x
in (
"",
"-journal",
"-wal")):
135 try: os.path.isfile(f)
and sizes.append(os.path.getsize(f))
136 except Exception
as e: ConsolePrinter.warn(
"Error getting size of %s: %s", f, e)
137 return sum(sizes)
if sizes
else None
141 """Opens the database file and populates schema if not already existing."""
142 for t
in (dict, list, tuple): sqlite3.register_adapter(t, json.dumps)
143 for t
in six.integer_types:
144 sqlite3.register_adapter(t,
lambda x: str(x)
if abs(x) > self.
MAX_INT else x)
145 sqlite3.register_converter(
"JSON", json.loads)
147 if self.
args.VERBOSE:
149 action =
"Overwriting" if sz
and self.
_overwrite else \
150 "Appending to" if sz
else "Creating"
151 ConsolePrinter.debug(
"%s SQLite output %s%s.", action, self.
filenamefilename,
152 (
" (%s)" % format_bytes(sz))
if sz
else "")
153 super(SqliteSink, self)._init_db()
156 def _load_schema(self):
157 """Populates instance attributes with schema metainfo."""
158 super(SqliteSink, self)._load_schema()
159 for row
in self.
db.execute(
"SELECT name FROM sqlite_master "
160 "WHERE type = 'table' AND name LIKE '%/%'"):
161 cols = self.
db.execute(
"PRAGMA table_info(%s)" % quote(row[
"name"])).fetchall()
163 if x[
"table_name"] == row[
"name"]),
None)
164 if not typerow:
continue
165 typekey = (typerow[
"type"], typerow[
"md5"])
166 self.
_schema[typekey] = collections.OrderedDict([(c[
"name"], c)
for c
in cols])
169 def _process_message(self, topic, msg, stamp):
170 """Inserts message to messages-table, and to pkg/MsgType tables."""
171 with api.TypeMeta.make(msg, topic)
as m:
172 topic_id, typename = self.
_topics_topics[m.topickey][
"id"], m.typename
173 margs = dict(dt=api.to_datetime(stamp), timestamp=api.to_nsec(stamp),
174 topic=topic, name=topic, topic_id=topic_id, type=typename,
175 yaml=str(msg)
if self.
_do_yaml else "", data=api.serialize_message(msg))
177 super(SqliteSink, self)._process_message(topic, msg, stamp)
181 """Returns new database connection."""
185 detect_types=sqlite3.PARSE_DECLTYPES)
187 db.row_factory =
lambda cursor, row: dict(sqlite3.Row(cursor, row))
191 def _execute_insert(self, sql, args):
192 """Executes INSERT statement, returns inserted ID."""
193 return self.
_cursor.execute(sql, args).lastrowid
196 def _executemany(self, sql, argses):
197 """Executes SQL with all args sequences."""
198 self.
_cursor.executemany(sql, argses)
201 def _executescript(self, sql):
202 """Executes SQL with one or more statements."""
203 self.
_cursor.executescript(sql)
206 def _get_next_id(self, table):
207 """Returns next ID value for table, using simple auto-increment."""
209 sql =
"SELECT COALESCE(MAX(_id), 0) AS id FROM %s" % quote(table)
215 def _make_db_label(self):
216 """Returns formatted label for database, with file path and size."""
218 return "%s (%s)" % (self.
filenamefilename,
"error getting size" if sz
is None else sz)
223 """Adds SQLite output format support."""
224 from ...
import plugins
225 plugins.add_write_format(
"sqlite", SqliteSink,
"SQLite", [
226 (
"commit-interval=NUM",
"transaction size for SQLite output\n"
227 "(default 1000, 0 is autocommit)"),
228 (
"dialect-file=path/to/dialects.yaml",
229 "load additional SQL dialect options\n"
230 "for SQLite output\n"
231 "from a YAML or JSON file"),
232 (
"message-yaml=true|false",
"whether to populate table field messages.yaml\n"
233 "in SQLite output (default true)"),
234 (
"nesting=array|all",
"create tables for nested message types\n"
235 "in SQLite output,\n"
236 'only for arrays if "array" \n'
237 "else for any nested types\n"
238 "(array fields in parent will be populated \n"
239 " with foreign keys instead of messages as JSON)"),
240 (
"overwrite=true|false",
"overwrite existing file in SQLite output\n"
241 "instead of appending to file (default false)")
242 ] + RolloverSinkMixin.get_write_options(
"SQLite"))
245__all__ = [
"SqliteSink",
"init"]
filename
Current output file path.
validate(self)
Returns whether write options are valid, emits error if not, else populates options.
size
Returns current file size in bytes, or None if size lookup failed.
make_filename(self)
Returns new filename for output, accounting for rollover template and overwrite.
_ensure_stamp_index(self, topic, msg, stamp=None, index=None)
valid
Result of validate()
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
validate(self)
Returns whether args.write_options has valid values, if any.
int COMMIT_INTERVAL
Number of emits between commits; 0 is autocommit.
ENGINE
Database engine name, overridden in subclasses.
validate(self)
Returns whether arguments are valid.
int MAX_INT
Maximum integer size supported in SQLite, higher values inserted as string.
str ENGINE
Database engine name.
__init__(self, args=None, **kwargs)
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes message to database.
validate(self)
Returns whether "commit-interval" and "nesting" in args.write_options have valid value,...
size
Returns current file size in bytes, including journals, or None if size lookup failed.
init(*_, **__)
Adds SQLite output format support.