36 """Writes messages to Apache Parquet files."""
39 FILE_EXTENSIONS = (
".parquet", )
46 "bool": pyarrow.bool_,
"bool_": pyarrow.bool_,
47 "float16": pyarrow.float16,
"float64": pyarrow.float64,
48 "float32": pyarrow.float32,
"decimal128": pyarrow.decimal128,
49 "int8": pyarrow.int8,
"uint8": pyarrow.uint8,
50 "int16": pyarrow.int16,
"uint16": pyarrow.uint16,
51 "int32": pyarrow.int32,
"uint32": pyarrow.uint32,
52 "int64": pyarrow.int64,
"uint64": pyarrow.uint64,
53 "date32": pyarrow.date32,
"time32": pyarrow.time32,
54 "date64": pyarrow.date64,
"time64": pyarrow.time64,
55 "timestamp": pyarrow.timestamp,
"duration": pyarrow.duration,
56 "binary": pyarrow.binary,
"large_binary": pyarrow.large_binary,
57 "string": pyarrow.string,
"large_string": pyarrow.large_string,
58 "utf8": pyarrow.string,
"large_utf8": pyarrow.large_utf8,
59 "list": pyarrow.list_,
"list_": pyarrow.list_,
60 "large_list": pyarrow.large_list,
62 if hasattr(pyarrow,
"month_day_nano_interval"): ARROW_TYPES.update({
63 "month_day_nano_interval": pyarrow.month_day_nano_interval,
68 "int8": pyarrow.int8(),
"int16": pyarrow.int16(),
"int32": pyarrow.int32(),
69 "uint8": pyarrow.uint8(),
"uint16": pyarrow.uint16(),
"uint32": pyarrow.uint32(),
70 "int64": pyarrow.int64(),
"uint64": pyarrow.uint64(),
"bool": pyarrow.bool_(),
71 "string": pyarrow.string(),
"wstring": pyarrow.string(),
"uint8[]": pyarrow.binary(),
72 "float32": pyarrow.float32(),
"float64": pyarrow.float64(),
76 DEFAULT_TYPE = pyarrow.string()
if pyarrow
else None
79 MESSAGE_TYPE_BASECOLS = [(
"_topic",
"string"),
80 (
"_timestamp",
"time"), ]
83 MESSAGE_TYPE_NESTCOLS = [(
"_id",
"string"),
84 (
"_parent_type",
"string"),
85 (
"_parent_id",
"string"), ]
88 WRITER_ARGS = {
"version":
"2.6"}
91 DEFAULT_ARGS = dict(EMIT_FIELD=(), META=
False, NOEMIT_FIELD=(), WRITE_OPTIONS={}, VERBOSE=
False)
96 @param args arguments as namespace or dictionary, case-insensitive;
97 or a single path as the base name of Parquet files to write
98 @param args.emit_field message fields to emit in output if not all
99 @param args.noemit_field message fields to skip in output
100 @param args.write base name of Parquet files to write
101 @param args.write_options ```
102 {"column": additional columns as {name: (rostype, value)},
103 "type": {rostype: PyArrow type or typename like "uint8"},
104 "writer": dictionary of arguments passed to ParquetWriter,
105 "idgenerator": callable or iterable for producing message IDs
106 like uuid.uuid4 or itertools.count();
107 nesting uses UUID values by default,
108 "column-k=rostype:v": one "column"-argument
110 "type-k=v: one "type"-argument in flat string form,
111 "writer-k=v": one "writer"-argument in flat string form,
112 "nesting": "array" to recursively insert arrays
113 of nested types, or "all" for any nesting,
114 "overwrite": whether to overwrite existing file
117 @param args.meta whether to print metainfo
118 @param args.verbose whether to print debug information
119 @param kwargs any and all arguments as keyword overrides, case-insensitive
121 args = {
"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else args
122 args = common.ensure_namespace(args, ParquetSink.DEFAULT_ARGS, **kwargs)
123 super(ParquetSink, self).
__init__(args)
134 self.
_nesting = args.WRITE_OPTIONS.get(
"nesting")
142 Returns whether required libraries are available (pandas and pyarrow) and overwrite is valid
143 and file base is writable.
146 ok = all([Sink.validate(self), self.
_configure()])
147 pandas_ok, pyarrow_ok = bool(pandas), bool(pyarrow)
148 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
149 ConsolePrinter.error(
"Invalid overwrite option for Parquet: %r. "
150 "Choose one of {true, false}.",
151 self.
args.WRITE_OPTIONS[
"overwrite"])
153 if self.
args.WRITE_OPTIONS.get(
"nesting")
not in (
None,
"",
"array",
"all"):
154 ConsolePrinter.error(
"Invalid nesting option for Parquet: %r. "
155 "Choose one of {array,all}.",
156 self.
args.WRITE_OPTIONS[
"nesting"])
159 ConsolePrinter.error(
"pandas not available: cannot write Parquet files.")
161 ConsolePrinter.error(
"PyArrow not available: cannot write Parquet files.")
162 if not common.verify_io(self.
args.WRITE,
"w"):
166 self.
_overwrite = (self.
args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true"))
170 def emit(self, topic, msg, stamp=None, match=None, index=None):
171 """Writes message to a Parquet file."""
180 """Writes out any remaining messages, closes writers, clears structures."""
182 for k, vv
in list(self.
_caches.items()):
189 sizes = {n:
None for n
in self.
_filenames.values()}
191 try: sizes[n] = os.path.getsize(n)
192 except Exception
as e: ConsolePrinter.warn(
"Error getting size of %s: %s", n, e)
193 ConsolePrinter.debug(
"Wrote %s in %s to %s (%s)%s",
196 common.plural(
"Parquet file", sizes),
197 common.format_bytes(sum(filter(bool, sizes.values()))),
198 ":" if self.
args.VERBOSE
else ".")
199 for (t, h), name
in self.
_filenames.items()
if self.
args.VERBOSE
else ():
200 count = sum(c
for (_, t_, h_), c
in self.
_counts_counts.items()
if (t, h) == (t_, h_))
201 ConsolePrinter.debug(
"- %s (%s, %s)", name,
202 "error getting size" if sizes[name]
is None else
203 common.format_bytes(sizes[name]),
204 common.plural(
"message", count))
210 def _process_type(self, topic, msg, rootmsg=None):
211 """Prepares Parquet schema and writer if not existing."""
212 rootmsg = rootmsg
or msg
213 with api.TypeMeta.make(msg, root=rootmsg)
as m:
214 typename, typehash, typekey = (m.typename, m.typehash, m.typekey)
215 if self.
args.VERBOSE
and topic
and (topic, typename, typehash)
not in self.
_counts_counts:
216 ConsolePrinter.debug(
"Adding topic %s in Parquet output.", topic)
219 basedir, basename = os.path.split(self.
_filebase)
220 pathname = os.path.join(basedir, re.sub(
r"\W",
"__",
"%s__%s" % (typename, typehash)))
221 filename = os.path.join(pathname, basename)
223 filename = common.unique_path(filename)
228 for path, value, subtype
in api.iter_message_fields(msg, scalars=scalars, **fltrs):
230 cols += [(
".".join(path), coltype)]
232 cols += [(c, self.
_make_column_type(t, fallback=
"int64" if "time" == t
else None))
235 if self.
args.VERBOSE:
236 sz = os.path.isfile(filename)
and os.path.getsize(filename)
237 action =
"Overwriting" if sz
and self.
_overwrite else "Adding"
238 ConsolePrinter.debug(
"%s type %s in Parquet output.", action, typename)
239 common.makedirs(pathname)
241 schema = pyarrow.schema(cols)
248 nesteds = api.iter_message_fields(msg, messages_only=
True, **fltrs)
if self.
_nesting else ()
249 for path, submsgs, subtype
in nesteds:
250 scalartype = api.scalar(subtype)
251 if subtype == scalartype
and "all" != self.
_nesting:
253 subtypehash =
not submsgs
and self.
source.get_message_type_hash(scalartype)
254 if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
255 [submsg] = submsgs[:1]
or [self.
source.get_message_class(scalartype, subtypehash)()]
259 def _process_message(self, topic, index, stamp, msg, match=None,
260 rootmsg=None, parent_type=None, parent_id=None):
262 Converts message to pandas dataframe, adds to cache.
264 Writes cache to disk if length reached chunk size.
266 If nesting is enabled, processes nested messages for subtypes in message.
267 If IDs are used, returns generated ID.
269 data, myid, rootmsg = {},
None, (rootmsg
or None)
271 with api.TypeMeta.make(msg, topic, root=rootmsg)
as m:
272 typename, typekey = m.typename, m.typekey
280 data.update(zip(COLS, [parent_type, parent_id]))
282 self.
_caches[typekey].append(data)
283 super(ParquetSink, self).
emit(topic, msg, stamp, match, index)
286 nesteds = api.iter_message_fields(msg, messages_only=
True, **fltrs)
if self.
_nesting else ()
287 for path, submsgs, subtype
in nesteds:
288 scalartype = api.scalar(subtype)
289 if subtype == scalartype
and "all" != self.
_nesting:
291 if isinstance(submsgs, (list, tuple)):
293 for submsg
in submsgs
if isinstance(submsgs, (list, tuple))
else [submsgs]:
295 rootmsg=rootmsg, parent_type=typename, parent_id=myid)
296 if isinstance(submsgs, (list, tuple)):
297 subids[path].append(subid)
305 def _make_column_type(self, typename, fallback=None):
307 Returns pyarrow type for ROS type.
309 @param fallback fallback typename to use for lookup if typename not found
311 noboundtype = api.canonical(typename, unbounded=
True)
312 scalartype = api.scalar(typename)
313 timetype = api.get_ros_time_category(scalartype)
319 if typename != scalartype:
323 if not coltype
and fallback:
330 def _make_column_value(self, value, typename=None):
331 """Returns column value suitable for adding to Parquet file."""
333 if isinstance(v, (list, tuple)):
334 noboundtype = api.canonical(typename, unbounded=
True)
335 if v
and api.is_ros_time(v[0]):
336 v = [api.to_nsec(x)
for x
in v]
337 elif api.scalar(typename)
not in api.ROS_BUILTIN_TYPES:
338 v = str([api.message_to_dict(m)
for m
in v])
341 v = bytes(bytearray(v))
344 elif api.is_ros_time(v):
346 elif typename
and typename
not in api.ROS_BUILTIN_TYPES:
347 v = str(api.message_to_dict(v))
351 def _write_table(self, typekey):
352 """Writes out cached messages for type."""
353 dicts = self.
_caches[typekey][:]
355 mapping = {k: [d[k]
for d
in dicts]
for k
in dicts[0]}
356 with warnings.catch_warnings():
357 warnings.simplefilter(
"ignore", UserWarning)
358 table = pyarrow.Table.from_pydict(mapping, self.
_schemas[typekey])
359 self.
_writers[typekey].write_table(table)
362 def _configure(self):
363 """Parses args.WRITE_OPTIONS, returns success."""
371 def process_column(name, rostype, value):
372 v, myok = value,
True
373 if "string" not in rostype:
377 ConsolePrinter.error(
"Invalid name option in %s=%s:%s", name, rostype, v)
378 if rostype
not in api.ROS_BUILTIN_TYPES:
380 ConsolePrinter.error(
"Invalid type option in %s=%s:%s", name, rostype, v)
386 def process_type(rostype, arrowtype):
388 arrowtype = eval(compile(arrowtype,
"",
"eval"), {
"__builtins__": self.
ARROW_TYPES})
391 for key, vals
in [(
"print", self.
args.EMIT_FIELD), (
"noprint", self.
args.NOEMIT_FIELD)]:
392 self.
_patterns[key] = [(tuple(v.split(
".")), common.path_to_regex(v))
for v
in vals]
396 alias = api.get_type_alias(rostype)
402 for k, v
in self.
args.WRITE_OPTIONS.items():
403 if "column" == k
and v
and isinstance(v, dict):
404 for name, (rostype, value)
in v.items():
405 if not process_column(name, rostype, value): ok =
False
406 elif "type" == k
and v
and isinstance(v, dict):
407 for name, value
in v.items():
408 if not process_type(name, value): ok =
False
409 elif "writer" == k
and v
and isinstance(v, dict):
411 elif isinstance(k, str)
and "-" in k:
412 category, name = k.split(
"-", 1)
413 if category
not in (
"column",
"type",
"writer"):
414 ConsolePrinter.warn(
"Unknown %r option in %s=%s", category, k, v)
417 if not name:
raise Exception(
"empty name")
418 if "column" == category:
419 if not process_column(name, *v.split(
":", 1)): ok =
False
420 elif "type" == category:
421 process_type(name, v)
422 elif "writer" == category:
423 try: v = json.loads(v)
424 except Exception:
pass
426 except Exception
as e:
428 ConsolePrinter.error(
"Invalid %s option in %s=%s: %s", category, k, v, e)
432 def _configure_ids(self):
433 """Configures ID generator from args.WRITE_OPTIONS, returns success."""
438 k, v =
"idgenerator", self.
args.WRITE_OPTIONS.get(
"idgenerator")
439 if k
in self.
args.WRITE_OPTIONS:
440 val, ex, ns = v,
None, dict(self.
ARROW_TYPES, itertools=itertools, uuid=uuid)
441 for root
in v.split(
".", 1)[:1]:
442 try: ns[root] = common.import_item(root)
443 except Exception:
pass
444 try: common.import_item(re.sub(
r"\(.+",
"", v))
445 except Exception:
pass
446 try: val = eval(compile(v,
"",
"eval"), ns)
447 except Exception
as e: ok, ex =
False, e
448 if isinstance(val, (six.binary_type, six.text_type)): ok =
False
452 except Exception
as e:
454 except Exception
as e: ok, ex =
False, e
456 ConsolePrinter.error(
"Invalid value in %s=%s%s", k, v, (
": %s" % ex
if ex
else ""))
462 if api.is_ros_time(fval):
463 typename =
"time" if "time" in str(type(fval)).lower()
else "duration"
464 elif isinstance(fval, six.integer_types):
466 elif isinstance(fval, float):
468 elif not isinstance(fval, str):
469 fval, self.
_idgenerator = str(fval), (str(x)
for x
in generator)
471 repl =
lambda n, t: (n, typename)
if "_id" in n
else (n, t)