Open wandgitlabbot opened 3 years ago
In GitLab, by Daniel Oosterwijk on 2020-05-22
runners.AmpMeasurementsToCsv.scala
uses Measurement's toCsvFormat and a StreamingFileSink to output CSV files in a quite manual way, and without column headers.
SpikeRunner uses a ScalaCsvOutputFormat and an unapply method with a map to replace an enum with an int.
A good approach for output should be to use toCsvFormat, which returns a Seq[String] (though it really should return an Iterator[String]), along with the ScalaCsvOutputFormat.
We would need some additional processing to determine the dimensionality of the Iterable while we're converting it to a tuple in order to use it as a type argument.
Additionally, Tuples can only have up to 22 fields. We have a few measurement types with more than that many fields. It would be best to create a custom SinkFunction or ideally a FileOutputFormat. It may be impossible to extend the ScalaCsvOutputFormat due to this limitation, so we might have to do it manually like in AmpMeasurementsToCsv. Doing it manually would allow us to include column headers if desired. It could also be interesting to include a measurement type header field so that we can automatically ingest any type of measurement-stream file without knowing what it is in advance.
LatencyTSFileInputFormat is quite short, since it's easy to parse a CSV for a single measurement type. Column headers and knowledge of the expected measurement type would make it fairly simple to parse generic CSVs of supported measurement types. This should be achievable using a FileInputFormat. Notably, LatencyTSAmpFileInputFormat extends GenericCsvInputFormat. This has no support for parsing column headers, instead allowing users to optionally skip them. The flexibility of this class to interpret column headers should be investigated.
In GitLab, by Daniel Oosterwijk on 2020-08-21
I've tried a couple of implementations of a CSV file sink. One is based on the recommended StreamingFileSink, but that retains the problems arising from the pipeline finishing early when using bounded sources. It also has the dependency on multiple checkpoints passing after the data is fully processed before the files are committed properly to disk, as well as a lack of flexibility in the filenames used.
Another is based directly on FileOutputFormat, and appears to function well. Unfortunately, the Java interfaces for using OutputFormats (writeUsingOutputFormat and OutputFormatSinkFunction) are deprecated. Curiously, the Scala interface for writeUsingOutputFormat is not deprecated. The replacement is the StreamingFileSink. It appears that discussions are ongoing as to how to properly implement batch processing, especially around sinks.
There is still success in AmpMeasurementsToCsv
with using a source that can reasonably determine if reading the file is finished, paired with another dummy source that waits for the file to be finished reading, plus a couple of extra checkpoints to allow the StreamingFileSink to finalise. However, I'm not comfortable with implementing this solution in the YamlDagRunner, as it involves a fair bit of uncertainty and messy behaviour, such as the dummy source needing to throw an exception to end the program.
This issue will go on hold again while we cross our fingers and wait for improvements.
I never noticed that Jackson has a CSV backend. That could stop us from needing to implement csv to/from for every supported type.
https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv
In GitLab, by Daniel Oosterwijk on 2020-04-07
AMP measurements are often expressed in CSV format. Some work has been done to output our measurement representations in that format, but the ingest side is currently a little weak. By using CSV column headers, we should be able to make this process fairly transparent. If they're not included, we should just let the user specify a column order.
The branch has been deleted, since it is very stale and not as useful as the other research in this thread.vodafone-amp-http-csv
is a branch with some really basic CSV ingest.