twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.6k stars 158 forks source link

Use EOS sessions with BlockRebalanceOnPoll #754

Open kahartma opened 1 month ago

kahartma commented 1 month ago

Hi, With the current session implementation, it seems like sessions will always abort the next transaction, regardless of if an rebalance actually happened after a poll or session.Begin(). A rebalance can happen completely out of the consume-produce loop, and still the next session.End() will fail. I suspect this is because of this revoked flag that is set on a revoke and only reverted after an session.End() call: https://github.com/twmb/franz-go/blob/master/pkg/kgo/txn.go#L104

Is there a way to make EOS sessions work with BlockRebalanceOnPoll and not abort on the next session.End()? I would like to prevent aborted transactions for example during controlled deployment rollouts.

Thanks!

twmb commented 4 weeks ago

My default answer here is "no", but I have a proposal that may make the situation better, if you're open.

GroupTransactSession takes a really pessimistic approach to transactions to try to ensure as much as possible that no duplicates happen. There are still some pretty extreme edge cases that could cause GroupTransactSession to fail, but they'd be really quite extreme. You can read some details on the approach here: https://github.com/twmb/franz-go/blob/master/docs/transactions.md

The reason the client internally forces an abort whenever a revoke happens is that it is possible for you to take so long processing records that the rebalance actually completes by the time you're ready to commit. A new consumer may already have partitions that you were currently consuming, and the new consumer is processing records from the old previous commit. If the client allowed you to commit, then it is possible you'd have records processed twice.

Kafka itself eventually fixed this problem with KIP-447. If a rebalance happens while there are outstanding transactions, Kafka actually blocks the new consumer from fetching commits until the prior consumer commits, aborts, or times out. This is the whole RequireStable thing. HOWEVER, this behavior is for the final consumer -- the EOS consumer/producer can't assume that downstream consumers are actually going to use this option. So, pessimistically, right now the client assumes the worst case.

What I can do here is introduce an option: AssumeConsumersRequireStable(). If used, the client does not automatically switch to aborting if a rebalance happens.

What do you think?

twmb commented 1 week ago

Ping -- wdyt of the idea proposed above?