fluent / fluent-bit

Fast and Lightweight Logs and Metrics processor for Linux, BSD, OSX and Windows
https://fluentbit.io
Apache License 2.0
5.82k stars 1.58k forks source link

Batch processing is required in in_kafka. #8030

Open dgdsingen opened 1 year ago

dgdsingen commented 1 year ago

Is your feature request related to a problem? Please describe.

The performance of fluentd input kafka(rdkafka) plugin is better than fluentbit in_kafka.

Below is part of the debug log recorded during testing. In the log, fluentbit commits all fetched messages every time. (fetch => commit every offsets => output) But fluentd commits only once after all fetched messages are processed. (fetch => output => commit one last offset)

  1. log of (fluentbit + in_kafka)
# ---- fetch ---- #
fluentbit  | %7|1697108577.859|SEND|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Sent FetchRequest (v11, 103 bytes @ 0, CorrId 29)
fluentbit  | %7|1697108577.859|BROADCAST|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: Broadcasting state change
fluentbit  | [2023/10/12 11:02:57] [debug] [input chunk] skip ingesting data with 0 bytes
fluentbit  | [2023/10/12 11:02:58] [debug] [input chunk] skip ingesting data with 0 bytes
fluentbit  | %7|1697108578.517|RECV|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Received FetchResponse (v11, 140 bytes, CorrId 29, rtt 658.13ms)
fluentbit  | %7|1697108578.517|FETCH|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Topic test-topic [0] MessageSet size 68, error "Success", MaxOffset 101, LSO 101, Ver 4/4
fluentbit  | %7|1697108578.517|CONSUME|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Enqueue 1 message(s) (0 bytes, 1 ops) on test-topic [0] fetch queue (qlen 0, v4, last_offset 100, 0 ctrl msgs, 0 aborted msgsets, uncompressed)
fluentbit  | %7|1697108578.517|FETCH|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch topic test-topic [0] at offset 101 (leader epoch 0, current leader epoch -1, v4)
fluentbit  | %7|1697108578.517|FETCH|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch 1/1/1 toppar(s)
fluentbit  | %7|1697108578.517|SEND|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Sent FetchRequest (v11, 103 bytes @ 0, CorrId 30)
fluentbit  | %7|1697108578.517|BROADCAST|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: Broadcasting state change
fluentbit  | %7|1697108578.520|RECV|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Received FetchResponse (v11, 820 bytes, CorrId 30, rtt 2.89ms)
fluentbit  | %7|1697108578.520|FETCH|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Topic test-topic [0] MessageSet size 748, error "Success", MaxOffset 112, LSO 112, Ver 4/4
fluentbit  | %7|1697108578.520|CONSUME|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Enqueue 11 message(s) (0 bytes, 11 ops) on test-topic [0] fetch queue (qlen 1, v4, last_offset 111, 0 ctrl msgs, 0 aborted msgsets, uncompressed)
fluentbit  | %7|1697108578.520|FETCH|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch topic test-topic [0] at offset 112 (leader epoch 0, current leader epoch -1, v4)
fluentbit  | %7|1697108578.520|FETCH|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch 1/1/1 toppar(s)
fluentbit  | %7|1697108578.520|SEND|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Sent FetchRequest (v11, 103 bytes @ 0, CorrId 31)
fluentbit  | %7|1697108578.520|BROADCAST|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: Broadcasting state change
fluentbit  | %7|1697108578.521|RECV|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Received FetchResponse (v11, 276 bytes, CorrId 31, rtt 1.03ms)
fluentbit  | %7|1697108578.521|FETCH|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Topic test-topic [0] MessageSet size 204, error "Success", MaxOffset 115, LSO 115, Ver 4/4
fluentbit  | %7|1697108578.521|CONSUME|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Enqueue 3 message(s) (0 bytes, 3 ops) on test-topic [0] fetch queue (qlen 12, v4, last_offset 114, 0 ctrl msgs, 0 aborted msgsets, uncompressed)

# ---- commit ---- #
fluentbit  | %7|1697109139.611|FETCH|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch topic test-topic [0] at offset 200 (leader epoch 0, current leader epoch -1, v4)
fluentbit  | %7|1697109139.611|FETCH|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch 1/1/1 toppar(s)
fluentbit  | %7|1697109139.611|SEND|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Sent FetchRequest (v11, 103 bytes @ 0, CorrId 88)
fluentbit  | %7|1697109139.611|BROADCAST|fluent-bit#consumer-1| [thrd:kafka:29092/bootstrap]: Broadcasting state change
fluentbit  | %7|1697109139.970|OFFSET|fluent-bit#consumer-1| [thrd:main]: Topic test-topic [0]: stored offset INVALID (leader epoch -1), committed offset INVALID (leader epoch -1): not including in commit
fluentbit  | %7|1697109139.970|DUMP|fluent-bit#consumer-1| [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
fluentbit  | %7|1697109139.970|DUMP_ALL|fluent-bit#consumer-1| [thrd:main]: List with 1 partition(s):
fluentbit  | %7|1697109139.970|DUMP_ALL|fluent-bit#consumer-1| [thrd:main]:  test-topic [0] offset STORED
fluentbit  | %7|1697109139.970|DUMP_PND|fluent-bit#consumer-1| [thrd:main]: List with 0 partition(s):
fluentbit  | %7|1697109139.970|DUMP_QRY|fluent-bit#consumer-1| [thrd:main]: List with 0 partition(s):
fluentbit  | %7|1697109139.970|DUMP_REM|fluent-bit#consumer-1| [thrd:main]: List with 0 partition(s):
fluentbit  | %7|1697109139.970|ASSIGNDONE|fluent-bit#consumer-1| [thrd:main]: Group "fluent-bit-1": assignment operations done in join-state steady (rebalance rejoin=false)
fluentbit  | %7|1697109139.979|CGRPOP|fluent-bit#consumer-1| [thrd:main]: Group "fluent-bit-1" received op OFFSET_COMMIT in state up (join-state steady)
fluentbit  | %7|1697109139.979|OFFSET|fluent-bit#consumer-1| [thrd:main]: Topic test-topic [0]: stored offset 101 (leader epoch 0), committed offset INVALID (leader epoch -1): setting stored offset 101 (leader epoch 0)  for commit
fluentbit  | %7|1697109139.979|COMMIT|fluent-bit#consumer-1| [thrd:main]: GroupCoordinator/1001: Committing offsets for 1 partition(s) with generation-id 1 in join-state steady: manual
fluentbit  | %7|1697109139.979|OFFSET|fluent-bit#consumer-1| [thrd:main]: GroupCoordinator/1001: Enqueue OffsetCommitRequest(v7, 1/1 partition(s))): manual
fluentbit  | %7|1697109139.979|SEND|fluent-bit#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: Sent OffsetCommitRequest (v7, 131 bytes @ 0, CorrId 10)
fluentbit  | [2023/10/12 11:12:19] [debug] [input:kafka:kafka.0] kafka message received
fluentbit  | %7|1697109139.990|RECV|fluent-bit#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: Received OffsetCommitResponse (v7, 30 bytes, CorrId 10, rtt 10.96ms)
fluentbit  | %7|1697109139.990|COMMIT|fluent-bit#consumer-1| [thrd:main]: GroupCoordinator/1001: OffsetCommit for 1 partition(s) in join-state steady: manual: returned: Success
fluentbit  | %7|1697109139.990|DUMP|fluent-bit#consumer-1| [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
fluentbit  | %7|1697109139.990|DUMP_ALL|fluent-bit#consumer-1| [thrd:main]: List with 1 partition(s):
fluentbit  | %7|1697109139.990|DUMP_ALL|fluent-bit#consumer-1| [thrd:main]:  test-topic [0] offset STORED
fluentbit  | %7|1697109139.990|DUMP_PND|fluent-bit#consumer-1| [thrd:main]: List with 0 partition(s):
fluentbit  | %7|1697109139.990|DUMP_QRY|fluent-bit#consumer-1| [thrd:main]: List with 0 partition(s):
fluentbit  | %7|1697109139.990|DUMP_REM|fluent-bit#consumer-1| [thrd:main]: List with 0 partition(s):
fluentbit  | %7|1697109139.990|ASSIGNDONE|fluent-bit#consumer-1| [thrd:main]: Group "fluent-bit-1": assignment operations done in join-state steady (rebalance rejoin=false)
fluentbit  | [2023/10/12 11:12:19] [debug] [input:kafka:kafka.0] kafka message received
fluentbit  | %7|1697109139.991|CGRPOP|fluent-bit#consumer-1| [thrd:main]: Group "fluent-bit-1" received op OFFSET_COMMIT in state up (join-state steady)
fluentbit  | %7|1697109139.991|OFFSET|fluent-bit#consumer-1| [thrd:main]: Topic test-topic [0]: stored offset 102 (leader epoch 0), committed offset 101 (leader epoch 0): setting stored offset 102 (leader epoch 0)  for commit
fluentbit  | %7|1697109139.991|COMMIT|fluent-bit#consumer-1| [thrd:main]: GroupCoordinator/1001: Committing offsets for 1 partition(s) with generation-id 1 in join-state steady: manual
fluentbit  | %7|1697109139.991|OFFSET|fluent-bit#consumer-1| [thrd:main]: GroupCoordinator/1001: Enqueue OffsetCommitRequest(v7, 1/1 partition(s))): manual
fluentbit  | %7|1697109139.991|SEND|fluent-bit#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: Sent OffsetCommitRequest (v7, 131 bytes @ 0, CorrId 11)
fluentbit  | %7|1697109139.993|RECV|fluent-bit#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: Received OffsetCommitResponse (v7, 30 bytes, CorrId 11, rtt 2.13ms)
fluentbit  | %7|1697109139.993|COMMIT|fluent-bit#consumer-1| [thrd:main]: GroupCoordinator/1001: OffsetCommit for 1 partition(s) in join-state steady: manual: returned: Success
fluentbit  | %7|1697109139.993|DUMP|fluent-bit#consumer-1| [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
fluentbit  | %7|1697109139.993|DUMP_ALL|fluent-bit#consumer-1| [thrd:main]: List with 1 partition(s):
fluentbit  | %7|1697109139.993|DUMP_ALL|fluent-bit#consumer-1| [thrd:main]:  test-topic [0] offset STORED
fluentbit  | %7|1697109139.993|DUMP_PND|fluent-bit#consumer-1| [thrd:main]: List with 0 partition(s):
fluentbit  | %7|1697109139.993|DUMP_QRY|fluent-bit#consumer-1| [thrd:main]: List with 0 partition(s):
fluentbit  | %7|1697109139.993|DUMP_REM|fluent-bit#consumer-1| [thrd:main]: List with 0 partition(s):
fluentbit  | %7|1697109139.993|ASSIGNDONE|fluent-bit#consumer-1| [thrd:main]: Group "fluent-bit-1": assignment operations done in join-state steady (rebalance rejoin=false)
fluentbit  | %7|1697109139.993|CGRPOP|fluent-bit#consumer-1| [thrd:main]: Group "fluent-bit-1" received op OFFSET_COMMIT in state up (join-state steady)
fluentbit  | %7|1697109139.993|OFFSET|fluent-bit#consumer-1| [thrd:main]: Topic test-topic [0]: stored offset 103 (leader epoch 0), committed offset 102 (leader epoch 0): setting stored offset 103 (leader epoch 0)  for commit

# ---- output ---- #
fluentbit  | [97] kafka.fluentbit: [[1697108579.150028373, {}], {"topic"=>"test-topic", "partition"=>0, "offset"=>197, "error"=>nil, "key"=>nil, "payload"=>nil}]
fluentbit  | [98] kafka.fluentbit: [[1697108579.151361130, {}], {"topic"=>"test-topic", "partition"=>0, "offset"=>198, "error"=>nil, "key"=>nil, "payload"=>nil}]
fluentbit  | [99] kafka.fluentbit: [[1697108579.152458511, {}], {"topic"=>"test-topic", "partition"=>0, "offset"=>199, "error"=>nil, "key"=>nil, "payload"=>nil}]
fluentbit  | [2023/10/12 11:02:59] [debug] [out flush] cb_destroy coro_id=1
fluentbit  | [2023/10/12 11:02:59] [debug] [input chunk] skip ingesting data with 0 bytes
fluentbit  | [2023/10/12 11:02:59] [debug] [task] destroy task=0xffffade212e0 (task_id=0)
fluentbit  | [2023/10/12 11:03:00] [debug] [input chunk] skip ingesting data with 0 bytes
  1. log of (fluentd + rdkafka_group)
# ---- fetch ---- #
fluentd    | D, [2023-10-12T10:58:32.209737 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Sent FetchRequest (v11, 100 bytes @ 0, CorrId 30)
fluentd    | D, [2023-10-12T10:58:32.475595 #18] DEBUG -- : rdkafka: [thrd:main]: Topic test-topic [0]: stored offset -1001, committed offset -1001: not including in commit
fluentd    | D, [2023-10-12T10:58:32.475640 #18] DEBUG -- : rdkafka: [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
fluentd    | D, [2023-10-12T10:58:32.475647 #18] DEBUG -- : rdkafka: [thrd:main]: List with 1 partition(s):
fluentd    | D, [2023-10-12T10:58:32.475651 #18] DEBUG -- : rdkafka: [thrd:main]:  test-topic [0] offset STORED
fluentd    | D, [2023-10-12T10:58:32.475655 #18] DEBUG -- : rdkafka: [thrd:main]: List with 0 partition(s):
fluentd    | D, [2023-10-12T10:58:32.475658 #18] DEBUG -- : rdkafka: [thrd:main]: List with 0 partition(s):
fluentd    | D, [2023-10-12T10:58:32.475662 #18] DEBUG -- : rdkafka: [thrd:main]: List with 0 partition(s):
fluentd    | D, [2023-10-12T10:58:32.475666 #18] DEBUG -- : rdkafka: [thrd:main]: Group "fluentd": assignment operations done in join-state steady (rebalance rejoin=false)
fluentd    | D, [2023-10-12T10:58:32.546002 #18] DEBUG -- : rdkafka: [thrd:main]: GroupCoordinator/1001: Heartbeat for group "fluentd" generation id 1
fluentd    | D, [2023-10-12T10:58:32.546215 #18] DEBUG -- : rdkafka: [thrd:GroupCoordinator]: GroupCoordinator/1001: Sent HeartbeatRequest (v3, 82 bytes @ 0, CorrId 12)
fluentd    | D, [2023-10-12T10:58:32.547815 #18] DEBUG -- : rdkafka: [thrd:GroupCoordinator]: GroupCoordinator/1001: Received HeartbeatResponse (v3, 6 bytes, CorrId 12, rtt 1.72ms)
fluentd    | D, [2023-10-12T10:58:32.710588 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Received FetchResponse (v11, 72 bytes, CorrId 30, rtt 500.93ms)
fluentd    | D, [2023-10-12T10:58:32.710627 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Topic test-topic [0] MessageSet size 0, error "Success", MaxOffset 99, LSO 99, Ver 4/4
fluentd    | D, [2023-10-12T10:58:32.710635 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch topic test-topic [0] at offset 99 (v4)
fluentd    | D, [2023-10-12T10:58:32.710640 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch 1/1/1 toppar(s)
fluentd    | D, [2023-10-12T10:58:32.710645 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Sent FetchRequest (v11, 100 bytes @ 0, CorrId 31)
fluentd    | D, [2023-10-12T10:58:33.185979 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Received FetchResponse (v11, 140 bytes, CorrId 31, rtt 475.14ms)
fluentd    | D, [2023-10-12T10:58:33.186032 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Topic test-topic [0] MessageSet size 68, error "Success", MaxOffset 100, LSO 100, Ver 4/4
fluentd    | D, [2023-10-12T10:58:33.186041 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Enqueue 1 message(s) (0 bytes, 1 ops) on test-topic [0] fetch queue (qlen 2, v4, last_offset 99, 0 ctrl msgs, 0 aborted msgsets, uncompressed)
fluentd    | D, [2023-10-12T10:58:33.186045 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch topic test-topic [0] at offset 100 (v4)
fluentd    | D, [2023-10-12T10:58:33.186049 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch 1/1/1 toppar(s)
fluentd    | D, [2023-10-12T10:58:33.186053 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Sent FetchRequest (v11, 100 bytes @ 0, CorrId 32)
fluentd    | D, [2023-10-12T10:58:33.187827 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Received FetchResponse (v11, 684 bytes, CorrId 32, rtt 1.26ms)
fluentd    | D, [2023-10-12T10:58:33.188074 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Topic test-topic [0] MessageSet size 612, error "Success", MaxOffset 109, LSO 109, Ver 4/4
fluentd    | D, [2023-10-12T10:58:33.188480 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Enqueue 9 message(s) (0 bytes, 9 ops) on test-topic [0] fetch queue (qlen 2, v4, last_offset 108, 0 ctrl msgs, 0 aborted msgsets, uncompressed)
fluentd    | D, [2023-10-12T10:58:33.188511 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch topic test-topic [0] at offset 109 (v4)
fluentd    | D, [2023-10-12T10:58:33.188645 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Fetch 1/1/1 toppar(s)
fluentd    | D, [2023-10-12T10:58:33.188669 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Sent FetchRequest (v11, 100 bytes @ 0, CorrId 33)
fluentd    | D, [2023-10-12T10:58:33.188676 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Received FetchResponse (v11, 276 bytes, CorrId 33, rtt 1.19ms)
fluentd    | D, [2023-10-12T10:58:33.188680 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Topic test-topic [0] MessageSet size 204, error "Success", MaxOffset 112, LSO 112, Ver 4/4
fluentd    | D, [2023-10-12T10:58:33.188683 #18] DEBUG -- : rdkafka: [thrd:kafka:29092/bootstrap]: kafka:29092/1001: Enqueue 3 message(s) (0 bytes, 3 ops) on test-topic [0] fetch queue (qlen 2, v4, last_offset 111, 0 ctrl msgs, 0 aborted msgsets, uncompressed)

# ---- output ---- #
fluentd    | 2023-10-12 10:58:33.496517546 +0000 kafka.fluentd.test-topic: null
fluentd    | 2023-10-12 10:58:33.496518379 +0000 kafka.fluentd.test-topic: null
fluentd    | 2023-10-12 10:58:33.496522129 +0000 kafka.fluentd.test-topic: null

# ---- commit ---- #
fluentd    | D, [2023-10-12T10:58:37.476419 #18] DEBUG -- : rdkafka: [thrd:main]: Topic test-topic [0]: stored offset 198, committed offset -1001: setting stored offset 198 for commit
fluentd    | D, [2023-10-12T10:58:37.476465 #18] DEBUG -- : rdkafka: [thrd:main]: GroupCoordinator/1001: Committing offsets for 1 partition(s) with generation-id 1 in join-state steady: cgrp auto commit timer
fluentd    | D, [2023-10-12T10:58:37.476473 #18] DEBUG -- : rdkafka: [thrd:main]: GroupCoordinator/1001: Enqueue OffsetCommitRequest(v7, 1/1 partition(s))): cgrp auto commit timer
fluentd    | D, [2023-10-12T10:58:37.476688 #18] DEBUG -- : rdkafka: [thrd:GroupCoordinator]: GroupCoordinator/1001: Sent OffsetCommitRequest (v7, 120 bytes @ 0, CorrId 14)
fluentd    | D, [2023-10-12T10:58:37.486752 #18] DEBUG -- : rdkafka: [thrd:GroupCoordinator]: GroupCoordinator/1001: Received OffsetCommitResponse (v7, 30 bytes, CorrId 14, rtt 10.09ms)
fluentd    | D, [2023-10-12T10:58:37.486907 #18] DEBUG -- : rdkafka: [thrd:main]: GroupCoordinator/1001: OffsetCommit for 1 partition(s) in join-state steady: cgrp auto commit timer: returned: Success
fluentd    | D, [2023-10-12T10:58:37.486925 #18] DEBUG -- : rdkafka: [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
fluentd    | D, [2023-10-12T10:58:37.486930 #18] DEBUG -- : rdkafka: [thrd:main]: List with 1 partition(s):
fluentd    | D, [2023-10-12T10:58:37.486934 #18] DEBUG -- : rdkafka: [thrd:main]:  test-topic [0] offset STORED
fluentd    | D, [2023-10-12T10:58:37.486938 #18] DEBUG -- : rdkafka: [thrd:main]: List with 0 partition(s):
fluentd    | D, [2023-10-12T10:58:37.486942 #18] DEBUG -- : rdkafka: [thrd:main]: List with 0 partition(s):
fluentd    | D, [2023-10-12T10:58:37.486946 #18] DEBUG -- : rdkafka: [thrd:main]: List with 0 partition(s):
fluentd    | D, [2023-10-12T10:58:37.486951 #18] DEBUG -- : rdkafka: [thrd:main]: Group "fluentd": assignment operations done in join-state steady (rebalance rejoin=false)

I researched whether there is an option to enable batch processing, but couldn't find a way.

https://docs.fluentbit.io/manual/pipeline/inputs/kafka https://github.com/fluent/fluent-bit/blob/master/plugins/in_kafka/in_kafka.c https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

Describe the solution you'd like

In the same way as fluentd, add an option to batch process and commit only once.

Describe alternatives you've considered

Additional context

Below is the fluentd / fluentbit configuration files used in test.

  1. fluentd config
<source>
  @type rdkafka_group
  brokers kafka:29092
  topics test-topic
  add_prefix kafka.fluentd
  format json
  kafka_configs {
    "bootstrap.servers": "kafka:29092",
    "group.id": "fluentd",
    "log_level" : 7,
    "debug" : "all"
  }
</source>

<match kafka.fluentd.**>
  @type stdout
</match>
  1. fluentbit config
[SERVICE]
  log_level debug
  flush 1

[INPUT]
  Name kafka
  tag kafka.fluentbit
  brokers kafka:29092
  topics test-topic
  group_id fluent-bit-1
  rdkafka.debug all

[OUTPUT]
  name  stdout
  match *
  workers 1
github-actions[bot] commented 9 months ago

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

github-actions[bot] commented 9 months ago

This issue was closed because it has been stalled for 5 days with no activity.

edsiper commented 1 week ago

cc: @lecaros