grepros 1.3.0
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
sqlite.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3SQLite output plugin.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 03.12.2021
11@modified 14.02.2026
12------------------------------------------------------------------------------
13"""
14
15import collections
16import datetime
17import json
18import os
19import sqlite3
20
21import six
22
23from ... import api
24from ... common import ConsolePrinter, format_bytes, makedirs, verify_io
25from ... outputs import RolloverSinkMixin
26from . dbbase import BaseDataSink, quote
27
28
30 """
31 Writes messages to an SQLite database.
32
33 Output will have:
34 - table "messages", with all messages as serialized binary data
35 - table "types", with message definitions
36 - table "topics", with topic information
37
38 plus:
39 - table "pkg/MsgType" for each message type, with detailed fields,
40 and JSON fields for arrays of nested subtypes,
41 with foreign keys if nesting else subtype values as JSON dictionaries;
42 plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
43
44 If launched with nesting-option, tables will also be created for each
45 nested message type.
46
47 - view "/topic/full/name" for each topic,
48 selecting from the message type table
49
50 """
51
52
53 ENGINE = "SQLite"
54
55
56 FILE_EXTENSIONS = (".sqlite", ".sqlite3")
57
58
59 MAX_INT = 2**63 - 1
60
61
62 DEFAULT_ARGS = dict(META=False, WRITE_OPTIONS={}, VERBOSE=False)
63
64
65 def __init__(self, args=None, **kwargs):
66 """
67 @param args arguments as namespace or dictionary, case-insensitive;
68 or a single path as the name of SQLitefile to write
69 @param args.write name of SQLite file to write, will be appended to if exists
70 @param args.write_options ```
71 {"commit-interval": transaction size (0 is autocommit),
72 "message-yaml": populate messages.yaml (default true),
73 "nesting": "array" to recursively insert arrays
74 of nested types, or "all" for any nesting),
75 "overwrite": whether to overwrite existing file
76 (default false),
77 "rollover-size": bytes limit for individual output files,
78 "rollover-count": message limit for individual output files,
79 "rollover-duration": time span limit for individual output files,
80 as ROS duration or convertible seconds,
81 "rollover-template": output filename template, supporting
82 strftime format codes like "%H-%M-%S"
83 and "%(index)s" as output file index}
84 ```
85 @param args.meta whether to emit metainfo
86 @param args.verbose whether to emit debug information
87 @param kwargs any and all arguments as keyword overrides, case-insensitive
88 """
89 super(SqliteSink, self).__init__(args, **kwargs)
90 RolloverSinkMixin.__init__(self, args)
91
92 self._do_yaml = None
93 self._overwrite = None
94 self._id_counters = {} # {table next: max ID}
95
96
97 def validate(self):
98 """
99 Returns whether "commit-interval" and "nesting" in args.write_options have valid value, if any,
100 and file is writable; parses "message-yaml" and "overwrite" from args.write_options.
101 """
102 if self.validvalid is not None: return self.validvalid
103 ok = all([super(SqliteSink, self).validate(), RolloverSinkMixin.validate(self)])
104 if self.args.WRITE_OPTIONS.get("message-yaml") not in (None, True, False, "true", "false"):
105 ConsolePrinter.error("Invalid message-yaml option for %s: %r. "
106 "Choose one of {true, false}.",
107 self.ENGINEENGINEENGINE, self.args.WRITE_OPTIONS["message-yaml"])
108 ok = False
109 if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
110 ConsolePrinter.error("Invalid overwrite option for %s: %r. "
111 "Choose one of {true, false}.",
112 self.ENGINEENGINEENGINE, self.args.WRITE_OPTIONS["overwrite"])
113 ok = False
114 if not verify_io(self.args.WRITE, "w"):
115 ok = False
116 self.validvalid = ok
117 if self.validvalid:
118 self._do_yaml = (self.args.WRITE_OPTIONS.get("message-yaml") != "false")
119 self._overwrite = (self.args.WRITE_OPTIONS.get("overwrite") in (True, "true"))
120 return self.validvalid
121
122
123 def emit(self, topic, msg, stamp=None, match=None, index=None):
124 """Writes message to database."""
125 if not self.validatevalidatevalidatevalidatevalidate(): raise Exception("invalid")
126 stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
127 RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
128 super(SqliteSink, self).emit(topic, msg, stamp, match, index)
129
130
131 @property
132 def size(self):
133 """Returns current file size in bytes, including journals, or None if size lookup failed."""
134 sizes = []
135 for f in ("%s%s" % (self.filenamefilename, x) for x in ("", "-journal", "-wal")):
136 try: os.path.isfile(f) and sizes.append(os.path.getsize(f))
137 except Exception as e: ConsolePrinter.warn("Error getting size of %s: %s", f, e)
138 return sum(sizes) if sizes else None
139
140
141 def _init_db(self):
142 """Opens the database file and populates schema if not already existing."""
143 sqlite3.register_adapter(datetime.datetime, datetime.datetime.isoformat)
144 for t in (dict, list, tuple): sqlite3.register_adapter(t, json.dumps)
145 for t in six.integer_types:
146 sqlite3.register_adapter(t, lambda x: str(x) if abs(x) > self.MAX_INT else x)
147 sqlite3.register_converter("JSON", json.loads)
149 if self.args.VERBOSE:
150 sz = self.sizesize
151 action = "Overwriting" if sz and self._overwrite else \
152 "Appending to" if sz else "Creating"
153 ConsolePrinter.debug("%s SQLite output %s%s.", action, self.filenamefilename,
154 (" (%s)" % format_bytes(sz)) if sz else "")
155 super(SqliteSink, self)._init_db()
156
157
158 def _load_schema(self):
159 """Populates instance attributes with schema metainfo."""
160 super(SqliteSink, self)._load_schema()
161 for row in self.db.execute("SELECT name FROM sqlite_master "
162 "WHERE type = 'table' AND name LIKE '%/%'"):
163 cols = self.db.execute("PRAGMA table_info(%s)" % quote(row["name"])).fetchall()
164 typerow = next((x for x in self._types_types.values()
165 if x["table_name"] == row["name"]), None)
166 if not typerow: continue # for row
167 typekey = (typerow["type"], typerow["md5"])
168 self._schema[typekey] = collections.OrderedDict([(c["name"], c) for c in cols])
169
170
171 def _process_message(self, topic, msg, stamp):
172 """Inserts message to messages-table, and to pkg/MsgType tables."""
173 with api.TypeMeta.make(msg, topic) as m:
174 topic_id, typename = self._topics_topics[m.topickey]["id"], m.typename
175 margs = dict(dt=api.to_datetime(stamp), timestamp=api.to_nsec(stamp),
176 topic=topic, name=topic, topic_id=topic_id, type=typename,
177 yaml=str(msg) if self._do_yaml else "", data=api.serialize_message(msg))
178 self._ensure_execute(self._get_dialect_option("insert_message"), margs)
179 super(SqliteSink, self)._process_message(topic, msg, stamp)
180
181
182 def _connect(self):
183 """Returns new database connection."""
184 makedirs(os.path.dirname(self.filenamefilename))
185 if self._overwrite: open(self.filenamefilename, "wb").close()
186 db = sqlite3.connect(self.filenamefilename, check_same_thread=False,
187 detect_types=sqlite3.PARSE_DECLTYPES)
188 if not self.COMMIT_INTERVALCOMMIT_INTERVAL: db.isolation_level = None
189 db.row_factory = lambda cursor, row: dict(sqlite3.Row(cursor, row))
190 return db
191
192
193 def _execute_insert(self, sql, args):
194 """Executes INSERT statement, returns inserted ID."""
195 return self._cursor.execute(sql, args).lastrowid
196
197
198 def _executemany(self, sql, argses):
199 """Executes SQL with all args sequences."""
200 self._cursor.executemany(sql, argses)
201
202
203 def _executescript(self, sql):
204 """Executes SQL with one or more statements."""
205 self._cursor.executescript(sql)
206
207
208 def _get_next_id(self, table):
209 """Returns next ID value for table, using simple auto-increment."""
210 if not self._id_counters.get(table):
211 sql = "SELECT COALESCE(MAX(_id), 0) AS id FROM %s" % quote(table)
212 self._id_counters[table] = self.db.execute(sql).fetchone()["id"]
213 self._id_counters[table] += 1
214 return self._id_counters[table]
215
216
217 def _make_db_label(self):
218 """Returns formatted label for database, with file path and size."""
219 sz = self.sizesize
220 return "%s (%s)" % (self.filenamefilename, "error getting size" if sz is None else sz)
221
222
223
224def init(*_, **__):
225 """Adds SQLite output format support."""
226 from ... import plugins # Late import to avoid circular
227 plugins.add_write_format("sqlite", SqliteSink, "SQLite", [
228 ("commit-interval=NUM", "transaction size for SQLite output\n"
229 "(default 1000, 0 is autocommit)"),
230 ("dialect-file=path/to/dialects.yaml",
231 "load additional SQL dialect options\n"
232 "for SQLite output\n"
233 "from a YAML or JSON file"),
234 ("message-yaml=true|false", "whether to populate table field messages.yaml\n"
235 "in SQLite output (default true)"),
236 ("nesting=array|all", "create tables for nested message types\n"
237 "in SQLite output,\n"
238 'only for arrays if "array" \n'
239 "else for any nested types\n"
240 "(array fields in parent will be populated \n"
241 " with foreign keys instead of messages as JSON)"),
242 ("overwrite=true|false", "overwrite existing file in SQLite output\n"
243 "instead of appending to file (default false)")
244 ] + RolloverSinkMixin.get_write_options("SQLite"))
245
246
247__all__ = ["SqliteSink", "init"]
filename
Current output file path.
Definition outputs.py:396
_ensure_stamp_index(self, topic, msg, stamp=None, index=None)
Definition outputs.py:127
valid
Result of validate()
Definition outputs.py:52
int COMMIT_INTERVAL
Number of emits between commits; 0 is autocommit.
Definition dbbase.py:49
ENGINE
Database engine name, overridden in subclasses.
Definition dbbase.py:46
int MAX_INT
Maximum integer size supported in SQLite, higher values inserted as string.
Definition sqlite.py:59
str ENGINE
Database engine name.
Definition sqlite.py:53
__init__(self, args=None, **kwargs)
Definition sqlite.py:65
emit(self, topic, msg, stamp=None, match=None, index=None)
Definition sqlite.py:123