twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.7+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.78k stars 181 forks source link

Deadlock in Produce() / TryProduce() when kgo.MaxBufferedBytes() is configured #777

Closed pracucci closed 2 months ago

pracucci commented 2 months ago

In Mimir we found a deadlock in Produce() and TryProduce() when kgo.MaxBufferedBytes() is configured and the limit hit.

The gist of the root cause of such deadlock is that there's no guarantee that all goroutines waiting for p.waitBuffer will be released, because there are conditions under which the number of messages sent to p.waitBuffer channel is different than the number of goroutines waiting for it.

Details

Produce() and TryProduce() wait for p.waitBuffer when the buffer limit is reached: https://github.com/twmb/franz-go/blob/a5f2b710830e32fc3f90374be47ec59849807342/pkg/kgo/producer.go#L439-L449

A message is published to p.waitBuffer each time a record completes, if the client detect that such record was over the bytes limit: https://github.com/twmb/franz-go/blob/a5f2b710830e32fc3f90374be47ec59849807342/pkg/kgo/producer.go#L520-L534

In case of "max buffered records" (which does not suffer this issue) each record has the same "weight": each record accounts for +1 in the p.bufferedRecords accumulator. However, in the case of "max buffered bytes" limit, different records don't have the same "weight", because each record has a different userSize(). This means that the computation of wasOverMaxBytes is screwed up, depending on which record gets completed first.

Example

Let's assume we set kgo.MaxBufferedBytes() to 100 bytes and the following workflow happen in order:

  1. Produce() record A (100 bytes)
  2. Produce() record B (50 bytes), waiting for buffer to free
  3. Produce() record C (50 bytes), waiting for buffer to free
  4. Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer
  5. Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer

Reproduction test

I've written a test reproducing the deadlock: https://github.com/twmb/franz-go/compare/pracucci:reproduce-produce-deadlock

twmb commented 2 months ago

Good find. Offhand, I think the fix is to convert this to a sync.Cond and have every waiter be notified and inspect the conditions, but I'll have to stare a bit at how to do this.

twmb commented 2 months ago

If you're able to see why #787 fails in CI, that may help accelerate a fix here. So far, the error is not reproducing locally for me.