twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.7+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.78k stars 182 forks source link

Question regarding manual commit example #801

Closed Iadgarov closed 1 month ago

Iadgarov commented 1 month ago

Hello,

I've been going over the manual commit example provided in the repo.

What I don't fully understand is the purpose of creating a buffered channel of 5 for each partitions consumer recs input, where each item in the channel is a batch of messages. I am referring to this line.

It seems like if we give each of the partition consumers a buffered channel then in theory the main poll loop could send the same batch of messages to the consumer several times?

Lets say we go into the first iteration of the poll loop. We

Would this second batch not be identical to the first due to the potential lack of commit between the two polls? Also, in such a case would we not be allowing a rebalance to occur potentially before processing is done?

tl;dr - shouldn't we want fetches.EachPartition to be blocking to avoid rebalance and polling until we commit what we've already polled? And if so, what's the benefit of using goroutine-per-partition consumers rather than a bunch of go routines handling a batch from all partitions, allowing a rebalance+new poll only after they finish (since if we need to wait for them all then maintaining partition order shouldn't really matter)?

I feel like I'm missing something fundamental, would greatly appreciate your input.

twmb commented 1 month ago

manual commit is not yet triggered, however we can already poll Kafka for more messages, once again sending them down the channel to each of the partition consumers

I think there's a misunderstanding here. The client does not re-poll at the same spot ever, it always advances. If the client gives your application records through offset 13, then internally the next fetch request is at offset 13, and your next poll will get the next records.


Separately however (and unrelated to this issue), there is a possibility of seeing duplicates if you are using a group consumer and a rebalance happens under the hood -- if a different client picks up the partitions you were working on before you were able to issue a commit, that other client will process records that your first client was already processing. A way to avoid this is to use BlockRebalanceOnPoll, but that has its own downsides (namely: you must allow a rebalance to continue before you're kicked out of the group). It's impossible to avoid one of these two downsides, but you at least get the power to choose which one you want to work with.