dblite  1.3.0
Simple query interface for SQL databases
util.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 Utility classes and functions.
4 
5 ------------------------------------------------------------------------------
6 This file is part of dblite - simple query interface for SQL databases.
7 Released under the MIT License.
8 
9 @author Erki Suurjaak
10 @created 28.11.2022
11 @modified 23.03.2023
12 ------------------------------------------------------------------------------
13 """
14 import collections
15 import datetime
16 import decimal
17 import glob
18 import importlib
19 import inspect
20 import json
21 import logging
22 import os
23 import re
24 
25 import six
26 
27 logger = logging.getLogger(__name__)
28 
29 
30 class StaticTzInfo(datetime.tzinfo):
31  """datetime.tzinfo class representing a constant offset from UTC."""
32  ZERO = datetime.timedelta(0)
33 
34  def __init__(self, name, delta):
35  """Constructs a new static zone info, with specified name and time delta."""
36  self._name = name
37  self._offset = delta
38 
39  def utcoffset(self, dt): return self._offset
40  def dst(self, dt): return self.ZERO
41  def tzname(self, dt): return self._name
42  def __ne__(self, other): return not self.__eq__(other)
43  def __repr__(self): return "%s(%s)" % (self.__class__.__name__, self._name)
44  def __eq__(self, other):
45  return isinstance(other, self.__class__) and self._offset == other._offset
46 
47 UTC = StaticTzInfo("UTC", StaticTzInfo.ZERO)
48 
49 
50 
51 def factory(ctor, data):
52  """
53  Returns object constructed with data dictionary.
54 
55  @param ctor callable like a class, declared args are matched case-insensitively
56  for positional arguments if keyword argument invocation fails
57  @param data data dictionary with string keys
58  @return (result, [error strings])
59  """
60  errors = []
61  try: return ctor(**data), [] # Constructed with keyword args as data keys-values
62  except Exception as e: errors.append(e)
63  try: return ctor(*data.values()), [] # Constructed with positional args as data values
64  except Exception as e: errors.append(e)
65  try: return ctor(data), [] # Constructed with data as single arg
66  except Exception as e: errors.append(e)
67  if is_namedtuple(ctor): # Populate any missing fields with None
68  try: return ctor(**dict({k: None for k in ctor._fields}, **data)), []
69  except Exception as e: errors.append(e)
70  try: return ctor(*map(data.get, ctor._fields)), []
71  except Exception as e: errors.append(e)
72  return data, errors
73 
74 
75 def is_dataobject(obj):
76  """Returns whether input is a data object: namedtuple, or has attributes or slots."""
77  if is_namedtuple(obj):
78  return True # collections.namedtuple
79  if getattr(obj, "__slots__", None):
80  return True # __slots__
81  if any(isinstance(v, property) for _, v in inspect.getmembers(type(obj))):
82  return True # Declared properties
83  if getattr(obj, "__dict__", None):
84  return True # Plain object
85  return False
86 
87 
88 def is_namedtuple(obj):
89  """Returns whether input is a namedtuple class or instance."""
90  return (isinstance(obj, tuple) or inspect.isclass(obj) and issubclass(obj, tuple)) \
91  and hasattr(obj, "_asdict") and hasattr(obj, "_fields")
92 
93 
94 def json_dumps(data, indent=2, sort_keys=True):
95  """
96  Returns JSON string, with datetime types converted to ISO-8601 strings
97  (in UTC if no timezone set), sets converted to lists,
98  and Decimal objects converted to float or int. Returns None if data is None.
99  """
100  if data is None: return None
101  def encoder(x):
102  if isinstance(x, set): return list(x)
103  if isinstance(x, (datetime.datetime, datetime.date, datetime.time)):
104  if x.tzinfo is None: x = x.replace(tzinfo=UTC)
105  return x.isoformat()
106  if isinstance(x, decimal.Decimal):
107  return float(x) if x.as_tuple().exponent else int(x)
108  return None
109  return json.dumps(data, default=encoder, indent=indent, sort_keys=sort_keys)
110 
111 
112 def json_loads(s):
113  """
114  Returns deserialized JSON, with datetime/date strings converted to objects.
115 
116  Returns original input if loading as JSON failed.
117  """
118  def convert_recursive(data):
119  """Converts ISO datetime strings to objects in nested dicts or lists."""
120  result = []
121  pairs = enumerate(data) if isinstance(data, list) \
122  else data.items() if isinstance(data, dict) else []
123  rgx = r"^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(\.\d+)?(([+-]\d{2}:?\d{2})|Z)?$"
124  for k, v in pairs:
125  if isinstance(v, (dict, list)): v = convert_recursive(v)
126  elif isinstance(v, six.string_types) and len(v) > 18 and re.match(rgx, v):
127  v = parse_datetime(v)
128  result.append((k, v))
129  return [x for _, x in result] if isinstance(data, list) \
130  else dict(result) if isinstance(data, dict) else data
131  try:
132  return None if s is None else json.loads(s, object_hook=convert_recursive)
133  except Exception:
134  fails = getattr(json_loads, "__fails", set())
135  if hash(s) not in fails: # Avoid spamming logs
136  logger.warning("Failed to parse JSON from %r.", s, exc_info=True)
137  setattr(json_loads, "__fails", fails | set([hash(s)]))
138  return s
139 
140 
141 def keyvalues(obj, namefmt=None):
142  """
143  Returns a list of keys and values, or [given object] if not applicable.
144 
145  @param obj mapping or namedtuple or list|set|tuple or object with attributes or slots
146  @param namefmt function(key) to apply on extracted keys, if any
147  @return [(key, value)] if available,
148  else original argument as list if list/set/tuple,
149  else list with a single item
150  """
151  namefmt = namefmt if callable(namefmt) else lambda x: x
152  if type(obj) in (dict, collections.defaultdict, collections.OrderedDict):
153  return list(obj.items()) # dictionary
154  if isinstance(obj, (list, set)):
155  return list(obj) # list/set
156  if is_namedtuple(obj):
157  return [(namefmt(k), getattr(obj, k)) for k in obj._fields] # collections.namedtuple
158  if isinstance(obj, tuple):
159  return list(obj) # tuple
160  if getattr(obj, "__slots__", None):
161  return [(namefmt(k), getattr(obj, k)) for k in obj.__slots__
162  if hasattr(obj, k)] # __slots__
163  if any(isinstance(v, property) for _, v in inspect.getmembers(type(obj))):
164  return [(namefmt(k), getattr(obj, k)) for k, v in inspect.getmembers(type(obj))
165  if isinstance(v, property)] # Declared properties
166  if getattr(obj, "__dict__", None):
167  return [(namefmt(k), v) for k, v in vars(obj).items()] # Plain object
168  if isinstance(obj, six.moves.collections_abc.Mapping):
169  return list(obj.items()) # dictionary
170  return [obj]
171 
172 
173 def load_modules():
174  """Returns db engines loaded from file directory, as {name: module}."""
175  result = {}
176  for n in sorted(glob.glob(os.path.join(os.path.dirname(__file__), "engines", "*"))):
177  name = os.path.splitext(os.path.basename(n))[0]
178  if name.startswith("__") or os.path.isfile(n) and not re.match(".*pyc?$", n) \
179  or os.path.isdir(n) and not any(glob.glob(os.path.join(n, x)) for x in ("*.py", "*.pyc")):
180  continue # for n
181 
182  modulename = "%s.%s.%s" % (__package__, "engines", name)
183  module = importlib.import_module(modulename)
184  result[name] = module
185  return result
186 
187 
188 def nameify(val, namefmt=None, parent=None):
189  """
190  Returns value as table or column name string.
191 
192  @param val a primitive like string, or a named object like a class,
193  or a class property or member or data descriptor
194  @param namefmt function(name) to apply on name extracted from class or object, if any
195  @param parent the parent class object if value is a class member or property
196  @return string
197  """
198  if isinstance(val, six.string_types):
199  return val
200  namefmt = namefmt if callable(namefmt) else lambda x: x
201  if inspect.isclass(val):
202  return namefmt(val.__name__)
203  if inspect.isdatadescriptor(val):
204  if hasattr(val, "__name__"): return namefmt(val.__name__) # __slots__ entry
205  return next(namefmt(k) for k, v in inspect.getmembers(parent) if v is val)
206  return six.text_type(val)
207 
208 
209 def parse_datetime(s):
210  """
211  Tries to parse string as ISO8601 datetime, returns input on error.
212  Supports "YYYY-MM-DD[ T]HH:MM:SS(.micros)?(Z|[+-]HH(:MM)?)?".
213  All returned datetimes are timezone-aware, falling back to UTC.
214  """
215  if len(s) < 18: return s
216  rgx = r"^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(\.\d+)?(([+-]\d{2}(:?\d{2})?)|Z)?$"
217  result, match = s, re.match(rgx, s)
218  if match:
219  millis, _, offset, _ = match.groups()
220  minimal = re.sub(r"\D", "", s[:match.span(2)[0]] if offset else s)
221  fmt = "%Y%m%d%H%M%S" + ("%f" if millis else "")
222  try:
223  result = datetime.datetime.strptime(minimal, fmt)
224  if offset: # Support timezones like 'Z' or '+03:00'
225  hh, mm = map(int, [offset[1:3], offset[4:]])
226  delta = datetime.timedelta(hours=hh, minutes=mm)
227  if offset.startswith("-"): delta = -delta
228  result = result.replace(tzinfo=StaticTzInfo(offset, delta))
229  except ValueError: pass
230  if isinstance(result, datetime.datetime) and result.tzinfo is None:
231  result = result.replace(tzinfo=UTC) # Force UTC timezone on unaware values
232  return result
233 
234 
235 __all__ = [
236  "StaticTzInfo", "UTC",
237  "factory", "is_dataobject", "is_namedtuple", "json_dumps", "json_loads",
238  "keyvalues", "load_modules", "nameify", "parse_datetime",
239 ]
dblite.util.StaticTzInfo.__repr__
def __repr__(self)
Definition: util.py:43
dblite.util.StaticTzInfo.tzname
def tzname(self, dt)
Definition: util.py:41
dblite.util.StaticTzInfo.utcoffset
def utcoffset(self, dt)
Definition: util.py:39
dblite.util.load_modules
def load_modules()
Returns db engines loaded from file directory, as {name: module}.
Definition: util.py:171
dblite.util.StaticTzInfo._offset
_offset
Definition: util.py:37
dblite.util.json_loads
def json_loads(s)
Returns deserialized JSON, with datetime/date strings converted to objects.
Definition: util.py:116
dblite.util.nameify
def nameify(val, namefmt=None, parent=None)
Returns value as table or column name string.
Definition: util.py:194
dblite.util.StaticTzInfo.__init__
def __init__(self, name, delta)
Constructs a new static zone info, with specified name and time delta.
Definition: util.py:34
dblite.util.json_dumps
def json_dumps(data, indent=2, sort_keys=True)
Returns JSON string, with datetime types converted to ISO-8601 strings (in UTC if no timezone set),...
Definition: util.py:98
dblite.util.parse_datetime
def parse_datetime(s)
Tries to parse string as ISO8601 datetime, returns input on error.
Definition: util.py:211
dblite.util.is_namedtuple
def is_namedtuple(obj)
Returns whether input is a namedtuple class or instance.
Definition: util.py:87
dblite.util.keyvalues
def keyvalues(obj, namefmt=None)
Returns a list of keys and values, or [given object] if not applicable.
Definition: util.py:148
dblite.util.StaticTzInfo.dst
def dst(self, dt)
Definition: util.py:40
dblite.util.StaticTzInfo.__ne__
def __ne__(self, other)
Definition: util.py:42
dblite.util.StaticTzInfo
datetime.tzinfo class representing a constant offset from UTC.
Definition: util.py:30
dblite.util.factory
def factory(ctor, data)
Returns object constructed with data dictionary.
Definition: util.py:58
dblite.util.StaticTzInfo.__eq__
def __eq__(self, other)
Definition: util.py:44
dblite.util.is_dataobject
def is_dataobject(obj)
Returns whether input is a data object: namedtuple, or has attributes or slots.
Definition: util.py:74
dblite.util.StaticTzInfo._name
_name
Definition: util.py:36