24from ... common
import ConsolePrinter, format_bytes, makedirs, verify_io
31 Writes messages to an SQLite database.
34 - table "messages", with all messages as serialized binary data
35 - table "types", with message definitions
36 - table "topics", with topic information
39 - table "pkg/MsgType" for each message type, with detailed fields,
40 and JSON fields for arrays of nested subtypes,
41 with foreign keys if nesting else subtype values as JSON dictionaries;
42 plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
44 If launched with nesting-option, tables will also be created for each
47 - view "/topic/full/name" for each topic,
48 selecting from the message type table
56 FILE_EXTENSIONS = (
".sqlite",
".sqlite3")
62 DEFAULT_ARGS = dict(META=
False, WRITE_OPTIONS={}, VERBOSE=
False)
67 @param args arguments as namespace or dictionary, case-insensitive;
68 or a single path as the name of SQLitefile to write
69 @param args.write name of SQLite file to write, will be appended to if exists
70 @param args.write_options ```
71 {"commit-interval": transaction size (0 is autocommit),
72 "message-yaml": populate messages.yaml (default true),
73 "nesting": "array" to recursively insert arrays
74 of nested types, or "all" for any nesting),
75 "overwrite": whether to overwrite existing file
77 "rollover-size": bytes limit for individual output files,
78 "rollover-count": message limit for individual output files,
79 "rollover-duration": time span limit for individual output files,
80 as ROS duration or convertible seconds,
81 "rollover-template": output filename template, supporting
82 strftime format codes like "%H-%M-%S"
83 and "%(index)s" as output file index}
85 @param args.meta whether to emit metainfo
86 @param args.verbose whether to emit debug information
87 @param kwargs any and all arguments as keyword overrides, case-insensitive
89 super(SqliteSink, self).
__init__(args, **kwargs)
90 RolloverSinkMixin.__init__(self, args)
99 Returns whether "commit-interval" and "nesting" in args.write_options have valid value, if any,
100 and file is writable; parses "message-yaml" and "overwrite" from args.write_options.
103 ok = all([super(SqliteSink, self).
validate(), RolloverSinkMixin.validate(self)])
104 if self.
args.WRITE_OPTIONS.get(
"message-yaml")
not in (
None,
True,
False,
"true",
"false"):
105 ConsolePrinter.error(
"Invalid message-yaml option for %s: %r. "
106 "Choose one of {true, false}.",
109 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
110 ConsolePrinter.error(
"Invalid overwrite option for %s: %r. "
111 "Choose one of {true, false}.",
114 if not verify_io(self.
args.WRITE,
"w"):
118 self.
_do_yaml = (self.
args.WRITE_OPTIONS.get(
"message-yaml") !=
"false")
119 self.
_overwrite = (self.
args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true"))
123 def emit(self, topic, msg, stamp=None, match=None, index=None):
124 """Writes message to database."""
127 RolloverSinkMixin.ensure_rollover(self, topic, msg, stamp)
128 super(SqliteSink, self).
emit(topic, msg, stamp, match, index)
133 """Returns current file size in bytes, including journals, or None if size lookup failed."""
136 try: os.path.isfile(f)
and sizes.append(os.path.getsize(f))
137 except Exception
as e: ConsolePrinter.warn(
"Error getting size of %s: %s", f, e)
138 return sum(sizes)
if sizes
else None
142 """Opens the database file and populates schema if not already existing."""
143 sqlite3.register_adapter(datetime.datetime, datetime.datetime.isoformat)
144 for t
in (dict, list, tuple): sqlite3.register_adapter(t, json.dumps)
145 for t
in six.integer_types:
146 sqlite3.register_adapter(t,
lambda x: str(x)
if abs(x) > self.
MAX_INT else x)
147 sqlite3.register_converter(
"JSON", json.loads)
149 if self.
args.VERBOSE:
151 action =
"Overwriting" if sz
and self.
_overwrite else \
152 "Appending to" if sz
else "Creating"
153 ConsolePrinter.debug(
"%s SQLite output %s%s.", action, self.
filenamefilename,
154 (
" (%s)" % format_bytes(sz))
if sz
else "")
155 super(SqliteSink, self)._init_db()
158 def _load_schema(self):
159 """Populates instance attributes with schema metainfo."""
160 super(SqliteSink, self)._load_schema()
161 for row
in self.
db.execute(
"SELECT name FROM sqlite_master "
162 "WHERE type = 'table' AND name LIKE '%/%'"):
163 cols = self.
db.execute(
"PRAGMA table_info(%s)" % quote(row[
"name"])).fetchall()
165 if x[
"table_name"] == row[
"name"]),
None)
166 if not typerow:
continue
167 typekey = (typerow[
"type"], typerow[
"md5"])
168 self.
_schema[typekey] = collections.OrderedDict([(c[
"name"], c)
for c
in cols])
171 def _process_message(self, topic, msg, stamp):
172 """Inserts message to messages-table, and to pkg/MsgType tables."""
173 with api.TypeMeta.make(msg, topic)
as m:
174 topic_id, typename = self.
_topics_topics[m.topickey][
"id"], m.typename
175 margs = dict(dt=api.to_datetime(stamp), timestamp=api.to_nsec(stamp),
176 topic=topic, name=topic, topic_id=topic_id, type=typename,
177 yaml=str(msg)
if self.
_do_yaml else "", data=api.serialize_message(msg))
179 super(SqliteSink, self)._process_message(topic, msg, stamp)
183 """Returns new database connection."""
187 detect_types=sqlite3.PARSE_DECLTYPES)
189 db.row_factory =
lambda cursor, row: dict(sqlite3.Row(cursor, row))
193 def _execute_insert(self, sql, args):
194 """Executes INSERT statement, returns inserted ID."""
195 return self.
_cursor.execute(sql, args).lastrowid
198 def _executemany(self, sql, argses):
199 """Executes SQL with all args sequences."""
200 self.
_cursor.executemany(sql, argses)
203 def _executescript(self, sql):
204 """Executes SQL with one or more statements."""
205 self.
_cursor.executescript(sql)
208 def _get_next_id(self, table):
209 """Returns next ID value for table, using simple auto-increment."""
211 sql =
"SELECT COALESCE(MAX(_id), 0) AS id FROM %s" % quote(table)
217 def _make_db_label(self):
218 """Returns formatted label for database, with file path and size."""
220 return "%s (%s)" % (self.
filenamefilename,
"error getting size" if sz
is None else sz)