logstash-plugins / logstash-codec-csv

This codec takes care of decoding and encoding csv data
Apache License 2.0
3 stars 12 forks source link

[WIP] support line delimited data #8

Closed colinsurprenant closed 4 years ago

colinsurprenant commented 4 years ago

TODO

colinsurprenant commented 4 years ago

@yaauie good catch about the line break that can be part of a quoted CSV value. This is a tricky one; this is not standardized and it seems like many implementations do not support it either. The Ruby CSV library does support it though.

The problem we face here is, as you pointed out, (and this is related to the long-lasting streaming vs line-oriented data) with the BufferedTokenizer a column containing a line break will be split in 2 lines so line breaks in columns will not work with the BufferedTokenizer. On the other hand, if we don't use the BufferedTokenizer then this codec will not work with streaming input like the tcp input. And as you also pointed out, when used with the file input, line breaks will already be processed (but note that if using the file input, line breaks in columns will not work either, regardless of the csv codec implementation).

This is in fact very similar to the problem described in https://github.com/logstash-plugins/logstash-codec-multiline/pull/63 where I suggested introducing a a streaming_input config option to deal with this.

colinsurprenant commented 4 years ago

@yaauie let me recap the problems & alternatives we have:

Use cases

Solutions

  1. We introduce a new streaming_input option (as also suggested in logstash-plugins/logstash-codec-multiline#63) which controls using the BufferedTokenizer or not to support the 2 use-cases above.
  2. We split this codec in 2 with csv and csv_lines for example, to support the 2 use-cases (similar to json/json_lines

Any other solution suggestions?

If we decide to move forward with something like (1), we could also plan the following:

WDYT? Depending on what we decided here, I'll move the discussion in a new logstash issue to followup.

yaauie commented 4 years ago

It is unfortunate that BufferedTokenizer consumes the delimiter, and that our codecs cannot rely on the completeness of the hunk of data arriving. In my mind, BufferedTokenizer should behave more like String#each_line than String.split, allowing the codecs to determine whether they need to chomp a trailing record separator, but that is not where we are.

The streaming_input solution certainly provides a way for users to configure a codec to re-introduce the delimiter to each chunk they receive, but it comes with its own edge-cases (e.g., closing flush on EOF introducing a newline that was never there). In general, I find that boolean flags end up overloading terms, and would recommend something like input_orientation => stream and input_orientation => line.

The other option is to push this new config to the "offending" inputs, providing them a way to declaratively include the delimiters.


Since the current behaviour of this codec is to handle each line as an event, emitting a _csvparsefailure-tagged event with the entire line when we encounter a failure, we will need to define what its behaviour should be when a multi-record hunk is given to it, especially if we have already emitted several events before encountering an error.

Here, I think CSV#readline may be helpful here too, especially combined with StringIO, since we can effectively "checkpoint" at each successful parse, and emit the remainder as the _csvparsefailure-tagged event.

Something like:

diff --git a/lib/logstash/codecs/csv.rb b/lib/logstash/codecs/csv.rb
index 07d6416..186c8d5 100644
--- a/lib/logstash/codecs/csv.rb
+++ b/lib/logstash/codecs/csv.rb
@@ -21,6 +21,8 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base
   # Optional.
   config :separator, :validate => :string, :default => ","

+  config :delimiter, :validate => :string, :default => "\n"
+
   # Define the character used to quote CSV fields. If this is not specified
   # the default is a double quote `"`.
   # Optional.
@@ -109,14 +111,20 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base
   end

   def decode(data)
-    data = @converter.convert(data)
-    begin
-      values = CSV.parse_line(data, :col_sep => @separator, :quote_char => @quote_char)
+    data_io = StringIO.new(@converter.convert(data))
+    data_io.close_write
+    ack_position = 0
+    csv = CSV.new(data_io, :col_sep => @separator, :row_sep => @delimiter, :quote_char => @quote_char)
+
+    loop do
+      values = csv.readline
+      ack_position = data_io.pos
+      break if values.nil?

       if (@autodetect_column_names && @columns.empty?)
         @columns = values
         @logger.debug? && @logger.debug("Auto detected the following columns", :columns => @columns.inspect)
-        return
+        next
       end

       decoded = {}
@@ -130,10 +138,11 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base
       end

       yield LogStash::Event.new(decoded)
-    rescue CSV::MalformedCSVError => e
-      @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => data)
-      yield LogStash::Event.new("message" => data, "tags" => ["_csvparsefailure"])
     end
+  rescue CSV::MalformedCSVError => e
+    data_io.seek(ack_position)
+    @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => data)
+    yield LogStash::Event.new("message" => data_io.read, "tags" => ["_csvparsefailure"])
   end

   def encode(event)
colinsurprenant commented 4 years ago

@yaauie +1 input_orientation.

I don't think we actually need to re-add line breaks. You probably saw that in logstash-plugins/logstash-codec-multiline#63 but I think a better way would be to simply use or not use the BufferedTokenizer depending on input_orientation.

Do we agree on this strategy? If we do I'll go ahead and refactor for this and then we can iterate review on the implementation details.

yaauie commented 4 years ago

The subject of this PR is "support line delimited data"; from the specs you added, I take this to mean "when CSV#decode is passed a body representing multiple records, emit each record as an event".

This goal can be achieved without adding BufferedTokenizer by using CSV#parse or by repeatedly sending CSV#readline; we can use CSV's row_sep directive and let it do the contextual tokenizing that respects quoted values in a way BufferedTokenizer cannot.

From what I can see, the only scenario in which adding a BufferedTokenizer would be useful is the opposite: when the codec is independently given multiple fragments of a single row spread across multiple CSV#decode calls, and is expected to retain and reassemble those fragments into a single row. And because many of our inputs use BufferedTokenizers of their own and swallow line-delimiters, this introduces more edge-cases.

colinsurprenant commented 4 years ago

@yaauie obviously the understanding of the problem has evolved and the description and specs have not yet followed, that's why I tried to recap my understanding of the problem and submitted possible solutions.

As I tried to explain, by using then input_orientation option which should certainly default to line, the BufferedTokenizer is not required and will not be used. The problem will be if you try to use a streaming input such as stdin or tcp it will not work correctly unless we use the BufferedTokenizer by configuring input_orientation => stream. The other option would be to make an extra csv_lines codec, similar to json/json_lines and plain/line codecs. My opinion is that we should eventually get rid of these duplicated codecs to serve the line-oriented and streaming inputs cases and instead move toward using an input_orientation option, which could also eventually be hinted by the input plugin, which knows what type of input it is providing (and also get rid of the weird fix_streaming_codecs method).

This IMO would provide a simple and cleaner path forward with what we have today, until we come up with a new processing framework (milling or else) at some point in the future.

colinsurprenant commented 4 years ago

Let me know if there are any objections with this plan.

colinsurprenant commented 4 years ago

Opened https://github.com/elastic/logstash/issues/11885 for the broader discussion

colinsurprenant commented 4 years ago

This is on hold until we conclude https://github.com/elastic/logstash/issues/11885