fluent / fluent-plugin-kafka

Kafka input and output plugin for Fluentd
Other
303 stars 178 forks source link

Seeing "MessageSizeTooLarge" error when forwarding logs/events to Kafka streams. #433

Closed raonelakurti closed 2 years ago

raonelakurti commented 3 years ago

Describe the bug

Collecting logs from different computes(through fleunt-bit/td-agent-bit) and sending logs/events centralized td-agent(fluentd). Using forward option sending to kafka streams.

For certain messages I'm able to send logs continuously. For bigger messages more than 10MB it's failing to process and storing in buffer path of td-agent and not able to process/send to Kafka streams.

Error:

2021-10-18 20:38:12 +0000 [debug]: #0 7015 messages send.
2021-10-18 20:38:13 +0000 [warn]: #0 Send exception occurred: Kafka::MessageSizeTooLarge
2021-10-18 20:38:13 +0000 [warn]: #0 Exception Backtrace : /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol.rb:160:in `handle_error'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:153:in `block in handle_response'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol/produce_response.rb:36:in `block (2 levels) in each_partition'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol/produce_response.rb:35:in `each'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol/produce_response.rb:35:in `block in each_partition'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol/produce_response.rb:34:in `each'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol/produce_response.rb:34:in `each_partition'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:144:in `handle_response'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:133:in `block in send_buffered_messages'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:105:in `each'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:105:in `send_buffered_messages'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:62:in `block in execute'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/instrumenter.rb:23:in `instrument'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:53:in `execute'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.16.3/lib/fluent/plugin/kafka_producer_ext.rb:212:in `block in deliver_messages_with_retries'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.16.3/lib/fluent/plugin/kafka_producer_ext.rb:202:in `loop'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.16.3/lib/fluent/plugin/kafka_producer_ext.rb:202:in `deliver_messages_with_retries'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.16.3/lib/fluent/plugin/kafka_producer_ext.rb:128:in `deliver_messages'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.16.3/lib/fluent/plugin/out_kafka2.rb:279:in `write'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin/output.rb:1138:in `try_flush'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin/output.rb:1450:in `flush_thread_run'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin/output.rb:462:in `block (2 levels) in start'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2021-10-18 20:38:13 +0000 [info]: #0 initialized kafka producer: fluentd
2021-10-18 20:38:13 +0000 [debug]: #0 taking back chunk for errors. chunk="5cea40997d426f4cf16dcb054c9881ae"
2021-10-18 20:38:13 +0000 [warn]: #0 failed to flush the buffer. retry_time=7 next_retry_seconds=2021-10-18 20:38:40 113632620852088348799/274877906944000000000 +0000 chunk="5cea40997d426f4cf16dcb054c9881ae" error_class=Kafka::MessageSizeTooLarge error="Kafka::MessageSizeTooLarge"

To Reproduce

td-agent.conf:

<match **>
  @type kafka2
  # list of seed brokers
    brokers xyz.com:9092
    use_event_time true
    username "xyz"
    password "xyz"
    ssl_ca_cert /etc/td-agent/certs/ca-cert.pem

    topic test_topic
    # topic settings
    topic_key test_topic

    # producer settings
    required_acks -1
    compression_codec gzip
    @log_level debug
    max_send_limit_bytes 1000000
    # buffer settings
    <buffer>
        @type file
        path /var/log/td-agent/buffer/osquery
        flush_interval 60s
        timekey 14400
        chunk_limit_size 64MB
        total_limit_size 1024MB
        flush_mode interval
        flush_thread_count 4
        retry_type exponential_backoff
        retry_max_interval 30
        retry_forever
    </buffer>

    # data type settings
    <format>
        @type json
    </format>
</match>

Expected behavior

I'm looking to process all my logs without failure. Please suggest me if I'm missing something.

Your Environment

- Fluentd version: 
- TD Agent version: td-agentv4
- Operating system: Centos-7
- Kernel version:

Your Configuration

<match **>
  @type kafka2
  # list of seed brokers
    brokers xyz.com:9092
    use_event_time true
    username "xyz"
    password "xyz"
    ssl_ca_cert /etc/td-agent/certs/ca-cert.pem

    topic test_topic
    # topic settings
    topic_key test_topic

    # producer settings
    required_acks -1
    compression_codec gzip
    @log_level debug
    max_send_limit_bytes 1000000
    # buffer settings
    <buffer>
        @type file
        path /var/log/td-agent/buffer/osquery
        flush_interval 60s
        timekey 14400
        chunk_limit_size 64MB
        total_limit_size 1024MB
        flush_mode interval
        flush_thread_count 4
        retry_type exponential_backoff
        retry_max_interval 30
        retry_forever
    </buffer>

    # data type settings
    <format>
        @type json
    </format>
</match>

Your Error Log

2021-11-03 16:26:49 +0000 [warn]: #0 Send exception occurred: Kafka::MessageSizeTooLarge
2021-11-03 16:26:49 +0000 [warn]: #0 Exception Backtrace : /opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol.rb:160:in `handle_error'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:153:in `block in handle_response'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol/produce_response.rb:36:in `block (2 levels) in each_partition'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol/produce_response.rb:35:in `each'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol/produce_response.rb:35:in `block in each_partition'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol/produce_response.rb:34:in `each'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/protocol/produce_response.rb:34:in `each_partition'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:144:in `handle_response'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:133:in `block in send_buffered_messages'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:105:in `each'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:105:in `send_buffered_messages'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:62:in `block in execute'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/instrumenter.rb:23:in `instrument'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/ruby-kafka-1.3.0/lib/kafka/produce_operation.rb:53:in `execute'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.16.3/lib/fluent/plugin/kafka_producer_ext.rb:212:in `block in deliver_messages_with_retries'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.16.3/lib/fluent/plugin/kafka_producer_ext.rb:202:in `loop'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.16.3/lib/fluent/plugin/kafka_producer_ext.rb:202:in `deliver_messages_with_retries'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.16.3/lib/fluent/plugin/kafka_producer_ext.rb:128:in `deliver_messages'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluent-plugin-kafka-0.16.3/lib/fluent/plugin/out_kafka2.rb:279:in `write'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin/output.rb:1138:in `try_flush'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin/output.rb:1450:in `flush_thread_run'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin/output.rb:462:in `block (2 levels) in start'
/opt/td-agent/lib/ruby/gems/2.7.0/gems/fluentd-1.13.3/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2021-11-03 16:26:49 +0000 [info]: #0 initialized kafka producer: fluentd
2021-11-03 16:26:49 +0000 [warn]: #0 failed to flush the buffer. retry_time=2 next_retry_seconds=2021-11-03 16:26:51 1339415856744267578541/4398046511104000000000 +0000 chunk="5cfe4c63430650ff67cade9c0d9bce76" error_class=Kafka::MessageSizeTooLarge error="Kafka::MessageSizeTooLarge"

Additional context

No response

github-actions[bot] commented 2 years ago

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days

github-actions[bot] commented 2 years ago

This issue was automatically closed because of stale in 30 days