fluent / fluent-plugin-opensearch

OpenSearch Plugin for Fluentd
Apache License 2.0
57 stars 20 forks source link

OpenSearchOutputDataStream does not provide error handling #123

Open robinsillem opened 10 months ago

robinsillem commented 10 months ago

(check apply)

Problem

OpenSearchOutputDataStream does not provide handling for cases where opensearch returns errors in its response to bulk inserts. This behaviour is present in OpenSearchOutput

This is a problem because the offending log message is lost, with no possibility for fluentd to (for instance) send it to some other form of dead letter queue storage, and subsequent offline processing

Steps to replicate

Config
  <match **>
    @type opensearch_data_stream
    @log_level ${log_level}

    host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
    port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
    user "#{ENV['FLUENT_ELASTICSEARCH_USER']}"
    password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD']}"
    scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
    ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"

    data_stream_name e.$${tag}.%Y.%W
    include_timestamp true

    default_opensearch_version 2
    fail_on_detecting_os_version_retry_exceed false
    max_retry_get_os_version 2
    verify_os_version_at_startup true

    include_tag_key true
    log_os_400_reason true
    reconnect_on_error true
    reload_after 100
    reload_connections true
    sniffer_class_name Fluent::Plugin::OpenSearchSimpleSniffer
    reload_on_failure true
    request_timeout 60s
    resurrect_after 60s
    suppress_type_name true

    with_transporter_log false # debug flushes

    <buffer tag, time>
        @type file
        timekey 5
        path /var/log/fluent/opensearch
        flush_mode immediate
        retry_type exponential_backoff
        flush_thread_count 12
        retry_forever true
        retry_max_interval 30
        total_limit_size ${fluentd_size}
        overflow_action block
    </buffer>
  </match>

  # This was added to provide DLQ functionality for logs that OS cannot parse (unrecoverable errors), sending them to S3
  # It also catches any events added to the fluentd processing to handle errors (requires emit_error_label_event=true)
  <label @ERROR>
    <match **>
      @type s3
      s3_bucket ${s3_access_point_alias}
      s3_region eu-west-2
      path errors/$${tag}/%Y/%m/%d/
      s3_object_key_format %%{path}%%{time_slice}_%%{index}.%%{file_extension}
      store_as text
      <buffer tag,time>
        @type file
        path /var/log/fluent/s3
        timekey 60 # 1 minute partition
        timekey_wait 10s
        timekey_use_utc true # use utc
      </buffer>
    </match>
  </label>
Message

This is a heavily redacted copy of a log message containing a field which opensearch attempts and fails to parse as a date, leading to a 400 response

{
  "request": {
    "headers": {
      "taxyearexplicit": "2018-19"
    }
  }
}

Expected Behavior or What you need to ask

I expect this message to trigger an error label event which can be picked up in the <label @ERROR> config above and handled within the plugin, allowing us to configure further action as above.

Instead, what we see is a log from fluentd noting that "Could not bulk insert to Data Stream: #{data_stream_name} #{response}", but the original log has been dropped. This means we are losing logs. While in this case the root cause may be considered to lie with opensearch, we still need a general fallback for uprocessable messages.

Using Fluentd and OpenSearch plugin versions