uken / fluent-plugin-elasticsearch

Apache License 2.0
891 stars 310 forks source link

Data is inserted 3 times in elasticsearch - could not push logs to Elasticsearch cluster #769

Closed afonsoaugusto closed 4 years ago

afonsoaugusto commented 4 years ago

(check apply)

Problem

Hi, I have the error using td-fluentd, when my application send information, this information is inserted 3 times in elasticsearch.

In my example I using lib python "fluent-logger==0.9.6", but when application using lib node, the same problem occur.

2020-06-25 14:19:17 +0000 [warn]: #0 failed to flush the buffer. retry_time=0 next_retry_seconds=2020-06-25 14:19:18 +0000 chunk="5a8e94686bb9317b73930850775ea484" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"elk.example.com.br\", :port=>9200, :scheme=>\"http\"}): read timeout reached"
  2020-06-25 14:19:17 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-4.0.9/lib/fluent/plugin/out_elasticsearch.rb:995:in `rescue in send_bulk'
  2020-06-25 14:19:17 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-4.0.9/lib/fluent/plugin/out_elasticsearch.rb:957:in `send_bulk'
  2020-06-25 14:19:17 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-4.0.9/lib/fluent/plugin/out_elasticsearch.rb:789:in `block in write'
  2020-06-25 14:19:17 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-4.0.9/lib/fluent/plugin/out_elasticsearch.rb:788:in `each'
  2020-06-25 14:19:17 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-4.0.9/lib/fluent/plugin/out_elasticsearch.rb:788:in `write'
  2020-06-25 14:19:17 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:1133:in `try_flush'
  2020-06-25 14:19:17 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:1439:in `flush_thread_run'
  2020-06-25 14:19:17 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:461:in `block (2 levels) in start'
  2020-06-25 14:19:17 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2020-06-25 14:19:23 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-06-25 14:19:24 +0000 chunk="5a8e946d31eeeeb7c8175dc080d29088" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"elk.example.com.br\", :port=>9200, :scheme=>\"http\"}): read timeout reached"
  2020-06-25 14:19:23 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-4.0.9/lib/fluent/plugin/out_elasticsearch.rb:995:in `rescue in send_bulk'
  2020-06-25 14:19:23 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-4.0.9/lib/fluent/plugin/out_elasticsearch.rb:957:in `send_bulk'
  2020-06-25 14:19:23 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-4.0.9/lib/fluent/plugin/out_elasticsearch.rb:789:in `block in write'
  2020-06-25 14:19:23 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-4.0.9/lib/fluent/plugin/out_elasticsearch.rb:788:in `each'
  2020-06-25 14:19:23 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-elasticsearch-4.0.9/lib/fluent/plugin/out_elasticsearch.rb:788:in `write'
  2020-06-25 14:19:23 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:1133:in `try_flush'
  2020-06-25 14:19:23 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:1439:in `flush_thread_run'
  2020-06-25 14:19:23 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:461:in `block (2 levels) in start'
  2020-06-25 14:19:23 +0000 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.11.1/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2020-06-25 14:19:23 +0000 [warn]: #0 failed to flush the buffer. retry_time=2 next_retry_seconds=2020-06-25 14:19:25 +0000 chunk="5a8e94686bb9317b73930850775ea484" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"elk.example.com.br\", :port=>9200, :scheme=>\"http\"}): read timeout reached"
  2020-06-25 14:19:23 +0000 [warn]: #0 suppressed same stacktrace
2020-06-25 14:19:30 +0000 [warn]: #0 failed to flush the buffer. retry_time=3 next_retry_seconds=2020-06-25 14:19:34 +0000 chunk="5a8e946d31eeeeb7c8175dc080d29088" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"elk.example.com.br\", :port=>9200, :scheme=>\"http\"}): read timeout reached"
  2020-06-25 14:19:30 +0000 [warn]: #0 suppressed same stacktrace
2020-06-25 14:19:30 +0000 [error]: #0 failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue. retry_times=3 records=10 error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"elk.example.com.br\", :port=>9200, :scheme=>\"http\"}): read timeout reached"
  2020-06-25 14:19:30 +0000 [error]: #0 suppressed same stacktrace
2020-06-25 14:20:26 +0000 [warn]: #0 failed to flush the buffer. retry_time=0 next_retry_seconds=2020-06-25 14:20:27 +0000 chunk="5a8e94aa0e327ca6bbe6519d2a6ce227" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"elk.example.com.br\", :port=>9200, :scheme=>\"http\"}): read timeout reached"
  2020-06-25 14:20:26 +0000 [warn]: #0 suppressed same stacktrace
2020-06-25 14:20:32 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-06-25 14:20:33 +0000 chunk="5a8e94afc9318378602e6d582fbda7f4" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"elk.example.com.br\", :port=>9200, :scheme=>\"http\"}): read timeout reached"
  2020-06-25 14:20:32 +0000 [warn]: #0 suppressed same stacktrace
2020-06-25 14:20:32 +0000 [warn]: #0 failed to flush the buffer. retry_time=2 next_retry_seconds=2020-06-25 14:20:34 +0000 chunk="5a8e94aa0e327ca6bbe6519d2a6ce227" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"elk.example.com.br\", :port=>9200, :scheme=>\"http\"}): read timeout reached"
  2020-06-25 14:20:32 +0000 [warn]: #0 suppressed same stacktrace
2020-06-25 14:20:37 +0000 [warn]: #0 retry succeeded. chunk_id="5a8e94aa0e327ca6bbe6519d2a6ce227"
2020-06-25 14:22:33 +0000 [warn]: #0 failed to flush the buffer. retry_time=0 next_retry_seconds=2020-06-25 14:22:34 +0000 chunk="5a8e95222320dfe8d3d4af93e4b25be4" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"elk.example.com.br\", :port=>9200, :scheme=>\"http\"}): read timeout reached"
  2020-06-25 14:22:33 +0000 [warn]: #0 suppressed same stacktrace
2020-06-25 14:22:39 +0000 [warn]: #0 failed to flush the buffer. retry_time=1 next_retry_seconds=2020-06-25 14:22:40 +0000 chunk="5a8e95222320dfe8d3d4af93e4b25be4" error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error="could not push logs to Elasticsearch cluster ({:host=>\"elk.example.com.br\", :port=>9200, :scheme=>\"http\"}): read timeout reached"
  2020-06-25 14:22:39 +0000 [warn]: #0 suppressed same stacktrace
2020-06-25 14:22:41 +0000 [warn]: #0 retry succeeded. chunk_id="5a8e95222320dfe8d3d4af93e4b25be4"

insert-elasticsearch

Steps to replicate

cat /etc/td-agent/td-agent.conf

<filter program-example**>
  @type record_transformer
  remove_keys container_name,container_id
</filter>

<match program-example**>
  @type elasticsearch
  host elk.example.com.br
  port 9200
  default_elasticsearch_version 6

  logstash_format true
  logstash_prefix ${tag}
  logstash_dateformat %Y%m%d
  include_tag_key true
  type_name app_log
  tag_key @log_name
  <buffer>
    @type memory
    flush_thread_count 2
    chunk_limit_size 32MB
    retry_max_interval 10
    retry_max_times 3
    flush_interval 5s
  </buffer>
</match>

<source>
  @type forward
  port 5143
  bind 0.0.0.0
</source>
**Testing**:

# test.py
from fluent import sender
from fluent import event
import time
sender.setup('program-example',
             host='fluentd-transporter-cluster.ec2.res.prv.hmg.maxmilhas.com', port=5143)

ts = time.time()

for x in range(1):
    time.sleep(1)
    print(x)
    event.Event('unfollow', {
        'from': 'userA-'+ str(x),
        'to':   'userB-'+ str(x),
        'ts':   'userB-'+ str(ts)
    })

Using Fluentd and ES plugin versions

For instalation I using the script: https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh

My environment:

fluentd server:
   CentOS Linux release 7.7.1908 (Core)
  /opt/td-agent/embedded/bin/fluentd --version -> fluentd 1.11.1

Elasticsearch 
    version "6.8.1"
    3 Master Ingest
    6 Data nodes

In my Elasticsearch I using AWS Classic LoadBalancer -> TCP 9200
afonsoaugusto commented 4 years ago

I fixed the problem using hash_id_key and increase the timeout.

<filter **>
  @type elasticsearch_genid
  hash_id_key _hash    
</filter>

<match logs**>
  @type elasticsearch
  host elk.example.com
  port 9200
  default_elasticsearch_version 6
  id_key _hash # specify same key name which is specified in hash_id_key
  remove_keys _hash # Elasticsearch doesn't like keys that start with _
  request_timeout 20s # defaults to 5s

  logstash_format true
  logstash_prefix ${tag}
  logstash_dateformat %Y.%m.%d
  include_tag_key true
  type_name app_log
  tag_key @log_name
  <buffer>
    @type memory
    flush_thread_count 2
    chunk_limit_size 32MB
    retry_max_interval 10
    retry_max_times 3
    flush_interval 5s
  </buffer>
</match>

<source>
  @type forward
  port 5142
  bind 0.0.0.0
</source>

Reference: https://medium.com/redbox-techblog/tuning-fluentd-retries-avoiding-duplicate-documents-in-elasticsearch-e7cb9630a453