3 Simple convenience wrapper for Postgres, via psycopg2.
5 ------------------------------------------------------------------------------
6 This file is part of dblite - simple query interface for SQL databases.
7 Released under the MIT License.
12 ------------------------------------------------------------------------------
15 from contextlib
import contextmanager
22 from six.moves
import urllib_parse
23 from six
import binary_type, integer_types, string_types, text_type
27 import psycopg2.extensions
28 import psycopg2.extras
30 except ImportError: psycopg2 =
None
32 from ..
import api, util
34 logger = logging.getLogger(__name__)
39 "ALL",
"ANALYSE",
"ANALYZE",
"AND",
"ANY",
"ARRAY",
"AS",
"ASC",
"ASYMMETRIC",
"AUTHORIZATION",
40 "BINARY",
"BOTH",
"CASE",
"CAST",
"CHECK",
"COLLATE",
"COLLATION",
"COLUMN",
"CONCURRENTLY",
41 "CONSTRAINT",
"CREATE",
"CROSS",
"CURRENT_CATALOG",
"CURRENT_DATE",
"CURRENT_ROLE",
42 "CURRENT_SCHEMA",
"CURRENT_TIME",
"CURRENT_TIMESTAMP",
"CURRENT_USER",
"DEFAULT",
"DEFERRABLE",
43 "DESC",
"DISTINCT",
"DO",
"ELSE",
"END",
"EXCEPT",
"FALSE",
"FETCH",
"FOR",
"FOREIGN",
"FREEZE",
44 "FROM",
"FULL",
"GRANT",
"GROUP",
"HAVING",
"ILIKE",
"IN",
"INITIALLY",
"INNER",
"INTERSECT",
45 "INTO",
"IS",
"ISNULL",
"JOIN",
"LATERAL",
"LEADING",
"LEFT",
"LIKE",
"LIMIT",
"LOCALTIME",
46 "LOCALTIMESTAMP",
"NATURAL",
"NOT",
"NOTNULL",
"NULL",
"OFFSET",
"ON",
"ONLY",
"OR",
"ORDER",
47 "OUTER",
"OVERLAPS",
"PLACING",
"PRIMARY",
"REFERENCES",
"RETURNING",
"RIGHT",
"SELECT",
48 "SESSION_USER",
"SIMILAR",
"SOME",
"SYMMETRIC",
"TABLE",
"TABLESAMPLE",
"THEN",
"TO",
49 "TRAILING",
"TRUE",
"UNION",
"UNIQUE",
"USER",
"USING",
"VARIADIC",
"VERBOSE",
50 "WHEN",
"WHERE",
"WINDOW",
"WITH"
55 """Wrapper for table and column names from data objects."""
58 """Supports comparing to strings or other Identifier instances."""
59 if isinstance(other, type(self)):
return self.
name == other.name
60 return isinstance(other, string_types)
and (self.
name == other)
62 """Returns quoted name."""
66 """Returns quoted name."""
67 return quote(name, force=
not name.islower())
73 OPS = (
"!=",
"!~",
"!~*",
"#",
"%",
"&",
"*",
"+",
"-",
"/",
"<",
"<<",
74 "<=",
"<>",
"<@",
"=",
">",
">=",
">>",
"@>",
"^",
"|",
"||",
"&&",
"~",
75 "~*",
"ANY",
"ILIKE",
"IN",
"IS",
"IS NOT",
"LIKE",
"NOT ILIKE",
"NOT IN",
76 "NOT LIKE",
"NOT SIMILAR TO",
"OR",
"OVERLAPS",
"SIMILAR TO",
"SOME")
82 def insert(self, table, values=(), **kwargs):
84 Convenience wrapper for database INSERT, returns inserted row ID.
85 Keyword arguments are added to VALUES.
87 sql, args = self.
makeSQL(
"INSERT", table, values=values, kwargs=kwargs)
88 cursor = self.
execute(sql, args)
89 row =
None if cursor.description
is None else next(cursor,
None)
90 return next(iter(row.values()))
if row
and isinstance(row, dict)
else None
93 def insertmany(self, table, rows=(), **kwargs):
95 Convenience wrapper for database multiple INSERTs, returns list of inserted row IDs.
96 Keyword arguments are added to VALUES of every single row, overriding individual row values.
101 n = util.nameify(table, self.
_wrapper(column=
False))
102 tablename = n.name
if isinstance(n, Identifier)
else n
103 pk = self.
_structure.get(tablename, {}).get(
"key")
105 commons = [(self.
_column(k, table=table, tablename=tablename), v)
for k, v
in kwargs.items()]
107 cols, values, valueidx = [], [], {}
108 knotnull = pk
if pk
and util.is_dataobject(row)
else None
109 for k, v
in util.keyvalues(row, self.
_wrapper(tablename=tablename)):
110 k = self.
_column(k, table=table, tablename=tablename)
111 if k != knotnull
or v
is not None:
112 cols.append(k), values.append((k, v)), valueidx.update({k: len(valueidx)})
114 if k
in valueidx: values[valueidx[k]] = (k, v)
115 else: cols.append(k), values.append((k, v))
116 cols.sort(key=
lambda x: x.lower()), values.sort(key=
lambda x: x[0].lower())
117 cachekey = tuple(cols)
119 sql = sqlcache.get(cachekey)
121 sql, args = self.
makeSQL(
"INSERT", tablename, values=values)
122 sqlcache[cachekey] = sql
124 keys = [
"%sI%s" % (re.sub(
r"\W+",
"_", k), i)
for i, (k, _)
in enumerate(values)]
125 args = {a: self.
_cast(k, v, table, tablename)
126 for i, (a, (k, v))
in enumerate(zip(keys, values))}
128 cursor = self.
execute(sql, args)
129 res =
None if cursor.description
is None else next(cursor,
None)
130 result.append(next(iter(res.values()))
if res
and isinstance(res, dict)
else None)
134 def makeSQL(self, action, table, cols="*", where=(), group=(), order=(), limit=(), values=(),
136 """Returns (SQL statement string, parameter dict)."""
138 def parse_members(i, col, op, val):
139 """Returns (col, op, val, argkey)."""
140 if isinstance(col, Identifier): col, colsql, pure = col.name, text_type(col),
False
141 elif not isinstance(col, string_types):
142 col = self.
_match_name(util.nameify(col, parent=table), tablename)
143 colsql, pure = Identifier.quote(col),
False
144 else: colsql, pure = col,
True
145 key =
"%sW%s" % (re.sub(
r"\W+",
"_", col), i)
146 if "EXPR" == col.upper()
and pure:
148 colsql, op, val, key = val[0],
"EXPR", val[1],
"EXPRW%s" % i
149 elif col.count(
"?") == argcount(val)
and pure:
151 op, val, key =
"EXPR", listify(val),
"EXPRW%s" % i
152 elif isinstance(val, (list, tuple))
and len(val) == 2 \
153 and isinstance(val[0], string_types):
154 tmp = val[0].strip().upper()
157 op, val = tmp, val[1]
158 elif val[0].count(
"?") == argcount(val[1]):
160 colsql, val, op =
"%s = %s" % (col, val[0]), listify(val[1]),
"EXPR"
161 if op
in (
"IN",
"NOT IN")
and not val:
162 colsql =
"%s%s = ANY('{}')" % (
"" if "IN" == op
else "NOT ", colsql)
164 return colsql, op, self.
_cast(col, val, table, tablename), key
165 def argcount(x) :
return len(x)
if isinstance(x, (list, set, tuple))
else 1
166 def listify(x) :
return x
if isinstance(x, (list, tuple))
else \
167 list(x)
if isinstance(x, set)
else [x]
168 def keylistify(x):
return x
if isinstance(x, (list, tuple))
else \
169 list(x)
if isinstance(x, (dict, set))
else [x]
170 cast =
lambda col, val: self.
_cast(col, val, table, tablename)
171 column =
lambda col, sql=
False: self.
_column(col, sql, table, tablename)
172 wrapper =
lambda column=
True, sql=
False: self.
_wrapper(column, sql, tablename)
176 action = action.upper()
177 where, group, order, limit, values = (()
if x
is None else x
178 for x
in (where, group, order, limit, values))
179 n = util.nameify(table, self.
_wrapper(column=
False))
180 tablename, tablesql = (n.name, text_type(n))
if isinstance(n, Identifier)
else (n, n)
181 namefmt = wrapper(sql=
True)
183 cols =
", ".join(util.nameify(x, namefmt, table)
for x
in keylistify(cols))
or "*"
184 group =
", ".join(util.nameify(x, namefmt, table)
for x
in keylistify(group))
185 where = util.keyvalues(where, wrapper())
186 order = list(order.items())
if isinstance(order, dict)
else listify(order)
187 order = [order]
if isinstance(order, (list, tuple)) \
188 and len(order) == 2
and isinstance(order[1], bool)
else order
189 limit = [limit]
if isinstance(limit, string_types + integer_types)
else limit
190 values = util.keyvalues(values, wrapper())
191 sql =
"SELECT %s FROM %s" % (cols, tablesql)
if "SELECT" == action
else ""
192 sql =
"DELETE FROM %s" % (tablesql)
if "DELETE" == action
else sql
193 sql =
"INSERT INTO %s" % (tablesql)
if "INSERT" == action
else sql
194 sql =
"UPDATE %s" % (tablesql)
if "UPDATE" == action
else sql
196 if kwargs
and action
in (
"SELECT",
"DELETE",
"UPDATE"): where += list(kwargs.items())
197 if kwargs
and action
in (
"INSERT", ): values += list(kwargs.items())
199 if "INSERT" == action:
200 pk = self.
_structure.get(tablename, {}).get(
"key")
201 if pk
and util.is_dataobject(values0):
202 values = [(k, v)
for k, v
in values
if k != pk
or v
is not None]
203 keys = [
"%sI%s" % (re.sub(
r"\W+",
"_", column(k)), i)
204 for i, (k, _)
in enumerate(values)]
205 args.update((a, cast(k, v))
for i, (a, (k, v))
in enumerate(zip(keys, values)))
206 cols =
", ".join(column(k, sql=
True)
for k, _
in values)
207 vals =
", ".join(
"%%(%s)s" % s
for s
in keys)
208 sql +=
" (%s) VALUES (%s)" % (cols, vals)
209 if pk: sql +=
" RETURNING %s AS id" % Identifier.quote(pk)
210 if "UPDATE" == action:
212 for i, (col, val)
in enumerate(values):
213 key =
"%sU%s" % (re.sub(
r"\W+",
"_", column(col)), i)
214 sql += (
", " if i
else "") +
"%s = %%(%s)s" % (column(col, sql=
True), key)
215 args[key] = cast(col, val)
218 for i, clause
in enumerate(where):
219 if isinstance(clause, string_types):
223 col, op, val, key = clause[0],
"EXPR", [],
None
224 elif len(clause) == 2:
225 col, op, val, key = parse_members(i, clause[0],
"=", clause[1])
227 col, op, val, key = parse_members(i, *clause)
230 for j
in range(col.count(
"?")):
231 col = col.replace(
"?",
"%%(%s_%s)s" % (key, j), 1)
232 args[
"%s_%s" % (key, j)] = cast(
None, val[j])
233 sql += (
" AND " if i
else "") +
"(%s)" % col
235 op = {
"=":
"IS",
"!=":
"IS NOT",
"<>":
"IS NOT"}.get(op, op)
236 sql += (
" AND " if i
else "") +
"%s %s NULL" % (col, op)
239 sql += (
" AND " if i
else "") +
"%s %s %%(%s)s" % (col, op, key)
241 sql +=
" GROUP BY " + group
244 for i, col
in enumerate(order):
245 name = util.nameify(col[0]
if isinstance(col, (list, tuple))
else col, quote, table)
246 sort = col[1]
if name != col
and isinstance(col, (list, tuple))
and len(col) > 1 \
248 if not isinstance(sort, string_types): sort =
"" if sort
else "DESC"
249 sql += (
", " if i
else "") + name + (
" " if sort
else "") + sort
251 limit = [
None if isinstance(v, integer_types)
and v < 0
else v
for v
in limit]
252 for k, v
in zip((
"limit",
"offset"), limit):
253 if v
is None:
continue
254 sql +=
" %s %%(%s)s" % (k.upper(), k)
257 logger.log(logging.DEBUG // 2, sql)
262 def quote(cls, value, force=False):
264 Returns identifier in quotes and proper-escaped for queries,
265 if value needs quoting (has non-alphanumerics, starts with number, or is reserved).
267 @param value the value to quote, returned as-is if not string
268 @param force whether to quote value even if not required
270 return quote(value, force)
273 def _adapt_value(self, value, typename):
275 Returns value as JSON if field is a JSON type and no adapter registered for value type,
278 if typename
in (
"json",
"jsonb")
and type(value)
not in self.ADAPTERS.values():
279 return psycopg2.extras.Json(value, dumps=util.json_dumps)
283 def _cast(self, col, val, table=None, tablename=None):
284 """Returns column value cast to correct type for use in psycopg."""
285 col = col.name
if isinstance(col, Identifier)
else \
286 col
if isinstance(col, string_types)
else \
287 self.
_match_name(util.nameify(col, parent=table), tablename)
289 if field
and "array" == field[
"type"]:
290 return list(val)
if isinstance(val, (list, set, tuple))
else [val]
291 elif field
and val
is not None:
293 if isinstance(val, (list, set)):
298 def _column(self, col, sql=False, table=None, tablename=None):
299 """Returns column name from string/property/Identifier, quoted if object and `sql`."""
300 if inspect.isdatadescriptor(col):
301 col = util.nameify(col, self.
_wrapper(sql=sql, tablename=tablename), table)
302 if isinstance(col, Identifier):
return text_type(col)
if sql
else col.name
303 return col
if isinstance(col, string_types)
else text_type(col)
306 def _load_schema(self, force=False):
307 """Populates table structure from database if uninitialized or forced."""
311 self.
cursor.factory, factory0 =
None, self.
cursor.factory
313 finally: self.
cursor.factory = factory0
316 def _match_name(self, name, table=None):
318 Returns proper-cased name from schema lookup, or same value if no match.
320 @parma name name of table/view or field, from data object or property
321 @param table name of table to match field for, if not matching table name
324 if name
not in (container
or {}):
325 namelc = name.lower()
326 if namelc
in container:
329 variants = [n
for n
in container
if n.lower() == namelc]
330 if len(variants) == 1:
335 def _wrapper(self, column=True, sql=False, tablename=None):
336 """Returns function(name) producing Identifier or SQL-ready name string."""
339 return text_type(val)
if sql
else val
345 Convenience wrapper around psycopg2.ConnectionPool and Cursor.
347 Queries directly on the Database object use autocommit mode.
357 MUTEX = collections.defaultdict(threading.RLock)
371 Creates a new Database instance for Postgres.
373 By default uses a pool of 1..4 connections.
375 Connection parameters can also be specified in OS environment,
376 via standard Postgres environment variables like `PGUSER` and `PGPASSWORD`.
378 @param opts Postgres connection string, or options dictionary as
379 `dict(dbname=.., user=.., password=.., host=.., port=.., ..)`
380 @param kwargs additional arguments given to engine constructor,
381 e.g. `minconn=1, maxconn=4`
395 """Context manager entry, opens database if not already open, returns Database object."""
400 def __exit__(self, exc_type, exc_val, exc_trace):
401 """Context manager exit, closes database and any pending transactions if open."""
402 txs, self.
_txs[:] = self.
_txs[:], []
403 for tx
in txs: tx.close(commit=
None if exc_type
is None else False)
405 return exc_type
is None
408 def execute(self, sql, args=()):
410 Executes SQL statement, returns psycopg cursor.
412 @param sql SQL statement to execute, with psycopg-specific parameter bindings, if any
413 @param args dictionary for %(name)s placeholders,
414 or a sequence for positional %s placeholders, or None
416 if not self.
_cursor:
raise RuntimeError(
"Database not open.")
423 Executes the SQL statement against all parameter sequences.
425 @param sql SQL statement to execute, with psycopg-specific parameter bindings
426 @param args iterable of query parameters, as dictionaries for %(name)s placeholders
427 or sequences for positional %s placeholders
429 if not self.
_cursor:
raise RuntimeError(
"Database not open.")
430 psycopg2.extras.execute_batch(self.
_cursor, sql, list(args))
435 Executes the SQL as script of any number of statements.
437 Reloads internal schema structure from database.
439 @param sql script with one or more SQL statements
447 """Opens database connection if not already open."""
455 def close(self, commit=None):
457 Closes the database and any pending transactions, if open.
459 @param commit `True` for explicit commit on open transactions,
460 `False` for explicit rollback on open transactions,
461 `None` defaults to `commit` flag from transaction creations
463 txs, self.
_txs[:] = self.
_txs[:], []
464 for tx
in txs: tx.close(commit)
469 self.
MUTEX.pop(self,
None)
470 pool = self.
POOLS.pop(self,
None)
471 if pool: pool.closeall()
476 """Whether database connection is currently not open."""
482 """Database engine cursor object, or `None` if closed."""
488 """The custom row factory, if any, as `function(cursor, row tuple)`."""
495 Sets custom row factory, as `function(cursor, row tuple)`, or `None` to reset to default.
497 `cursor.description` is a sequence of 7-element tuples,
498 as `(name, type_code, display_size, internal_size, precision, scale, null_ok)`.
500 self.
_row_factory =
False if row_factory
is None else row_factory
504 def transaction(self, commit=True, exclusive=False, **kwargs):
506 Returns a transaction context manager.
508 Context is breakable by raising Rollback.
510 @param commit whether transaction commits at exiting with-block
511 @param exclusive whether entering a with-block is exclusive
512 over other Transaction instances on this Database
513 @param kwargs engine-specific arguments, like `schema="other", lazy=True` for Postgres
515 tx =
Transaction(self, commit, exclusive, **kwargs)
521 def make_cursor(self, commit=False, autocommit=False, schema=None, lazy=False, itersize=None):
523 Context manager for psycopg connection cursor.
524 Creates a new cursor on an unused connection and closes it when exiting
525 context, committing changes if specified.
527 @param commit commit at the end on success
528 @param autocommit connection autocommit mode
529 @param schema name of Postgres schema to use, if not using default `"public"`
530 @param lazy if true, returns a named server-side cursor that fetches rows
531 iteratively in batches; only supports making a single query
532 @param itersize batch size in rows for server-side cursor
533 @return psycopg2 Cursor
535 connection = self.
POOLS[self].getconn()
537 connection.autocommit = autocommit
538 cursor, namedcursor =
None,
None
539 if "public" == schema: schema =
None
543 if schema
or not lazy: cursor = connection.cursor()
544 if schema: cursor.execute(
'SET search_path TO %s,public' %
quote(schema))
545 if lazy: namedcursor = connection.cursor(
"name_%s" % id(connection))
546 if lazy
and itersize
is not None: namedcursor.itersize = itersize
549 yield namedcursor
or cursor
550 if commit: connection.commit()
551 except GeneratorExit:
pass
552 except Exception
as e:
554 logger.exception(
"SQL error on %s:", (namedcursor
or cursor).query)
557 connection.rollback()
558 try: namedcursor
and namedcursor.close()
559 except Exception:
pass
561 cursor.execute(
"SET search_path TO public")
563 if cursor: cursor.close()
564 finally: self.
POOLS[self].putconn(connection)
568 def init_pool(cls, db, minconn=POOL_SIZE[0], maxconn=POOL_SIZE[1], **kwargs):
569 """Initializes connection pool for Database if not already initialized."""
571 if db
in cls.
POOLS:
return
573 args = minconn, maxconn, db.dsn
574 kwargs.update(cursor_factory=db._cursor_factory)
575 cls.
POOLS[db] = psycopg2.pool.ThreadedConnectionPool(*args, **kwargs)
578 def _apply_converters(self):
579 """Applies registered converters, if any, looking up type OIDs on live cursor."""
584 for typename, transformer
in regs.items():
585 cursor.execute(
"SELECT NULL::%s" % typename)
586 oid = cursor.description[0][1]
587 wrap =
lambda x, c, f=transformer: f(x)
588 TYPE = psycopg2.extensions.new_type((oid, ), typename, wrap)
589 psycopg2.extensions.register_type(TYPE)
592 def _cursor_factory(self, *args, **kwargs):
593 """Returns a new RowFactoryCursor."""
595 if factory
in (
False,
None): factory =
None if factory
is False else self.
ROW_FACTORY
599 def _notify(self, tx):
600 """Notifies database of transaction closing."""
601 if tx
in self.
_txs: self.
_txs.remove(tx)
607 Transaction context manager, provides convenience methods for queries.
609 Supports server-side cursors; those can only be used for making a single query.
611 Must be closed explicitly if not used as context manager in a with-block.
612 Block can be exited early by raising Rollback.
615 def __init__(self, db, commit=True, exclusive=False,
616 schema=None, lazy=False, itersize=None, **__):
618 Creates a transaction context manager.
620 Context is breakable by raising Rollback.
622 @param db Database instance
623 @param commit whether transaction commits automatically at exiting with-block
624 @param exclusive whether entering a with-block is exclusive over other
625 Transaction instances Database
626 @param schema search_path to use in this transaction
627 @param lazy if true, uses a server-side cursor to fetch results from server
628 iteratively in batches instead of all at once,
629 supports one single query only
630 @param itersize batch size for server-side cursor (defaults to 2000 rows)
635 self.
_cursorctx = db.make_cursor(commit, schema=schema, lazy=lazy, itersize=itersize)
642 """Context manager entry, opens cursor, returns Transaction object."""
643 if self.
closed:
raise RuntimeError(
"Transaction already closed")
654 def __exit__(self, exc_type, exc_val, exc_trace):
655 """Context manager exit, closes cursor, commits or rolls back as specified on creation."""
667 self.
_db._notify(self)
670 def close(self, commit=None):
672 Closes the transaction, performing commit or rollback as specified,
673 and releases database connection back to connection pool.
674 Required if not using transaction as context manager in a with-block.
676 @param commit `True` for explicit commit, `False` for explicit rollback,
677 `None` defaults to `commit` flag from creation
680 self.
_db._notify(self)
683 elif commit: self.
commit()
688 self.
_db._notify(self)
690 def execute(self, sql, args=()):
692 Executes SQL statement, returns psycopg cursor.
694 @param sql SQL statement to execute, with psycopg-specific parameter bindings, if any
695 @param args dictionary for %(name)s placeholders,
696 or a sequence for positional %s placeholders, or None
698 if self.
closed:
raise RuntimeError(
"Transaction already closed")
705 Executes the SQL statement against all parameter sequences.
707 @param sql SQL statement to execute, with psycopg-specific parameter bindings
708 @param args iterable of query parameters, as dictionaries for %(name)s placeholders
709 or sequences for positional %s placeholders
711 if self.
closed:
raise RuntimeError(
"Transaction already closed")
713 psycopg2.extras.execute_batch(self.
_cursor, sql, list(args))
717 Executes the SQL as script of any number of statements.
719 Reloads internal schema structure from database.
721 @param sql script with one or more SQL statements
728 """Commits pending actions, if any."""
732 """Rolls back pending actions, if any."""
737 """Whether transaction is currently not open."""
742 """Database engine cursor object, or `None` if closed."""
749 """Returns transaction Database instance."""
752 def _load_schema(self, force=False):
754 Populates database table structure from database if uninitialized or forced.
756 Uses parent Database for lookup if lazy cursor.
758 if self.
_lazy:
return self.
_db._load_schema(force=force)
759 return super(Transaction, self)._load_schema(force=force)
764 """A cursor that generates result rows via given factory callable."""
766 def __init__(self, row_factory, *args, **kwargs):
767 self.factory = row_factory
768 self.rowtype = dict
if sys.version_info > (3, )
else collections.OrderedDict
769 super(RowFactoryCursor, self).
__init__(*args, **kwargs)
772 row = super(RowFactoryCursor, self).
fetchone()
773 return row
if row
is None else self.row_factory(row)
775 def fetchmany(self, size=None):
776 rows = super(RowFactoryCursor, self).fetchmany(size)
777 return [self.row_factory(row)
for row
in rows]
780 rows = super(RowFactoryCursor, self).
fetchall()
781 return [self.row_factory(row)
for row
in rows]
783 def __next__(self):
return self.row_factory(super(RowFactoryCursor, self).__next__())
784 def next(self):
return self.__next__()
786 def row_factory(self, row):
787 """Returns value constructed with custom row factory, or as dict if no factory."""
788 if self.factory
not in (
False,
None):
return self.factory(self, row)
789 return self.
rowtype(zip([x[0]
for x
in self.description], row))
795 Returns true if input is recognizable as Postgres connection options.
797 @param opts expected as URL string `"postgresql://user@localhost/mydb"`
798 or keyword=value format string like `"host=localhost dbname=.."`
799 or a dictionary of `dict(host="localhost", dbname=..)`
801 if not isinstance(opts, string_types + (dict, )):
return False
802 if isinstance(opts, dict):
803 try:
return bool(psycopg2.extensions.make_dsn(**opts)
or True)
804 except Exception:
return False
805 try:
return bool(psycopg2.extensions.parse_dsn(opts)
or True)
806 except Exception:
return False
810 """Returns Postgres connection options as URL, like `"postgresql://host/dbname"`."""
811 BASICS = collections.OrderedDict([(
"user",
""), (
"password",
":"), (
"host",
""),
812 (
"port",
":"), (
"dbname",
"/")])
813 result, creds =
"",
False
814 if isinstance(opts, string_types):
815 opts = psycopg2.extensions.parse_dsn(opts)
816 for i, (k, prefix)
in enumerate(BASICS.items()):
817 if creds
and i > 1: result, creds = result +
"@",
False
818 if opts.get(k)
is not None:
819 result, creds = result + prefix +
"%%(%s)s" % k, (i < 2)
820 result %= {k : urllib_parse.quote(text_type(opts[k]))
for k
in opts}
821 if any(k
not in BASICS
for k
in opts):
822 result +=
"/" if opts.get(
"dbname")
is None else ""
823 result +=
"?" + urllib_parse.urlencode({k: opts[k]
for k
in opts
if k
not in BASICS})
824 return "postgresql://" + result
827 def query_schema(queryable, keys=False, views=False, inheritance=False):
829 Returns database table structure populated from given database.
831 @param queryable Database or Transaction instance
832 @param views whether to include views
833 @param keys whether to include primary and foreign key information
834 @param inheritance whether to include parent-child table information
835 and populate inherited foreign keys
836 @return ```{table or view name: {
837 "fields": OrderedDict({
840 "type": column type name,
842 ?"fk": foreign table name,
845 ?"key": primary key column name,
846 ?"parent": parent table name,
847 ?"children": [child table name, ],
848 "type": "table" or "view",
855 for v
in queryable.fetchall(
"information_schema.columns", table_schema=
"public",
856 order=
"table_name, ordinal_position"):
857 t, c, d = v[
"table_name"], v[
"column_name"], v[
"data_type"]
858 if t
not in result: result[t] = {
"type":
"table",
859 "fields": collections.OrderedDict()}
860 result[t][
"fields"][c] = {
"name": c,
"type": d.lower()}
863 for v
in queryable.fetchall(
864 "information_schema.table_constraints tc "
865 "JOIN information_schema.key_column_usage kcu "
866 "ON tc.constraint_name = kcu.constraint_name "
867 "JOIN information_schema.constraint_column_usage ccu "
868 "ON ccu.constraint_name = tc.constraint_name ",
869 cols=
"DISTINCT tc.table_name, kcu.column_name, tc.constraint_type, "
870 "ccu.table_name AS table_name2", where={
"tc.table_schema":
"public"}
872 t, c, t2 = v[
"table_name"], v[
"column_name"], v[
"table_name2"]
873 if "PRIMARY KEY" == v[
"constraint_type"]:
874 result[t][
"fields"][c][
"pk"], result[t][
"key"] =
True, c
875 else: result[t][
"fields"][c][
"fk"] = t2
878 for v
in queryable.fetchall(
879 "information_schema.pg_inherits i JOIN information_schema.pg_class c ON inhrelid=c.oid "
880 "JOIN information_schema.pg_class p ON inhparent = p.oid "
881 "JOIN information_schema.pg_namespace pn ON pn.oid = p.relnamespace "
882 "JOIN information_schema.pg_namespace cn "
883 "ON cn.oid = c.relnamespace AND cn.nspname = pn.nspname",
884 cols=
"c.relname AS child, p.relname AS parent",
885 where={
"pn.nspname":
"public"}
886 )
if inheritance
else ():
887 result[v[
"parent"]].setdefault(
"children", []).append(v[
"child"])
888 result[v[
"child"]][
"parent"] = v[
"parent"]
889 for f, opts
in result[v[
"parent"]][
"fields"].items()
if keys
else ():
890 if not opts.get(
"fk"):
continue
891 result[v[
"child"]][
"fields"][f][
"fk"] = opts[
"fk"]
894 for v
in queryable.fetchall(
895 "information_schema.pg_attribute a "
896 "JOIN information_schema.pg_class c ON a.attrelid = c.oid "
897 "JOIN information_schema.pg_namespace s ON c.relnamespace = s.oid "
898 "JOIN information_schema.pg_type t ON a.atttypid = t.oid "
899 "JOIN information_schema.pg_proc p ON t.typname = p.proname ",
900 cols=
"DISTINCT c.relname, a.attname, pg_get_function_result(p.oid) AS data_type",
901 where={
"a.attnum": (
">", 0),
"a.attisdropped":
False,
902 "s.nspname":
"public",
"c.relkind": (
"IN", (
"v",
"m"))}
904 t, c, d = v[
"relname"], v[
"attname"], v[
"data_type"]
905 if t
not in result: result[t] = {
"type":
"view",
906 "fields": collections.OrderedDict()}
907 result[t][
"fields"][c] = {
"name": c,
"type": d.lower()}
912 def quote(value, force=False):
914 Returns identifier in quotes and proper-escaped for queries,
915 if value needs quoting (has non-alphanumerics, starts with number, or is reserved).
917 @param value the value to quote, returned as-is if not string
918 @param force whether to quote value even if not required
920 if not isinstance(value, string_types):
922 RGX_INVALID, RGX_UNICODE =
r"(^[\W\d])|(?=\W)",
r"[^\x01-\x7E]"
923 result = value.decode()
if isinstance(value, binary_type)
else value
924 if force
or result.upper()
in RESERVED_KEYWORDS
or re.search(RGX_INVALID, result):
925 if re.search(RGX_UNICODE, value):
926 result = result.replace(
"\\",
r"\\").replace(
'"',
'""')
927 result =
'U&"%s"' % re.sub(RGX_UNICODE,
lambda m:
r"\+%06X" % ord(m.group(0)), value)
929 result =
'"%s"' % result.replace(
'"',
'""')
934 """Registers function to auto-adapt given Python types to Postgres types in query parameters."""
936 """Wraps transformed value in psycopg protocol object."""
938 return psycopg2.extensions.AsIs(v
if isinstance(v, binary_type)
else text_type(v).encode())
940 for t
in typeclasses:
941 psycopg2.extensions.register_adapter(t, adapt)
942 Database.ADAPTERS[t] = transformer
946 """Registers function to auto-convert given Postgres types to Python types in query results."""
947 typenames = [n.upper()
for n
in typenames]
948 if "JSON" in typenames:
949 psycopg2.extras.register_default_json(globally=
True, loads=transformer)
950 if "JSONB" in typenames:
951 psycopg2.extras.register_default_jsonb(globally=
True, loads=transformer)
952 Database.CONVERTERS.update({n: transformer
for n
in typenames
if n
not in (
"JSON",
"JSONB")})
956 """Registers custom row factory, as or `None` to reset to default."""
957 Database.ROW_FACTORY = row_factory
963 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
964 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
965 except Exception: logger.exception(
"Error configuring psycopg.")
969 "RESERVED_KEYWORDS",
"Database",
"Transaction",
970 "autodetect",
"quote",
"register_adapter",
"register_converter",
"register_row_factory",