confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.57k stars 651 forks source link

Producer messages stuck flushing even though there weren't any writes. #1194

Open ylhan opened 3 months ago

ylhan commented 3 months ago

Description

Scenario:

  1. I have a producer running for 4 days
  2. No writes at all
  3. Calling flush on this producer (timeout=10 seconds) results in 2 unflushed messages

I'm at wits' end here. I didn't write anything using this producer yet it complains that two messages are not flushed.

I dug into the library code a bit and perhaps this could be an issue with the Len() method? Why does this method add up the lengths of 3 different queues? Why do I have unflushed messages when I did not call write on the producer even once?

How to reproduce

Instantiate a kafka producer with the configuration map below, do not write anything, and flush the producer.

Checklist

Please provide the following information:

extract-2024-05-20T21_40_38.964Z.csv

milindl commented 3 months ago

Hi @ylhan , Is there a goroutine or something that is reading from the producer.Events() channel? That channel needs to be read from continuously, as there might be some administrative/control events on that channel, and until that channel is drained, Flush would complain.

It's necessary to read from this channel while running a producer, even if you choose to set per-message delivery channels.

ylhan commented 2 months ago

Yes we have a goroutine running in the background consuming any result messages asynchronously.

guotie commented 1 month ago

same problem

when I produce message, and recv kafka.ErrQueueFull error, I do flush(), but I found it stucked.

guotie commented 1 month ago

If I produce message as following:

            err = p.Produce(&kafka.Message{
                TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0},
                Value:          buf,
            }, delivery)
            if err == nil {
                <-delivery
            }

then, When I do p.Flush(10000), it will sucks.

And if I set delivery to nil, then Flush is Ok.

Why?