deviceinsight / kafkactl

Command Line Tool for managing Apache Kafka
https://deviceinsight.github.io/kafkactl/
Apache License 2.0
812 stars 79 forks source link

consume as part of consumer group consumes more than max-messages #198

Open bzakhar opened 3 months ago

bzakhar commented 3 months ago

Current behavior:

kafkactl consume -g group_name --max-messages 1 topic.name

advances group_name consumer offset by more than 1 while only outputting one message. Simple reproducer:

$ kafkactl create topic test.topic -p 1
$ for i in {1..100}; do kafkactl produce test.topic -s -v $i; done
$ kafkactl consume -g test_cg --from-beginning --max-messages 1 test.topic
$ kafkactl describe cg test_cg
$ kafkactl consume -g test_cg --max-messages 1 test.topic
$ kafkactl describe cg test_cg
# repeat a few times
# ...

you will notice that messages are "skipped" and CG offset advances by more than 1 after each invocation of kafkactl consume. In my case:

$ kafkactl create topic test.topic -p 1
$ for i in {1..100}; do kafkactl produce test.topic -s -v $i; done
$ kafkactl consume --from-beginning -g test_cg --max-messages 1 test.topic
1
$ kafkactl describe cg test_cg
TOPIC          PARTITION     NEWEST_OFFSET     OLDEST_OFFSET     CONSUMER_OFFSET     LEAD     LAG
test.topic     0             100               0                 2                   2        98

CLIENT_HOST     CLIENT_ID     TOPIC     ASSIGNED_PARTITIONS

$ kafkactl consume -g test_cg --max-messages 1 test.topic
3
$ kafkactl describe cg test_cg
TOPIC          PARTITION     NEWEST_OFFSET     OLDEST_OFFSET     CONSUMER_OFFSET     LEAD     LAG
test.topic     0             100               0                 4                   4        96

CLIENT_HOST     CLIENT_ID     TOPIC     ASSIGNED_PARTITIONS

$ kafkactl consume -g test_cg --max-messages 1 test.topic
5
$ kafkactl describe cg test_cg
TOPIC          PARTITION     NEWEST_OFFSET     OLDEST_OFFSET     CONSUMER_OFFSET     LEAD     LAG
test.topic     0             100               0                 7                   7        93

CLIENT_HOST     CLIENT_ID     TOPIC     ASSIGNED_PARTITIONS

$ kafkactl consume -g test_cg --max-messages 1 test.topic
8
# with -V, unimportant output skipped, but notice DROP MESSAGE logs
[kafkactl] 2024/05/23 18:18:56 group consumer initialized
[kafkactl] 2024/05/23 18:18:56 waiting for group consumer
[sarama  ] 2024/05/23 18:18:56 consumer/broker/2 accumulated 1 new subscriptions
[sarama  ] 2024/05/23 18:18:56 consumer/broker/2 added subscription to test.topic/0
10
[kafkactl] 2024/05/23 18:18:56 drop message
[kafkactl] 2024/05/23 18:18:56 stop consume claim via channel
[kafkactl] 2024/05/23 18:18:56 drop message
[sarama  ] 2024/05/23 18:18:56 consumergroup/test_cg loop check partition number goroutine will exit, topics [test.topic]
[sarama  ] 2024/05/23 18:18:56 consumer/broker/2 closed dead subscription to test.topic/0
[sarama  ] 2024/05/23 18:18:56 consumergroup/session/kafkactl-minio-57df5166-e975-42f5-99dc-f30d0fd4eaeb/9 heartbeat loop stopped
[sarama  ] 2024/05/23 18:18:56 consumergroup/session/kafkactl-minio-57df5166-e975-42f5-99dc-f30d0fd4eaeb/9 released
[kafkactl] 2024/05/23 18:18:56 waiting for deserialization
[kafkactl] 2024/05/23 18:18:56 deserialization finished
[kafkactl] 2024/05/23 18:18:56 closing consumer

Expected behavior:

Each invocation of kafkactl consume with --max-messages parameter and consumer group advances that consumer group's offset by no more than the value of max-messages parameter so that no messages are skipped during consumption.

Version tested:

cmd.info{version:"v5.0.6", buildTime:"2024-03-14T10:08:45Z", gitCommit:"d7f78e0", goVersion:"go1.21.8", compiler:"gc", platform:"linux/amd64"}

bzakhar commented 2 days ago

Is this bug planned to be addressed any time soon? I do not know enough Go to figure out how to fix it myself, but it looks like draining the channel after consuming max-messages is intentional (those "drop message" log entries) and just not doing that (draining) might fix the issue.

d-rk commented 1 day ago

The problem is not that easy to solve: Suppose you have a topic with 4 partitions and you want to read only a single message. In order to know on which partition the next message was produced, you will have to read one message from all partitions and then compare their timestamps. Finally, you have to mark only one of these messages as consumed.

This will lead to a bigger change in how the consumption of messages is implemented. Hence, I cannot predict when I will have time to look into it.

bzakhar commented 1 day ago

"In order to know on which partition the next message was produced, you will have to read one message from all partitions and then compare their timestamps. Finally, you have to mark only one of these messages as consumed." - that is not how it works. Broker decides where a returning consumer should pick up based on where they left off and order of messages in the topic. In other words, the logic you describe is implemented on broker side, consumers don't have to bother with that. If there's just one consumer in the consumer group, it will consume messages from a topic in order or arrival regardless if it's partitioned or not, and if there are multiple consumers in a group then order on topic level simply can't be guaranteed (but it's still guaranteed within each partition). Ordered consumption from a topic requires single partition or a key which constrains keyed messages to single partition where order is guaranteed, it's a well-known behavior. And the use case here is really simple: read messages from a topic in whatever order the broker returns them and stop after reading N. There is no assumption or expectation that this will be done concurrently in more than one invocation of the program, but if that's the case, the invoker is expected to understand the relationships between multiple consumers in a group and multiple partitions in a topic and how this affects order of consumption.