Open kuckjwi0928 opened 3 months ago
When a BufferChunkOverflowError occurs, it gets caught in an unexpected error and infinitely reconnects until BufferChunkOverflowError is resolved.
I think the BufferChunkOverflow error should be included in the BufferError and retry without reconnecting the consumer, is this a bug?
Checking the fluentd-kafka-plugin log after a BufferChunkOverflowError
Reproduce the steps
It looks like BufferChunkOverflowError should be included in BufferError and retried without restarting the consumer.
- Fluentd version: 1.16.2 - TD Agent version: - fluent-plugin-kafka version: v0.19.2 - ruby-kafka version: 1.5.0 - Operating system: - Kernel version:
<source> @type kafka_group consumer_group fluentd-consumer brokers kafka-controller-0.kafka-controller-headless.kafka.svc.cluster.local:9092,kafka-controller-1.kafka-controller-headless.kafka.svc.cluster.local:9092,kafka-controller-2.kafka-controller-headless.kafka.svc.cluster.local:9092 topics test-topic format json offset_commit_interval 5 offset_commit_threshold 100 </source> <match test-topic> @type stdout <buffer> @type memory retry_max_times 3 flush_mode interval flush_interval 1s flush_thread_interval 0.1 flush_thread_burst_interval 0.01 flush_thread_count 5 chunk_full_threshold 0.1 chunk_limit_size 1 # This is just a setup to intentionally throw a BufferChunkOverFlow error. </buffer> </match>
2024-07-04 00:52:37 +0000 [info]: #1 Subscribe to topics matching the regex test-topic 2024-07-04 00:52:37 +0000 [warn]: #1 Re-starting consumer 2024-07-04 00:52:37 +0000 2024-07-04 00:52:44 +0000 [warn]: #1 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferChunkOverflowError error="a 3268 bytes record (nth: 0) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 1) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 2) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 3) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 4) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 5) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 6) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 7) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 8) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 9) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 10) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 11) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 12) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 13) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 14) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 15) is larger than buffer chunk limit size (1)" location="/usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:457:in `write'" tag="telemetry.ab" 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:198:in `rescue in emit_events' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:195:in `emit_events' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:111:in `block (2 levels) in process' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:110:in `each' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:110:in `block in process' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:108:in `each' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-rewrite-tag-filter-2.4.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:108:in `process' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/output.rb:885:in `emit_sync' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:196:in `emit_events' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:376:in `emit_events' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:347:in `process_batch' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:358:in `block in run' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:336:in `block (3 levels) in each_batch' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run' 2024-07-04 00:52:44 +0000 [warn]: #1 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferChunkOverflowError error="a 3268 bytes record (nth: 0) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 1) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 2) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 3) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 4) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 5) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 6) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 7) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 8) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 9) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 10) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 11) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 12) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 13) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 14) is larger than buffer chunk limit size (1), a 3268 bytes record (nth: 15) is larger than buffer chunk limit size (1)" location="/usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/plugin/buffer.rb:457:in `write'" tag="test-topic" 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:198:in `rescue in emit_events' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:195:in `emit_events' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluentd-1.16.2/lib/fluent/event_router.rb:115:in `emit_stream' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:376:in `emit_events' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:347:in `process_batch' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:358:in `block in run' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:336:in `block (3 levels) in each_batch' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch' 2024-07-04 00:52:44 +0000 [warn]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run' 2024-07-04 00:52:45 +0000 [error]: #1 unexpected error during consuming events from kafka. Re-fetch events. error="Kafka::ProcessingError" 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:345:in `rescue in block (3 levels) in each_batch' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:338:in `block (3 levels) in each_batch' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:334:in `block (2 levels) in each_batch' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `each' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:314:in `block in each_batch' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:419:in `block in consumer_loop' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:23:in `instrument' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/instrumenter.rb:35:in `instrument' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:417:in `consumer_loop' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:311:in `each_batch' 2024-07-04 00:52:45 +0000 [error]: #1 /usr/local/bundle/gems/fluent-plugin-kafka-0.19.2/lib/fluent/plugin/in_kafka_group.rb:354:in `run' 2024-07-04 00:52:45 +0000 [warn]: #1 Stopping Consumer 2024-07-04 00:52:45 +0000 [warn]: #1 Could not connect to broker. retry_time:0. Next retry will be in 30 seconds
No response
Describe the bug
When a BufferChunkOverflowError occurs, it gets caught in an unexpected error and infinitely reconnects until BufferChunkOverflowError is resolved.
I think the BufferChunkOverflow error should be included in the BufferError and retry without reconnecting the consumer, is this a bug?
To Reproduce
Checking the fluentd-kafka-plugin log after a BufferChunkOverflowError
Reproduce the steps
Expected behavior
It looks like BufferChunkOverflowError should be included in BufferError and retried without restarting the consumer.
Your Environment
Your Configuration
Your Error Log
Additional context
No response