3Common utility classes and functions.
5------------------------------------------------------------------------------
6This file is part of rosros - simple unified interface to ROS1 / ROS2.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace rosros.util
15import concurrent.futures
28class ThrottledLogger(logging.Logger):
30 Logger wrapper with support
for throttling logged messages per call site.
32 Logging methods (`debug()`, `info()`, etc) accept additional keyword arguments:
33 - `__once__`: whether to log only once
from call site
34 - `__throttle__`: seconds to skip logging
from call site
for
35 - `__throttle_identical__`: whether to skip logging identical consecutive texts
from call site
36 (given log message excluding formatting arguments).
37 Combines
with `__throttle__` to skip a duplicate
for a period.
40 _KEYWORDS = ["__throttle__",
"__throttle_identical__",
"__once__"]
54 Creates a wrapper logger around given logger instance, providing support for throttling.
56 @param `logging.Logger` to wrap
58 super().__init__(logger.name, logger.level)
59 OVERRIDES = ("debug",
"info",
"warning",
"warn",
"error",
"fatal",
"log")
60 for name
in dir(logger):
61 if not name.startswith(
"_")
and name
not in OVERRIDES:
62 setattr(self, name, getattr(logger, name))
66 def debug(self, msg, *args, **kwargs):
68 Logs `msg % args` with severity `DEBUG`.
70 To
pass exception information, use the keyword argument `exc_info=
True`.
72 @param __once__ whether to log only once
from call site
73 @param __throttle__ seconds to skip logging
from call site
for
74 @param __throttle_identical__ whether to skip identical consecutive texts
from call site
80 def info(self, msg, *args, **kwargs):
82 Logs `msg % args` with severity `INFO`. The arguments are interpreted
as for `
debug()`.
88 def warning(self, msg, *args, **kwargs):
90 Logs `msg % args` with severity `WARN`. The arguments are interpreted
as for `
debug()`.
96 def warn(self, msg, *args, **kwargs):
98 Logs `msg % args` with severity `WARNING`. The arguments are interpreted
as for `
debug()`.
104 def error(self, msg, *args, **kwargs):
106 Logs `msg % args` with severity `ERROR`. The arguments are interpreted
as for `
debug()`.
112 def fatal(self, msg, *args, **kwargs):
114 Logs `msg % args` with severity `FATAL`. The arguments are interpreted
as for `
debug()`.
120 def log(self, level, msg, *args, **kwargs):
122 Logs `msg % args` with given severity. The arguments are interpreted
as for `
debug()`.
130 """Drops throttle parameters from kwargs and returns as dict."""
131 return {k.replace(
"__",
""): kwargs.pop(k,
None)
for k
in cls.
_KEYWORDS}
135 def _is_throttled(cls, msg, once=False, throttle=None, throttle_identical=False):
136 """Returns whether message should be skipped."""
138 if once
or throttle_identical
or throttle:
139 f = inspect.currentframe().f_back.f_back
140 caller_id =
":".join(str(x)
for x
in (inspect.getabsfile(f), f.f_lineno, f.f_lasti))
142 result = caller_id
in cls.
_ONCES
144 elif throttle_identical:
145 msg_hash = hashlib.md5(msg.encode()).hexdigest()
146 result = (cls.
_HASHES.get(caller_id) == msg_hash)
148 cls.
_HASHES[caller_id] = msg_hash
150 now, last = time.monotonic(), cls.
_TIMES.get(caller_id)
151 result = result
and last
is not None and now - last < throttle
152 cls.
_TIMES[caller_id] = now
if last
is None or not result
else last
154 now, last = time.monotonic(), cls.
_TIMES.get(caller_id)
155 result = last
is not None and now - last < throttle
156 cls.
_TIMES[caller_id] = now
if last
is None or not result
else last
162 """Drops or replaces trailing zeros and empty decimal separator, if any."""
163 return re.sub(
r"\.?0+$",
lambda x: len(x.group()) * replace, str(v))
166def ensure_object(obj_or_cls, attributes, populate_object=None, *args, **kwargs):
168 Ensures result is an object of specified type.
170 Intended
for wrapping convenience functions giving
or returning
171 object
as attribute list
or dictionary instead of instantiated object.
173 If first positional argument
is an instance of specified
class,
174 it
is populated
from positional
and keyword arguments.
176 If first positional argument
is a dictionary, it
is taken
as keyword arguments.
178 `obj_or_cls` may be a
class to instantiate, or the class instance
179 to populate
if instance
not in first positional argument.
180 Instance attributes will be given
in constructor
as keyword arguments.
184 @param obj_or_cls object
class or instance
185 @param attributes iterable of object attribute names
186 to combine positional arguments
from,
187 or a callable returning iterable
188 @param populate_object function(dict, obj) to populate object
from dictionary
189 @return object of specified
class, populated
from
190 positional
and keyword arguments,
193 iscls = inspect.isclass(obj_or_cls)
194 cls, obj = (obj_or_cls if iscls
else type(obj_or_cls)),
None
195 isarg = args
and isinstance(args[0], cls)
196 if isarg: obj, args = (args[0], args[1:])
197 if args
and isinstance(args[0], dict): args, kwargs = args[1:], dict(args[0], **kwargs)
198 if args
and callable(attributes): attributes = attributes()
199 for k, v
in zip(attributes, args)
if args
else ():
201 raise TypeError(
"%s got multiple values for keyword argument %r" %
204 obj = (cls()
if iscls
else obj_or_cls)
if obj
is None else obj
205 populate_object = populate_object
or (
lambda d, o: [setattr(o, k, v)
for k, v
in d.items()])
206 populate_object(kwargs, obj)
212 Flattens a nested dictionary to a flat dictionary, with nested keys joined
with separator.
214 @param dct the dictionary to flatten
215 @param sep separator between nested keys
216 @return flat dictionary like {
"my.nested.key": value}
218 result, stack = {}, list(dct.items())
221 if isinstance(v, dict):
222 stack.extend((
"%s%s%s" % (k, sep, k2), v2)
for k2, v2
in v.items())
228def format_bytes(size, precision=2, inter=" ", strip=True):
229 """Returns a formatted byte size (like 421.40 MB), trailing zeros optionally removed."""
232 UNITS = [(
"bytes",
"byte")[1 == size]] + [x +
"B" for x
in "KMGTPEZY"]
233 exponent = min(int(math.log(size, 1024)), len(UNITS) - 1)
234 result =
"%.*f" % (precision, size / (1024. ** exponent))
235 result +=
"" if precision > 0
else "."
236 result = (
drop_zeros(result)
if strip
else result) + inter + UNITS[exponent]
240def get_arity(func, positional=True, keyword=False):
242 Returns the maximum number of arguments the function takes, -1 if variable number.
244 @param positional count positional-only
and positional/keyword arguments
245 @param keyword count keyword-only
and positional/keyword arguments
247 POSITIONALS = (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.POSITIONAL_ONLY)
248 KEYWORDALS = (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY)
250 try: params = inspect.signature(func).parameters
251 except Exception: params =
None
253 or positional
and any(x.kind == inspect.Parameter.VAR_POSITIONAL
for x
in params.values()) \
254 or keyword
and any(x.kind == inspect.Parameter.VAR_KEYWORD
for x
in params.values()):
257 if positional
and keyword:
258 result += sum(x.kind
in POSITIONALS + KEYWORDALS
for x
in params.values())
260 result += sum(x.kind
in POSITIONALS
for x
in params.values())
262 result += sum(x.kind
in KEYWORDALS
for x
in params.values())
268 Returns (nested value, value parent container, key in parent container).
270 Raises
if path
not found.
272 @param obj object
or dictionary
or list,
with arbitrary nesting
273 @param path
"name" or (
"nested",
"path")
or "nested.path", element can be list index
274 @return (value, value parent container, key
in parent container)
277 def getter(obj, key):
278 """Returns dictionary value, or object attribute, or sequence element if numeric key."""
279 key = int(key)
if isinstance(key, str)
and re.match(
"^[0-9]+$", key)
else key
280 return obj[key]
if isinstance(obj, dict)
or isinstance(key, int)
else getattr(obj, key)
282 (*path, leaf) = path.split(
".")
if isinstance(path, str)
else path
286 return getter(ptr, leaf), ptr, leaf
291 Returns object or dictionary
or list value at (nested, path).
293 Raises
if path
not found.
295 @param obj object
or dictionary
or list,
with arbitrary nesting
296 @param path (
"nested",
"path")
or "nested.path", element can be list index
297 @param pathsep string to split scalar key by, like
"." for "nested.path"
299 if not isinstance(path, (list, tuple)):
300 path = path.split(pathsep)
if pathsep
else [path]
301 path, leaf, ptr = path[:-1], path[-1], obj
303 ptr = ptr[p]
if isinstance(ptr, (dict, list, tuple))
else getattr(ptr, p)
305 if isinstance(ptr, (dict, list, tuple)):
307 return getattr(ptr, leaf)
312 Returns a results-caching wrapper for the function.
314 All arguments to function must be hashable.
317 def inner(*args, **kwargs):
318 key = args + sum(kwargs.items(), ())
320 cache[key] = func(*args, **kwargs)
322 return functools.update_wrapper(inner, func)
326 """Returns a nested dictionary from path, like {"nested": {"path": value}}."""
329 ptr = ptr.setdefault(p, {})
330 ptr[path[-1]] = value
336 Merges d2 into d1, recursively for nested dicts.
340 for k, v
in d2.items()
if d2
else ():
341 if k
in d1
and isinstance(v, dict)
and isinstance(d1[k], dict):
349 """Returns arguments joined into a namespace name, starting and separated with "/"."""
350 return "/" +
"/".join(filter(bool, (x.strip(
"/")
for x
in args)))
354 """Returns argument split into (namespace, name), like "/a/b/c" as ("/a/b", "c")."""
355 parts = name.rsplit(
"/", 1)
356 return (
"" if len(parts) < 2
else (parts[0]
or "/")), parts[-1]
359def set_value(obj, path, value, pathsep=None):
361 Sets object or dictionary
or list value at key
or (nested, path).
363 Lists are appended to
if specified index does
not exist.
365 @param obj object
or dictionary
or list,
with arbitrary nesting
366 @param path scalar key
or (
"nested",
"path"), element can be list index
367 @param value value to set
368 @param pathsep string to split scalar key by, like
"." for "nested.path"
370 if not isinstance(path, (list, tuple)):
371 path = path.split(pathsep)
if pathsep
else [path]
372 path, leaf, ptr = path[:-1], path[-1], obj
374 ptr = ptr.setdefault(p, {})
if isinstance(ptr, dict)
else \
375 ptr[p]
if isinstance(ptr, list)
else getattr(ptr, p)
377 if isinstance(ptr, dict):
379 elif isinstance(ptr, list):
380 if 0 <= leaf < len(ptr):
385 setattr(ptr, leaf, value)
390 Returns `concurrent.futures.Future` and invokes function
in a background thread.
392 Future will be done when function returns
or raises. Background thread
is not daemonic.
394 future = concurrent.futures.Future()
396 try: future.set_result(func(*args, **kwargs))
397 except Exception
as e: future.set_exception(e)
398 threading.Thread(target=worker).start()
404 Returns a unique version of the path.
406 If a file or directory
with the same name already exists, returns a unique
407 version (e.g.
"/tmp/my.2.file" if ""/tmp/my.file
" already exists).
409 @param empty_ok whether to ignore existence
if file
is empty
412 if os.path.isfile(result)
and empty_ok
and not os.path.getsize(result):
414 path, name = os.path.split(result)
415 base, ext = os.path.splitext(name)
417 name = base[:255 - len(ext) - 2] +
".." + ext
418 result = os.path.join(path, name)
420 while os.path.exists(result):
421 suffix =
".%s%s" % (counter, ext)
424 name = base[:255 - len(suffix) - 2] +
".." + suffix
425 result = os.path.join(path, name)
432 Returns wrapper for invoking function
with its maximum supported number of arguments.
434 E.g. `
wrap_arity(abs)(-1, -2)` will
return result of `abs(-1)`.
436 Returns original function
if a built-
in with no arity information available.
438 POSITIONALS = (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.POSITIONAL_ONLY)
439 KEYWORDALS = (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY)
440 try: params = inspect.signature(func).parameters
441 except Exception: params =
None
442 varargs = params
and any(x.kind == inspect.Parameter.VAR_POSITIONAL
for x
in params.values())
443 varkws = params
and any(x.kind == inspect.Parameter.VAR_KEYWORD
for x
in params.values())
444 posargs = []
if params
is None else [x.name
for x
in params.values()
if x.kind
in POSITIONALS]
445 keywords = []
if params
is None else [x.name
for x
in params.values()
if x.kind
in KEYWORDALS]
447 def inner(*args, **kwargs):
448 if not varargs: args = args[:len(posargs)]
449 if not varkws: kwargs = {k: v
for k, v
in kwargs.items()
if k
in keywords}
450 return func(*args, **kwargs)
451 return func
if params
is None else functools.update_wrapper(inner, func)
455 "ensure_object",
"flatten_dict",
"format_bytes",
"get_arity",
"get_value",
"make_dict",
456 "memoize",
"merge_dicts",
"namejoin",
"namesplit",
"set_value",
"start_future",
457 "unique_path",
"wrap_arity",
fatal(self, msg, *args, **kwargs)
Logs `msg % args` with severity `FATAL`.
error(self, msg, *args, **kwargs)
Logs `msg % args` with severity `ERROR`.
_ONCES
Caller IDs registered for throttling by once-only.
debug(self, msg, *args, **kwargs)
Logs `msg % args` with severity `DEBUG`.
_extract_args(cls, kwargs)
Drops throttle parameters from kwargs and returns as dict.
_is_throttled(cls, msg, once=False, throttle=None, throttle_identical=False)
Returns whether message should be skipped.
dict _HASHES
Caller IDs and log message hashes for throttling by identical text.
log(self, level, msg, *args, **kwargs)
Logs `msg % args` with given severity.
info(self, msg, *args, **kwargs)
Logs `msg % args` with severity `INFO`.
__init__(self, logger)
Creates a wrapper logger around given logger instance, providing support for throttling.
warn(self, msg, *args, **kwargs)
Logs `msg % args` with severity `WARNING`.
warning(self, msg, *args, **kwargs)
Logs `msg % args` with severity `WARN`.
dict _TIMES
Caller IDs and last timestamps for throttling by time.
merge_dicts(d1, d2)
Merges d2 into d1, recursively for nested dicts.
ensure_object(obj_or_cls, attributes, populate_object=None, *args, **kwargs)
Ensures result is an object of specified type.
get_value(obj, path, pathsep=None)
Returns object or dictionary or list value at (nested, path).
start_future(func, *args, **kwargs)
Returns `concurrent.futures.Future` and invokes function in a background thread.
make_dict(path, value)
Returns a nested dictionary from path, like {"nested": {"path": value}}.
get_arity(func, positional=True, keyword=False)
Returns the maximum number of arguments the function takes, -1 if variable number.
namejoin(*args)
Returns arguments joined into a namespace name, starting and separated with "/".
set_value(obj, path, value, pathsep=None)
Sets object or dictionary or list value at key or (nested, path).
namesplit(name)
Returns argument split into (namespace, name), like "/a/b/c" as ("/a/b", "c").
flatten_dict(dct, sep=".")
Flattens a nested dictionary to a flat dictionary, with nested keys joined with separator.
drop_zeros(v, replace="")
Drops or replaces trailing zeros and empty decimal separator, if any.
wrap_arity(func)
Returns wrapper for invoking function with its maximum supported number of arguments.
unique_path(pathname, empty_ok=False)
Returns a unique version of the path.
format_bytes(size, precision=2, inter=" ", strip=True)
Returns a formatted byte size (like 421.40 MB), trailing zeros optionally removed.
get_nested(obj, path)
Returns (nested value, value parent container, key in parent container).