32 Writes messages to a Postgres database.
35 - table "topics", with topic and message type names
36 - table "types", with message type definitions
39 - table "pkg/MsgType" for each topic message type, with detailed fields,
40 BYTEA fields for uint8[], array fields for scalar list attributes,
41 and JSONB fields for lists of ROS messages;
42 with foreign keys if nesting else subtype values as JSON dictionaries;
43 plus underscore-prefixed fields for metadata, like `_topic` as the topic name.
44 If not nesting, only topic message type tables are created.
45 - view "/full/topic/name" for each topic, selecting from the message type table
47 If a message type table already exists but for a type with a different MD5 hash,
48 the new table will have its MD5 hash appended to end, as "pkg/MsgType (hash)".
55 ID_SEQUENCE_STEP = 100
58 SELECT_TYPE_COLUMNS =
"""
59 SELECT c.table_name, c.column_name, c.data_type
60 FROM information_schema.columns c INNER JOIN information_schema.tables t
61 ON t.table_name = c.table_name
62 WHERE c.table_schema = current_schema() AND t.table_type = 'BASE TABLE' AND
63 c.table_name LIKE '%/%'
64 ORDER BY c.table_name, CAST(c.dtd_identifier AS INTEGER)
68 MESSAGE_TYPE_TOPICCOLS = [(
"_topic",
"TEXT"),
69 (
"_topic_id",
"BIGINT"), ]
71 MESSAGE_TYPE_BASECOLS = [(
"_dt",
"TIMESTAMP"),
72 (
"_timestamp",
"NUMERIC"),
73 (
"_id",
"BIGSERIAL PRIMARY KEY"), ]
75 MESSAGE_TYPE_NESTCOLS = [(
"_parent_type",
"TEXT"),
76 (
"_parent_id",
"BIGINT"), ]
81 @param args arguments as namespace or dictionary, case-insensitive;
82 or a single item as the database connection string
83 @param args.write Postgres connection string like "postgresql://user@host/db"
84 @param args.write_options ```
85 {"commit-interval": transaction size (0 is autocommit),
86 "nesting": "array" to recursively insert arrays
87 of nested types, or "all" for any nesting)}
89 @param args.meta whether to emit metainfo
90 @param args.verbose whether to emit debug information
91 @param kwargs any and all arguments as keyword overrides, case-insensitive
93 super(PostgresSink, self).
__init__(args, **kwargs)
94 self.
_id_queue = collections.defaultdict(collections.deque)
99 Returns whether Postgres driver is available,
100 and "commit-interval" and "nesting" in args.write_options have valid value, if any,
101 and database is connectable.
104 db_ok, driver_ok, config_ok =
False, bool(psycopg2), super(PostgresSink, self).
validate()
106 ConsolePrinter.error(
"psycopg2 not available: cannot write to Postgres.")
110 except Exception
as e:
111 ConsolePrinter.error(
"Error connecting Postgres: %s", e)
117 """Opens the database file, and populates schema if not already existing."""
118 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
119 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
120 psycopg2.extras.register_default_jsonb(globally=
True, loads=json.loads)
121 super(PostgresSink, self)._init_db()
124 def _load_schema(self):
125 """Populates instance attributes with schema metainfo."""
126 super(PostgresSink, self)._load_schema()
130 if x[
"table_name"] == row[
"table_name"]),
None)
131 if not typerow:
continue
132 typekey = (typerow[
"type"], typerow[
"md5"])
133 self.
_schema.setdefault(typekey, collections.OrderedDict())
134 self.
_schema[typekey][row[
"column_name"]] = row[
"data_type"]
138 """Returns new database connection."""
139 factory = psycopg2.extras.RealDictCursor
140 return psycopg2.connect(self.
args.WRITE, cursor_factory=factory)
143 def _execute_insert(self, sql, args):
144 """Executes INSERT statement, returns inserted ID."""
149 def _executemany(self, sql, argses):
150 """Executes SQL with all args sequences."""
154 def _executescript(self, sql):
155 """Executes SQL with one or more statements."""
159 def _get_next_id(self, table):
160 """Returns next cached ID value, re-populating empty cache from sequence."""
164 if MAXLEN: seqbase = seqbase[:MAXLEN - len(seqsuffix)]
165 sql =
"SELECT nextval('%s') AS id" % quote(seqbase + seqsuffix)
172 def _make_column_value(self, value, typename=None):
173 """Returns column value suitable for inserting to database."""
175 plaintype = api.scalar(typename)
178 replace = {float(
"inf"):
None, float(
"-inf"):
None, float(
"nan"):
None}
180 v = psycopg2.extras.Json(v, json.dumps)
181 elif isinstance(v, (list, tuple)):
182 scalartype = api.scalar(typename)
183 if scalartype
in api.ROS_TIME_TYPES:
185 elif scalartype
not in api.ROS_BUILTIN_TYPES:
187 else: v = psycopg2.extras.Json([api.message_to_dict(m, replace)
188 for m
in v], json.dumps)
189 elif "BYTEA" in (TYPES.get(typename),
190 TYPES.get(api.canonical(typename, unbounded=
True))):
191 v = psycopg2.Binary(bytes(bytearray(v)))
194 elif api.is_ros_time(v):
196 elif plaintype
not in api.ROS_BUILTIN_TYPES:
197 v = psycopg2.extras.Json(api.message_to_dict(v, replace), json.dumps)
203 def _make_db_label(self):
204 """Returns formatted label for database."""
205 target = self.
args.WRITE
206 if not target.startswith((
"postgres://",
"postgresql://")): target = repr(target)
212 """Returns true if target is recognizable as a Postgres connection string."""
213 return (target
or "").startswith((
"postgres://",
"postgresql://"))
__init__(self, args=None, **kwargs)
list MESSAGE_TYPE_BASECOLS
Default columns for pkg/MsgType tables.
str SELECT_TYPE_COLUMNS
SQL statement for selecting metainfo on pkg/MsgType table columns.
int ID_SEQUENCE_STEP
Sequence length per table to reserve for inserted message IDs.