uken / fluent-plugin-elasticsearch

Apache License 2.0
890 stars 310 forks source link

error field name like ^#1 #965

Closed frankcrc closed 2 years ago

frankcrc commented 2 years ago

(check apply)

Problem

When send_bulk encountered error, such as es_rejected_execution_exception (when es is busy), subsequent record could possibly contain strange field, like "^#1" => "xxx".

I debuged, and found it is related to handle_error. The same chunk is msgpack_each agained. I wrote some code to test, and I think nestly calling msgpack_each on the same chunk is unsafe.

I also summited a issue about this in msgpack_ruby, https://github.com/msgpack/msgpack-ruby/issues/277

Steps to replicate

  1. modify out_elasticsearch.rb, https://github.com/frankcrc/fluent-plugin-elasticsearch/commit/5c2a12920bd7603332e0bfde652dc5c2a0321068
  2. fluentd conf
    
    <source>
    @type forward
    @id out_fwd
    bind 0.0.0.0
    port 24224
    </source>

<match caih-**> @type copy

@type elasticsearch @id out_es with_transporter_log true @log_level info log_es_400_reason true hosts 127.0.0.1:9200 user elastic password abcd1234 scheme http request_timeout 30s ssl_verify false bulk_message_request_threshold 200K @type file path /opt/apps/es_data/buffer/forward chunk_limit_size 90M overflow_action block timekey 1h timekey_wait 0s timekey_use_utc false timekey_zone +0800 flush_thread_count 1 flush_mode interval flush_interval 5s index_name ${tag}-error-handler include_timestamp true type_name _doc id_key _hash # specify same key name which is specified in hash_id_key remove_keys _hash



3. copy [buffer.q5dc863c51ab364f71ce240ccdcbf8699.log](https://github.com/frankcrc/fluent-plugin-elasticsearch/blob/branch_checkout_from_v4.2.2/problem_to_reproduce/buffer.q5dc863c51ab364f71ce240ccdcbf8699.log) and [buffer.q5dc863c51ab364f71ce240ccdcbf8699.log.meta](https://github.com/frankcrc/fluent-plugin-elasticsearch/blob/branch_checkout_from_v4.2.2/problem_to_reproduce/buffer.q5dc863c51ab364f71ce240ccdcbf8699.log.meta) to /opt/apps/es_data/buffer/forward.
4. start fluentd.

In addition, [mock.dat](https://github.com/frankcrc/fluent-plugin-elasticsearch/blob/branch_checkout_from_v4.2.2/problem_to_reproduce/mock.dat) contains origin records, which is corresponse with buffer.q5dc863c51ab364f71ce240ccdcbf8699.log. I guess using in_tail plugin can also reproduce the problem.

#### Expected Behavior or What you need to ask

If `msgpack_each` allows nested call, msg_count should be 500 when code runs to line 826, https://github.com/frankcrc/fluent-plugin-elasticsearch/blob/5c2a12920bd7603332e0bfde652dc5c2a0321068/lib/fluent/plugin/out_elasticsearch.rb#L826.

However, the fact is that msg_count would be less than 500.

I think we could create a transient array to save unpacked message, and then pass it to `handle_error` method.

#### Using Fluentd and ES plugin versions

* UOS 20
* Laptop
* Fluentd v1.11.5/td-agent 3.8.1
* ES plugin 4.2.2
* ES version 7.6.1