grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
sqlite.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3SQLite output plugin.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 03.12.2021
11@modified 21.04.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.plugins.auto.sqlite
15import collections
16import json
17import os
18import sqlite3
19
20import six
21
22from ... import api
23from ... common import ConsolePrinter, format_bytes, makedirs, verify_io
24from ... outputs import RolloverSinkMixin
25from . dbbase import BaseDataSink, quote
26
27
28class SqliteSink(BaseDataSink, RolloverSinkMixin):
29 """
30 Writes messages to an SQLite database.
31
32 Output will have:
33 - table "messages", with all messages as serialized binary data
34 - table "types", with message definitions
35 - table "topics", with topic information
36
37 plus:
38 - table "pkg/MsgType" for each message type, with detailed fields,
39 and JSON fields for arrays of nested subtypes,
40 with foreign keys if nesting else subtype values as JSON dictionaries;
41 plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
42
43 If launched with nesting-option, tables will also be created for each
44 nested message type.
45
46 - view "/topic/full/name" for each topic,
47 selecting from the message type table
48
49 """
50
51
52 ENGINE = "SQLite"
53
54
55 FILE_EXTENSIONS = (".sqlite", ".sqlite3")
56
57
58 MAX_INT = 2**63 - 1
59
60
61 DEFAULT_ARGS = dict(META=False, WRITE_OPTIONS={}, VERBOSE=False)
62
63
64 def __init__(self, args=None, **kwargs):
65 """
66 @param args arguments as namespace or dictionary, case-insensitive;
67 or a single path as the name of SQLitefile to write
68 @param args.write name of SQLite file to write, will be appended to if exists
69 @param args.write_options ```
70 {"commit-interval": transaction size (0 is autocommit),
71 "message-yaml": populate messages.yaml (default true),
72 "nesting": "array" to recursively insert arrays
73 of nested types, or "all" for any nesting),
74 "overwrite": whether to overwrite existing file
75 (default false),
76 "rollover-size": bytes limit for individual output files,
77 "rollover-count": message limit for individual output files,
78 "rollover-duration": time span limit for individual output files,
79 as ROS duration or convertible seconds,
80 "rollover-template": output filename template, supporting
81 strftime format codes like "%H-%M-%S"
82 and "%(index)s" as output file index}
83 ```
84 @param args.meta whether to emit metainfo
85 @param args.verbose whether to emit debug information
86 @param kwargs any and all arguments as keyword overrides, case-insensitive
87 """
88 super(SqliteSink, self).__init__(args, **kwargs)
89 RolloverSinkMixin.__init__(self, args)
90
91 self._do_yaml = None
92 self._overwrite = None
93 self._id_counters = {} # {table next: max ID}
94
95
96 def validate(self):
97 """
98 Returns whether "commit-interval" and "nesting" in args.write_options have valid value, if any,
99 and file is writable; parses "message-yaml" and "overwrite" from args.write_options.
100 """
101 if self.validvalid is not None: return self.validvalid
102 ok = all([super(SqliteSink, self).validate(), RolloverSinkMixin.validate(self)])
103 if self.args.WRITE_OPTIONS.get("message-yaml") not in (None, True, False, "true", "false"):
104 ConsolePrinter.error("Invalid message-yaml option for %s: %r. "
105 "Choose one of {true, false}.",
106 self.ENGINEENGINEENGINE, self.args.WRITE_OPTIONS["message-yaml"])
107 ok = False
108 if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
109 ConsolePrinter.error("Invalid overwrite option for %s: %r. "
110 "Choose one of {true, false}.",
111 self.ENGINEENGINEENGINE, self.args.WRITE_OPTIONS["overwrite"])
112 ok = False
113 if not verify_io(self.args.WRITE, "w"):
114 ok = False
115 self.validvalid = ok
116 if self.validvalid:
117 self._do_yaml = (self.args.WRITE_OPTIONS.get("message-yaml") != "false")
118 self._overwrite = (self.args.WRITE_OPTIONS.get("overwrite") in (True, "true"))
119 return self.validvalid
120
121
122 def emit(self, topic, msg, stamp=None, match=None, index=None):
123 """Writes message to database."""
124 if not self.validatevalidatevalidatevalidatevalidate(): raise Exception("invalid")
125 stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
126 RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
127 super(SqliteSink, self).emit(topic, msg, stamp, match, index)
128
129
130 @property
131 def size(self):
132 """Returns current file size in bytes, including journals, or None if size lookup failed."""
133 sizes = []
134 for f in ("%s%s" % (self.filenamefilename, x) for x in ("", "-journal", "-wal")):
135 try: os.path.isfile(f) and sizes.append(os.path.getsize(f))
136 except Exception as e: ConsolePrinter.warn("Error getting size of %s: %s", f, e)
137 return sum(sizes) if sizes else None
138
139
140 def _init_db(self):
141 """Opens the database file and populates schema if not already existing."""
142 for t in (dict, list, tuple): sqlite3.register_adapter(t, json.dumps)
143 for t in six.integer_types:
144 sqlite3.register_adapter(t, lambda x: str(x) if abs(x) > self.MAX_INT else x)
145 sqlite3.register_converter("JSON", json.loads)
147 if self.args.VERBOSE:
148 sz = self.sizesizesizesize
149 action = "Overwriting" if sz and self._overwrite else \
150 "Appending to" if sz else "Creating"
151 ConsolePrinter.debug("%s SQLite output %s%s.", action, self.filenamefilename,
152 (" (%s)" % format_bytes(sz)) if sz else "")
153 super(SqliteSink, self)._init_db()
154
155
156 def _load_schema(self):
157 """Populates instance attributes with schema metainfo."""
158 super(SqliteSink, self)._load_schema()
159 for row in self.db.execute("SELECT name FROM sqlite_master "
160 "WHERE type = 'table' AND name LIKE '%/%'"):
161 cols = self.db.execute("PRAGMA table_info(%s)" % quote(row["name"])).fetchall()
162 typerow = next((x for x in self._types_types.values()
163 if x["table_name"] == row["name"]), None)
164 if not typerow: continue # for row
165 typekey = (typerow["type"], typerow["md5"])
166 self._schema[typekey] = collections.OrderedDict([(c["name"], c) for c in cols])
167
168
169 def _process_message(self, topic, msg, stamp):
170 """Inserts message to messages-table, and to pkg/MsgType tables."""
171 with api.TypeMeta.make(msg, topic) as m:
172 topic_id, typename = self._topics_topics[m.topickey]["id"], m.typename
173 margs = dict(dt=api.to_datetime(stamp), timestamp=api.to_nsec(stamp),
174 topic=topic, name=topic, topic_id=topic_id, type=typename,
175 yaml=str(msg) if self._do_yaml else "", data=api.serialize_message(msg))
176 self._ensure_execute(self._get_dialect_option("insert_message"), margs)
177 super(SqliteSink, self)._process_message(topic, msg, stamp)
178
179
180 def _connect(self):
181 """Returns new database connection."""
182 makedirs(os.path.dirname(self.filenamefilename))
183 if self._overwrite: open(self.filenamefilename, "wb").close()
184 db = sqlite3.connect(self.filenamefilename, check_same_thread=False,
185 detect_types=sqlite3.PARSE_DECLTYPES)
186 if not self.COMMIT_INTERVALCOMMIT_INTERVAL: db.isolation_level = None
187 db.row_factory = lambda cursor, row: dict(sqlite3.Row(cursor, row))
188 return db
189
190
191 def _execute_insert(self, sql, args):
192 """Executes INSERT statement, returns inserted ID."""
193 return self._cursor.execute(sql, args).lastrowid
194
195
196 def _executemany(self, sql, argses):
197 """Executes SQL with all args sequences."""
198 self._cursor.executemany(sql, argses)
199
200
201 def _executescript(self, sql):
202 """Executes SQL with one or more statements."""
203 self._cursor.executescript(sql)
204
205
206 def _get_next_id(self, table):
207 """Returns next ID value for table, using simple auto-increment."""
208 if not self._id_counters.get(table):
209 sql = "SELECT COALESCE(MAX(_id), 0) AS id FROM %s" % quote(table)
210 self._id_counters[table] = self.db.execute(sql).fetchone()["id"]
211 self._id_counters[table] += 1
212 return self._id_counters[table]
213
214
215 def _make_db_label(self):
216 """Returns formatted label for database, with file path and size."""
217 sz = self.sizesizesizesize
218 return "%s (%s)" % (self.filenamefilename, "error getting size" if sz is None else sz)
219
220
221
222def init(*_, **__):
223 """Adds SQLite output format support."""
224 from ... import plugins # Late import to avoid circular
225 plugins.add_write_format("sqlite", SqliteSink, "SQLite", [
226 ("commit-interval=NUM", "transaction size for SQLite output\n"
227 "(default 1000, 0 is autocommit)"),
228 ("dialect-file=path/to/dialects.yaml",
229 "load additional SQL dialect options\n"
230 "for SQLite output\n"
231 "from a YAML or JSON file"),
232 ("message-yaml=true|false", "whether to populate table field messages.yaml\n"
233 "in SQLite output (default true)"),
234 ("nesting=array|all", "create tables for nested message types\n"
235 "in SQLite output,\n"
236 'only for arrays if "array" \n'
237 "else for any nested types\n"
238 "(array fields in parent will be populated \n"
239 " with foreign keys instead of messages as JSON)"),
240 ("overwrite=true|false", "overwrite existing file in SQLite output\n"
241 "instead of appending to file (default false)")
242 ] + RolloverSinkMixin.get_write_options("SQLite"))
244
245__all__ = ["SqliteSink", "init"]
filename
Current output file path.
Definition outputs.py:403
validate(self)
Returns whether write options are valid, emits error if not, else populates options.
Definition outputs.py:406
size
Returns current file size in bytes, or None if size lookup failed.
Definition outputs.py:527
make_filename(self)
Returns new filename for output, accounting for rollover template and overwrite.
Definition outputs.py:481
_ensure_stamp_index(self, topic, msg, stamp=None, index=None)
Definition outputs.py:131
valid
Result of validate()
Definition outputs.py:53
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
Definition outputs.py:99
validate(self)
Returns whether args.write_options has valid values, if any.
Definition dbbase.py:112
int COMMIT_INTERVAL
Number of emits between commits; 0 is autocommit.
Definition dbbase.py:49
ENGINE
Database engine name, overridden in subclasses.
Definition dbbase.py:46
validate(self)
Returns whether arguments are valid.
Definition sqlbase.py:63
int MAX_INT
Maximum integer size supported in SQLite, higher values inserted as string.
Definition sqlite.py:58
str ENGINE
Database engine name.
Definition sqlite.py:52
__init__(self, args=None, **kwargs)
Definition sqlite.py:88
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes message to database.
Definition sqlite.py:123
validate(self)
Returns whether "commit-interval" and "nesting" in args.write_options have valid value,...
Definition sqlite.py:101
size
Returns current file size in bytes, including journals, or None if size lookup failed.
Definition sqlite.py:133
init(*_, **__)
Adds SQLite output format support.
Definition sqlite.py:243