grepros 1.2.2
grep for ROS bag files and live topics
Loading...
Searching...
No Matches
csv.py
Go to the documentation of this file.
1# -*- coding: utf-8 -*-
2"""
3CSV output plugin.
4
5------------------------------------------------------------------------------
6This file is part of grepros - grep for ROS bag files and live topics.
7Released under the BSD License.
8
9@author Erki Suurjaak
10@created 03.12.2021
11@modified 21.04.2024
12------------------------------------------------------------------------------
13"""
14## @namespace grepros.plugins.auto.csv
15from __future__ import absolute_import
16import atexit
17import csv
18import itertools
19import os
20
21import six
22
23from ... import api
24from ... import common
25from ... common import ConsolePrinter, plural
26from ... outputs import Sink
27
28
29class CsvSink(Sink):
30 """Writes messages to CSV files, each topic to a separate file."""
31
32
33 FILE_EXTENSIONS = (".csv", )
34
35
36 DEFAULT_ARGS = dict(EMIT_FIELD=(), META=False, NOEMIT_FIELD=(), WRITE_OPTIONS={}, VERBOSE=False)
37
38 def __init__(self, args=None, **kwargs):
39 """
40 @param args arguments as namespace or dictionary, case-insensitive;
41 or a single path as the base name of CSV files to write
42 @param args.emit_field message fields to emit in output if not all
43 @param args.noemit_field message fields to skip in output
44 @param args.write base name of CSV files to write,
45 will add topic name like "name.__my__topic.csv" for "/my/topic",
46 will add counter like "name.__my__topic.2.csv" if exists
47 @param args.write_options {"overwrite": whether to overwrite existing files
48 (default false)}
49 @param args.meta whether to emit metainfo
50 @param args.verbose whether to emit debug information
51 @param kwargs any and all arguments as keyword overrides, case-insensitive
52 """
53 args = {"WRITE": str(args)} if isinstance(args, common.PATH_TYPES) else args
54 args = common.ensure_namespace(args, CsvSink.DEFAULT_ARGS, **kwargs)
55 super(CsvSink, self).__init__(args)
56 self._filebase = args.WRITE # Filename base, will be made unique
57 self._files = {} # {(topic, typename, typehash): file()}
58 self._writers = {} # {(topic, typename, typehash): CsvWriter}
59 self._patterns = {} # {key: [(() if any field else ('path', ), re.Pattern), ]}
60 self._lasttopickey = None # Last (topic, typename, typehash) emitted
61 self._overwrite = None
62 self._close_printed = False
63
64 for key, vals in [("print", args.EMIT_FIELD), ("noprint", args.NOEMIT_FIELD)]:
65 self._patterns[key] = [(tuple(v.split(".")), common.path_to_regex(v)) for v in vals]
66 atexit.register(self.closecloseclose)
68 def emit(self, topic, msg, stamp=None, match=None, index=None):
69 """Writes message to output file."""
70 if not self.validatevalidate(): raise Exception("invalid")
71 stamp, index = self._ensure_stamp_index(topic, msg, stamp, index)
72 data = (v for _, v in self._iter_fields(msg))
73 metadata = [api.to_sec(stamp), api.to_datetime(stamp), api.get_message_type(msg)]
74 self._make_writer(topic, msg).writerow(itertools.chain(metadata, data))
75 self._close_printed = False
76 super(CsvSink, self).emit(topic, msg, stamp, match, index)
77
78 def validate(self):
79 """Returns whether arguments and overwrite option are valid, and file base is writable."""
80 if self.validvalid is not None: return self.validvalid
81 result = Sink.validate(self)
82 if self.args.WRITE_OPTIONS.get("overwrite") not in (None, True, False, "true", "false"):
83 ConsolePrinter.error("Invalid overwrite option for CSV: %r. "
84 "Choose one of {true, false}.",
85 self.args.WRITE_OPTIONS["overwrite"])
86 result = False
87 if not common.verify_io(self.args.WRITE, "w"):
88 result = False
89 self.validvalid = result
90 if self.validvalid:
91 self._overwrite = self.args.WRITE_OPTIONS.get("overwrite") in (True, "true")
92 return self.validvalid
93
94 def close(self):
95 """Closes output file(s), if any."""
96 try:
97 names = {k: f.name for k, f in self._files.items()}
98 for k in names:
99 self._files.pop(k).close()
100 self._writers.clear()
101 self._lasttopickey = None
102 finally:
103 if not self._close_printed and self._counts_counts:
104 self._close_printed = True
105 sizes = {k: None for k in names.values()}
106 for k, n in names.items():
107 try: sizes[k] = os.path.getsize(n)
108 except Exception as e: ConsolePrinter.warn("Error getting size of %s: %s", n, e)
109 ConsolePrinter.debug("Wrote %s in %s to CSV (%s)%s",
110 plural("message", sum(self._counts_counts.values())),
111 plural("topic", self._counts_counts),
112 common.format_bytes(sum(filter(bool, sizes.values()))),
113 ":" if self.args.VERBOSE else ".")
114 for topickey, name in names.items() if self.args.VERBOSE else ():
115 ConsolePrinter.debug("- %s (%s, %s)", name,
116 "error getting size" if sizes[topickey] is None else
117 common.format_bytes(sizes[topickey]),
118 plural("message", self._counts_counts[topickey]))
119 super(CsvSink, self).close()
120
121 def _make_writer(self, topic, msg):
122 """
123 Returns a csv.writer for writing topic data.
124
125 File is populated with header if not created during this session.
126 """
127 topickey = api.TypeMeta.make(msg, topic).topickey
128 if not self._lasttopickey:
129 common.makedirs(os.path.dirname(self._filebase))
130 if self._lasttopickey and topickey != self._lasttopickey:
131 self._files[self._lasttopickey].close() # Avoid hitting ulimit
132 if topickey not in self._files or self._files[topickey].closed:
133 name = self._files[topickey].name if topickey in self._files else None
134 action = "Creating" # Or "Overwriting"
135 if not name:
136 base, ext = os.path.splitext(self._filebase)
137 name = "%s.%s%s" % (base, topic.lstrip("/").replace("/", "__"), ext)
138 if self._overwrite:
139 if os.path.isfile(name) and os.path.getsize(name): action = "Overwriting"
140 open(name, "wb").close()
141 else: name = common.unique_path(name)
142 flags = {"mode": "ab"} if six.PY2 else {"mode": "a", "newline": "", "encoding": "utf-8"}
143 f = open(name, **flags)
144 w = CsvWriter(f)
145 if topickey not in self._files:
146 if self.args.VERBOSE:
147 ConsolePrinter.debug("%s %s.", action, name)
148 header = (topic + "/" + ".".join(map(str, p)) for p, _ in self._iter_fields(msg))
149 metaheader = ["__time", "__datetime", "__type"]
150 w.writerow(itertools.chain(metaheader, header))
151 self._files[topickey], self._writers[topickey] = f, w
152 self._lasttopickey = topickey
153 return self._writers[topickey]
154
155 def _iter_fields(self, msg, top=()):
156 """
157 Yields ((nested, path), scalar value) from ROS message.
158
159 Lists are returned as ((nested, path, index), value), e.g. (("data", 0), 666).
160 """
161 prints, noprints = self._patterns["print"], self._patterns["noprint"]
162 fieldmap, identity = api.get_message_fields(msg), lambda x: x
163 fieldmap = api.filter_fields(fieldmap, top, include=prints, exclude=noprints)
164 for k, t in fieldmap.items() if fieldmap != msg else ():
165 v, path, baset = api.get_message_value(msg, k, t), top + (k, ), api.scalar(t)
166 is_sublist = isinstance(v, (list, tuple)) and baset not in api.ROS_BUILTIN_TYPES
167 cast = api.to_sec if baset in api.ROS_TIME_TYPES else identity
168 if isinstance(v, (list, tuple)) and not is_sublist:
169 for i, lv in enumerate(v):
170 yield path + (i, ), cast(lv)
171 elif is_sublist:
172 for i, lmsg in enumerate(v):
173 for lp, lv in self._iter_fields(lmsg, path + (i, )):
174 yield lp, lv
175 elif api.is_ros_message(v, ignore_time=True):
176 for mp, mv in self._iter_fields(v, path):
177 yield mp, mv
178 else:
179 yield path, cast(v)
180
181
182class CsvWriter(object):
183 """Wraps csv.writer with bool conversion, iterator support, and lesser memory use."""
184
185 def __init__(self, csvfile, dialect="excel", **fmtparams):
186 """
187 @param csvfile file-like object with `write()` method
188 @param dialect CSV dialect to use, one from `csv.list_dialects()`
189 @param fmtparams override individual format parameters in dialect
190 """
191 self._file = csvfile
192 self._buffer = six.BytesIO() if "b" in csvfile.mode else six.StringIO()
193 self._writer = csv.writer(self._buffer, dialect, **dict(fmtparams, lineterminator=""))
194 self._dialect = csv.writer(self._buffer, dialect, **fmtparams).dialect
195 self._format = lambda v: int(v) if isinstance(v, bool) else v
196 if six.PY2: # In Py2, CSV is written in binary mode
197 self._format = lambda v: int(v) if isinstance(v, bool) else \
198 v.encode("utf-8") if isinstance(v, six.text_type) else v
199
200 @property
201 def dialect(self):
202 """A read-only description of the dialect in use by the writer."""
203 return self._dialect
204
205 def writerow(self, row):
206 """
207 Writes the row to the writer’s file object.
209 Fields will be formatted according to the current dialect.
210
211 @param row iterable of field values
212 @return return value of the call to the write method of the underlying file object
213 """
214 def write_columns(cols, inter):
215 """Writes columns to file, returns number of bytes written."""
216 count = len(inter) if inter else 0
217 if inter: self._file.write(inter)
218 # Hack: use csv.writer to format a slice at a time; huge rows cause excessive memory use
219 self._writer.writerow(cols)
220 self._file.write(self._buffer.getvalue())
221 count += self._buffer.tell()
222 self._buffer.seek(0); self._buffer.truncate()
223 return count
224
225 result, chunk, inter, DELIM, STEP = 0, [], "", self.dialectdialect.delimiter, 10000
226 if "b" in self._file.mode: DELIM = six.binary_type(self.dialectdialect.delimiter)
227 for v in row:
228 chunk.append(self._format(v))
229 if len(chunk) >= STEP:
230 result += write_columns(chunk, inter)
231 chunk, inter = [], DELIM
232 if chunk: result += write_columns(chunk, inter)
233 self._file.write(self.dialectdialect.lineterminator)
234 result += len(self.dialectdialect.lineterminator)
235 return result
236
237 def writerows(self, rows):
238 """
239 Writes the rows to the writer’s file object.
240
241 Fields will be formatted according to the current dialect.
242
243 @param rows iterable of iterables of field values
244 """
245 for row in rows: self.writerow(row)
246
247
248
249def init(*_, **__):
250 """Adds CSV output format support."""
251 from ... import plugins # Late import to avoid circular
252 plugins.add_write_format("csv", CsvSink, "CSV", [
253 ("overwrite=true|false", "overwrite existing files in CSV output\n"
254 "instead of appending unique counter (default false)")
255 ])
256 plugins.add_output_label("CSV", ["--emit-field", "--no-emit-field"])
257
258
259__all__ = ["CsvSink", "CsvWriter", "init"]
_ensure_stamp_index(self, topic, msg, stamp=None, index=None)
Definition outputs.py:131
valid
Result of validate()
Definition outputs.py:53
validate(self)
Returns whether sink prerequisites are met (like ROS environment set if LiveSink).
Definition outputs.py:99
close(self)
Shuts down output, closing any files or connections.
Definition outputs.py:106
__init__(self, args=None, **kwargs)
Definition csv.py:53
emit(self, topic, msg, stamp=None, match=None, index=None)
Writes message to output file.
Definition csv.py:69
validate(self)
Returns whether arguments and overwrite option are valid, and file base is writable.
Definition csv.py:79
close(self)
Closes output file(s), if any.
Definition csv.py:95
Wraps csv.writer with bool conversion, iterator support, and lesser memory use.
Definition csv.py:187
dialect
A read-only description of the dialect in use by the writer.
Definition csv.py:208
writerows(self, rows)
Writes the rows to the writer’s file object.
Definition csv.py:251
writerow(self, row)
Writes the row to the writer’s file object.
Definition csv.py:220
init(*_, **__)
Adds CSV output format support.
Definition csv.py:256