bsm / sarama-cluster

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 [DEPRECATED]
MIT License
1.01k stars 222 forks source link

How to start a consumer group consuming from the latest committed offsets? #269

Closed vdobler closed 6 years ago

vdobler commented 6 years ago

We have a simple setup: One topic with 4 partitions and one consumer group consuming this topic. This works fine as long as at least one consumer is running.

If we stop the whole consumer group and new messages arrive in the time until the consumer group is restarted these messages are not consumed. This is presumably exactly what #263 and #106 is about.

If we setting Config.Consumer.Offsets.Initial to sarama.OffsetNewest then consumption will start once a new message arrives (as described in the README). E.g. directly before the restart: Partition 2: Start 0, End 580, Offset 550, Lag 30 Then one new message is produced and consumed directly from partition 2 with the offsets jumping to Partition 2: Start 0, End 581, Offset 581, Lag 0 So we "loose" messages 551-580.

If we setting Config.Consumer.Offsets.Initial to sarama.OffsetOldest then consumption will start immediately (no new message needed) but from Start: Partition 2: Start 0, End 580, Offset 550, Lag 30 After restarting the consumer group consumption will start from 0 and we will re-consume messages 0-550 which have been consumed and their offsets committed already.

How to consume exactly the uncommitted messages (and any new messages afterward) after starting a consumer group with all consumers being "brand new" (having never committed anything)?

Please tell me that we just overlooked something obvious and we do not need to start from sarama.OffsetNewest and check somehow against the last committed offsets.

dim commented 6 years ago

@vdobler sorry, but sarama-cluster has been deprecated in favour of https://github.com/Shopify/sarama/pull/1099 (which was merged today). Nevertheless, Config.Consumer.Offsets.Initial is only used when there is no offset stored. if you have consumed messages up to offset 550, you need to Mark them so that this offset is persisted. After restart the stored offset will be used and it should resume consuming from 551, not from 581. You can see this working correctly in the tests and the examples.

dim commented 6 years ago

please see https://godoc.org/github.com/bsm/sarama-cluster#Consumer.MarkOffset