rosros 0.2.5
Simple unified interface to ROS1 / ROS2 Python API
Loading...
Searching...
No Matches
util.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3Common utility classes and functions.
4
5------------------------------------------------------------------------------
6This file is part of rosros - simple unified interface to ROS1 / ROS2.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 11.02.2022
11@modified 07.12.2023
12------------------------------------------------------------------------------
13"""
14## @namespace rosros.util
15import concurrent.futures
16import functools
17import hashlib
18import inspect
19import logging
20import math
21import os
22import re
23import threading
24import time
25
26
27
28class ThrottledLogger(logging.Logger):
29 """
30 Logger wrapper with support for throttling logged messages per call site.
31
32 Logging methods (`debug()`, `info()`, etc) accept additional keyword arguments:
33 - `__once__`: whether to log only once from call site
34 - `__throttle__`: seconds to skip logging from call site for
35 - `__throttle_identical__`: whether to skip logging identical consecutive texts from call site
36 (given log message excluding formatting arguments).
37 Combines with `__throttle__` to skip a duplicate for a period.
38 """
39
40 _KEYWORDS = ["__throttle__", "__throttle_identical__", "__once__"]
41
42
43 _ONCES = set()
44
45
46 _TIMES = {}
47
48
49 _HASHES = {}
50
51
52 def __init__(self, logger):
53 """
54 Creates a wrapper logger around given logger instance, providing support for throttling.
56 @param `logging.Logger` to wrap
57 """
58 super().__init__(logger.name, logger.level)
59 OVERRIDES = ("debug", "info", "warning", "warn", "error", "fatal", "log")
60 for name in dir(logger): # Monkey-patch all other attributes from wrapped
61 if not name.startswith("_") and name not in OVERRIDES:
62 setattr(self, name, getattr(logger, name))
63 self.__logger = logger
64
65
66 def debug(self, msg, *args, **kwargs):
67 """
68 Logs `msg % args` with severity `DEBUG`.
70 To pass exception information, use the keyword argument `exc_info=True`.
71
72 @param __once__ whether to log only once from call site
73 @param __throttle__ seconds to skip logging from call site for
74 @param __throttle_identical__ whether to skip identical consecutive texts from call site
75 """
76 if not self._is_throttled(msg, **self._extract_args(kwargs)):
77 self.__logger.debug(msg, *args, **kwargs)
78
79
80 def info(self, msg, *args, **kwargs):
81 """
82 Logs `msg % args` with severity `INFO`. The arguments are interpreted as for `debug()`.
83 """
84 if not self._is_throttled(msg, **self._extract_args(kwargs)):
85 self.__logger.info(msg, *args, **kwargs)
86
88 def warning(self, msg, *args, **kwargs):
89 """
90 Logs `msg % args` with severity `WARN`. The arguments are interpreted as for `debug()`.
91 """
92 if not self._is_throttled(msg, **self._extract_args(kwargs)):
93 self.__logger.warning(msg, *args, **kwargs)
94
96 def warn(self, msg, *args, **kwargs):
97 """
98 Logs `msg % args` with severity `WARNING`. The arguments are interpreted as for `debug()`.
99 """
100 if not self._is_throttled(msg, **self._extract_args(kwargs)):
101 self.__logger.warn(msg, *args, **kwargs)
102
104 def error(self, msg, *args, **kwargs):
105 """
106 Logs `msg % args` with severity `ERROR`. The arguments are interpreted as for `debug()`.
107 """
108 if not self._is_throttled(msg, **self._extract_args(kwargs)):
109 self.__logger.error(msg, *args, **kwargs)
110
112 def fatal(self, msg, *args, **kwargs):
113 """
114 Logs `msg % args` with severity `FATAL`. The arguments are interpreted as for `debug()`.
115 """
116 if not self._is_throttled(msg, **self._extract_args(kwargs)):
117 self.__logger.fatal(msg, *args, **kwargs)
118
120 def log(self, level, msg, *args, **kwargs):
121 """
122 Logs `msg % args` with given severity. The arguments are interpreted as for `debug()`.
123 """
124 if not self._is_throttled(msg, **self._extract_args(kwargs)):
125 self.__logger.log(level, msg, *args, **kwargs)
126
128 @classmethod
129 def _extract_args(cls, kwargs):
130 """Drops throttle parameters from kwargs and returns as dict."""
131 return {k.replace("__", ""): kwargs.pop(k, None) for k in cls._KEYWORDS}
132
133
134 @classmethod
135 def _is_throttled(cls, msg, once=False, throttle=None, throttle_identical=False):
136 """Returns whether message should be skipped."""
137 result = False
138 if once or throttle_identical or throttle:
139 f = inspect.currentframe().f_back.f_back
140 caller_id = ":".join(str(x) for x in (inspect.getabsfile(f), f.f_lineno, f.f_lasti))
141 if once:
142 result = caller_id in cls._ONCES
143 cls._ONCES.add(caller_id)
144 elif throttle_identical:
145 msg_hash = hashlib.md5(msg.encode()).hexdigest()
146 result = (cls._HASHES.get(caller_id) == msg_hash)
147 if not result:
148 cls._HASHES[caller_id] = msg_hash
149 if throttle:
150 now, last = time.monotonic(), cls._TIMES.get(caller_id)
151 result = result and last is not None and now - last < throttle
152 cls._TIMES[caller_id] = now if last is None or not result else last
153 elif throttle:
154 now, last = time.monotonic(), cls._TIMES.get(caller_id)
155 result = last is not None and now - last < throttle
156 cls._TIMES[caller_id] = now if last is None or not result else last
157 return result
158
159
160
161def drop_zeros(v, replace=""):
162 """Drops or replaces trailing zeros and empty decimal separator, if any."""
163 return re.sub(r"\.?0+$", lambda x: len(x.group()) * replace, str(v))
164
165
166def ensure_object(obj_or_cls, attributes, populate_object=None, *args, **kwargs):
167 """
168 Ensures result is an object of specified type.
169
170 Intended for wrapping convenience functions giving or returning
171 object as attribute list or dictionary instead of instantiated object.
172
173 If first positional argument is an instance of specified class,
174 it is populated from positional and keyword arguments.
175
176 If first positional argument is a dictionary, it is taken as keyword arguments.
178 `obj_or_cls` may be a class to instantiate, or the class instance
179 to populate if instance not in first positional argument.
180 Instance attributes will be given in constructor as keyword arguments.
181
182 E.g. `ensure_object(std_msgs.msg.Bool, ["data"], True)`.
183
184 @param obj_or_cls object class or instance
185 @param attributes iterable of object attribute names
186 to combine positional arguments from,
187 or a callable returning iterable
188 @param populate_object function(dict, obj) to populate object from dictionary
189 @return object of specified class, populated from
190 positional and keyword arguments,
191 created if not given
192 """
193 iscls = inspect.isclass(obj_or_cls)
194 cls, obj = (obj_or_cls if iscls else type(obj_or_cls)), None
195 isarg = args and isinstance(args[0], cls) # Object in first positional arg
196 if isarg: obj, args = (args[0], args[1:])
197 if args and isinstance(args[0], dict): args, kwargs = args[1:], dict(args[0], **kwargs)
198 if args and callable(attributes): attributes = attributes()
199 for k, v in zip(attributes, args) if args else ():
200 if k in kwargs:
201 raise TypeError("%s got multiple values for keyword argument %r" %
202 (cls.__name__, k))
203 kwargs[k] = v
204 obj = (cls() if iscls else obj_or_cls) if obj is None else obj
205 populate_object = populate_object or (lambda d, o: [setattr(o, k, v) for k, v in d.items()])
206 populate_object(kwargs, obj)
207 return obj
208
209
210def flatten_dict(dct, sep="."):
211 """
212 Flattens a nested dictionary to a flat dictionary, with nested keys joined with separator.
213
214 @param dct the dictionary to flatten
215 @param sep separator between nested keys
216 @return flat dictionary like {"my.nested.key": value}
217 """
218 result, stack = {}, list(dct.items())
219 while stack:
220 k, v = stack.pop(0)
221 if isinstance(v, dict):
222 stack.extend(("%s%s%s" % (k, sep, k2), v2) for k2, v2 in v.items())
223 else:
224 result[k] = v
225 return result
226
227
228def format_bytes(size, precision=2, inter=" ", strip=True):
229 """Returns a formatted byte size (like 421.40 MB), trailing zeros optionally removed."""
230 result = "0 bytes"
231 if size:
232 UNITS = [("bytes", "byte")[1 == size]] + [x + "B" for x in "KMGTPEZY"]
233 exponent = min(int(math.log(size, 1024)), len(UNITS) - 1)
234 result = "%.*f" % (precision, size / (1024. ** exponent))
235 result += "" if precision > 0 else "." # Do not strip integer zeroes
236 result = (drop_zeros(result) if strip else result) + inter + UNITS[exponent]
237 return result
238
239
240def get_arity(func, positional=True, keyword=False):
241 """
242 Returns the maximum number of arguments the function takes, -1 if variable number.
243
244 @param positional count positional-only and positional/keyword arguments
245 @param keyword count keyword-only and positional/keyword arguments
246 """
247 POSITIONALS = (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.POSITIONAL_ONLY)
248 KEYWORDALS = (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY)
249 result = 0
250 try: params = inspect.signature(func).parameters # Raises if built-in
251 except Exception: params = None
252 if params is None \
253 or positional and any(x.kind == inspect.Parameter.VAR_POSITIONAL for x in params.values()) \
254 or keyword and any(x.kind == inspect.Parameter.VAR_KEYWORD for x in params.values()):
255 result = -1
256 else:
257 if positional and keyword:
258 result += sum(x.kind in POSITIONALS + KEYWORDALS for x in params.values())
259 elif positional:
260 result += sum(x.kind in POSITIONALS for x in params.values())
261 if keyword:
262 result += sum(x.kind in KEYWORDALS for x in params.values())
263 return result
264
265
266def get_nested(obj, path):
267 """
268 Returns (nested value, value parent container, key in parent container).
269
270 Raises if path not found.
271
272 @param obj object or dictionary or list, with arbitrary nesting
273 @param path "name" or ("nested", "path") or "nested.path", element can be list index
274 @return (value, value parent container, key in parent container)
275 """
276
277 def getter(obj, key):
278 """Returns dictionary value, or object attribute, or sequence element if numeric key."""
279 key = int(key) if isinstance(key, str) and re.match("^[0-9]+$", key) else key
280 return obj[key] if isinstance(obj, dict) or isinstance(key, int) else getattr(obj, key)
281
282 (*path, leaf) = path.split(".") if isinstance(path, str) else path
283 ptr = obj
284 for p in path:
285 ptr = getter(ptr, p)
286 return getter(ptr, leaf), ptr, leaf
287
289def get_value(obj, path, pathsep=None):
290 """
291 Returns object or dictionary or list value at (nested, path).
292
293 Raises if path not found.
294
295 @param obj object or dictionary or list, with arbitrary nesting
296 @param path ("nested", "path") or "nested.path", element can be list index
297 @param pathsep string to split scalar key by, like "." for "nested.path"
298 """
299 if not isinstance(path, (list, tuple)):
300 path = path.split(pathsep) if pathsep else [path]
301 path, leaf, ptr = path[:-1], path[-1], obj
302 for p in path:
303 ptr = ptr[p] if isinstance(ptr, (dict, list, tuple)) else getattr(ptr, p)
304
305 if isinstance(ptr, (dict, list, tuple)):
306 return ptr[leaf]
307 return getattr(ptr, leaf)
308
309
310def memoize(func):
311 """
312 Returns a results-caching wrapper for the function.
313
314 All arguments to function must be hashable.
315 """
316 cache = {}
317 def inner(*args, **kwargs):
318 key = args + sum(kwargs.items(), ())
319 if key not in cache:
320 cache[key] = func(*args, **kwargs)
321 return cache[key]
322 return functools.update_wrapper(inner, func)
323
324
325def make_dict(path, value):
326 """Returns a nested dictionary from path, like {"nested": {"path": value}}."""
327 result = ptr = {}
328 for p in path[:-1]:
329 ptr = ptr.setdefault(p, {})
330 ptr[path[-1]] = value
331 return result
332
333
334def merge_dicts(d1, d2):
335 """
336 Merges d2 into d1, recursively for nested dicts.
337
338 @return updated d1
339 """
340 for k, v in d2.items() if d2 else ():
341 if k in d1 and isinstance(v, dict) and isinstance(d1[k], dict):
342 merge_dicts(d1[k], v)
343 else:
344 d1[k] = v
345 return d1
346
347
348def namejoin(*args):
349 """Returns arguments joined into a namespace name, starting and separated with "/"."""
350 return "/" + "/".join(filter(bool, (x.strip("/") for x in args)))
351
353def namesplit(name):
354 """Returns argument split into (namespace, name), like "/a/b/c" as ("/a/b", "c")."""
355 parts = name.rsplit("/", 1)
356 return ("" if len(parts) < 2 else (parts[0] or "/")), parts[-1]
357
358
359def set_value(obj, path, value, pathsep=None):
360 """
361 Sets object or dictionary or list value at key or (nested, path).
362
363 Lists are appended to if specified index does not exist.
364
365 @param obj object or dictionary or list, with arbitrary nesting
366 @param path scalar key or ("nested", "path"), element can be list index
367 @param value value to set
368 @param pathsep string to split scalar key by, like "." for "nested.path"
369 """
370 if not isinstance(path, (list, tuple)):
371 path = path.split(pathsep) if pathsep else [path]
372 path, leaf, ptr = path[:-1], path[-1], obj
373 for p in path:
374 ptr = ptr.setdefault(p, {}) if isinstance(ptr, dict) else \
375 ptr[p] if isinstance(ptr, list) else getattr(ptr, p)
376
377 if isinstance(ptr, dict):
378 ptr[leaf] = value
379 elif isinstance(ptr, list):
380 if 0 <= leaf < len(ptr):
381 ptr[leaf] = value
382 else:
383 ptr.append(value)
384 else:
385 setattr(ptr, leaf, value)
386
387
388def start_future(func, *args, **kwargs):
389 """
390 Returns `concurrent.futures.Future` and invokes function in a background thread.
391
392 Future will be done when function returns or raises. Background thread is not daemonic.
393 """
394 future = concurrent.futures.Future()
395 def worker():
396 try: future.set_result(func(*args, **kwargs))
397 except Exception as e: future.set_exception(e)
398 threading.Thread(target=worker).start()
399 return future
400
401
402def unique_path(pathname, empty_ok=False):
403 """
404 Returns a unique version of the path.
405
406 If a file or directory with the same name already exists, returns a unique
407 version (e.g. "/tmp/my.2.file" if ""/tmp/my.file" already exists).
408
409 @param empty_ok whether to ignore existence if file is empty
410 """
411 result = pathname
412 if os.path.isfile(result) and empty_ok and not os.path.getsize(result):
413 return result
414 path, name = os.path.split(result)
415 base, ext = os.path.splitext(name)
416 if len(name) > 255: # Filesystem limitation
417 name = base[:255 - len(ext) - 2] + ".." + ext
418 result = os.path.join(path, name)
419 counter = 2
420 while os.path.exists(result):
421 suffix = ".%s%s" % (counter, ext)
422 name = base + suffix
423 if len(name) > 255:
424 name = base[:255 - len(suffix) - 2] + ".." + suffix
425 result = os.path.join(path, name)
426 counter += 1
427 return result
428
429
430def wrap_arity(func):
431 """
432 Returns wrapper for invoking function with its maximum supported number of arguments.
433
434 E.g. `wrap_arity(abs)(-1, -2)` will return result of `abs(-1)`.
435
436 Returns original function if a built-in with no arity information available.
437 """
438 POSITIONALS = (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.POSITIONAL_ONLY)
439 KEYWORDALS = (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY)
440 try: params = inspect.signature(func).parameters # Raises if built-in
441 except Exception: params = None
442 varargs = params and any(x.kind == inspect.Parameter.VAR_POSITIONAL for x in params.values())
443 varkws = params and any(x.kind == inspect.Parameter.VAR_KEYWORD for x in params.values())
444 posargs = [] if params is None else [x.name for x in params.values() if x.kind in POSITIONALS]
445 keywords = [] if params is None else [x.name for x in params.values() if x.kind in KEYWORDALS]
446
447 def inner(*args, **kwargs):
448 if not varargs: args = args[:len(posargs)]
449 if not varkws: kwargs = {k: v for k, v in kwargs.items() if k in keywords}
450 return func(*args, **kwargs)
451 return func if params is None else functools.update_wrapper(inner, func)
452
453
454__all__ = [
455 "ensure_object", "flatten_dict", "format_bytes", "get_arity", "get_value", "make_dict",
456 "memoize", "merge_dicts", "namejoin", "namesplit", "set_value", "start_future",
457 "unique_path", "wrap_arity",
458]
fatal(self, msg, *args, **kwargs)
Logs `msg % args` with severity `FATAL`.
Definition util.py:127
error(self, msg, *args, **kwargs)
Logs `msg % args` with severity `ERROR`.
Definition util.py:119
_ONCES
Caller IDs registered for throttling by once-only.
Definition util.py:49
debug(self, msg, *args, **kwargs)
Logs `msg % args` with severity `DEBUG`.
Definition util.py:87
_extract_args(cls, kwargs)
Drops throttle parameters from kwargs and returns as dict.
Definition util.py:144
_is_throttled(cls, msg, once=False, throttle=None, throttle_identical=False)
Returns whether message should be skipped.
Definition util.py:152
dict _HASHES
Caller IDs and log message hashes for throttling by identical text.
Definition util.py:61
log(self, level, msg, *args, **kwargs)
Logs `msg % args` with given severity.
Definition util.py:135
info(self, msg, *args, **kwargs)
Logs `msg % args` with severity `INFO`.
Definition util.py:95
__init__(self, logger)
Creates a wrapper logger around given logger instance, providing support for throttling.
Definition util.py:69
warn(self, msg, *args, **kwargs)
Logs `msg % args` with severity `WARNING`.
Definition util.py:111
warning(self, msg, *args, **kwargs)
Logs `msg % args` with severity `WARN`.
Definition util.py:103
dict _TIMES
Caller IDs and last timestamps for throttling by time.
Definition util.py:55
merge_dicts(d1, d2)
Merges d2 into d1, recursively for nested dicts.
Definition util.py:352
ensure_object(obj_or_cls, attributes, populate_object=None, *args, **kwargs)
Ensures result is an object of specified type.
Definition util.py:207
get_value(obj, path, pathsep=None)
Returns object or dictionary or list value at (nested, path).
Definition util.py:311
start_future(func, *args, **kwargs)
Returns `concurrent.futures.Future` and invokes function in a background thread.
Definition util.py:406
make_dict(path, value)
Returns a nested dictionary from path, like {"nested": {"path": value}}.
Definition util.py:338
get_arity(func, positional=True, keyword=False)
Returns the maximum number of arguments the function takes, -1 if variable number.
Definition util.py:260
namejoin(*args)
Returns arguments joined into a namespace name, starting and separated with "/".
Definition util.py:361
set_value(obj, path, value, pathsep=None)
Sets object or dictionary or list value at key or (nested, path).
Definition util.py:382
namesplit(name)
Returns argument split into (namespace, name), like "/a/b/c" as ("/a/b", "c").
Definition util.py:366
flatten_dict(dct, sep=".")
Flattens a nested dictionary to a flat dictionary, with nested keys joined with separator.
Definition util.py:231
drop_zeros(v, replace="")
Drops or replaces trailing zeros and empty decimal separator, if any.
Definition util.py:177
wrap_arity(func)
Returns wrapper for invoking function with its maximum supported number of arguments.
Definition util.py:450
unique_path(pathname, empty_ok=False)
Returns a unique version of the path.
Definition util.py:423
format_bytes(size, precision=2, inter=" ", strip=True)
Returns a formatted byte size (like 421.40 MB), trailing zeros optionally removed.
Definition util.py:242
get_nested(obj, path)
Returns (nested value, value parent container, key in parent container).
Definition util.py:288