Therefore, we have to constantly consume events by the Events()
channel until the producer is Close()ed, therefore its events channel
is closed. We introduced a new goroutine, consumeDeliveries that does
just that, consumes events and logs any failures.
Some refactorings were also performed:
run() is no longer a separate method/goroutine but is embedded in Close()
instead. This simplifies the Producer's internals.
Use a sync.WaitGroup instead of close/done channels
The librdkafka producer events channel should be consumed prior to calling
Flush()
, otherwise we'll hit the flush timeout and outstanding events will be reported byFlush()
(for more info see https://github.com/confluentinc/confluent-kafka-go/blob/v0.11.0/kafka/producer.go#L195-L215)Therefore, we have to constantly consume events by the
Events()
channel until the producer isClose()
ed, therefore its events channel is closed. We introduced a new goroutine,consumeDeliveries
that does just that, consumes events and logs any failures.Some refactorings were also performed:
run()
is no longer a separate method/goroutine but is embedded inClose()
instead. This simplifies the Producer's internals.Fixes #33