grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
sql.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3SQL schema output plugin.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 20.12.2021
11@modified 23.03.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.plugins.sql
15import atexit
16import collections
17import datetime
18import os
19
20from .. import __title__
21from .. import api
22from .. import common
23from .. import main
24from .. common import ConsolePrinter, plural
25from .. outputs import Sink
26from . auto.sqlbase import SqlMixin
27
28
29
30class SqlSink(Sink, SqlMixin):
31 """
32 Writes SQL schema file for message type tables and topic views.
33
34 Output will have:
35 - table "pkg/MsgType" for each topic message type, with ordinary columns for
36 scalar fields, and structured columns for list fields;
37 plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
38
39 If launched with nesting-option, tables will also be created for each
40 nested message type.
41
42 - view "/full/topic/name" for each topic, selecting from the message type table
43 """
44
45
46 FILE_EXTENSIONS = (".sql", )
47
48
49 MESSAGE_TYPE_BASECOLS = [("_topic", "string"),
50 ("_timestamp", "time"), ]
51
52
53 DEFAULT_ARGS = dict(WRITE_OPTIONS={}, VERBOSE=False)
54
55
56 def __init__(self, args=None, **kwargs):
57 """
58 @param args arguments as namespace or dictionary, case-insensitive;
59 or a single path as the file to write
60 @param args.write output file path
61 @param args.write_options ```
62 {"dialect": SQL dialect if not default,
63 "nesting": true|false to created nested type tables,
64 "overwrite": whether to overwrite existing file
65 (default false)}
66 ```
67 @param args.meta whether to emit metainfo
68 @param args.verbose whether to emit debug information
69 @param kwargs any and all arguments as keyword overrides, case-insensitive
70 """
71 args = {"WRITE": str(args)} if isinstance(args, common.PATH_TYPES) else args
72 args = common.ensure_namespace(args, SqlSink.DEFAULT_ARGS, **kwargs)
73 super(SqlSink, self).__init__(args)
74 SqlMixin.__init__(self, args)
75
76 self._filename = None # Unique output filename
77 self._file = None # Open file() object
78 self._batch = None # Current source batch
79 self._nested_types = {} # {(typename, typehash): "CREATE TABLE .."}
80 self._batch_metas = [] # [source batch metainfo string, ]
81 self._overwrite = None
82 self._close_printed = False
83
84 # Whether to create tables for nested message types,
85 # "array" if to do this only for arrays of nested types, or
86 # "all" for any nested type, including those fully flattened into parent fields.
87 self._nesting_nesting = None
88
89 atexit.register(self.closeclosecloseclose)
91
92 def validate(self):
93 """
94 Returns whether "dialect" and "nesting" and "overwrite" parameters contain supported values
95 and file is writable.
96 """
97 if self.validvalid is not None: return self.validvalid
98 ok, sqlconfig_ok = True, SqlMixin.validate(self)
99 if self.args.WRITE_OPTIONS.get("nesting") not in (None, "array", "all"):
100 ConsolePrinter.error("Invalid nesting option for SQL: %r. "
101 "Choose one of {array,all}.",
102 self.args.WRITE_OPTIONS["nesting"])
103 ok = False
104 if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
105 ConsolePrinter.error("Invalid overwrite option for SQL: %r. "
106 "Choose one of {true, false}.",
107 self.args.WRITE_OPTIONS["overwrite"])
108 ok = False
109 if not common.verify_io(self.args.WRITE, "w"):
110 ok = False
111 self.validvalid = sqlconfig_ok and ok
112 if self.validvalid:
113 self._overwrite = (self.args.WRITE_OPTIONS.get("overwrite") in (True, "true"))
114 self._nesting_nesting = self.args.WRITE_OPTIONS.get("nesting")
115 return self.validvalid
116
117
118 def emit(self, topic, msg, stamp=None, match=None, index=None):
119 """Writes out message type CREATE TABLE statements to SQL schema file."""
120 if not self.validatevalidatevalidate(): raise Exception("invalid")
121 batch = self.source.get_batch()
122 if not self._batch_metas or batch != self._batch:
123 self._batch = batch
124 self._batch_metas.append(self.source.format_meta())
125 self._ensure_open()
126 self._process_type(msg)
127 self._process_topic(topic, msg)
128
129
130 def close(self):
131 """Rewrites out everything to SQL schema file, ensuring all source metas."""
132 try:
133 if self._file:
134 self._file.seek(0)
135 self._write_header()
136 for key in sorted(self._types_types):
137 self._write_entity("table", self._types_types[key])
138 for key in sorted(self._topics_topics):
139 self._write_entity("view", self._topics_topics[key])
140 self._file.close()
141 self._file = None
142 finally:
143 if not self._close_printed and self._types_types:
144 self._close_printed = True
145 try: sz = common.format_bytes(os.path.getsize(self._filename))
146 except Exception as e:
147 ConsolePrinter.warn("Error getting size of %s: %s", self._filename, e)
148 sz = "error getting size"
149 ConsolePrinter.debug("Wrote %s and %s to SQL %s (%s).",
150 plural("message type table",
151 len(self._types_types) - len(self._nested_types)),
152 plural("topic view", self._topics_topics), self._filename, sz)
153 if self._nested_types:
154 ConsolePrinter.debug("Wrote %s to SQL %s.",
155 plural("nested message type table", self._nested_types),
156 self._filename)
157 self._nested_types.clear()
158 del self._batch_metas[:]
159 SqlMixin.close(self)
160 super(SqlSink, self).close()
161
162
163 def _ensure_open(self):
164 """Opens output file if not already open, writes header."""
165 if self._file: return
166
167 self._filename = self.args.WRITE if self._overwrite else common.unique_path(self.args.WRITE)
168 common.makedirs(os.path.dirname(self._filename))
169 if self.args.VERBOSE:
170 sz = os.path.exists(self._filename) and os.path.getsize(self._filename)
171 action = "Overwriting" if sz and self._overwrite else "Creating"
172 ConsolePrinter.debug("%s %s.", action, self._filename)
173 self._file = open(self._filename, "wb")
174 self._write_header()
175 self._close_printed = False
176
177
178 def _process_topic(self, topic, msg):
179 """Builds and writes CREATE VIEW statement for topic if not already built."""
180 topickey = api.TypeMeta.make(msg, topic).topickey
181 if topickey in self._topics_topics:
182 return
183
184 self._topics_topics[topickey] = self._make_topic_data(topic, msg)
185 self._write_entity("view", self._topics_topics[topickey])
186
187
188 def _process_type(self, msg, rootmsg=None):
189 """
190 Builds and writes CREATE TABLE statement for message type if not already built.
191
192 Builds statements recursively for nested types if configured.
193
194 @return built SQL, or None if already built
195 """
196 rootmsg = rootmsg or msg
197 typekey = api.TypeMeta.make(msg, root=rootmsg).typekey
198 if typekey in self._types_types:
199 return None
200
201 extra_cols = [(c, self._make_column_type(t, fallback="int64" if "time" == t else None))
202 for c, t in self.MESSAGE_TYPE_BASECOLS]
203 self._types_types[typekey] = self._make_type_data(msg, extra_cols, rootmsg)
204 self._schema[typekey] = collections.OrderedDict(self._types_types[typekey].pop("cols"))
205
206 self._write_entity("table", self._types_types[typekey])
207 if self._nesting_nesting: self._process_nested(msg, rootmsg)
208 return self._types_types[typekey]["sql"]
209
210
211 def _process_nested(self, msg, rootmsg):
212 """Builds anr writes CREATE TABLE statements for nested types."""
213 nesteds = api.iter_message_fields(msg, messages_only=True) if self._nesting_nesting else ()
214 for path, submsgs, subtype in nesteds:
215 scalartype = api.scalar(subtype)
216 if subtype == scalartype and "all" != self._nesting_nesting: continue # for path
217
218 subtypehash = self.source.get_message_type_hash(scalartype)
219 subtypekey = (scalartype, subtypehash)
220 if subtypekey in self._types_types: continue # for path
221
222 if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
223 [submsg] = submsgs[:1] or [self.source.get_message_class(scalartype, subtypehash)()]
224 subsql = self._process_type(submsg, rootmsg)
225 if subsql: self._nested_types[subtypekey] = subsql
226
227
228 def _write_header(self):
229 """Writes header to current file."""
230 values = {"title": __title__,
231 "dialect": self._dialect_dialect,
232 "args": " ".join(main.CLI_ARGS or []),
233 "source": "\n\n".join("-- Source:\n" +
234 "\n".join("-- " + x for x in s.strip().splitlines())
235 for s in self._batch_metas),
236 "dt": datetime.datetime.now().strftime("%Y-%m-%d %H:%M"), }
237 self._file.write((
238 "-- SQL dialect: {dialect}.\n"
239 "-- Written by {title} on {dt}.\n"
240 ).format(**values).encode("utf-8"))
241 if values["args"]:
242 self._file.write("-- Command: grepros {args}.\n".format(**values).encode("utf-8"))
243 self._file.write("\n{source}\n\n".format(**values).encode("utf-8"))
244
245
246 def _write_entity(self, category, item):
247 """Writes table or view SQL statement to file."""
248 self._file.write(b"\n")
249 if "table" == category:
250 self._file.write(("-- Message type %(type)s (%(md5)s)\n--\n" % item).encode("utf-8"))
251 self._file.write(("-- %s\n" % "\n-- ".join(item["definition"].splitlines())).encode("utf-8"))
252 else:
253 self._file.write(('-- Topic "%(name)s": %(type)s (%(md5)s)\n' % item).encode("utf-8"))
254 self._file.write(("%s\n\n" % item["sql"]).encode("utf-8"))
255
256
257
258def init(*_, **__):
259 """Adds SQL schema output format support."""
260 from .. import plugins # Late import to avoid circular
261 plugins.add_write_format("sql", SqlSink, "SQL", [
262 ("dialect=" + "|".join(sorted(filter(bool, SqlSink.DIALECTS))),
263 "use specified SQL dialect in SQL output\n"
264 '(default "%s")' % SqlSink.DEFAULT_DIALECT),
265 ("dialect-file=path/to/dialects.yaml",
266 "load additional SQL dialects\n"
267 "for SQL output, from a YAML or JSON file"),
268 ("nesting=array|all", "create tables for nested message types\n"
269 "in SQL output,\n"
270 'only for arrays if "array" \n'
271 "else for any nested types"),
272 ("overwrite=true|false", "overwrite existing file in SQL output\n"
273 "instead of appending unique counter (default false)")
274 ])
275
276
277__all__ = ["SqlSink", "init"]
source
inputs.Source instance bound to this sink
Definition outputs.py:55
valid
Result of validate()
Definition outputs.py:53
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
Definition outputs.py:99
close(self)
Shuts down output, closing any files or connections.
Definition outputs.py:106
validate(self)
Returns whether arguments are valid.
Definition sqlbase.py:63
close(self)
Clears data structures.
Definition sqlbase.py:122
__init__(self, args=None, **kwargs)
Definition sql.py:71
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes out message type CREATE TABLE statements to SQL schema file.
Definition sql.py:119
validate(self)
Returns whether "dialect" and "nesting" and "overwrite" parameters contain supported values and file ...
Definition sql.py:97
close(self)
Rewrites out everything to SQL schema file, ensuring all source metas.
Definition sql.py:131
list MESSAGE_TYPE_BASECOLS
Default columns for message type tables, as [(column name, ROS type)].
Definition sql.py:49
init(*_, **__)
Adds SQL schema output format support.
Definition sql.py:271