5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.plugins.auto.postgres
20 import psycopg2.extensions
21 import psycopg2.extras
26from ... common
import ConsolePrinter
27from . dbbase
import BaseDataSink, quote
32 Writes messages to a Postgres database.
35 - table "topics",
with topic
and message type names
36 - table
"types",
with message type definitions
39 - table
"pkg/MsgType" for each topic message type,
with detailed fields,
40 BYTEA fields
for uint8[], array fields
for scalar list attributes,
41 and JSONB fields
for lists of ROS messages;
42 with foreign keys
if nesting
else subtype values
as JSON dictionaries;
43 plus underscore-prefixed fields
for metadata, like `_topic`
as the topic name.
44 If
not nesting, only topic message type tables are created.
45 - view
"/full/topic/name" for each topic, selecting
from the message type table
47 If a message type table already exists but
for a type
with a different MD5 hash,
48 the new table will have its MD5 hash appended to end,
as "pkg/MsgType (hash)".
55 ID_SEQUENCE_STEP = 100
58 SELECT_TYPE_COLUMNS =
"""
59 SELECT c.table_name, c.column_name, c.data_type
60 FROM information_schema.columns c INNER JOIN information_schema.tables t
61 ON t.table_name = c.table_name
62 WHERE c.table_schema = current_schema() AND t.table_type = 'BASE TABLE' AND
63 c.table_name LIKE
'%/%'
64 ORDER BY c.table_name, CAST(c.dtd_identifier AS INTEGER)
68 MESSAGE_TYPE_TOPICCOLS = [("_topic",
"TEXT"),
69 (
"_topic_id",
"BIGINT"), ]
71 MESSAGE_TYPE_BASECOLS = [(
"_dt",
"TIMESTAMP"),
72 (
"_timestamp",
"NUMERIC"),
73 (
"_id",
"BIGSERIAL PRIMARY KEY"), ]
75 MESSAGE_TYPE_NESTCOLS = [(
"_parent_type",
"TEXT"),
76 (
"_parent_id",
"BIGINT"), ]
79 def __init__(self, args=None, **kwargs):
81 @param args arguments
as namespace
or dictionary, case-insensitive;
82 or a single item
as the database connection string
83 @param args.write Postgres connection string like
"postgresql://user@host/db"
84 @param args.write_options ```
85 {
"commit-interval": transaction size (0
is autocommit),
86 "nesting":
"array" to recursively insert arrays
87 of nested types,
or "all" for any nesting)}
89 @param args.meta whether to emit metainfo
90 @param args.verbose whether to emit debug information
91 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
93 super(PostgresSink, self).__init__(args, **kwargs)
94 self._id_queue = collections.defaultdict(collections.deque)
99 Returns whether Postgres driver is available,
100 and "commit-interval" and "nesting" in args.write_options have valid value,
if any,
101 and database
is connectable.
104 db_ok, driver_ok, config_ok =
False, bool(psycopg2), super(PostgresSink, self).
validate()
106 ConsolePrinter.error(
"psycopg2 not available: cannot write to Postgres.")
110 except Exception
as e:
111 ConsolePrinter.error(
"Error connecting Postgres: %s", e)
112 self.
validvalid = db_ok
and driver_ok
and config_ok
117 """Opens the database file, and populates schema if not already existing."""
118 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
119 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
120 psycopg2.extras.register_default_jsonb(globally=
True, loads=json.loads)
121 super(PostgresSink, self)._init_db()
124 def _load_schema(self):
125 """Populates instance attributes with schema metainfo."""
126 super(PostgresSink, self)._load_schema()
130 if x[
"table_name"] == row[
"table_name"]),
None)
131 if not typerow:
continue
132 typekey = (typerow[
"type"], typerow[
"md5"])
133 self.
_schema.setdefault(typekey, collections.OrderedDict())
134 self.
_schema[typekey][row[
"column_name"]] = row[
"data_type"]
138 """Returns new database connection."""
139 factory = psycopg2.extras.RealDictCursor
140 return psycopg2.connect(self.
args.WRITE, cursor_factory=factory)
143 def _execute_insert(self, sql, args):
144 """Executes INSERT statement, returns inserted ID."""
149 def _executemany(self, sql, argses):
150 """Executes SQL with all args sequences."""
154 def _executescript(self, sql):
155 """Executes SQL with one or more statements."""
159 def _get_next_id(self, table):
160 """Returns next cached ID value, re-populating empty cache from sequence."""
164 if MAXLEN: seqbase = seqbase[:MAXLEN - len(seqsuffix)]
165 sql =
"SELECT nextval('%s') AS id" % quote(seqbase + seqsuffix)
172 def _make_column_value(self, value, typename=None):
173 """Returns column value suitable for inserting to database."""
175 plaintype = api.scalar(typename)
178 replace = {float(
"inf"):
None, float(
"-inf"):
None, float(
"nan"):
None}
180 v = psycopg2.extras.Json(v, json.dumps)
181 elif isinstance(v, (list, tuple)):
182 scalartype = api.scalar(typename)
183 if scalartype
in api.ROS_TIME_TYPES:
185 elif scalartype
not in api.ROS_BUILTIN_TYPES:
187 else: v = psycopg2.extras.Json([api.message_to_dict(m, replace)
188 for m
in v], json.dumps)
189 elif "BYTEA" in (TYPES.get(typename),
190 TYPES.get(api.canonical(typename, unbounded=
True))):
191 v = psycopg2.Binary(bytes(bytearray(v)))
194 elif api.is_ros_time(v):
196 elif plaintype
not in api.ROS_BUILTIN_TYPES:
197 v = psycopg2.extras.Json(api.message_to_dict(v, replace), json.dumps)
203 def _make_db_label(self):
204 """Returns formatted label for database."""
205 target = self.
args.WRITE
206 if not target.startswith((
"postgres://",
"postgresql://")): target = repr(target)
212 """Returns true if target is recognizable as a Postgres connection string."""
213 return (target
or "").startswith((
"postgres://",
"postgresql://"))
218 """Adds Postgres output format support."""
219 from ...
import plugins
220 plugins.add_write_format(
"postgres", PostgresSink,
"Postgres", [
221 (
"commit-interval=NUM",
"transaction size for Postgres output\n"
222 "(default 1000, 0 is autocommit)"),
223 (
"dialect-file=path/to/dialects.yaml",
224 "load additional SQL dialect options\n"
225 "for Postgres output\n"
226 "from a YAML or JSON file"),
227 (
"nesting=array|all",
"create tables for nested message types\n"
228 "in Postgres output,\n"
229 'only for arrays if "array" \n'
230 "else for any nested types\n"
231 "(array fields in parent will be populated \n"
232 " with foreign keys instead of messages as JSON)"),
236__all__ = [
"PostgresSink",
"init"]
valid
Result of validate()
Base class for writing messages to a database.
list MESSAGE_TYPE_BASECOLS
Default columns for pkg/MsgType tables.
Writes messages to a Postgres database.
__init__(self, args=None, **kwargs)
autodetect(cls, target)
Returns true if target is recognizable as a Postgres connection string.
validate(self)
Returns whether Postgres driver is available, and "commit-interval" and "nesting" in args....
list MESSAGE_TYPE_BASECOLS
Default columns for pkg/MsgType tables.
str SELECT_TYPE_COLUMNS
SQL statement for selecting metainfo on pkg/MsgType table columns.
int ID_SEQUENCE_STEP
Sequence length per table to reserve for inserted message IDs.
init(*_, **__)
Adds Postgres output format support.