30 """Writes messages to CSV files, each topic to a separate file."""
33 FILE_EXTENSIONS = (
".csv", )
36 DEFAULT_ARGS = dict(EMIT_FIELD=(), META=
False, NOEMIT_FIELD=(), WRITE_OPTIONS={}, VERBOSE=
False)
40 @param args arguments as namespace or dictionary, case-insensitive;
41 or a single path as the base name of CSV files to write
42 @param args.emit_field message fields to emit in output if not all
43 @param args.noemit_field message fields to skip in output
44 @param args.write base name of CSV files to write,
45 will add topic name like "name.__my__topic.csv" for "/my/topic",
46 will add counter like "name.__my__topic.2.csv" if exists
47 @param args.write_options {"overwrite": whether to overwrite existing files
49 @param args.meta whether to emit metainfo
50 @param args.verbose whether to emit debug information
51 @param kwargs any and all arguments as keyword overrides, case-insensitive
53 args = {
"WRITE": str(args)}
if isinstance(args, common.PATH_TYPES)
else args
54 args = common.ensure_namespace(args, CsvSink.DEFAULT_ARGS, **kwargs)
64 for key, vals
in [(
"print", args.EMIT_FIELD), (
"noprint", args.NOEMIT_FIELD)]:
65 self.
_patterns[key] = [(tuple(v.split(
".")), common.path_to_regex(v))
for v
in vals]
68 def emit(self, topic, msg, stamp=None, match=None, index=None):
69 """Writes message to output file."""
73 metadata = [api.to_sec(stamp), api.to_datetime(stamp), api.get_message_type(msg)]
74 self.
_make_writer(topic, msg).writerow(itertools.chain(metadata, data))
76 super(CsvSink, self).
emit(topic, msg, stamp, match, index)
79 """Returns whether arguments and overwrite option are valid, and file base is writable."""
81 result = Sink.validate(self)
82 if self.
args.WRITE_OPTIONS.get(
"overwrite")
not in (
None,
True,
False,
"true",
"false"):
83 ConsolePrinter.error(
"Invalid overwrite option for CSV: %r. "
84 "Choose one of {true, false}.",
85 self.
args.WRITE_OPTIONS[
"overwrite"])
87 if not common.verify_io(self.
args.WRITE,
"w"):
91 self.
_overwrite = self.
args.WRITE_OPTIONS.get(
"overwrite")
in (
True,
"true")
95 """Closes output file(s), if any."""
97 names = {k: f.name
for k, f
in self.
_files.items()}
105 sizes = {k:
None for k
in names.values()}
106 for k, n
in names.items():
107 try: sizes[k] = os.path.getsize(n)
108 except Exception
as e: ConsolePrinter.warn(
"Error getting size of %s: %s", n, e)
109 ConsolePrinter.debug(
"Wrote %s in %s to CSV (%s)%s",
112 common.format_bytes(sum(filter(bool, sizes.values()))),
113 ":" if self.
args.VERBOSE
else ".")
114 for topickey, name
in names.items()
if self.
args.VERBOSE
else ():
115 ConsolePrinter.debug(
"- %s (%s, %s)", name,
116 "error getting size" if sizes[topickey]
is None else
117 common.format_bytes(sizes[topickey]),
119 super(CsvSink, self).
close()
121 def _make_writer(self, topic, msg):
123 Returns a csv.writer for writing topic data.
125 File is populated with header if not created during this session.
127 topickey = api.TypeMeta.make(msg, topic).topickey
129 common.makedirs(os.path.dirname(self.
_filebase))
132 if topickey
not in self.
_files or self.
_files[topickey].closed:
133 name = self.
_files[topickey].name
if topickey
in self.
_files else None
136 base, ext = os.path.splitext(self.
_filebase)
137 name =
"%s.%s%s" % (base, topic.lstrip(
"/").replace(
"/",
"__"), ext)
139 if os.path.isfile(name)
and os.path.getsize(name): action =
"Overwriting"
140 open(name,
"wb").
close()
141 else: name = common.unique_path(name)
142 flags = {
"mode":
"ab"}
if six.PY2
else {
"mode":
"a",
"newline":
"",
"encoding":
"utf-8"}
143 f = open(name, **flags)
145 if topickey
not in self.
_files:
146 if self.
args.VERBOSE:
147 ConsolePrinter.debug(
"%s %s.", action, name)
148 header = (topic +
"/" +
".".join(map(str, p))
for p, _
in self.
_iter_fields(msg))
149 metaheader = [
"__time",
"__datetime",
"__type"]
150 w.writerow(itertools.chain(metaheader, header))
155 def _iter_fields(self, msg, top=()):
157 Yields ((nested, path), scalar value) from ROS message.
159 Lists are returned as ((nested, path, index), value), e.g. (("data", 0), 666).
162 fieldmap, identity = api.get_message_fields(msg),
lambda x: x
163 fieldmap = api.filter_fields(fieldmap, top, include=prints, exclude=noprints)
164 for k, t
in fieldmap.items()
if fieldmap != msg
else ():
165 v, path, baset = api.get_message_value(msg, k, t), top + (k, ), api.scalar(t)
166 is_sublist = isinstance(v, (list, tuple))
and baset
not in api.ROS_BUILTIN_TYPES
167 cast = api.to_sec
if baset
in api.ROS_TIME_TYPES
else identity
168 if isinstance(v, (list, tuple))
and not is_sublist:
169 for i, lv
in enumerate(v):
170 yield path + (i, ), cast(lv)
172 for i, lmsg
in enumerate(v):
175 elif api.is_ros_message(v, ignore_time=
True):