uken / fluent-plugin-elasticsearch

Apache License 2.0
891 stars 310 forks source link

Duplicate records sent to elasticsearch #796

Open willemveerman opened 4 years ago

willemveerman commented 4 years ago

(check apply)

Problem

We have found that k8s API server audit log records are being duplicated many times over

We are using the elasticsearch_genid filter to hash each record so duplicate records do not appear in ES

However, we have been suffering severe downtime in our ES cluster because fluentd is repeatedly re-sending requests

We know that it is sending duplicate requests because if we comment out use_record_as_seed (as below) the records are not hashed but instead a uuid is inserted in the _id field

We can then see in Kibana hundreds of k8s audit log records with identical auditID field.

Is it possible that a filter with ** expression can cause fluentd to emit duplicate records?

   <filter **>
      @type elasticsearch_genid
      hash_id_key generated_hash
      use_entire_record true
      #use_record_as_seed true
      separator _
      hash_type sha256
    </filter>

    <source>
      @id in_tail_apiserver_audit_logs
      @type tail
      multiline_flush_interval 5s
      path /var/lib/docker/kubernetes/logs/apiserver-audit*.log
      pos_file /var/log/fluentd-apiserver-audit.log.pos
      tag kube-apiserver-audit
      read_from_head true
      <parse>
        @type json
        time_format %Y-%m-%dT%H:%M:%S.%L
        time_key stageTimestamp
      </parse>
    </source>

    <filter kube-apiserver-audit>
      @type record_transformer
      enable_ruby true
      <record>
        tags kubernetes_audit_filtered
        _s3_output_type kubernetes_audit
        s3Key ${record.dig("objectRef", "namespace") ? record.dig("objectRef", "namespace") : 'cluster_wide'}
      </record>
    </filter>

    <filter kube-apiserver-audit>
      @type record_transformer
      enable_ruby true
      auto_typecast true
      renew_record true
      <record>
        audit_json ${record}
      </record>
    </filter>

    <match kube-apiserver-audit>
      @type copy
      <store>
        @type elasticsearch
        @log_level debug
        include_tag_key true
        type_name doc
        id_key $.audit_json.generated_hash
        remove_keys $.audit_json.generated_hash
        host {{ .Env.elasticsearch.host | default .Common.elasticsearch.host }}
        port {{ .Env.elasticsearch.port | default .Common.elasticsearch.port }}
        user "#{ENV['FLUENT_ELASTICSEARCH_USER']}"
        password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD']}"
        scheme {{ .Env.elasticsearch.scheme | default .Common.elasticsearch.scheme }}
        ssl_verify {{ .Env.elasticsearch.ssl_verify | default .Common.elasticsearch.ssl_verify }}
        ssl_version {{ .Env.elasticsearch.ssl_version | default .Common.elasticsearch.ssl_version }}
        ca_file {{ .Env.elasticsearch.ca_file | default .Common.elasticsearch.ca_file }}
        reload_connections false
        reconnect_on_error true
        reload_on_failure true
        logstash_format true
        logstash_prefix "kubernetes-audit-{{ .Env.elasticsearch.index_suffix | default .Common.elasticsearch.index_suffix }}"
        request_timeout 600s
        <buffer>
      flush_interval: 60s
      retry_max_times: 2
      retry_timeout: 40s
      retry_wait: 10s
      retry_max_interval: 30s
      retry_forever: false
      overflow_action: drop_oldest_chunk
      chunk_limit_size: 30MB
      flush_mode: interval
      flush_thread_count: 8
      retry_type: exponential_backoff
        </buffer>
      </store>
    </match>

Steps to replicate

as above

Expected Behavior or What you need to ask

No duplicate records in elasticsearch

Using Fluentd and ES plugin versions

cosmo0920 commented 4 years ago

We know that it is sending duplicate requests because if we comment out use_record_as_seed (as below) the records are not hashed but instead a uuid is inserted in the _id field

We can then see in Kibana hundreds of k8s audit log records with identical auditID field.

Is it possible that a filter with ** expression can cause fluentd to emit duplicate records?

It is impossible to implement under at least once protocol on Elasticsearch REST API. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html Elasticsearch itself does not return its record IDs from bulk API responses.

cosmo0920 commented 4 years ago

This is Fluentd mechanism limitation and Elasticsearch REST API limitation. If bulk API does not handle unique IDs, Elasticsearch shouldn't reject duplicated records.

willemveerman commented 4 years ago

So what you're saying is, if one record in the bulk request is rejected, fluentd will re-send the entire bulk request?

willemveerman commented 4 years ago

This is Fluentd mechanism limitation and Elasticsearch REST API limitation. If bulk API does not handle unique IDs, Elasticsearch shouldn't reject duplicated records.

I realise that if records are given a different unique ID in the _id field they will treated as new records in elasticsearch - that's fine, that's expected behaviour

What's not expected though is for fluentd to repeatedly send the same record

Why is it doing that?

willemveerman commented 4 years ago

Coud it be because we have filter with <filter **> setting?

cosmo0920 commented 4 years ago

So what you're saying is, if one record in the bulk request is rejected, fluentd will re-send the entire bulk request?

Yes.

willemveerman commented 4 years ago

So what you're saying is, if one record in the bulk request is rejected, fluentd will re-send the entire bulk request?

Yes.

Hold on, doesn't that mean that fluentd can enter a never-ending loop?

cosmo0920 commented 4 years ago

Elasticsearch plugin can resign to send records with https://github.com/uken/fluent-plugin-elasticsearch#unrecoverable-error-types. If Elasticsearch responses have error type, it can handle on error handler with unrecoverable_error_types.

cosmo0920 commented 4 years ago

If ES plugin does not catch up rejected duplicated records error as unrecoverable error, Fluentd re-send entire records. Yeah, your understanding is correct.

willemveerman commented 4 years ago

OK, thank you for the help.

However, es_rejected_execution_exception is included in unrecoverable_error_types by default. Correct?

Therefore, there is no configuration change in fluent-plugin-elasticsearch which I can make which will fix this problem. Correct?

willemveerman commented 4 years ago

Yes, it's the default: https://github.com/uken/fluent-plugin-elasticsearch/blob/master/lib/fluent/plugin/out_elasticsearch.rb#L153

So surely we want to keep es_rejected_execution_exception in unrecoverable_error_types so plugin does not re-send records on es_rejected_execution_exception error

cosmo0920 commented 4 years ago

However, es_rejected_execution_exception is included in unrecoverable_error_types by default. Correct?

Correct.

Therefore, there is no configuration change in fluent-plugin-elasticsearch which I can make which will fix this problem. Correct?

Correct.

willemveerman commented 4 years ago

What about if we put in retry_tag es.retry then insert:

    <match retry.es.*>
      @type stdout
    </match>

at top of config file

cosmo0920 commented 4 years ago

Yes. Re-routing is one approach to prevent re-register events and cause ES cluster exhausion. Openshift's log system uses this approach.

willemveerman commented 4 years ago

We have inserted this however we are still seeing issues of huge bulk write queue on one ES data node (out of 10 data nodes in cluster), which we believe is caused by fluentd re-sending records still

Have you seen such a situation before?

cosmo0920 commented 4 years ago

We does not provide Data manupilation services nor run huge Elasticsearch cluster. So, I hadn't heard such huge transportation rate environment. Our main target is fluentd-kubernetes-daemonset data ingestion with fluentd-debian-elasticsearch image.

willemveerman commented 4 years ago

Yes we're using that daemonset and image

We may have spotted some other issues so will raise another ticket if we can find a reproducible issue

cosmo0920 commented 4 years ago

Oh! You are so "advanced" user! 😎