3SQL schema output plugin.
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.plugins.sql
20from .. import __title__
24from .. common import ConsolePrinter, plural
25from .. outputs import Sink
26from . auto.sqlbase import SqlMixin
30class SqlSink(Sink, SqlMixin):
32 Writes SQL schema file for message type tables
and topic views.
35 - table
"pkg/MsgType" for each topic message type,
with ordinary columns
for
36 scalar fields,
and structured columns
for list fields;
37 plus underscore-prefixed fields
for metadata, like `_topic`
as the topic name.
39 If launched
with nesting-option, tables will also be created
for each
42 - view
"/full/topic/name" for each topic, selecting
from the message type table
46 FILE_EXTENSIONS = (".sql", )
49 MESSAGE_TYPE_BASECOLS = [(
"_topic",
"string"),
50 (
"_timestamp",
"time"), ]
53 DEFAULT_ARGS = dict(WRITE_OPTIONS={}, VERBOSE=
False)
56 def __init__(self, args=None, **kwargs):
58 @param args arguments
as namespace
or dictionary, case-insensitive;
59 or a single path
as the file to write
60 @param args.write output file path
61 @param args.write_options ```
62 {
"dialect": SQL dialect
if not default,
63 "nesting": true|false to created nested type tables,
64 "overwrite": whether to overwrite existing file
67 @param args.meta whether to emit metainfo
68 @param args.verbose whether to emit debug information
69 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
71 args = {"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else args
72 args = common.ensure_namespace(args, SqlSink.DEFAULT_ARGS, **kwargs)
74 SqlMixin.__init__(self, args)
94 Returns whether "dialect" and "nesting" and "overwrite" parameters contain supported values
98 ok, sqlconfig_ok =
True, SqlMixin.validate(self)
99 if self.
args.WRITE_OPTIONS.get(
"nesting")
not in (
None,
"array",
"all"):
100 ConsolePrinter.error(
"Invalid nesting option for SQL: %r. "
101 "Choose one of {array,all}.",
102 self.
args.WRITE_OPTIONS[
"nesting"])
104 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
105 ConsolePrinter.error(
"Invalid overwrite option for SQL: %r. "
106 "Choose one of {true, false}.",
107 self.
args.WRITE_OPTIONS[
"overwrite"])
109 if not common.verify_io(self.
args.WRITE,
"w"):
113 self.
_overwrite = (self.
args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true"))
118 def emit(self, topic, msg, stamp=None, match=None, index=None):
119 """Writes out message type CREATE TABLE statements to SQL schema file."""
121 batch = self.
source.get_batch()
131 """Rewrites out everything to SQL schema file, ensuring all source metas."""
145 try: sz = common.format_bytes(os.path.getsize(self.
_filename))
146 except Exception
as e:
147 ConsolePrinter.warn(
"Error getting size of %s: %s", self.
_filename, e)
148 sz =
"error getting size"
149 ConsolePrinter.debug(
"Wrote %s and %s to SQL %s (%s).",
150 plural(
"message type table",
154 ConsolePrinter.debug(
"Wrote %s to SQL %s.",
160 super(SqlSink, self).
close()
163 def _ensure_open(self):
164 """Opens output file if not already open, writes header."""
165 if self.
_file:
return
168 common.makedirs(os.path.dirname(self.
_filename))
169 if self.
args.VERBOSE:
171 action =
"Overwriting" if sz
and self.
_overwrite else "Creating"
172 ConsolePrinter.debug(
"%s %s.", action, self.
_filename)
178 def _process_topic(self, topic, msg):
179 """Builds and writes CREATE VIEW statement for topic if not already built."""
180 topickey = api.TypeMeta.make(msg, topic).topickey
188 def _process_type(self, msg, rootmsg=None):
190 Builds and writes CREATE TABLE statement
for message type
if not already built.
192 Builds statements recursively
for nested types
if configured.
194 @return built SQL,
or None if already built
196 rootmsg = rootmsg or msg
197 typekey = api.TypeMeta.make(msg, root=rootmsg).typekey
201 extra_cols = [(c, self.
_make_column_type(t, fallback=
"int64" if "time" == t
else None))
211 def _process_nested(self, msg, rootmsg):
212 """Builds anr writes CREATE TABLE statements for nested types."""
213 nesteds = api.iter_message_fields(msg, messages_only=
True)
if self.
_nesting_nesting else ()
214 for path, submsgs, subtype
in nesteds:
215 scalartype = api.scalar(subtype)
218 subtypehash = self.
source.get_message_type_hash(scalartype)
219 subtypekey = (scalartype, subtypehash)
222 if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
223 [submsg] = submsgs[:1]
or [self.
source.get_message_class(scalartype, subtypehash)()]
228 def _write_header(self):
229 """Writes header to current file."""
230 values = {
"title": __title__,
232 "args":
" ".join(main.CLI_ARGS
or []),
233 "source":
"\n\n".join(
"-- Source:\n" +
234 "\n".join(
"-- " + x
for x
in s.strip().splitlines())
236 "dt": datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M"), }
238 "-- SQL dialect: {dialect}.\n"
239 "-- Written by {title} on {dt}.\n"
240 ).format(**values).encode(
"utf-8"))
242 self.
_file.write(
"-- Command: grepros {args}.\n".format(**values).encode(
"utf-8"))
243 self.
_file.write(
"\n{source}\n\n".format(**values).encode(
"utf-8"))
246 def _write_entity(self, category, item):
247 """Writes table or view SQL statement to file."""
248 self.
_file.write(b
"\n")
249 if "table" == category:
250 self.
_file.write((
"-- Message type %(type)s (%(md5)s)\n--\n" % item).encode(
"utf-8"))
251 self.
_file.write((
"-- %s\n" %
"\n-- ".join(item[
"definition"].splitlines())).encode(
"utf-8"))
253 self.
_file.write((
'-- Topic "%(name)s": %(type)s (%(md5)s)\n' % item).encode(
"utf-8"))
254 self.
_file.write((
"%s\n\n" % item[
"sql"]).encode(
"utf-8"))
259 """Adds SQL schema output format support."""
260 from ..
import plugins
261 plugins.add_write_format(
"sql", SqlSink,
"SQL", [
262 (
"dialect=" +
"|".join(sorted(filter(bool, SqlSink.DIALECTS))),
263 "use specified SQL dialect in SQL output\n"
264 '(default "%s")' % SqlSink.DEFAULT_DIALECT),
265 (
"dialect-file=path/to/dialects.yaml",
266 "load additional SQL dialects\n"
267 "for SQL output, from a YAML or JSON file"),
268 (
"nesting=array|all",
"create tables for nested message types\n"
270 'only for arrays if "array" \n'
271 "else for any nested types"),
272 (
"overwrite=true|false",
"overwrite existing file in SQL output\n"
273 "instead of appending unique counter (default false)")
277__all__ = [
"SqlSink",
"init"]
source
inputs.Source instance bound to this sink
valid
Result of validate()
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
close(self)
Shuts down output, closing any files or connections.
validate(self)
Returns whether arguments are valid.
close(self)
Clears data structures.
__init__(self, args=None, **kwargs)
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes out message type CREATE TABLE statements to SQL schema file.
validate(self)
Returns whether "dialect" and "nesting" and "overwrite" parameters contain supported values and file ...
close(self)
Rewrites out everything to SQL schema file, ensuring all source metas.
list MESSAGE_TYPE_BASECOLS
Default columns for message type tables, as [(column name, ROS type)].
init(*_, **__)
Adds SQL schema output format support.