5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
12------------------------------------------------------------------------------
14## @namespace grepros.plugins.auto.csv
15from __future__ import absolute_import
25from ... common import ConsolePrinter, plural
26from ... outputs import Sink
30 """Writes messages to CSV files, each topic to a separate file."""
33 FILE_EXTENSIONS = (
".csv", )
36 DEFAULT_ARGS = dict(EMIT_FIELD=(), META=
False, NOEMIT_FIELD=(), WRITE_OPTIONS={}, VERBOSE=
False)
38 def __init__(self, args=None, **kwargs):
40 @param args arguments
as namespace
or dictionary, case-insensitive;
41 or a single path
as the base name of CSV files to write
42 @param args.emit_field message fields to emit
in output
if not all
43 @param args.noemit_field message fields to skip
in output
44 @param args.write base name of CSV files to write,
45 will add topic name like
"name.__my__topic.csv" for "/my/topic",
46 will add counter like
"name.__my__topic.2.csv" if exists
47 @param args.write_options {
"overwrite": whether to overwrite existing files
49 @param args.meta whether to emit metainfo
50 @param args.verbose whether to emit debug information
51 @param kwargs any
and all arguments
as keyword overrides, case-insensitive
53 args = {"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else args
54 args = common.ensure_namespace(args, CsvSink.DEFAULT_ARGS, **kwargs)
64 for key, vals
in [(
"print", args.EMIT_FIELD), (
"noprint", args.NOEMIT_FIELD)]:
65 self.
_patterns[key] = [(tuple(v.split(
".")), common.path_to_regex(v))
for v
in vals]
68 def emit(self, topic, msg, stamp=None, match=None, index=None):
69 """Writes message to output file."""
73 metadata = [api.to_sec(stamp), api.to_datetime(stamp), api.get_message_type(msg)]
74 self.
_make_writer(topic, msg).writerow(itertools.chain(metadata, data))
76 super(CsvSink, self).
emit(topic, msg, stamp, match, index)
79 """Returns whether arguments and overwrite option are valid, and file base is writable."""
81 result = Sink.validate(self)
82 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
83 ConsolePrinter.error(
"Invalid overwrite option for CSV: %r. "
84 "Choose one of {true, false}.",
85 self.
args.WRITE_OPTIONS[
"overwrite"])
87 if not common.verify_io(self.
args.WRITE,
"w"):
91 self.
_overwrite = self.
args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true")
95 """Closes output file(s), if any."""
97 names = {k: f.name
for k, f
in self.
_files.items()}
105 sizes = {k:
None for k
in names.values()}
106 for k, n
in names.items():
107 try: sizes[k] = os.path.getsize(n)
108 except Exception
as e: ConsolePrinter.warn(
"Error getting size of %s: %s", n, e)
109 ConsolePrinter.debug(
"Wrote %s in %s to CSV (%s)%s",
112 common.format_bytes(sum(filter(bool, sizes.values()))),
113 ":" if self.
args.VERBOSE
else ".")
114 for topickey, name
in names.items()
if self.
args.VERBOSE
else ():
115 ConsolePrinter.debug(
"- %s (%s, %s)", name,
116 "error getting size" if sizes[topickey]
is None else
117 common.format_bytes(sizes[topickey]),
119 super(CsvSink, self).
close()
121 def _make_writer(self, topic, msg):
123 Returns a csv.writer for writing topic data.
125 File
is populated
with header
if not created during this session.
127 topickey = api.TypeMeta.make(msg, topic).topickey
129 common.makedirs(os.path.dirname(self.
_filebase))
132 if topickey
not in self.
_files or self.
_files[topickey].closed:
133 name = self.
_files[topickey].name
if topickey
in self.
_files else None
136 base, ext = os.path.splitext(self.
_filebase)
137 name =
"%s.%s%s" % (base, topic.lstrip(
"/").replace(
"/",
"__"), ext)
139 if os.path.isfile(name)
and os.path.getsize(name): action =
"Overwriting"
140 open(name,
"wb").
close()
141 else: name = common.unique_path(name)
142 flags = {
"mode":
"ab"}
if six.PY2
else {
"mode":
"a",
"newline":
"",
"encoding":
"utf-8"}
143 f = open(name, **flags)
145 if topickey
not in self.
_files:
146 if self.
args.VERBOSE:
147 ConsolePrinter.debug(
"%s %s.", action, name)
148 header = (topic +
"/" +
".".join(map(str, p))
for p, _
in self.
_iter_fields(msg))
149 metaheader = [
"__time",
"__datetime",
"__type"]
150 w.writerow(itertools.chain(metaheader, header))
155 def _iter_fields(self, msg, top=()):
157 Yields ((nested, path), scalar value) from ROS message.
159 Lists are returned
as ((nested, path, index), value), e.g. ((
"data", 0), 666).
162 fieldmap, identity = api.get_message_fields(msg),
lambda x: x
163 fieldmap = api.filter_fields(fieldmap, top, include=prints, exclude=noprints)
164 for k, t
in fieldmap.items()
if fieldmap != msg
else ():
165 v, path, baset = api.get_message_value(msg, k, t), top + (k, ), api.scalar(t)
166 is_sublist = isinstance(v, (list, tuple))
and baset
not in api.ROS_BUILTIN_TYPES
167 cast = api.to_sec
if baset
in api.ROS_TIME_TYPES
else identity
168 if isinstance(v, (list, tuple))
and not is_sublist:
169 for i, lv
in enumerate(v):
170 yield path + (i, ), cast(lv)
172 for i, lmsg
in enumerate(v):
175 elif api.is_ros_message(v, ignore_time=
True):
183 """Wraps csv.writer with bool conversion, iterator support, and lesser memory use."""
185 def __init__(self, csvfile, dialect="excel", **fmtparams):
187 @param csvfile file-like object
with `write()` method
188 @param dialect CSV dialect to use, one
from `csv.list_dialects()`
189 @param fmtparams override individual format parameters
in dialect
192 self._buffer = six.BytesIO() if "b" in csvfile.mode
else six.StringIO()
193 self.
_writer = csv.writer(self.
_buffer, dialect, **dict(fmtparams, lineterminator=
""))
195 self.
_format =
lambda v: int(v)
if isinstance(v, bool)
else v
197 self.
_format =
lambda v: int(v)
if isinstance(v, bool)
else \
198 v.encode(
"utf-8")
if isinstance(v, six.text_type)
else v
202 """A read-only description of the dialect in use by the writer."""
207 Writes the row to the writer’s file object.
209 Fields will be formatted according to the current dialect.
211 @param row iterable of field values
212 @return return value of the call to the write method of the underlying file object
214 def write_columns(cols, inter):
215 """Writes columns to file, returns number of bytes written."""
216 count = len(inter)
if inter
else 0
217 if inter: self.
_file.write(inter)
225 result, chunk, inter, DELIM, STEP = 0, [],
"", self.
dialectdialect.delimiter, 10000
229 if len(chunk) >= STEP:
230 result += write_columns(chunk, inter)
231 chunk, inter = [], DELIM
232 if chunk: result += write_columns(chunk, inter)
239 Writes the rows to the writer’s file object.
241 Fields will be formatted according to the current dialect.
243 @param rows iterable of iterables of field values
250 """Adds CSV output format support."""
251 from ...
import plugins
252 plugins.add_write_format(
"csv", CsvSink,
"CSV", [
253 (
"overwrite=true|false",
"overwrite existing files in CSV output\n"
254 "instead of appending unique counter (default false)")
256 plugins.add_output_label(
"CSV", [
"--emit-field",
"--no-emit-field"])
259__all__ = [
"CsvSink",
"CsvWriter",
"init"]
_ensure_stamp_index(self, topic, msg, stamp=None, index=None)
valid
Result of validate()
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
close(self)
Shuts down output, closing any files or connections.
__init__(self, args=None, **kwargs)
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes message to output file.
validate(self)
Returns whether arguments and overwrite option are valid, and file base is writable.
close(self)
Closes output file(s), if any.
Wraps csv.writer with bool conversion, iterator support, and lesser memory use.
dialect
A read-only description of the dialect in use by the writer.
writerows(self, rows)
Writes the rows to the writer’s file object.
writerow(self, row)
Writes the row to the writer’s file object.
init(*_, **__)
Adds CSV output format support.