fluent / fluent-plugin-kafka

Kafka input and output plugin for Fluentd
Other
303 stars 176 forks source link

Occasional FFI error #422

Closed davide-bolcioni closed 3 years ago

davide-bolcioni commented 3 years ago

Describe the bug

Occasional FFI error (see log below) appears in the log, for about 0.5% of messages. I suspect it is the underlying reason for a persistent stream of retries I cannot otherwise explain.

To Reproduce

Not easily reproduced, fluentd is being forwarded records (in JSON format from k8s) and the error is sporadic.

Expected behavior

No FFI exception.

Your Environment

- Fluentd version: 13.0
- TD Agent version:
- fluent-plugin-kafka version: 0.17.0
- ruby-kafka version: rdkafka 0.8.1
- Operating system: debian stable (docker container under kubernetes)
- Kernel version: unavailable

Your Configuration

Configuration:

        <match kube.**>
          @id kafka
          @type rdkafka2
          @log_level debug
          brokers ...

          topic messages_topic
          <format>
            @type json
          </format>

          # The following is escaped JSON in a fluentd Ruby parsed string. Its parser
          # does not like blank lines, trailing whitespace, and the last } on its own line.
          rdkafka_options "{
            \"log_level\" : 7,
            \"compression.codec\":  \"gzip\",
            \"security.protocol\":  \"sasl_ssl\",
            \"linger.ms\": 2000,
            \"ssl.ca.location\":    \"/etc/instaclustr/kafka/cluster-ca-certificate.pem\",
            \"sasl.mechanism\":     \"SCRAM-SHA-256\",
            \"sasl.username\":      \"#{ENV['KAFKA_USERNAME']}\",
            \"sasl.password\":      \"#{ENV['KAFKA_PASSWORD']}\"}"

          <buffer []>
            flush_mode             interval
            flush_interval         30s
            flush_thread_count     2
            retry_max_interval     180s
            retry_wait             2s
            retry_timeout          96h
            chunk_limit_size       300K
            delayed_commit_timeout 150s
            overflow_action        throw_exception
          </buffer>
        </match>

Your Error Log

The following appears in fluentd logs, for about 0.5% of messages:

{"time":"2021-09-09 19:04:47 +0000","level":"warn","worker_id":0,"message":"/usr
/lib/ruby/gems/2.7.0/gems/ffi-1.15.4/lib/ffi/variadic.rb:47:in `invoke'\n/usr/li
b/ruby/gems/2.7.0/gems/ffi-1.15.4/lib/ffi/variadic.rb:47:in `call'\n(eval):3:in 
`rd_kafka_producev'\n/usr/lib/ruby/gems/2.7.0/gems/rdkafka-0.8.1/lib/rdkafka/pro
ducer.rb:155:in `produce'\n/usr/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.0/lib/fluent/plugin/out_rdkafka2.rb:342:in `block in enqueue_with_retry'\n/usr/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.0/lib/fluent/plugin/out_rdkafka2.rb:340:in `loop'\n/usr/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.0/lib/fluent/plugin/out_rdkafka2.rb:340:in `enqueue_with_retry'\n/usr/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.0/lib/fluent/plugin/out_rdkafka2.rb:319:in `block in write'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/event.rb:315:in `each'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/event.rb:315:in `block in each'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/event.rb:314:in `each'\n/usr/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.17.0/lib/fluent/plugin/out_rdkafka2.rb:297:in `write'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin/output.rb:1138:in `try_flush'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin/output.rb:1450:in `flush_thread_run'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin/output.rb:462:in `block (2 levels) in start'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'"}


### Additional context

_No response_
davide-bolcioni commented 3 years ago

The only parameter in

return producer.produce(topic: topic, payload: record_buf, key: message_key, partition: partition, headers: headers)

which seems a candidate for conversion to integer is the partition. Maybe a configuration setting.

davide-bolcioni commented 3 years ago

False alarm, went away with

partition_key nil