uken / fluent-plugin-elasticsearch

Apache License 2.0
890 stars 310 forks source link

[Elasticsearch Data Stream] Catch errors with 400 status #981

Open didrikseni opened 2 years ago

didrikseni commented 2 years ago

Problem

Hello, my team and I encounter a problem where the logs were lost when trying to index them and the ES Bulk API responded with status 400 due to some mapping parsing exceptions.

Steps to replicate

I was testing and trying to replicate the problem with the following simple configuration:

<source>
  @type sample
  tag main
  size 1
  auto_increment_key id
  dummy {"message": {"asd": "1"}}
</source>

<match main>
  @type rewrite_tag_filter
  @label @ROUTE
  <rule>
    key message
    pattern /.*/
    tag general
  </rule>
</match>

<label @ROUTE>
  <match **>
      @type elasticsearch_data_stream
      data_stream_name logstash
      host elasticsearch
      port 9200
      include_tag_key true
      tag_key @log_name
      @label @RETRY_ES
      logstash_format true
      log_es_400_reason true
     <buffer>
        @type memory
        flush_mode immediate
     </buffer>
  </match>
</label>

<label @RETRY_ES>
  <match **>
    @type stdout
  </match>
</label>

<label @ERROR>
  <match **>
    @type stdout
  </match>
</label>

Got the following message:

{"time":"2022-07-21 19:54:25 +0000","level":"error","message":"Could not bulk insert to Data Stream: logstash {\"took\"=>1, \"errors\"=>true, \"items\"=>[{\"create\"=>{\"_index\"=>\".ds-logstash-2022.07.20-000001\", \"_type\"=>\"_doc\", \"_id\"=>\"PXZTIoIBYBu9OKH0XV-I\", \"status\"=>400, \"error\"=>{\"type\"=>\"mapper_parsing_exception\", \"reason\"=>\"failed to parse field [message] of type [text] in document with id 'PXZTIoIBYBu9OKH0XV-I'. Preview of field's value: '{asd=1}'\", \"caused_by\"=>{\"type\"=>\"illegal_state_exception\", \"reason\"=>\"Can't get text on a START_OBJECT at 1:12\"}}}}]}","worker_id":0}

Expected Behavior or What you need to ask

We want to capture this errors and process them to be retried and indexed in some other index. But we can't capture them with the "@ERROR" or "@RETRY_ES" labels.

We try send them by using the @type elasticsearch instead of elasticsearch_data_stream, and it worked, we can capture the logs that failed to be indexed in the "@ERROR" label. But it's a requirement for the team to use data streams.

My question is if it's possible or not, given that the mapping parser error is a logical error, to capture and process them.

PS: We know that the data is not matching the indices, but we can't change how the data is sent to us.

Using Fluentd and ES plugin versions

bgruszka commented 9 months ago

We have exactly the same issue. Is there anyone able to confirm that this is how elasticsearch_data_stream works? Is there a chance for any development in this regard? Unfortunately, it seems that there are many missing features for elasticsearch_data_stream, e.g., https://github.com/uken/fluent-plugin-elasticsearch/issues/1027. Can I please ask for any help from the project maintainer (@cosmo0920 @kenhys)? 🙂

cosmo0920 commented 9 months ago

@kenhys Can you take a look on this?

Floppe commented 2 days ago

Seems to be many things that does not work with type elasticsearch_data_stream. This is one of those things I also miss. Any progress?