grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
parquet.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3Parquet output plugin.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 14.12.2021
11@modified 21.04.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.plugins.parquet
15import itertools
16import json
17import os
18import re
19import uuid
20import warnings
21
22try: import pandas
23except Exception: pandas = None
24try: import pyarrow
25except Exception: pyarrow = None
26try: import pyarrow.parquet
27except Exception: pass
28import six
29
30from .. import api, common
31from .. common import ConsolePrinter
32from .. outputs import Sink
33
34
35class ParquetSink(Sink):
36 """Writes messages to Apache Parquet files."""
37
38
39 FILE_EXTENSIONS = (".parquet", )
40
41
42 CHUNK_SIZE = 100
43
44
45 ARROW_TYPES = {
46 "bool": pyarrow.bool_, "bool_": pyarrow.bool_,
47 "float16": pyarrow.float16, "float64": pyarrow.float64,
48 "float32": pyarrow.float32, "decimal128": pyarrow.decimal128,
49 "int8": pyarrow.int8, "uint8": pyarrow.uint8,
50 "int16": pyarrow.int16, "uint16": pyarrow.uint16,
51 "int32": pyarrow.int32, "uint32": pyarrow.uint32,
52 "int64": pyarrow.int64, "uint64": pyarrow.uint64,
53 "date32": pyarrow.date32, "time32": pyarrow.time32,
54 "date64": pyarrow.date64, "time64": pyarrow.time64,
55 "timestamp": pyarrow.timestamp, "duration": pyarrow.duration,
56 "binary": pyarrow.binary, "large_binary": pyarrow.large_binary,
57 "string": pyarrow.string, "large_string": pyarrow.large_string,
58 "utf8": pyarrow.string, "large_utf8": pyarrow.large_utf8,
59 "list": pyarrow.list_, "list_": pyarrow.list_,
60 "large_list": pyarrow.large_list,
61 } if pyarrow else {}
62 if hasattr(pyarrow, "month_day_nano_interval"): ARROW_TYPES.update({ # Py3
63 "month_day_nano_interval": pyarrow.month_day_nano_interval,
64 })
65
66
67 COMMON_TYPES = {
68 "int8": pyarrow.int8(), "int16": pyarrow.int16(), "int32": pyarrow.int32(),
69 "uint8": pyarrow.uint8(), "uint16": pyarrow.uint16(), "uint32": pyarrow.uint32(),
70 "int64": pyarrow.int64(), "uint64": pyarrow.uint64(), "bool": pyarrow.bool_(),
71 "string": pyarrow.string(), "wstring": pyarrow.string(), "uint8[]": pyarrow.binary(),
72 "float32": pyarrow.float32(), "float64": pyarrow.float64(),
73 } if pyarrow else {}
74
75
76 DEFAULT_TYPE = pyarrow.string() if pyarrow else None
77
78
79 MESSAGE_TYPE_BASECOLS = [("_topic", "string"),
80 ("_timestamp", "time"), ]
81
82
83 MESSAGE_TYPE_NESTCOLS = [("_id", "string"),
84 ("_parent_type", "string"),
85 ("_parent_id", "string"), ]
86
87
88 WRITER_ARGS = {"version": "2.6"}
89
90
91 DEFAULT_ARGS = dict(EMIT_FIELD=(), META=False, NOEMIT_FIELD=(), WRITE_OPTIONS={}, VERBOSE=False)
92
93
94 def __init__(self, args=None, **kwargs):
95 """
96 @param args arguments as namespace or dictionary, case-insensitive;
97 or a single path as the base name of Parquet files to write
98 @param args.emit_field message fields to emit in output if not all
99 @param args.noemit_field message fields to skip in output
100 @param args.write base name of Parquet files to write
101 @param args.write_options ```
102 {"column": additional columns as {name: (rostype, value)},
103 "type": {rostype: PyArrow type or typename like "uint8"},
104 "writer": dictionary of arguments passed to ParquetWriter,
105 "idgenerator": callable or iterable for producing message IDs
106 like uuid.uuid4 or itertools.count();
107 nesting uses UUID values by default,
108 "column-k=rostype:v": one "column"-argument
109 in flat string form,
110 "type-k=v: one "type"-argument in flat string form,
111 "writer-k=v": one "writer"-argument in flat string form,
112 "nesting": "array" to recursively insert arrays
113 of nested types, or "all" for any nesting,
114 "overwrite": whether to overwrite existing file
115 (default false)}
116 ```
117 @param args.meta whether to print metainfo
118 @param args.verbose whether to print debug information
119 @param kwargs any and all arguments as keyword overrides, case-insensitive
120 """
121 args = {"WRITE": str(args)} if isinstance(args, common.PATH_TYPES) else args
122 args = common.ensure_namespace(args, ParquetSink.DEFAULT_ARGS, **kwargs)
123 super(ParquetSink, self).__init__(args)
124
125 self._filebase = args.WRITE
126 self._overwrite = None
127 self._filenames = {} # {(typename, typehash): Parquet file path}
128 self._caches = {} # {(typename, typehash): [{data}, ]}
129 self._schemas = {} # {(typename, typehash): pyarrow.Schema}
130 self._writers = {} # {(typename, typehash): pyarrow.parquet.ParquetWriter}
131 self._extra_basecols = [] # [(name, rostype)]
132 self._extra_basevals = [] # [(name, value)]
133 self._patterns = {} # {key: [(() if any field else ('path', ), re.Pattern), ]}
134 self._nesting = args.WRITE_OPTIONS.get("nesting")
135 self._idgenerator = iter(lambda: str(uuid.uuid4()), self) if self._nesting else None
136
137 self._close_printed = False
138
139
140 def validate(self):
141 """
142 Returns whether required libraries are available (pandas and pyarrow) and overwrite is valid
143 and file base is writable.
144 """
145 if self.validvalid is not None: return self.validvalid
146 ok = all([Sink.validate(self), self._configure()])
147 pandas_ok, pyarrow_ok = bool(pandas), bool(pyarrow)
148 if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
149 ConsolePrinter.error("Invalid overwrite option for Parquet: %r. "
150 "Choose one of {true, false}.",
151 self.args.WRITE_OPTIONS["overwrite"])
152 ok = False
153 if self.args.WRITE_OPTIONS.get("nesting") not in (None, "", "array", "all"):
154 ConsolePrinter.error("Invalid nesting option for Parquet: %r. "
155 "Choose one of {array,all}.",
156 self.args.WRITE_OPTIONS["nesting"])
157 ok = False
158 if not pandas_ok:
159 ConsolePrinter.error("pandas not available: cannot write Parquet files.")
160 if not pyarrow_ok:
161 ConsolePrinter.error("PyArrow not available: cannot write Parquet files.")
162 if not common.verify_io(self.args.WRITE, "w"):
163 ok = False
164 self.validvalid = ok and pandas_ok and pyarrow_ok
165 if self.validvalid:
166 self._overwrite = (self.args.WRITE_OPTIONS.get("overwrite") in (True, "true"))
167 return self.validvalid
168
169
170 def emit(self, topic, msg, stamp=None, match=None, index=None):
171 """Writes message to a Parquet file."""
172 if not self.validatevalidate(): raise Exception("invalid")
173 stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
174 self._process_type(topic, msg)
175 self._process_message(topic, index, stamp, msg, match)
176 self._close_printed = False
177
178
179 def close(self):
180 """Writes out any remaining messages, closes writers, clears structures."""
181 try:
182 for k, vv in list(self._caches.items()):
183 vv and self._write_table(k)
184 for k in list(self._writers):
185 self._writers.pop(k).close()
186 finally:
187 if not self._close_printed and self._counts_counts:
188 self._close_printed = True
189 sizes = {n: None for n in self._filenames.values()}
190 for n in self._filenames.values():
191 try: sizes[n] = os.path.getsize(n)
192 except Exception as e: ConsolePrinter.warn("Error getting size of %s: %s", n, e)
193 ConsolePrinter.debug("Wrote %s in %s to %s (%s)%s",
194 common.plural("message", sum(self._counts_counts.values())),
195 common.plural("topic", self._counts_counts),
196 common.plural("Parquet file", sizes),
197 common.format_bytes(sum(filter(bool, sizes.values()))),
198 ":" if self.args.VERBOSE else ".")
199 for (t, h), name in self._filenames.items() if self.args.VERBOSE else ():
200 count = sum(c for (_, t_, h_), c in self._counts_counts.items() if (t, h) == (t_, h_))
201 ConsolePrinter.debug("- %s (%s, %s)", name,
202 "error getting size" if sizes[name] is None else
203 common.format_bytes(sizes[name]),
204 common.plural("message", count))
205 self._caches.clear()
206 self._schemas.clear()
207 self._filenames.clear()
208
209
210 def _process_type(self, topic, msg, rootmsg=None):
211 """Prepares Parquet schema and writer if not existing."""
212 rootmsg = rootmsg or msg
213 with api.TypeMeta.make(msg, root=rootmsg) as m:
214 typename, typehash, typekey = (m.typename, m.typehash, m.typekey)
215 if self.args.VERBOSE and topic and (topic, typename, typehash) not in self._counts_counts:
216 ConsolePrinter.debug("Adding topic %s in Parquet output.", topic)
217 if typekey in self._writers: return
218
219 basedir, basename = os.path.split(self._filebase)
220 pathname = os.path.join(basedir, re.sub(r"\W", "__", "%s__%s" % (typename, typehash)))
221 filename = os.path.join(pathname, basename)
222 if not self._overwrite:
223 filename = common.unique_path(filename)
224
225 cols = []
226 scalars = set(x for x in self.COMMON_TYPESCOMMON_TYPES if "[" not in x)
227 fltrs = dict(include=self._patterns["print"], exclude=self._patterns["noprint"])
228 for path, value, subtype in api.iter_message_fields(msg, scalars=scalars, **fltrs):
229 coltype = self._make_column_type(subtype)
230 cols += [(".".join(path), coltype)]
232 cols += [(c, self._make_column_type(t, fallback="int64" if "time" == t else None))
233 for c, t in MSGCOLS + self._extra_basecols]
234
235 if self.args.VERBOSE:
236 sz = os.path.isfile(filename) and os.path.getsize(filename)
237 action = "Overwriting" if sz and self._overwrite else "Adding"
238 ConsolePrinter.debug("%s type %s in Parquet output.", action, typename)
239 common.makedirs(pathname)
240
241 schema = pyarrow.schema(cols)
242 writer = pyarrow.parquet.ParquetWriter(filename, schema, **self.WRITER_ARGSWRITER_ARGS)
243 self._caches[typekey] = []
244 self._filenames[typekey] = filename
245 self._schemas[typekey] = schema
246 self._writers[typekey] = writer
247
248 nesteds = api.iter_message_fields(msg, messages_only=True, **fltrs) if self._nesting else ()
249 for path, submsgs, subtype in nesteds:
250 scalartype = api.scalar(subtype)
251 if subtype == scalartype and "all" != self._nesting:
252 continue # for path
253 subtypehash = not submsgs and self.source.get_message_type_hash(scalartype)
254 if not isinstance(submsgs, (list, tuple)): submsgs = [submsgs]
255 [submsg] = submsgs[:1] or [self.source.get_message_class(scalartype, subtypehash)()]
256 self._process_type(None, submsg, rootmsg)
257
258
259 def _process_message(self, topic, index, stamp, msg, match=None,
260 rootmsg=None, parent_type=None, parent_id=None):
261 """
262 Converts message to pandas dataframe, adds to cache.
263
264 Writes cache to disk if length reached chunk size.
265
266 If nesting is enabled, processes nested messages for subtypes in message.
267 If IDs are used, returns generated ID.
268 """
269 data, myid, rootmsg = {}, None, (rootmsg or None)
270 if self._idgenerator: myid = next(self._idgenerator)
271 with api.TypeMeta.make(msg, topic, root=rootmsg) as m:
272 typename, typekey = m.typename, m.typekey
273 fltrs = dict(include=self._patterns["print"], exclude=self._patterns["noprint"])
274 for p, v, t in api.iter_message_fields(msg, scalars=set(self.COMMON_TYPESCOMMON_TYPES), **fltrs):
275 data[".".join(p)] = self._make_column_value(v, t)
276 data.update(_topic=topic, _timestamp=self._make_column_value(stamp, "time"))
277 if self._idgenerator: data.update(_id=myid)
278 if self._nesting:
279 COLS = [k for k, _ in self.MESSAGE_TYPE_NESTCOLSMESSAGE_TYPE_NESTCOLS if "parent" in k]
280 data.update(zip(COLS, [parent_type, parent_id]))
281 data.update(self._extra_basevals)
282 self._caches[typekey].append(data)
283 super(ParquetSink, self).emit(topic, msg, stamp, match, index)
284
285 subids = {} # {message field path: [ids]}
286 nesteds = api.iter_message_fields(msg, messages_only=True, **fltrs) if self._nesting else ()
287 for path, submsgs, subtype in nesteds:
288 scalartype = api.scalar(subtype)
289 if subtype == scalartype and "all" != self._nesting:
290 continue # for path
291 if isinstance(submsgs, (list, tuple)):
292 subids[path] = []
293 for submsg in submsgs if isinstance(submsgs, (list, tuple)) else [submsgs]:
294 subid = self._process_message(topic, index, stamp, submsg,
295 rootmsg=rootmsg, parent_type=typename, parent_id=myid)
296 if isinstance(submsgs, (list, tuple)):
297 subids[path].append(subid)
298 data.update(subids)
299
300 if len(self._caches[typekey]) >= self.CHUNK_SIZE:
301 self._write_table(typekey)
302 return myid
303
304
305 def _make_column_type(self, typename, fallback=None):
306 """
307 Returns pyarrow type for ROS type.
308
309 @param fallback fallback typename to use for lookup if typename not found
310 """
311 noboundtype = api.canonical(typename, unbounded=True)
312 scalartype = api.scalar(typename)
313 timetype = api.get_ros_time_category(scalartype)
314 coltype = self.COMMON_TYPESCOMMON_TYPES.get(typename) or self.COMMON_TYPESCOMMON_TYPES.get(noboundtype)
315
316 if not coltype and scalartype in self.COMMON_TYPESCOMMON_TYPES:
317 coltype = pyarrow.list_(self.COMMON_TYPESCOMMON_TYPES[scalartype])
318 if not coltype and timetype in self.COMMON_TYPESCOMMON_TYPES:
319 if typename != scalartype:
320 coltype = pyarrow.list_(self.COMMON_TYPESCOMMON_TYPES[timetype])
321 else:
322 coltype = self.COMMON_TYPESCOMMON_TYPES[timetype]
323 if not coltype and fallback:
324 coltype = self._make_column_type(fallback)
325 if not coltype:
326 coltype = self.DEFAULT_TYPE
327 return coltype
328
329
330 def _make_column_value(self, value, typename=None):
331 """Returns column value suitable for adding to Parquet file."""
332 v = value
333 if isinstance(v, (list, tuple)):
334 noboundtype = api.canonical(typename, unbounded=True)
335 if v and api.is_ros_time(v[0]):
336 v = [api.to_nsec(x) for x in v]
337 elif api.scalar(typename) not in api.ROS_BUILTIN_TYPES:
338 v = str([api.message_to_dict(m) for m in v])
339 elif pyarrow.binary() in (self.COMMON_TYPESCOMMON_TYPES.get(typename),
340 self.COMMON_TYPESCOMMON_TYPES.get(noboundtype)):
341 v = bytes(bytearray(v)) # Py2/Py3 compatible
342 else:
343 v = list(v) # Ensure lists not tuples
344 elif api.is_ros_time(v):
345 v = api.to_nsec(v)
346 elif typename and typename not in api.ROS_BUILTIN_TYPES:
347 v = str(api.message_to_dict(v))
348 return v
349
350
351 def _write_table(self, typekey):
352 """Writes out cached messages for type."""
353 dicts = self._caches[typekey][:]
354 del self._caches[typekey][:]
355 mapping = {k: [d[k] for d in dicts] for k in dicts[0]}
356 with warnings.catch_warnings(): # PyArrow can raise UserWarning about pandas version
357 warnings.simplefilter("ignore", UserWarning)
358 table = pyarrow.Table.from_pydict(mapping, self._schemas[typekey])
359 self._writers[typekey].write_table(table)
360
361
362 def _configure(self):
363 """Parses args.WRITE_OPTIONS, returns success."""
364 self.COMMON_TYPESCOMMON_TYPES = type(self).COMMON_TYPES.copy()
365 self.WRITER_ARGSWRITER_ARGS = type(self).WRITER_ARGS.copy()
366 del self._extra_basecols[:]
367 del self._extra_basevals[:]
368 self._patterns.clear()
369 ok = self._configure_ids()
370
371 def process_column(name, rostype, value): # Parse "column-name=rostype:value"
372 v, myok = value, True
373 if "string" not in rostype:
374 v = json.loads(v)
375 if not name:
376 myok = False
377 ConsolePrinter.error("Invalid name option in %s=%s:%s", name, rostype, v)
378 if rostype not in api.ROS_BUILTIN_TYPES:
379 myok = False
380 ConsolePrinter.error("Invalid type option in %s=%s:%s", name, rostype, v)
381 if myok:
382 self._extra_basecols.append((name, rostype))
383 self._extra_basevals.append((name, value))
384 return myok
385
386 def process_type(rostype, arrowtype): # Eval pyarrow datatype from value like "float64()"
387 if arrowtype not in self.ARROW_TYPES.values():
388 arrowtype = eval(compile(arrowtype, "", "eval"), {"__builtins__": self.ARROW_TYPES})
389 self.COMMON_TYPESCOMMON_TYPES[rostype] = arrowtype
390
391 for key, vals in [("print", self.args.EMIT_FIELD), ("noprint", self.args.NOEMIT_FIELD)]:
392 self._patterns[key] = [(tuple(v.split(".")), common.path_to_regex(v)) for v in vals]
393
394 # Populate ROS type aliases like "byte" and "char"
395 for rostype in list(self.COMMON_TYPESCOMMON_TYPES):
396 alias = api.get_type_alias(rostype)
397 if alias:
398 self.COMMON_TYPESCOMMON_TYPES[alias] = self.COMMON_TYPESCOMMON_TYPES[rostype]
399 if alias and rostype + "[]" in self.COMMON_TYPESCOMMON_TYPES:
400 self.COMMON_TYPESCOMMON_TYPES[alias + "[]"] = self.COMMON_TYPESCOMMON_TYPES[rostype + "[]"]
401
402 for k, v in self.args.WRITE_OPTIONS.items():
403 if "column" == k and v and isinstance(v, dict):
404 for name, (rostype, value) in v.items():
405 if not process_column(name, rostype, value): ok = False
406 elif "type" == k and v and isinstance(v, dict):
407 for name, value in v.items():
408 if not process_type(name, value): ok = False
409 elif "writer" == k and v and isinstance(v, dict):
410 self.WRITER_ARGSWRITER_ARGS.update(v)
411 elif isinstance(k, str) and "-" in k:
412 category, name = k.split("-", 1)
413 if category not in ("column", "type", "writer"):
414 ConsolePrinter.warn("Unknown %r option in %s=%s", category, k, v)
415 continue # for k, v
416 try:
417 if not name: raise Exception("empty name")
418 if "column" == category: # column-name=rostype:value
419 if not process_column(name, *v.split(":", 1)): ok = False
420 elif "type" == category: # type-rostype=arrowtype
421 process_type(name, v)
422 elif "writer" == category: # writer-argname=argvalue
423 try: v = json.loads(v)
424 except Exception: pass
425 self.WRITER_ARGSWRITER_ARGS[name] = v
426 except Exception as e:
427 ok = False
428 ConsolePrinter.error("Invalid %s option in %s=%s: %s", category, k, v, e)
429 return ok
430
431
432 def _configure_ids(self):
433 """Configures ID generator from args.WRITE_OPTIONS, returns success."""
434 ok = True
435 self.MESSAGE_TYPE_BASECOLSMESSAGE_TYPE_BASECOLS = type(self).MESSAGE_TYPE_BASECOLS[:]
436 self.MESSAGE_TYPE_NESTCOLSMESSAGE_TYPE_NESTCOLS = type(self).MESSAGE_TYPE_NESTCOLS[:]
437
438 k, v = "idgenerator", self.args.WRITE_OPTIONS.get("idgenerator")
439 if k in self.args.WRITE_OPTIONS:
440 val, ex, ns = v, None, dict(self.ARROW_TYPES, itertools=itertools, uuid=uuid)
441 for root in v.split(".", 1)[:1]:
442 try: ns[root] = common.import_item(root) # Provide root module
443 except Exception: pass
444 try: common.import_item(re.sub(r"\‍(.+", "", v)) # Ensure nested imports
445 except Exception: pass
446 try: val = eval(compile(v, "", "eval"), ns)
447 except Exception as e: ok, ex = False, e
448 if isinstance(val, (six.binary_type, six.text_type)): ok = False
449
450 if ok:
451 try: self._idgenerator = iter(val)
452 except Exception as e:
453 try: self._idgenerator = iter(val, self) # (callable=val, sentinel=self)
454 except Exception as e: ok, ex = False, e
455 if not ok:
456 ConsolePrinter.error("Invalid value in %s=%s%s", k, v, (": %s" % ex if ex else ""))
457 elif not self._nesting:
458 self.MESSAGE_TYPE_BASECOLSMESSAGE_TYPE_BASECOLS.append(("_id", "string"))
459
460 if ok and self._idgenerator: # Detect given ID column type
461 fval, typename, generator = next(self._idgenerator), None, self._idgenerator
462 if api.is_ros_time(fval):
463 typename = "time" if "time" in str(type(fval)).lower() else "duration"
464 elif isinstance(fval, six.integer_types):
465 typename = "int64"
466 elif isinstance(fval, float):
467 typename = "float64"
468 elif not isinstance(fval, str): # Cast whatever it is to string
469 fval, self._idgenerator = str(fval), (str(x) for x in generator)
470 if typename:
471 repl = lambda n, t: (n, typename) if "_id" in n else (n, t)
474 self._idgenerator = itertools.chain([fval], self._idgenerator)
475 return ok
476
477
478
479def init(*_, **__):
480 """Adds Parquet output format support. Raises ImportWarning if libraries not available."""
481 if not pandas or not pyarrow:
482 ConsolePrinter.error("pandas or PyArrow not available: cannot write Parquet files.")
483 raise ImportWarning()
484 from .. import plugins # Late import to avoid circular
485 plugins.add_write_format("parquet", ParquetSink, "Parquet", [
486 ("column-NAME=ROSTYPE:VALUE", "additional column to add in Parquet output,\n"
487 "like column-bag_hash=string:26dfba2c"),
488 ("idgenerator=CALLABLE", "callable or iterable for producing message IDs \n"
489 "in Parquet output, like 'uuid.uuid4' or 'itertools.count()';\n"
490 "nesting uses UUID values by default"),
491 ("nesting=array|all", "create tables for nested message types\n"
492 "in Parquet output,\n"
493 'only for arrays if "array" \n'
494 "else for any nested types\n"
495 "(array fields in parent will be populated \n"
496 " with foreign keys instead of messages as JSON)"),
497 ("overwrite=true|false", "overwrite existing file in Parquet output\n"
498 "instead of appending unique counter (default false)"),
499 ("type-ROSTYPE=ARROWTYPE", "custom mapping between ROS and pyarrow type\n"
500 "for Parquet output, like type-time=\"timestamp('ns')\"\n"
501 "or type-uint8[]=\"list(uint8())\""),
502 ("writer-ARGNAME=ARGVALUE", "additional arguments for Parquet output\n"
503 "given to pyarrow.parquet.ParquetWriter"),
504 ])
505 plugins.add_output_label("Parquet", ["--emit-field", "--no-emit-field"])
506
507
508__all__ = ["ParquetSink", "init"]
_ensure_stamp_index(self, topic, msg, stamp=None, index=None)
Definition outputs.py:131
source
inputs.Source instance bound to this sink
Definition outputs.py:55
valid
Result of validate()
Definition outputs.py:53
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
Definition outputs.py:99
dict ARROW_TYPES
Mapping from pyarrow type names and aliases to pyarrow type constructors.
Definition parquet.py:45
__init__(self, args=None, **kwargs)
Definition parquet.py:121
int CHUNK_SIZE
Number of dataframes to cache before writing, per type.
Definition parquet.py:42
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes message to a Parquet file.
Definition parquet.py:171
DEFAULT_TYPE
Fallback pyarrow type if mapped type not found.
Definition parquet.py:76
dict WRITER_ARGS
Custom arguments for pyarrow.parquet.ParquetWriter.
Definition parquet.py:88
dict COMMON_TYPES
Mapping from ROS common type names to pyarrow type constructors.
Definition parquet.py:67
validate(self)
Returns whether required libraries are available (pandas and pyarrow) and overwrite is valid and file...
Definition parquet.py:145
close(self)
Writes out any remaining messages, closes writers, clears structures.
Definition parquet.py:180
list MESSAGE_TYPE_BASECOLS
Default columns for message type tables.
Definition parquet.py:79
list MESSAGE_TYPE_NESTCOLS
Additional default columns for messaga type tables with nesting output.
Definition parquet.py:83
init(*_, **__)
Adds Parquet output format support.
Definition parquet.py:494