5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.plugins.parquet
23except Exception: pandas = None
25except Exception: pyarrow = None
26try: import pyarrow.parquet
30from .. import api, common
31from .. common import ConsolePrinter
32from .. outputs import Sink
35class ParquetSink(Sink):
36 """Writes messages to Apache Parquet files."""
39 FILE_EXTENSIONS = (
".parquet", )
46 "bool": pyarrow.bool_,
"bool_": pyarrow.bool_,
47 "float16": pyarrow.float16,
"float64": pyarrow.float64,
48 "float32": pyarrow.float32,
"decimal128": pyarrow.decimal128,
49 "int8": pyarrow.int8,
"uint8": pyarrow.uint8,
50 "int16": pyarrow.int16,
"uint16": pyarrow.uint16,
51 "int32": pyarrow.int32,
"uint32": pyarrow.uint32,
52 "int64": pyarrow.int64,
"uint64": pyarrow.uint64,
53 "date32": pyarrow.date32,
"time32": pyarrow.time32,
54 "date64": pyarrow.date64,
"time64": pyarrow.time64,
55 "timestamp": pyarrow.timestamp,
"duration": pyarrow.duration,
56 "binary": pyarrow.binary,
"large_binary": pyarrow.large_binary,
57 "string": pyarrow.string,
"large_string": pyarrow.large_string,
58 "utf8": pyarrow.string,
"large_utf8": pyarrow.large_utf8,
59 "list": pyarrow.list_,
"list_": pyarrow.list_,
60 "large_list": pyarrow.large_list,
62 if hasattr(pyarrow,
"month_day_nano_interval"): ARROW_TYPES.update({
63 "month_day_nano_interval": pyarrow.month_day_nano_interval,
68 "int8": pyarrow.int8(),
"int16": pyarrow.int16(),
"int32": pyarrow.int32(),
69 "uint8": pyarrow.uint8(),
"uint16": pyarrow.uint16(),
"uint32": pyarrow.uint32(),
70 "int64": pyarrow.int64(),
"uint64": pyarrow.uint64(),
"bool": pyarrow.bool_(),
71 "string": pyarrow.string(),
"wstring": pyarrow.string(),
"uint8[]": pyarrow.binary(),
72 "float32": pyarrow.float32(),
"float64": pyarrow.float64(),
76 DEFAULT_TYPE = pyarrow.string()
if pyarrow
else None
79 MESSAGE_TYPE_BASECOLS = [(
"_topic",
"string"),
80 (
"_timestamp",
"time"), ]
83 MESSAGE_TYPE_NESTCOLS = [(
"_id",
"string"),
84 (
"_parent_type",
"string"),
85 (
"_parent_id",
"string"), ]
88 WRITER_ARGS = {
"version":
"2.6"}
91 DEFAULT_ARGS = dict(EMIT_FIELD=(), META=
False, NOEMIT_FIELD=(), WRITE_OPTIONS={}, VERBOSE=
False)
94 def __init__(self, args=None, **kwargs):
96 @param args arguments
as namespace
or dictionary, case-insensitive;
97 or a single path
as the base name of Parquet files to write
98 @param args.emit_field message fields to emit
in output
if not all
99 @param args.noemit_field message fields to skip
in output
100 @param args.write base name of Parquet files to write
101 @param args.write_options ```
102 {
"column": additional columns
as {name: (rostype, value)},
103 "type": {rostype: PyArrow type
or typename like
"uint8"},
104 "writer": dictionary of arguments passed to ParquetWriter,
105 "idgenerator": callable
or iterable
for producing message IDs
106 like uuid.uuid4
or itertools.count();
107 nesting uses UUID values by default,
108 "column-k=rostype:v": one
"column"-argument
110 "type-k=v: one "type
"-argument in flat string form,
111 "writer-k=v": one
"writer"-argument
in flat string form,
112 "nesting":
"array" to recursively insert arrays
113 of nested types,
or "all" for any nesting,
114 "overwrite": whether to overwrite existing file
117 @param args.meta whether to
print metainfo
118 @param args.verbose whether to
print debug information
119 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
121 args = {"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else args
122 args = common.ensure_namespace(args, ParquetSink.DEFAULT_ARGS, **kwargs)
123 super(ParquetSink, self).
__init__(args)
134 self.
_nesting = args.WRITE_OPTIONS.get(
"nesting")
142 Returns whether required libraries are available (pandas and pyarrow)
and overwrite
is valid
143 and file base
is writable.
146 ok = all([Sink.validate(self), self.
_configure()])
147 pandas_ok, pyarrow_ok = bool(pandas), bool(pyarrow)
148 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
149 ConsolePrinter.error(
"Invalid overwrite option for Parquet: %r. "
150 "Choose one of {true, false}.",
151 self.
args.WRITE_OPTIONS[
"overwrite"])
153 if self.
args.WRITE_OPTIONS.get(
"nesting")
not in (
None,
"",
"array",
"all"):
154 ConsolePrinter.error(
"Invalid nesting option for Parquet: %r. "
155 "Choose one of {array,all}.",
156 self.
args.WRITE_OPTIONS[
"nesting"])
159 ConsolePrinter.error(
"pandas not available: cannot write Parquet files.")
161 ConsolePrinter.error(
"PyArrow not available: cannot write Parquet files.")
162 if not common.verify_io(self.
args.WRITE,
"w"):
164 self.
validvalid = ok
and pandas_ok
and pyarrow_ok
166 self.
_overwrite = (self.
args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true"))
170 def emit(self, topic, msg, stamp=None, match=None, index=None):
171 """Writes message to a Parquet file."""
180 """Writes out any remaining messages, closes writers, clears structures."""
182 for k, vv
in list(self.
_caches.items()):
189 sizes = {n:
None for n
in self.
_filenames.values()}
191 try: sizes[n] = os.path.getsize(n)
192 except Exception
as e: ConsolePrinter.warn(
"Error getting size of %s: %s", n, e)
193 ConsolePrinter.debug(
"Wrote %s in %s to %s (%s)%s",
196 common.plural(
"Parquet file", sizes),
197 common.format_bytes(sum(filter(bool, sizes.values()))),
198 ":" if self.
args.VERBOSE
else ".")
199 for (t, h), name
in self.
_filenames.items()
if self.
args.VERBOSE
else ():
200 count = sum(c
for (_, t_, h_), c
in self.
_counts_counts.items()
if (t, h) == (t_, h_))
201 ConsolePrinter.debug(
"- %s (%s, %s)", name,
202 "error getting size" if sizes[name]
is None else
203 common.format_bytes(sizes[name]),
204 common.plural(
"message", count))
210 def _process_type(self, topic, msg, rootmsg=None):
211 """Prepares Parquet schema and writer if not existing."""
212 rootmsg = rootmsg
or msg
213 with api.TypeMeta.make(msg, root=rootmsg)
as m:
214 typename, typehash, typekey = (m.typename, m.typehash, m.typekey)
215 if self.
args.VERBOSE
and topic
and (topic, typename, typehash)
not in self.
_counts_counts:
216 ConsolePrinter.debug(
"Adding topic %s in Parquet output.", topic)
219 basedir, basename = os.path.split(self.
_filebase)
220 pathname = os.path.join(basedir, re.sub(
r"\W",
"__",
"%s__%s" % (typename, typehash)))
221 filename = os.path.join(pathname, basename)
223 filename = common.unique_path(filename)
228 for path, value, subtype
in api.iter_message_fields(msg, scalars=scalars, **fltrs):
230 cols += [(
".".join(path), coltype)]
232 cols += [(c, self.
_make_column_type(t, fallback=
"int64" if "time" == t
else None))
235 if self.
args.VERBOSE:
236 sz = os.path.isfile(filename)
and os.path.getsize(filename)
237 action =
"Overwriting" if sz
and self.
_overwrite else "Adding"
238 ConsolePrinter.debug(
"%s type %s in Parquet output.", action, typename)
239 common.makedirs(pathname)
241 schema = pyarrow.schema(cols)
248 nesteds = api.iter_message_fields(msg, messages_only=
True, **fltrs)
if self.
_nesting else ()
249 for path, submsgs, subtype
in nesteds:
250 scalartype = api.scalar(subtype)
251 if subtype == scalartype
and "all" != self.
_nesting:
253 subtypehash =
not submsgs
and self.
source.get_message_type_hash(scalartype)
254 if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
255 [submsg] = submsgs[:1]
or [self.
source.get_message_class(scalartype, subtypehash)()]
259 def _process_message(self, topic, index, stamp, msg, match=None,
260 rootmsg=None, parent_type=None, parent_id=None):
262 Converts message to pandas dataframe, adds to cache.
264 Writes cache to disk if length reached chunk size.
266 If nesting
is enabled, processes nested messages
for subtypes
in message.
267 If IDs are used, returns generated ID.
269 data, myid, rootmsg = {}, None, (rootmsg
or None)
271 with api.TypeMeta.make(msg, topic, root=rootmsg)
as m:
272 typename, typekey = m.typename, m.typekey
280 data.update(zip(COLS, [parent_type, parent_id]))
282 self.
_caches[typekey].append(data)
283 super(ParquetSink, self).
emit(topic, msg, stamp, match, index)
286 nesteds = api.iter_message_fields(msg, messages_only=
True, **fltrs)
if self.
_nesting else ()
287 for path, submsgs, subtype
in nesteds:
288 scalartype = api.scalar(subtype)
289 if subtype == scalartype
and "all" != self.
_nesting:
291 if isinstance(submsgs, (list, tuple)):
293 for submsg
in submsgs
if isinstance(submsgs, (list, tuple))
else [submsgs]:
295 rootmsg=rootmsg, parent_type=typename, parent_id=myid)
296 if isinstance(submsgs, (list, tuple)):
297 subids[path].append(subid)
305 def _make_column_type(self, typename, fallback=None):
307 Returns pyarrow type for ROS type.
309 @param fallback fallback typename to use
for lookup
if typename
not found
311 noboundtype = api.canonical(typename, unbounded=True)
312 scalartype = api.scalar(typename)
313 timetype = api.get_ros_time_category(scalartype)
319 if typename != scalartype:
323 if not coltype
and fallback:
330 def _make_column_value(self, value, typename=None):
331 """Returns column value suitable for adding to Parquet file."""
333 if isinstance(v, (list, tuple)):
334 noboundtype = api.canonical(typename, unbounded=
True)
335 if v
and api.is_ros_time(v[0]):
336 v = [api.to_nsec(x)
for x
in v]
337 elif api.scalar(typename)
not in api.ROS_BUILTIN_TYPES:
338 v = str([api.message_to_dict(m)
for m
in v])
341 v = bytes(bytearray(v))
344 elif api.is_ros_time(v):
346 elif typename
and typename
not in api.ROS_BUILTIN_TYPES:
347 v = str(api.message_to_dict(v))
351 def _write_table(self, typekey):
352 """Writes out cached messages for type."""
353 dicts = self.
_caches[typekey][:]
355 mapping = {k: [d[k]
for d
in dicts]
for k
in dicts[0]}
356 with warnings.catch_warnings():
357 warnings.simplefilter(
"ignore", UserWarning)
358 table = pyarrow.Table.from_pydict(mapping, self.
_schemas[typekey])
359 self.
_writers[typekey].write_table(table)
362 def _configure(self):
363 """Parses args.WRITE_OPTIONS, returns success."""
371 def process_column(name, rostype, value):
372 v, myok = value,
True
373 if "string" not in rostype:
377 ConsolePrinter.error(
"Invalid name option in %s=%s:%s", name, rostype, v)
378 if rostype
not in api.ROS_BUILTIN_TYPES:
380 ConsolePrinter.error(
"Invalid type option in %s=%s:%s", name, rostype, v)
386 def process_type(rostype, arrowtype):
388 arrowtype = eval(compile(arrowtype,
"",
"eval"), {
"__builtins__": self.
ARROW_TYPES})
391 for key, vals
in [(
"print", self.
args.EMIT_FIELD), (
"noprint", self.
args.NOEMIT_FIELD)]:
392 self.
_patterns[key] = [(tuple(v.split(
".")), common.path_to_regex(v))
for v
in vals]
396 alias = api.get_type_alias(rostype)
402 for k, v
in self.
args.WRITE_OPTIONS.items():
403 if "column" == k
and v
and isinstance(v, dict):
404 for name, (rostype, value)
in v.items():
405 if not process_column(name, rostype, value): ok =
False
406 elif "type" == k
and v
and isinstance(v, dict):
407 for name, value
in v.items():
408 if not process_type(name, value): ok =
False
409 elif "writer" == k
and v
and isinstance(v, dict):
411 elif isinstance(k, str)
and "-" in k:
412 category, name = k.split(
"-", 1)
413 if category
not in (
"column",
"type",
"writer"):
414 ConsolePrinter.warn(
"Unknown %r option in %s=%s", category, k, v)
417 if not name:
raise Exception(
"empty name")
418 if "column" == category:
419 if not process_column(name, *v.split(
":", 1)): ok =
False
420 elif "type" == category:
421 process_type(name, v)
422 elif "writer" == category:
423 try: v = json.loads(v)
424 except Exception:
pass
426 except Exception
as e:
428 ConsolePrinter.error(
"Invalid %s option in %s=%s: %s", category, k, v, e)
432 def _configure_ids(self):
433 """Configures ID generator from args.WRITE_OPTIONS, returns success."""
438 k, v =
"idgenerator", self.
args.WRITE_OPTIONS.get(
"idgenerator")
439 if k
in self.
args.WRITE_OPTIONS:
440 val, ex, ns = v,
None, dict(self.
ARROW_TYPES, itertools=itertools, uuid=uuid)
441 for root
in v.split(
".", 1)[:1]:
442 try: ns[root] = common.import_item(root)
443 except Exception:
pass
444 try: common.import_item(re.sub(
r"\(.+",
"", v))
445 except Exception:
pass
446 try: val = eval(compile(v,
"",
"eval"), ns)
447 except Exception
as e: ok, ex =
False, e
448 if isinstance(val, (six.binary_type, six.text_type)): ok =
False
452 except Exception
as e:
454 except Exception
as e: ok, ex =
False, e
456 ConsolePrinter.error(
"Invalid value in %s=%s%s", k, v, (
": %s" % ex
if ex
else ""))
462 if api.is_ros_time(fval):
463 typename =
"time" if "time" in str(type(fval)).lower()
else "duration"
464 elif isinstance(fval, six.integer_types):
466 elif isinstance(fval, float):
468 elif not isinstance(fval, str):
469 fval, self.
_idgenerator = str(fval), (str(x)
for x
in generator)
471 repl =
lambda n, t: (n, typename)
if "_id" in n
else (n, t)
480 """Adds Parquet output format support. Raises ImportWarning if libraries not available."""
481 if not pandas
or not pyarrow:
482 ConsolePrinter.error(
"pandas or PyArrow not available: cannot write Parquet files.")
483 raise ImportWarning()
484 from ..
import plugins
485 plugins.add_write_format(
"parquet", ParquetSink,
"Parquet", [
486 (
"column-NAME=ROSTYPE:VALUE",
"additional column to add in Parquet output,\n"
487 "like column-bag_hash=string:26dfba2c"),
488 (
"idgenerator=CALLABLE",
"callable or iterable for producing message IDs \n"
489 "in Parquet output, like 'uuid.uuid4' or 'itertools.count()';\n"
490 "nesting uses UUID values by default"),
491 (
"nesting=array|all",
"create tables for nested message types\n"
492 "in Parquet output,\n"
493 'only for arrays if "array" \n'
494 "else for any nested types\n"
495 "(array fields in parent will be populated \n"
496 " with foreign keys instead of messages as JSON)"),
497 (
"overwrite=true|false",
"overwrite existing file in Parquet output\n"
498 "instead of appending unique counter (default false)"),
499 (
"type-ROSTYPE=ARROWTYPE",
"custom mapping between ROS and pyarrow type\n"
500 "for Parquet output, like type-time=\"timestamp('ns')\"\n"
501 "or type-uint8[]=\"list(uint8())\""),
502 (
"writer-ARGNAME=ARGVALUE",
"additional arguments for Parquet output\n"
503 "given to pyarrow.parquet.ParquetWriter"),
505 plugins.add_output_label(
"Parquet", [
"--emit-field",
"--no-emit-field"])
508__all__ = [
"ParquetSink",
"init"]
_ensure_stamp_index(self, topic, msg, stamp=None, index=None)
source
inputs.Source instance bound to this sink
valid
Result of validate()
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
dict ARROW_TYPES
Mapping from pyarrow type names and aliases to pyarrow type constructors.
__init__(self, args=None, **kwargs)
int CHUNK_SIZE
Number of dataframes to cache before writing, per type.
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes message to a Parquet file.
DEFAULT_TYPE
Fallback pyarrow type if mapped type not found.
dict WRITER_ARGS
Custom arguments for pyarrow.parquet.ParquetWriter.
dict COMMON_TYPES
Mapping from ROS common type names to pyarrow type constructors.
validate(self)
Returns whether required libraries are available (pandas and pyarrow) and overwrite is valid and file...
close(self)
Writes out any remaining messages, closes writers, clears structures.
list MESSAGE_TYPE_BASECOLS
Default columns for message type tables.
list MESSAGE_TYPE_NESTCOLS
Additional default columns for messaga type tables with nesting output.
init(*_, **__)
Adds Parquet output format support.