dblite  1.3.0
Simple query interface for SQL databases
postgres.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 Simple convenience wrapper for Postgres, via psycopg2.
4 
5 ------------------------------------------------------------------------------
6 This file is part of dblite - simple query interface for SQL databases.
7 Released under the MIT License.
8 
9 @author Erki Suurjaak
10 @created 08.05.2020
11 @modified 26.03.2023
12 ------------------------------------------------------------------------------
13 """
14 import collections
15 from contextlib import contextmanager
16 import inspect
17 import logging
18 import re
19 import sys
20 import threading
21 
22 from six.moves import urllib_parse
23 from six import binary_type, integer_types, string_types, text_type
24 
25 try:
26  import psycopg2
27  import psycopg2.extensions
28  import psycopg2.extras
29  import psycopg2.pool
30 except ImportError: psycopg2 = None
31 
32 from .. import api, util
33 
34 logger = logging.getLogger(__name__)
35 
36 
37 
38 RESERVED_KEYWORDS = [
39  "ALL", "ANALYSE", "ANALYZE", "AND", "ANY", "ARRAY", "AS", "ASC", "ASYMMETRIC", "AUTHORIZATION",
40  "BINARY", "BOTH", "CASE", "CAST", "CHECK", "COLLATE", "COLLATION", "COLUMN", "CONCURRENTLY",
41  "CONSTRAINT", "CREATE", "CROSS", "CURRENT_CATALOG", "CURRENT_DATE", "CURRENT_ROLE",
42  "CURRENT_SCHEMA", "CURRENT_TIME", "CURRENT_TIMESTAMP", "CURRENT_USER", "DEFAULT", "DEFERRABLE",
43  "DESC", "DISTINCT", "DO", "ELSE", "END", "EXCEPT", "FALSE", "FETCH", "FOR", "FOREIGN", "FREEZE",
44  "FROM", "FULL", "GRANT", "GROUP", "HAVING", "ILIKE", "IN", "INITIALLY", "INNER", "INTERSECT",
45  "INTO", "IS", "ISNULL", "JOIN", "LATERAL", "LEADING", "LEFT", "LIKE", "LIMIT", "LOCALTIME",
46  "LOCALTIMESTAMP", "NATURAL", "NOT", "NOTNULL", "NULL", "OFFSET", "ON", "ONLY", "OR", "ORDER",
47  "OUTER", "OVERLAPS", "PLACING", "PRIMARY", "REFERENCES", "RETURNING", "RIGHT", "SELECT",
48  "SESSION_USER", "SIMILAR", "SOME", "SYMMETRIC", "TABLE", "TABLESAMPLE", "THEN", "TO",
49  "TRAILING", "TRUE", "UNION", "UNIQUE", "USER", "USING", "VARIADIC", "VERBOSE",
50  "WHEN", "WHERE", "WINDOW", "WITH"
51 ]
52 
53 
54 class Identifier(object):
55  """Wrapper for table and column names from data objects."""
56  def __init__(self, name): self.name = name
57  def __eq__(self, other):
58  """Supports comparing to strings or other Identifier instances."""
59  if isinstance(other, type(self)): return self.name == other.name
60  return isinstance(other, string_types) and (self.name == other)
61  def __str__(self):
62  """Returns quoted name."""
63  return self.quote(self.name)
64  @staticmethod
65  def quote(name):
66  """Returns quoted name."""
67  return quote(name, force=not name.islower()) # Must quote if not lowercase
68 
69 
71 
72 
73  OPS = ("!=", "!~", "!~*", "#", "%", "&", "*", "+", "-", "/", "<", "<<",
74  "<=", "<>", "<@", "=", ">", ">=", ">>", "@>", "^", "|", "||", "&&", "~",
75  "~*", "ANY", "ILIKE", "IN", "IS", "IS NOT", "LIKE", "NOT ILIKE", "NOT IN",
76  "NOT LIKE", "NOT SIMILAR TO", "OR", "OVERLAPS", "SIMILAR TO", "SOME")
77 
78 
79  ENGINE = "postgres"
80 
81 
82  def insert(self, table, values=(), **kwargs):
83  """
84  Convenience wrapper for database INSERT, returns inserted row ID.
85  Keyword arguments are added to VALUES.
86  """
87  sql, args = self.makeSQL("INSERT", table, values=values, kwargs=kwargs)
88  cursor = self.execute(sql, args)
89  row = None if cursor.description is None else next(cursor, None)
90  return next(iter(row.values())) if row and isinstance(row, dict) else None
91 
92 
93  def insertmany(self, table, rows=(), **kwargs):
94  """
95  Convenience wrapper for database multiple INSERTs, returns list of inserted row IDs.
96  Keyword arguments are added to VALUES of every single row, overriding individual row values.
97  """
98  result = []
99 
100  sqlcache = {} # {tuple(col, ): "INSERT .."}
101  n = util.nameify(table, self._wrapper(column=False))
102  tablename = n.name if isinstance(n, Identifier) else n
103  pk = self._structure.get(tablename, {}).get("key")
104 
105  commons = [(self._column(k, table=table, tablename=tablename), v) for k, v in kwargs.items()]
106  for row in rows:
107  cols, values, valueidx = [], [], {}
108  knotnull = pk if pk and util.is_dataobject(row) else None # Skip NULL pk if data object
109  for k, v in util.keyvalues(row, self._wrapper(tablename=tablename)):
110  k = self._column(k, table=table, tablename=tablename)
111  if k != knotnull or v is not None:
112  cols.append(k), values.append((k, v)), valueidx.update({k: len(valueidx)})
113  for k, v in commons:
114  if k in valueidx: values[valueidx[k]] = (k, v)
115  else: cols.append(k), values.append((k, v))
116  cols.sort(key=lambda x: x.lower()), values.sort(key=lambda x: x[0].lower())
117  cachekey = tuple(cols)
118 
119  sql = sqlcache.get(cachekey)
120  if not sql:
121  sql, args = self.makeSQL("INSERT", tablename, values=values)
122  sqlcache[cachekey] = sql
123  else:
124  keys = ["%sI%s" % (re.sub(r"\W+", "_", k), i) for i, (k, _) in enumerate(values)]
125  args = {a: self._cast(k, v, table, tablename)
126  for i, (a, (k, v)) in enumerate(zip(keys, values))}
127 
128  cursor = self.execute(sql, args)
129  res = None if cursor.description is None else next(cursor, None)
130  result.append(next(iter(res.values())) if res and isinstance(res, dict) else None)
131  return result
132 
133 
134  def makeSQL(self, action, table, cols="*", where=(), group=(), order=(), limit=(), values=(),
135  kwargs=None):
136  """Returns (SQL statement string, parameter dict)."""
137 
138  def parse_members(i, col, op, val):
139  """Returns (col, op, val, argkey)."""
140  if isinstance(col, Identifier): col, colsql, pure = col.name, text_type(col), False
141  elif not isinstance(col, string_types):
142  col = self._match_name(util.nameify(col, parent=table), tablename)
143  colsql, pure = Identifier.quote(col), False
144  else: colsql, pure = col, True
145  key = "%sW%s" % (re.sub(r"\W+", "_", col), i)
146  if "EXPR" == col.upper() and pure:
147  # ("EXPR", ("SQL", val))
148  colsql, op, val, key = val[0], "EXPR", val[1], "EXPRW%s" % i
149  elif col.count("?") == argcount(val) and pure:
150  # ("any SQL with ? placeholders", val)
151  op, val, key = "EXPR", listify(val), "EXPRW%s" % i
152  elif isinstance(val, (list, tuple)) and len(val) == 2 \
153  and isinstance(val[0], string_types):
154  tmp = val[0].strip().upper()
155  if tmp in self.OPS:
156  # ("col", ("binary op like >=", val))
157  op, val = tmp, val[1]
158  elif val[0].count("?") == argcount(val[1]):
159  # ("col", ("SQL with ? placeholders", val))
160  colsql, val, op = "%s = %s" % (col, val[0]), listify(val[1]), "EXPR"
161  if op in ("IN", "NOT IN") and not val: # IN -> ANY, to avoid error on empty array
162  colsql = "%s%s = ANY('{}')" % ("" if "IN" == op else "NOT ", colsql)
163  op = "EXPR"
164  return colsql, op, self._cast(col, val, table, tablename), key
165  def argcount(x) : return len(x) if isinstance(x, (list, set, tuple)) else 1
166  def listify(x) : return x if isinstance(x, (list, tuple)) else \
167  list(x) if isinstance(x, set) else [x]
168  def keylistify(x): return x if isinstance(x, (list, tuple)) else \
169  list(x) if isinstance(x, (dict, set)) else [x]
170  cast = lambda col, val: self._cast(col, val, table, tablename)
171  column = lambda col, sql=False: self._column(col, sql, table, tablename)
172  wrapper = lambda column=True, sql=False: self._wrapper(column, sql, tablename)
173 
174  self._load_schema()
175  values0 = values
176  action = action.upper()
177  where, group, order, limit, values = (() if x is None else x
178  for x in (where, group, order, limit, values))
179  n = util.nameify(table, self._wrapper(column=False))
180  tablename, tablesql = (n.name, text_type(n)) if isinstance(n, Identifier) else (n, n)
181  namefmt = wrapper(sql=True)
182 
183  cols = ", ".join(util.nameify(x, namefmt, table) for x in keylistify(cols)) or "*"
184  group = ", ".join(util.nameify(x, namefmt, table) for x in keylistify(group))
185  where = util.keyvalues(where, wrapper())
186  order = list(order.items()) if isinstance(order, dict) else listify(order)
187  order = [order] if isinstance(order, (list, tuple)) \
188  and len(order) == 2 and isinstance(order[1], bool) else order
189  limit = [limit] if isinstance(limit, string_types + integer_types) else limit
190  values = util.keyvalues(values, wrapper())
191  sql = "SELECT %s FROM %s" % (cols, tablesql) if "SELECT" == action else ""
192  sql = "DELETE FROM %s" % (tablesql) if "DELETE" == action else sql
193  sql = "INSERT INTO %s" % (tablesql) if "INSERT" == action else sql
194  sql = "UPDATE %s" % (tablesql) if "UPDATE" == action else sql
195  args = {}
196  if kwargs and action in ("SELECT", "DELETE", "UPDATE"): where += list(kwargs.items())
197  if kwargs and action in ("INSERT", ): values += list(kwargs.items())
198 
199  if "INSERT" == action:
200  pk = self._structure.get(tablename, {}).get("key")
201  if pk and util.is_dataobject(values0): # Can't avoid giving primary key if data object
202  values = [(k, v) for k, v in values if k != pk or v is not None] # Discard NULL pk
203  keys = ["%sI%s" % (re.sub(r"\W+", "_", column(k)), i)
204  for i, (k, _) in enumerate(values)]
205  args.update((a, cast(k, v)) for i, (a, (k, v)) in enumerate(zip(keys, values)))
206  cols = ", ".join(column(k, sql=True) for k, _ in values)
207  vals = ", ".join("%%(%s)s" % s for s in keys)
208  sql += " (%s) VALUES (%s)" % (cols, vals)
209  if pk: sql += " RETURNING %s AS id" % Identifier.quote(pk)
210  if "UPDATE" == action:
211  sql += " SET "
212  for i, (col, val) in enumerate(values):
213  key = "%sU%s" % (re.sub(r"\W+", "_", column(col)), i)
214  sql += (", " if i else "") + "%s = %%(%s)s" % (column(col, sql=True), key)
215  args[key] = cast(col, val)
216  if where:
217  sql += " WHERE "
218  for i, clause in enumerate(where):
219  if isinstance(clause, string_types): # "raw SQL with no arguments"
220  clause = (clause, )
221 
222  if len(clause) == 1: # ("raw SQL with no arguments", )
223  col, op, val, key = clause[0], "EXPR", [], None
224  elif len(clause) == 2: # ("col", val) or ("col", ("op" or "expr with ?", val))
225  col, op, val, key = parse_members(i, clause[0], "=", clause[1])
226  else: # ("col", "op" or "expr with ?", val)
227  col, op, val, key = parse_members(i, *clause)
228 
229  if "EXPR" == op:
230  for j in range(col.count("?")):
231  col = col.replace("?", "%%(%s_%s)s" % (key, j), 1)
232  args["%s_%s" % (key, j)] = cast(None, val[j])
233  sql += (" AND " if i else "") + "(%s)" % col
234  elif val is None:
235  op = {"=": "IS", "!=": "IS NOT", "<>": "IS NOT"}.get(op, op)
236  sql += (" AND " if i else "") + "%s %s NULL" % (col, op)
237  else:
238  args[key] = val
239  sql += (" AND " if i else "") + "%s %s %%(%s)s" % (col, op, key)
240  if group:
241  sql += " GROUP BY " + group
242  if order:
243  sql += " ORDER BY "
244  for i, col in enumerate(order):
245  name = util.nameify(col[0] if isinstance(col, (list, tuple)) else col, quote, table)
246  sort = col[1] if name != col and isinstance(col, (list, tuple)) and len(col) > 1 \
247  else ""
248  if not isinstance(sort, string_types): sort = "" if sort else "DESC"
249  sql += (", " if i else "") + name + (" " if sort else "") + sort
250  if limit:
251  limit = [None if isinstance(v, integer_types) and v < 0 else v for v in limit]
252  for k, v in zip(("limit", "offset"), limit):
253  if v is None: continue # for k, v
254  sql += " %s %%(%s)s" % (k.upper(), k)
255  args[k] = v
256 
257  logger.log(logging.DEBUG // 2, sql)
258  return sql, args
259 
260 
261  @classmethod
262  def quote(cls, value, force=False):
263  """
264  Returns identifier in quotes and proper-escaped for queries,
265  if value needs quoting (has non-alphanumerics, starts with number, or is reserved).
266 
267  @param value the value to quote, returned as-is if not string
268  @param force whether to quote value even if not required
269  """
270  return quote(value, force)
271 
272 
273  def _adapt_value(self, value, typename):
274  """
275  Returns value as JSON if field is a JSON type and no adapter registered for value type,
276  or original value.
277  """
278  if typename in ("json", "jsonb") and type(value) not in self.ADAPTERS.values():
279  return psycopg2.extras.Json(value, dumps=util.json_dumps)
280  return value
281 
282 
283  def _cast(self, col, val, table=None, tablename=None):
284  """Returns column value cast to correct type for use in psycopg."""
285  col = col.name if isinstance(col, Identifier) else \
286  col if isinstance(col, string_types) else \
287  self._match_name(util.nameify(col, parent=table), tablename)
288  field = tablename in self._structure and self._structure[tablename]["fields"].get(col)
289  if field and "array" == field["type"]: # Values for array fields must be lists
290  return list(val) if isinstance(val, (list, set, tuple)) else [val]
291  elif field and val is not None:
292  val = self._adapt_value(val, field["type"])
293  if isinstance(val, (list, set)): # Sequence parameters for IN etc must be tuples
294  return tuple(val)
295  return val
296 
297 
298  def _column(self, col, sql=False, table=None, tablename=None):
299  """Returns column name from string/property/Identifier, quoted if object and `sql`."""
300  if inspect.isdatadescriptor(col):
301  col = util.nameify(col, self._wrapper(sql=sql, tablename=tablename), table)
302  if isinstance(col, Identifier): return text_type(col) if sql else col.name
303  return col if isinstance(col, string_types) else text_type(col)
304 
305 
306  def _load_schema(self, force=False):
307  """Populates table structure from database if uninitialized or forced."""
308  if self._structure is None or force:
309  self._structure = {} # Avoid recursion on first query
310 
311  self.cursor.factory, factory0 = None, self.cursor.factory # Ensure dict rows
312  try: self._structure.update(query_schema(self, keys=True))
313  finally: self.cursor.factory = factory0
314 
315 
316  def _match_name(self, name, table=None):
317  """
318  Returns proper-cased name from schema lookup, or same value if no match.
319 
320  @parma name name of table/view or field, from data object or property
321  @param table name of table to match field for, if not matching table name
322  """
323  container = self._structure.get(table, {}).get("fields") if table else self._structure
324  if name not in (container or {}): # Check for case differences
325  namelc = name.lower()
326  if namelc in container: # Normal lower-case name present
327  name = namelc
328  elif namelc == name: # Name from data is lowercase, check for single cased
329  variants = [n for n in container if n.lower() == namelc]
330  if len(variants) == 1:
331  name = variants[0]
332  return name
333 
334 
335  def _wrapper(self, column=True, sql=False, tablename=None):
336  """Returns function(name) producing Identifier or SQL-ready name string."""
337  def inner(name):
338  val = Identifier(self._match_name(name, table=tablename if column else None))
339  return text_type(val) if sql else val
340  return inner
341 
342 
344  """
345  Convenience wrapper around psycopg2.ConnectionPool and Cursor.
346 
347  Queries directly on the Database object use autocommit mode.
348  """
349 
350 
351  ADAPTERS = {}
352 
353 
354  CONVERTERS = {}
355 
356 
357  MUTEX = collections.defaultdict(threading.RLock)
358 
359 
360  POOL_SIZE = (1, 4)
361 
362 
363  POOLS = {}
364 
365 
366  ROW_FACTORY = None
367 
368 
369  def __init__(self, opts, **kwargs):
370  """
371  Creates a new Database instance for Postgres.
372 
373  By default uses a pool of 1..4 connections.
374 
375  Connection parameters can also be specified in OS environment,
376  via standard Postgres environment variables like `PGUSER` and `PGPASSWORD`.
377 
378  @param opts Postgres connection string, or options dictionary as
379  `dict(dbname=.., user=.., password=.., host=.., port=.., ..)`
380  @param kwargs additional arguments given to engine constructor,
381  e.g. `minconn=1, maxconn=4`
382  """
383 
384 
385  self.dsn = make_db_url(opts)
386  self._kwargs = kwargs
387  self._cursor = None
388  self._cursorctx = None
389  self._txs = [] # [Transaction, ]
390  self._row_factory = None # None if default, False if explicitly default, or func(cur, row)
391  self._structure = None # Database schema as {table or view name: {"fields": {..}, ..}}
392 
393 
394  def __enter__(self):
395  """Context manager entry, opens database if not already open, returns Database object."""
396  self.open()
397  return self
398 
399 
400  def __exit__(self, exc_type, exc_val, exc_trace):
401  """Context manager exit, closes database and any pending transactions if open."""
402  txs, self._txs[:] = self._txs[:], []
403  for tx in txs: tx.close(commit=None if exc_type is None else False)
404  self.close()
405  return exc_type is None
406 
407 
408  def execute(self, sql, args=()):
409  """
410  Executes SQL statement, returns psycopg cursor.
411 
412  @param sql SQL statement to execute, with psycopg-specific parameter bindings, if any
413  @param args dictionary for %(name)s placeholders,
414  or a sequence for positional %s placeholders, or None
415  """
416  if not self._cursor: raise RuntimeError("Database not open.")
417  self._cursor.execute(sql, args or None)
418  return self._cursor
419 
420 
421  def executemany(self, sql, args):
422  """
423  Executes the SQL statement against all parameter sequences.
424 
425  @param sql SQL statement to execute, with psycopg-specific parameter bindings
426  @param args iterable of query parameters, as dictionaries for %(name)s placeholders
427  or sequences for positional %s placeholders
428  """
429  if not self._cursor: raise RuntimeError("Database not open.")
430  psycopg2.extras.execute_batch(self._cursor, sql, list(args))
431 
432 
433  def executescript(self, sql):
434  """
435  Executes the SQL as script of any number of statements.
436 
437  Reloads internal schema structure from database.
438 
439  @param sql script with one or more SQL statements
440  """
441  cursor = self.execute(sql)
442  self._structure = None # Clear database schema to force reload on next query
443  return cursor
444 
445 
446  def open(self):
447  """Opens database connection if not already open."""
448  if self._cursor: return
449  self.init_pool(self, **self._kwargs)
451  self._cursorctx = self.make_cursor(autocommit=True)
452  self._cursor = self._cursorctx.__enter__()
453 
454 
455  def close(self, commit=None):
456  """
457  Closes the database and any pending transactions, if open.
458 
459  @param commit `True` for explicit commit on open transactions,
460  `False` for explicit rollback on open transactions,
461  `None` defaults to `commit` flag from transaction creations
462  """
463  txs, self._txs[:] = self._txs[:], []
464  for tx in txs: tx.close(commit)
465  if self._cursor:
466  self._cursorctx.__exit__(None, None, None)
467  self._cursor = None
468  self._cursorctx = None
469  self.MUTEX.pop(self, None)
470  pool = self.POOLS.pop(self, None)
471  if pool: pool.closeall()
472 
473 
474  @property
475  def closed(self):
476  """Whether database connection is currently not open."""
477  return not self._cursor
478 
479 
480  @property
481  def cursor(self):
482  """Database engine cursor object, or `None` if closed."""
483  return self._cursor
484 
485 
486  @property
487  def row_factory(self):
488  """The custom row factory, if any, as `function(cursor, row tuple)`."""
489  return None if self._row_factory in (False, None) else self._row_factory
490 
491 
492  @row_factory.setter
493  def row_factory(self, row_factory):
494  """
495  Sets custom row factory, as `function(cursor, row tuple)`, or `None` to reset to default.
496 
497  `cursor.description` is a sequence of 7-element tuples,
498  as `(name, type_code, display_size, internal_size, precision, scale, null_ok)`.
499  """
500  self._row_factory = False if row_factory is None else row_factory
501  if self._cursor: self._cursor.factory = row_factory
502 
503 
504  def transaction(self, commit=True, exclusive=False, **kwargs):
505  """
506  Returns a transaction context manager.
507 
508  Context is breakable by raising Rollback.
509 
510  @param commit whether transaction commits at exiting with-block
511  @param exclusive whether entering a with-block is exclusive
512  over other Transaction instances on this Database
513  @param kwargs engine-specific arguments, like `schema="other", lazy=True` for Postgres
514  """
515  tx = Transaction(self, commit, exclusive, **kwargs)
516  self._txs.append(tx)
517  return tx
518 
519 
520  @contextmanager
521  def make_cursor(self, commit=False, autocommit=False, schema=None, lazy=False, itersize=None):
522  """
523  Context manager for psycopg connection cursor.
524  Creates a new cursor on an unused connection and closes it when exiting
525  context, committing changes if specified.
526 
527  @param commit commit at the end on success
528  @param autocommit connection autocommit mode
529  @param schema name of Postgres schema to use, if not using default `"public"`
530  @param lazy if true, returns a named server-side cursor that fetches rows
531  iteratively in batches; only supports making a single query
532  @param itersize batch size in rows for server-side cursor
533  @return psycopg2 Cursor
534  """
535  connection = self.POOLS[self].getconn()
536  try:
537  connection.autocommit = autocommit
538  cursor, namedcursor = None, None
539  if "public" == schema: schema = None # Default, no need to set
540 
541  # If using schema, schema tables are queried first, fallback to public.
542  # Need two cursors if schema+lazy, as named cursor only does one query.
543  if schema or not lazy: cursor = connection.cursor()
544  if schema: cursor.execute('SET search_path TO %s,public' % quote(schema))
545  if lazy: namedcursor = connection.cursor("name_%s" % id(connection))
546  if lazy and itersize is not None: namedcursor.itersize = itersize
547 
548  try:
549  yield namedcursor or cursor
550  if commit: connection.commit()
551  except GeneratorExit: pass # Caller consumed nothing
552  except Exception as e:
553  if not isinstance(e, api.Rollback):
554  logger.exception("SQL error on %s:", (namedcursor or cursor).query)
555  raise
556  finally:
557  connection.rollback() # If not already committed, must rollback here
558  try: namedcursor and namedcursor.close()
559  except Exception: pass
560  if schema: # Restore default search path on this connection
561  cursor.execute("SET search_path TO public")
562  connection.commit()
563  if cursor: cursor.close()
564  finally: self.POOLS[self].putconn(connection)
565 
566 
567  @classmethod
568  def init_pool(cls, db, minconn=POOL_SIZE[0], maxconn=POOL_SIZE[1], **kwargs):
569  """Initializes connection pool for Database if not already initialized."""
570  with cls.MUTEX[db]:
571  if db in cls.POOLS: return
572 
573  args = minconn, maxconn, db.dsn
574  kwargs.update(cursor_factory=db._cursor_factory)
575  cls.POOLS[db] = psycopg2.pool.ThreadedConnectionPool(*args, **kwargs)
576 
577 
578  def _apply_converters(self):
579  """Applies registered converters, if any, looking up type OIDs on live cursor."""
580  if not self.CONVERTERS: return
581 
582  regs, self.CONVERTERS = dict(self.CONVERTERS), {}
583  with self.make_cursor() as cursor:
584  for typename, transformer in regs.items():
585  cursor.execute("SELECT NULL::%s" % typename)
586  oid = cursor.description[0][1] # description is [(name, type_code, ..)]
587  wrap = lambda x, c, f=transformer: f(x) # psycopg invokes callback(value, cursor)
588  TYPE = psycopg2.extensions.new_type((oid, ), typename, wrap)
589  psycopg2.extensions.register_type(TYPE)
590 
591 
592  def _cursor_factory(self, *args, **kwargs):
593  """Returns a new RowFactoryCursor."""
594  factory = self._row_factory
595  if factory in (False, None): factory = None if factory is False else self.ROW_FACTORY
596  return RowFactoryCursor(factory, *args, **kwargs)
597 
598 
599  def _notify(self, tx):
600  """Notifies database of transaction closing."""
601  if tx in self._txs: self._txs.remove(tx)
602 
603 
604 
606  """
607  Transaction context manager, provides convenience methods for queries.
608 
609  Supports server-side cursors; those can only be used for making a single query.
610 
611  Must be closed explicitly if not used as context manager in a with-block.
612  Block can be exited early by raising Rollback.
613  """
614 
615  def __init__(self, db, commit=True, exclusive=False,
616  schema=None, lazy=False, itersize=None, **__):
617  """
618  Creates a transaction context manager.
619 
620  Context is breakable by raising Rollback.
621 
622  @param db Database instance
623  @param commit whether transaction commits automatically at exiting with-block
624  @param exclusive whether entering a with-block is exclusive over other
625  Transaction instances Database
626  @param schema search_path to use in this transaction
627  @param lazy if true, uses a server-side cursor to fetch results from server
628  iteratively in batches instead of all at once,
629  supports one single query only
630  @param itersize batch size for server-side cursor (defaults to 2000 rows)
631  """
632  self._db = db
633  self._lazy = lazy
634  self._cursor = None
635  self._cursorctx = db.make_cursor(commit, schema=schema, lazy=lazy, itersize=itersize)
636  self._exclusive = exclusive
637  self._exitcommit = commit
638  self._enterstack = 0 # Number of levels the transaction context is nested at
639  self._structure = None # Database schema as {table or view name: {"fields": {..}, ..}}
640 
641  def __enter__(self):
642  """Context manager entry, opens cursor, returns Transaction object."""
643  if self.closed: raise RuntimeError("Transaction already closed")
644 
645  if self._exclusive: Database.MUTEX[self._db].acquire()
646  try:
647  if not self._cursor: self._cursor = self._cursorctx.__enter__()
648  self._enterstack += 1
649  return self
650  except Exception:
651  if self._exclusive: Database.MUTEX[self._db].release()
652  raise
653 
654  def __exit__(self, exc_type, exc_val, exc_trace):
655  """Context manager exit, closes cursor, commits or rolls back as specified on creation."""
656  depth = self._enterstack = self._enterstack - 1
657  try:
658  if self._cursor and depth < 1: # Last level: close properly
659  self._cursorctx.__exit__(exc_type, exc_val, exc_trace)
660  elif self._cursor: # Still some depth: intermediary commit/rollback
661  self.commit() if self._exitcommit and exc_type is None else self.rollback()
662  return exc_type in (None, api.Rollback)
663  finally:
664  if depth < 1:
665  self._cursor = None
666  self._cursorctx = None
667  self._db._notify(self)
668  if self._exclusive: Database.MUTEX[self._db].release()
669 
670  def close(self, commit=None):
671  """
672  Closes the transaction, performing commit or rollback as specified,
673  and releases database connection back to connection pool.
674  Required if not using transaction as context manager in a with-block.
675 
676  @param commit `True` for explicit commit, `False` for explicit rollback,
677  `None` defaults to `commit` flag from creation
678  """
679  if not self._cursor:
680  self._db._notify(self)
681  return
682  if commit is False: self.rollback()
683  elif commit: self.commit()
684  try: self._cursorctx.__exit__(None, None, None)
685  finally:
686  self._cursor = None
687  self._cursorctx = None
688  self._db._notify(self)
689 
690  def execute(self, sql, args=()):
691  """
692  Executes SQL statement, returns psycopg cursor.
693 
694  @param sql SQL statement to execute, with psycopg-specific parameter bindings, if any
695  @param args dictionary for %(name)s placeholders,
696  or a sequence for positional %s placeholders, or None
697  """
698  if self.closed: raise RuntimeError("Transaction already closed")
699  if not self._cursor: self._cursor = self._cursorctx.__enter__()
700  self._cursor.execute(sql, args or None)
701  return self._cursor
702 
703  def executemany(self, sql, args):
704  """
705  Executes the SQL statement against all parameter sequences.
706 
707  @param sql SQL statement to execute, with psycopg-specific parameter bindings
708  @param args iterable of query parameters, as dictionaries for %(name)s placeholders
709  or sequences for positional %s placeholders
710  """
711  if self.closed: raise RuntimeError("Transaction already closed")
712  if not self._cursor: self._cursor = self._cursorctx.__enter__()
713  psycopg2.extras.execute_batch(self._cursor, sql, list(args))
714 
715  def executescript(self, sql):
716  """
717  Executes the SQL as script of any number of statements.
718 
719  Reloads internal schema structure from database.
720 
721  @param sql script with one or more SQL statements
722  """
723  cursor = self.execute(sql)
724  self._structure = None # Clear database schema to force reload on next query
725  return cursor
726 
727  def commit(self):
728  """Commits pending actions, if any."""
729  if self._cursor: self._cursor.connection.commit()
730 
731  def rollback(self):
732  """Rolls back pending actions, if any."""
733  if self._cursor: self._cursor.connection.rollback()
734 
735  @property
736  def closed(self):
737  """Whether transaction is currently not open."""
738  return not self._cursorctx
739 
740  @property
741  def cursor(self):
742  """Database engine cursor object, or `None` if closed."""
743  if not self._cursorctx: return None
744  if not self._cursor: self._cursor = self._cursorctx.__enter__()
745  return self._cursor
746 
747  @property
748  def database(self):
749  """Returns transaction Database instance."""
750  return self._db
751 
752  def _load_schema(self, force=False):
753  """
754  Populates database table structure from database if uninitialized or forced.
755 
756  Uses parent Database for lookup if lazy cursor.
757  """
758  if self._lazy: return self._db._load_schema(force=force)
759  return super(Transaction, self)._load_schema(force=force)
760 
761 
762 
763 class RowFactoryCursor(psycopg2.extensions.cursor):
764  """A cursor that generates result rows via given factory callable."""
765 
766  def __init__(self, row_factory, *args, **kwargs):
767  self.factory = row_factory
768  self.rowtype = dict if sys.version_info > (3, ) else collections.OrderedDict
769  super(RowFactoryCursor, self).__init__(*args, **kwargs)
770 
771  def fetchone(self):
772  row = super(RowFactoryCursor, self).fetchone()
773  return row if row is None else self.row_factory(row)
774 
775  def fetchmany(self, size=None):
776  rows = super(RowFactoryCursor, self).fetchmany(size)
777  return [self.row_factory(row) for row in rows]
778 
779  def fetchall(self):
780  rows = super(RowFactoryCursor, self).fetchall()
781  return [self.row_factory(row) for row in rows]
782 
783  def __next__(self): return self.row_factory(super(RowFactoryCursor, self).__next__())
784  def next(self): return self.__next__()
785 
786  def row_factory(self, row):
787  """Returns value constructed with custom row factory, or as dict if no factory."""
788  if self.factory not in (False, None): return self.factory(self, row)
789  return self.rowtype(zip([x[0] for x in self.description], row))
790 
791 
792 
793 def autodetect(opts):
794  """
795  Returns true if input is recognizable as Postgres connection options.
796 
797  @param opts expected as URL string `"postgresql://user@localhost/mydb"`
798  or keyword=value format string like `"host=localhost dbname=.."`
799  or a dictionary of `dict(host="localhost", dbname=..)`
800  """
801  if not isinstance(opts, string_types + (dict, )): return False
802  if isinstance(opts, dict):
803  try: return bool(psycopg2.extensions.make_dsn(**opts) or True) # "{}" returns ""
804  except Exception: return False
805  try: return bool(psycopg2.extensions.parse_dsn(opts) or True) # "postgresql://" returns {}
806  except Exception: return False
807 
808 
809 def make_db_url(opts):
810  """Returns Postgres connection options as URL, like `"postgresql://host/dbname"`."""
811  BASICS = collections.OrderedDict([("user", ""), ("password", ":"), ("host", ""),
812  ("port", ":"), ("dbname", "/")])
813  result, creds = "", False
814  if isinstance(opts, string_types):
815  opts = psycopg2.extensions.parse_dsn(opts)
816  for i, (k, prefix) in enumerate(BASICS.items()):
817  if creds and i > 1: result, creds = result + "@", False # Either user or password set
818  if opts.get(k) is not None:
819  result, creds = result + prefix + "%%(%s)s" % k, (i < 2)
820  result %= {k : urllib_parse.quote(text_type(opts[k])) for k in opts}
821  if any(k not in BASICS for k in opts):
822  result += "/" if opts.get("dbname") is None else ""
823  result += "?" + urllib_parse.urlencode({k: opts[k] for k in opts if k not in BASICS})
824  return "postgresql://" + result
825 
826 
827 def query_schema(queryable, keys=False, views=False, inheritance=False):
828  """
829  Returns database table structure populated from given database.
830 
831  @param queryable Database or Transaction instance
832  @param views whether to include views
833  @param keys whether to include primary and foreign key information
834  @param inheritance whether to include parent-child table information
835  and populate inherited foreign keys
836  @return ```{table or view name: {
837  "fields": OrderedDict({
838  column name: {
839  "name": column name,
840  "type": column type name,
841  ?"pk": True,
842  ?"fk": foreign table name,
843  }
844  }),
845  ?"key": primary key column name,
846  ?"parent": parent table name,
847  ?"children": [child table name, ],
848  "type": "table" or "view",
849  }
850  }```
851  """
852  result = {}
853 
854  # Retrieve column names
855  for v in queryable.fetchall("information_schema.columns", table_schema="public",
856  order="table_name, ordinal_position"):
857  t, c, d = v["table_name"], v["column_name"], v["data_type"]
858  if t not in result: result[t] = {"type": "table",
859  "fields": collections.OrderedDict()}
860  result[t]["fields"][c] = {"name": c, "type": d.lower()}
861 
862  # Retrieve primary and foreign keys
863  for v in queryable.fetchall(
864  "information_schema.table_constraints tc "
865  "JOIN information_schema.key_column_usage kcu "
866  "ON tc.constraint_name = kcu.constraint_name "
867  "JOIN information_schema.constraint_column_usage ccu "
868  "ON ccu.constraint_name = tc.constraint_name ",
869  cols="DISTINCT tc.table_name, kcu.column_name, tc.constraint_type, "
870  "ccu.table_name AS table_name2", where={"tc.table_schema": "public"}
871  ) if keys else ():
872  t, c, t2 = v["table_name"], v["column_name"], v["table_name2"]
873  if "PRIMARY KEY" == v["constraint_type"]:
874  result[t]["fields"][c]["pk"], result[t]["key"] = True, c
875  else: result[t]["fields"][c]["fk"] = t2
876 
877  # Retrieve inheritance information, copy foreign key flags from parent
878  for v in queryable.fetchall(
879  "information_schema.pg_inherits i JOIN information_schema.pg_class c ON inhrelid=c.oid "
880  "JOIN information_schema.pg_class p ON inhparent = p.oid "
881  "JOIN information_schema.pg_namespace pn ON pn.oid = p.relnamespace "
882  "JOIN information_schema.pg_namespace cn "
883  "ON cn.oid = c.relnamespace AND cn.nspname = pn.nspname",
884  cols="c.relname AS child, p.relname AS parent",
885  where={"pn.nspname": "public"}
886  ) if inheritance else ():
887  result[v["parent"]].setdefault("children", []).append(v["child"])
888  result[v["child"]]["parent"] = v["parent"]
889  for f, opts in result[v["parent"]]["fields"].items() if keys else ():
890  if not opts.get("fk"): continue # for f, opts
891  result[v["child"]]["fields"][f]["fk"] = opts["fk"]
892 
893  # Retrieve view column names
894  for v in queryable.fetchall(
895  "information_schema.pg_attribute a "
896  "JOIN information_schema.pg_class c ON a.attrelid = c.oid "
897  "JOIN information_schema.pg_namespace s ON c.relnamespace = s.oid "
898  "JOIN information_schema.pg_type t ON a.atttypid = t.oid "
899  "JOIN information_schema.pg_proc p ON t.typname = p.proname ",
900  cols="DISTINCT c.relname, a.attname, pg_get_function_result(p.oid) AS data_type",
901  where={"a.attnum": (">", 0), "a.attisdropped": False,
902  "s.nspname": "public", "c.relkind": ("IN", ("v", "m"))}
903  ) if views else ():
904  t, c, d = v["relname"], v["attname"], v["data_type"]
905  if t not in result: result[t] = {"type": "view",
906  "fields": collections.OrderedDict()}
907  result[t]["fields"][c] = {"name": c, "type": d.lower()}
908 
909  return result
910 
911 
912 def quote(value, force=False):
913  """
914  Returns identifier in quotes and proper-escaped for queries,
915  if value needs quoting (has non-alphanumerics, starts with number, or is reserved).
916 
917  @param value the value to quote, returned as-is if not string
918  @param force whether to quote value even if not required
919  """
920  if not isinstance(value, string_types):
921  return value
922  RGX_INVALID, RGX_UNICODE = r"(^[\W\d])|(?=\W)", r"[^\x01-\x7E]"
923  result = value.decode() if isinstance(value, binary_type) else value
924  if force or result.upper() in RESERVED_KEYWORDS or re.search(RGX_INVALID, result):
925  if re.search(RGX_UNICODE, value): # Convert to Unicode escape U&"\+ABCDEF"
926  result = result.replace("\\", r"\\").replace('"', '""')
927  result = 'U&"%s"' % re.sub(RGX_UNICODE, lambda m: r"\+%06X" % ord(m.group(0)), value)
928  else:
929  result = '"%s"' % result.replace('"', '""')
930  return result
931 
932 
933 def register_adapter(transformer, typeclasses):
934  """Registers function to auto-adapt given Python types to Postgres types in query parameters."""
935  def adapt(x):
936  """Wraps transformed value in psycopg protocol object."""
937  v = transformer(x)
938  return psycopg2.extensions.AsIs(v if isinstance(v, binary_type) else text_type(v).encode())
939 
940  for t in typeclasses:
941  psycopg2.extensions.register_adapter(t, adapt)
942  Database.ADAPTERS[t] = transformer
943 
944 
945 def register_converter(transformer, typenames):
946  """Registers function to auto-convert given Postgres types to Python types in query results."""
947  typenames = [n.upper() for n in typenames]
948  if "JSON" in typenames:
949  psycopg2.extras.register_default_json(globally=True, loads=transformer)
950  if "JSONB" in typenames:
951  psycopg2.extras.register_default_jsonb(globally=True, loads=transformer)
952  Database.CONVERTERS.update({n: transformer for n in typenames if n not in ("JSON", "JSONB")})
953 
954 
955 def register_row_factory(row_factory):
956  """Registers custom row factory, as or `None` to reset to default."""
957  Database.ROW_FACTORY = row_factory
958 
959 
960 
961 if psycopg2:
962  try:
963  psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
964  psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
965  except Exception: logger.exception("Error configuring psycopg.")
966 
967 
968 __all__ = [
969  "RESERVED_KEYWORDS", "Database", "Transaction",
970  "autodetect", "quote", "register_adapter", "register_converter", "register_row_factory",
971 ]
dblite.engines.postgres.Queryable
Definition: postgres.py:70
dblite.engines.postgres.Queryable._structure
_structure
Definition: postgres.py:317
dblite.engines.postgres.Database._kwargs
_kwargs
Definition: postgres.py:398
dblite.engines.postgres.Database.CONVERTERS
dictionary CONVERTERS
Registered converters for SQL->Python pending application, as {typename: converter}.
Definition: postgres.py:367
dblite.engines.postgres.Transaction._exitcommit
_exitcommit
Definition: postgres.py:657
dblite.engines.postgres.Queryable.insertmany
def insertmany(self, table, rows=(), **kwargs)
Convenience wrapper for database multiple INSERTs, returns list of inserted row IDs.
Definition: postgres.py:97
dblite.engines.postgres.Identifier.name
name
Definition: postgres.py:56
dblite.engines.postgres.Queryable._column
def _column(self, col, sql=False, table=None, tablename=None)
Returns column name from string/property/Identifier, quoted if object and `sql`.
Definition: postgres.py:305
dblite.api.Transaction.rollback
def rollback(self)
Rolls back pending actions, if any.
Definition: api.py:491
dblite.api.Queryable.fetchone
def fetchone(self, table, cols="*", where=(), group=(), order=(), limit=(), **kwargs)
Convenience wrapper for database SELECT and fetch one.
Definition: api.py:256
dblite.engines.postgres.Queryable.quote
def quote(cls, value, force=False)
Returns identifier in quotes and proper-escaped for queries, if value needs quoting (has non-alphanum...
Definition: postgres.py:269
dblite.engines.postgres.register_row_factory
def register_row_factory(row_factory)
Registers custom row factory, as or `None` to reset to default.
Definition: postgres.py:979
dblite.engines.postgres.Transaction.rollback
def rollback(self)
Rolls back pending actions, if any.
Definition: postgres.py:749
dblite.engines.postgres.Transaction.__init__
def __init__(self, db, commit=True, exclusive=False, schema=None, lazy=False, itersize=None, **__)
Creates a transaction context manager.
Definition: postgres.py:651
dblite.engines.postgres.Queryable._adapt_value
def _adapt_value(self, value, typename)
Returns value as JSON if field is a JSON type and no adapter registered for value type,...
Definition: postgres.py:279
dblite.engines.postgres.Queryable.insert
def insert(self, table, values=(), **kwargs)
Convenience wrapper for database INSERT, returns inserted row ID.
Definition: postgres.py:86
dblite.engines.postgres.RowFactoryCursor
A cursor that generates result rows via given factory callable.
Definition: postgres.py:789
dblite.engines.postgres.Database.close
def close(self, commit=None)
Closes the database and any pending transactions, if open.
Definition: postgres.py:471
dblite.engines.postgres.Database.cursor
cursor
Database engine cursor object, or `None` if closed.
Definition: postgres.py:493
dblite.engines.postgres.Database.__init__
def __init__(self, opts, **kwargs)
Creates a new Database instance for Postgres.
Definition: postgres.py:394
dblite.engines.postgres.Identifier.__str__
def __str__(self)
Returns quoted name.
Definition: postgres.py:62
dblite.engines.postgres.Transaction.database
database
Returns transaction Database instance.
Definition: postgres.py:771
dblite.engines.postgres.Transaction._lazy
_lazy
Definition: postgres.py:653
dblite.engines.postgres.Database.execute
def execute(self, sql, args=())
Executes SQL statement, returns psycopg cursor.
Definition: postgres.py:426
dblite.engines.postgres.Database.dsn
dsn
Data Source Name, as URL like `"postgresql://user@host/dbname"`.
Definition: postgres.py:397
dblite.engines.postgres.Queryable.OPS
tuple OPS
Recognized binary operators for makeSQL()
Definition: postgres.py:73
dblite.engines.postgres.Transaction.commit
def commit(self)
Commits pending actions, if any.
Definition: postgres.py:745
dblite.engines.postgres.autodetect
def autodetect(opts)
Returns true if input is recognizable as Postgres connection options.
Definition: postgres.py:825
dblite.engines.postgres.Transaction._db
_db
Definition: postgres.py:652
dblite.engines.postgres.quote
def quote(value, force=False)
Returns identifier in quotes and proper-escaped for queries, if value needs quoting (has non-alphanum...
Definition: postgres.py:943
dblite.engines.postgres.Database._txs
_txs
Definition: postgres.py:401
dblite.engines.postgres.Transaction.executescript
def executescript(self, sql)
Executes the SQL as script of any number of statements.
Definition: postgres.py:740
dblite.engines.postgres.Transaction
Transaction context manager, provides convenience methods for queries.
Definition: postgres.py:634
dblite.engines.postgres.RowFactoryCursor.rowtype
rowtype
Definition: postgres.py:794
dblite.api.Rollback
Definition: api.py:507
dblite.engines.postgres.Transaction.__enter__
def __enter__(self)
Context manager entry, opens cursor, returns Transaction object.
Definition: postgres.py:662
dblite.api.Queryable.closed
closed
Whether currently not open.
Definition: api.py:345
dblite.api.Database
Database instance.
Definition: api.py:379
dblite.engines.postgres.Transaction._exclusive
_exclusive
Definition: postgres.py:656
dblite.api.Database.open
def open(self)
Opens database connection if not already open.
Definition: api.py:409
dblite.engines.postgres.Transaction.executemany
def executemany(self, sql, args)
Executes the SQL statement against all parameter sequences.
Definition: postgres.py:728
dblite.engines.postgres.Queryable._cast
def _cast(self, col, val, table=None, tablename=None)
Returns column value cast to correct type for use in psycopg.
Definition: postgres.py:288
dblite.engines.postgres.Database._row_factory
_row_factory
Definition: postgres.py:402
dblite.engines.postgres.Transaction.cursor
cursor
Database engine cursor object, or `None` if closed.
Definition: postgres.py:762
dblite.api.Queryable.fetchall
def fetchall(self, table, cols="*", where=(), group=(), order=(), limit=(), **kwargs)
Convenience wrapper for database SELECT and fetch all.
Definition: api.py:248
dblite.engines.postgres.Database.row_factory
row_factory
The custom row factory, if any, as `function(cursor, row tuple)`.
Definition: postgres.py:501
dblite.engines.postgres.Database
Convenience wrapper around psycopg2.ConnectionPool and Cursor.
Definition: postgres.py:361
dblite.engines.postgres.Queryable._load_schema
def _load_schema(self, force=False)
Populates table structure from database if uninitialized or forced.
Definition: postgres.py:315
dblite.engines.postgres.Transaction.__exit__
def __exit__(self, exc_type, exc_val, exc_trace)
Context manager exit, closes cursor, commits or rolls back as specified on creation.
Definition: postgres.py:675
dblite.engines.postgres.Database._cursorctx
_cursorctx
Definition: postgres.py:400
dblite.api.Queryable.cursor
cursor
Database engine cursor object, or `None` if closed.
Definition: api.py:353
dblite.engines.postgres.Identifier.__eq__
def __eq__(self, other)
Supports comparing to strings or other Identifier instances.
Definition: postgres.py:58
dblite.api.Database.close
def close(self, commit=None)
Closes the database and any pending transactions, if open.
Definition: api.py:419
dblite.engines.postgres.Identifier
Wrapper for table and column names from data objects.
Definition: postgres.py:54
dblite.engines.postgres.Database.transaction
def transaction(self, commit=True, exclusive=False, **kwargs)
Returns a transaction context manager.
Definition: postgres.py:530
dblite.engines.postgres.Database.make_cursor
def make_cursor(self, commit=False, autocommit=False, schema=None, lazy=False, itersize=None)
Context manager for psycopg connection cursor.
Definition: postgres.py:549
dblite.engines.postgres.Database.POOLS
dictionary POOLS
Connection pools, as {Database: psycopg2.pool.ConnectionPool}.
Definition: postgres.py:376
dblite.engines.postgres.Queryable._wrapper
def _wrapper(self, column=True, sql=False, tablename=None)
Returns function(name) producing Identifier or SQL-ready name string.
Definition: postgres.py:349
dblite.api.Queryable.makeSQL
def makeSQL(self, action, table, cols="*", where=(), group=(), order=(), limit=(), values=(), kwargs=None)
Returns (SQL statement string, parameter dict).
Definition: api.py:337
dblite.engines.postgres.Database.closed
closed
Whether database connection is currently not open.
Definition: postgres.py:485
dblite.engines.postgres.Database.__enter__
def __enter__(self)
Context manager entry, opens database if not already open, returns Database object.
Definition: postgres.py:406
dblite.engines.postgres.register_converter
def register_converter(transformer, typenames)
Registers function to auto-convert given Postgres types to Python types in query results.
Definition: postgres.py:969
dblite.engines.postgres.make_db_url
def make_db_url(opts)
Returns Postgres connection options as URL, like `"postgresql://host/dbname"`.
Definition: postgres.py:834
dblite.engines.postgres.Transaction._cursorctx
_cursorctx
Definition: postgres.py:655
dblite.api.Queryable.execute
def execute(self, sql, args=())
Executes the SQL statement and returns database cursor.
Definition: api.py:311
dblite.api.Queryable.update
def update(self, table, values, where=(), **kwargs)
Convenience wrapper for database UPDATE, returns affected row count.
Definition: api.py:291
dblite.engines.postgres.Transaction.closed
closed
Whether transaction is currently not open.
Definition: postgres.py:755
dblite.api.Transaction.commit
def commit(self)
Commits pending actions, if any.
Definition: api.py:487
dblite.engines.postgres.Identifier.__init__
def __init__(self, name)
Definition: postgres.py:56
dblite.engines.postgres.Queryable.makeSQL
def makeSQL(self, action, table, cols="*", where=(), group=(), order=(), limit=(), values=(), kwargs=None)
Returns (SQL statement string, parameter dict).
Definition: postgres.py:134
dblite.engines.postgres.Database.__exit__
def __exit__(self, exc_type, exc_val, exc_trace)
Context manager exit, closes database and any pending transactions if open.
Definition: postgres.py:412
dblite.engines.postgres.Transaction.close
def close(self, commit=None)
Closes the transaction, performing commit or rollback as specified, and releases database connection ...
Definition: postgres.py:698
dblite.api.Transaction
Transaction context manager, breakable by raising Rollback.
Definition: api.py:450
dblite.engines.postgres.Identifier.quote
def quote(name)
Returns quoted name.
Definition: postgres.py:66
dblite.engines.postgres.Database.executescript
def executescript(self, sql)
Executes the SQL as script of any number of statements.
Definition: postgres.py:450
dblite.engines.postgres.Transaction.execute
def execute(self, sql, args=())
Executes SQL statement, returns psycopg cursor.
Definition: postgres.py:716
dblite.engines.postgres.Database._apply_converters
def _apply_converters(self)
Applies registered converters, if any, looking up type OIDs on live cursor.
Definition: postgres.py:596
dblite.engines.postgres.Transaction._cursor
_cursor
Definition: postgres.py:654
dblite.api.Queryable
Abstract base for Database and Transaction.
Definition: api.py:237
dblite.engines.postgres.query_schema
def query_schema(queryable, keys=False, views=False, inheritance=False)
Returns database table structure populated from given database.
Definition: postgres.py:875
dblite.engines.postgres.Transaction._enterstack
_enterstack
Definition: postgres.py:658
dblite.engines.postgres.Database.open
def open(self)
Opens database connection if not already open.
Definition: postgres.py:456
dblite.engines.postgres.Queryable._match_name
def _match_name(self, name, table=None)
Returns proper-cased name from schema lookup, or same value if no match.
Definition: postgres.py:333
dblite.engines.postgres.Database.MUTEX
MUTEX
Mutexes for exclusive transactions, as {Database instance: lock}.
Definition: postgres.py:370
dblite.engines.postgres.Database.ROW_FACTORY
ROW_FACTORY
Registered row factory.
Definition: postgres.py:379
dblite.engines.postgres.register_adapter
def register_adapter(transformer, typeclasses)
Registers function to auto-adapt given Python types to Postgres types in query parameters.
Definition: postgres.py:957
dblite.engines.postgres.Database.executemany
def executemany(self, sql, args)
Executes the SQL statement against all parameter sequences.
Definition: postgres.py:438
dblite.engines.postgres.Database.init_pool
def init_pool(cls, db, minconn=POOL_SIZE[0], maxconn=POOL_SIZE[1], **kwargs)
Initializes connection pool for Database if not already initialized.
Definition: postgres.py:583
dblite.engines.postgres.Database._cursor
_cursor
Definition: postgres.py:399