grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
postgres.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3Postgres output plugin.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 02.12.2021
11@modified 28.06.2023
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.plugins.auto.postgres
15import collections
16import json
17
18try:
19 import psycopg2
20 import psycopg2.extensions
21 import psycopg2.extras
22except ImportError:
23 psycopg2 = None
24
25from ... import api
26from ... common import ConsolePrinter
27from . dbbase import BaseDataSink, quote
28
29
31 """
32 Writes messages to a Postgres database.
33
34 Output will have:
35 - table "topics", with topic and message type names
36 - table "types", with message type definitions
37
38 plus:
39 - table "pkg/MsgType" for each topic message type, with detailed fields,
40 BYTEA fields for uint8[], array fields for scalar list attributes,
41 and JSONB fields for lists of ROS messages;
42 with foreign keys if nesting else subtype values as JSON dictionaries;
43 plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
44 If not nesting, only topic message type tables are created.
45 - view "/full/topic/name" for each topic, selecting from the message type table
46
47 If a message type table already exists but for a type with a different MD5 hash,
48 the new table will have its MD5 hash appended to end, as "pkg/MsgType (hash)".
49 """
50
51
52 ENGINE = "Postgres"
53
54
55 ID_SEQUENCE_STEP = 100
56
57
58 SELECT_TYPE_COLUMNS = """
59 SELECT c.table_name, c.column_name, c.data_type
60 FROM information_schema.columns c INNER JOIN information_schema.tables t
61 ON t.table_name = c.table_name
62 WHERE c.table_schema = current_schema() AND t.table_type = 'BASE TABLE' AND
63 c.table_name LIKE '%/%'
64 ORDER BY c.table_name, CAST(c.dtd_identifier AS INTEGER)
65 """
66
67
68 MESSAGE_TYPE_TOPICCOLS = [("_topic", "TEXT"),
69 ("_topic_id", "BIGINT"), ]
70
71 MESSAGE_TYPE_BASECOLS = [("_dt", "TIMESTAMP"),
72 ("_timestamp", "NUMERIC"),
73 ("_id", "BIGSERIAL PRIMARY KEY"), ]
74
75 MESSAGE_TYPE_NESTCOLS = [("_parent_type", "TEXT"),
76 ("_parent_id", "BIGINT"), ]
77
78
79 def __init__(self, args=None, **kwargs):
80 """
81 @param args arguments as namespace or dictionary, case-insensitive;
82 or a single item as the database connection string
83 @param args.write Postgres connection string like "postgresql://user@host/db"
84 @param args.write_options ```
85 {"commit-interval": transaction size (0 is autocommit),
86 "nesting": "array" to recursively insert arrays
87 of nested types, or "all" for any nesting)}
88 ```
89 @param args.meta whether to emit metainfo
90 @param args.verbose whether to emit debug information
91 @param kwargs any and all arguments as keyword overrides, case-insensitive
92 """
93 super(PostgresSink, self).__init__(args, **kwargs)
94 self._id_queue = collections.defaultdict(collections.deque) # {table name: [next ID, ]}
95
96
97 def validate(self):
98 """
99 Returns whether Postgres driver is available,
100 and "commit-interval" and "nesting" in args.write_options have valid value, if any,
101 and database is connectable.
102 """
103 if self.validvalid is not None: return self.validvalid
104 db_ok, driver_ok, config_ok = False, bool(psycopg2), super(PostgresSink, self).validate()
105 if not driver_ok:
106 ConsolePrinter.error("psycopg2 not available: cannot write to Postgres.")
107 else:
108 try:
109 with self._connect_connect(): db_ok = True
110 except Exception as e:
111 ConsolePrinter.error("Error connecting Postgres: %s", e)
112 self.validvalid = db_ok and driver_ok and config_ok
113 return self.validvalid
114
115
116 def _init_db(self):
117 """Opens the database file, and populates schema if not already existing."""
118 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
119 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
120 psycopg2.extras.register_default_jsonb(globally=True, loads=json.loads)
121 super(PostgresSink, self)._init_db()
122
123
124 def _load_schema(self):
125 """Populates instance attributes with schema metainfo."""
126 super(PostgresSink, self)._load_schema()
128 for row in self._cursor_cursor.fetchall():
129 typerow = next((x for x in self._types_types.values()
130 if x["table_name"] == row["table_name"]), None)
131 if not typerow: continue # for row
132 typekey = (typerow["type"], typerow["md5"])
133 self._schema.setdefault(typekey, collections.OrderedDict())
134 self._schema[typekey][row["column_name"]] = row["data_type"]
135
136
137 def _connect(self):
138 """Returns new database connection."""
139 factory = psycopg2.extras.RealDictCursor
140 return psycopg2.connect(self.args.WRITE, cursor_factory=factory)
141
142
143 def _execute_insert(self, sql, args):
144 """Executes INSERT statement, returns inserted ID."""
145 self._cursor_cursor.execute(sql, args)
146 return self._cursor_cursor.fetchone()["id"]
147
148
149 def _executemany(self, sql, argses):
150 """Executes SQL with all args sequences."""
151 psycopg2.extras.execute_batch(self._cursor_cursor, sql, argses)
152
153
154 def _executescript(self, sql):
155 """Executes SQL with one or more statements."""
156 self._cursor_cursor.execute(sql)
157
158
159 def _get_next_id(self, table):
160 """Returns next cached ID value, re-populating empty cache from sequence."""
161 if not self._id_queue.get(table):
162 MAXLEN = self._get_dialect_option("maxlen_entity")
163 seqbase, seqsuffix = table, "_%s_seq" % self.MESSAGE_TYPE_BASECOLSMESSAGE_TYPE_BASECOLS[-1][0]
164 if MAXLEN: seqbase = seqbase[:MAXLEN - len(seqsuffix)]
165 sql = "SELECT nextval('%s') AS id" % quote(seqbase + seqsuffix)
166 for _ in range(self.ID_SEQUENCE_STEPID_SEQUENCE_STEP):
167 self._cursor_cursor.execute(sql)
168 self._id_queue[table].append(self._cursor_cursor.fetchone()["id"])
169 return self._id_queue[table].popleft()
170
171
172 def _make_column_value(self, value, typename=None):
173 """Returns column value suitable for inserting to database."""
174 TYPES = self._get_dialect_option("types")
175 plaintype = api.scalar(typename) # "string<=10" -> "string"
176 v = value
177 # Common in JSON but disallowed in Postgres
178 replace = {float("inf"): None, float("-inf"): None, float("nan"): None}
179 if not typename:
180 v = psycopg2.extras.Json(v, json.dumps)
181 elif isinstance(v, (list, tuple)):
182 scalartype = api.scalar(typename)
183 if scalartype in api.ROS_TIME_TYPES:
184 v = [self._convert_time_value(x, scalartype) for x in v]
185 elif scalartype not in api.ROS_BUILTIN_TYPES:
186 if self._nesting_nesting: v = None
187 else: v = psycopg2.extras.Json([api.message_to_dict(m, replace)
188 for m in v], json.dumps)
189 elif "BYTEA" in (TYPES.get(typename),
190 TYPES.get(api.canonical(typename, unbounded=True))):
191 v = psycopg2.Binary(bytes(bytearray(v))) # Py2/Py3 compatible
192 else:
193 v = list(self._convert_column_value(v, typename)) # Ensure not-tuple for psycopg2
194 elif api.is_ros_time(v):
195 v = self._convert_time_value(v, typename)
196 elif plaintype not in api.ROS_BUILTIN_TYPES:
197 v = psycopg2.extras.Json(api.message_to_dict(v, replace), json.dumps)
198 else:
199 v = self._convert_column_value(v, plaintype)
200 return v
201
202
203 def _make_db_label(self):
204 """Returns formatted label for database."""
205 target = self.args.WRITE
206 if not target.startswith(("postgres://", "postgresql://")): target = repr(target)
207 return target
208
209
210 @classmethod
211 def autodetect(cls, target):
212 """Returns true if target is recognizable as a Postgres connection string."""
213 return (target or "").startswith(("postgres://", "postgresql://"))
214
215
216
217def init(*_, **__):
218 """Adds Postgres output format support."""
219 from ... import plugins # Late import to avoid circular
220 plugins.add_write_format("postgres", PostgresSink, "Postgres", [
221 ("commit-interval=NUM", "transaction size for Postgres output\n"
222 "(default 1000, 0 is autocommit)"),
223 ("dialect-file=path/to/dialects.yaml",
224 "load additional SQL dialect options\n"
225 "for Postgres output\n"
226 "from a YAML or JSON file"),
227 ("nesting=array|all", "create tables for nested message types\n"
228 "in Postgres output,\n"
229 'only for arrays if "array" \n'
230 "else for any nested types\n"
231 "(array fields in parent will be populated \n"
232 " with foreign keys instead of messages as JSON)"),
233 ])
234
235
236__all__ = ["PostgresSink", "init"]
valid
Result of validate()
Definition outputs.py:53
Base class for writing messages to a database.
Definition dbbase.py:43
list MESSAGE_TYPE_BASECOLS
Default columns for pkg/MsgType tables.
Definition dbbase.py:55
Writes messages to a Postgres database.
Definition postgres.py:49
__init__(self, args=None, **kwargs)
Definition postgres.py:93
autodetect(cls, target)
Returns true if target is recognizable as a Postgres connection string.
Definition postgres.py:230
validate(self)
Returns whether Postgres driver is available, and "commit-interval" and "nesting" in args....
Definition postgres.py:103
list MESSAGE_TYPE_BASECOLS
Default columns for pkg/MsgType tables.
Definition postgres.py:71
str SELECT_TYPE_COLUMNS
SQL statement for selecting metainfo on pkg/MsgType table columns.
Definition postgres.py:58
int ID_SEQUENCE_STEP
Sequence length per table to reserve for inserted message IDs.
Definition postgres.py:55
init(*_, **__)
Adds Postgres output format support.
Definition postgres.py:236