With our own flushing implementation we were flushing until rd_kakfa_outq_len
reached 0. However rd_kakfa_outq_len also takes other events such
as statistics into account which led to a race where we were flushing
for a very long time because more and more other events were produced to
the queue while we were flushing.
Modifications:
KafkaClient:
remove var outgoingQueueSize
add new method flush(timeoutMilliseconds:) that executes the
blocking rd_kafka_flush call on a DispatchQueue but vends this
as an async func
KafkaProducer:
rename KafkaProducer.StateMachine.State.flushing to
KafkaProducer.StateMachine.State.finishing
invoke KafkaClient.flush before terminating poll loop
Motivation:
With our own flushing implementation we were flushing until
rd_kakfa_outq_len
reached0
. Howeverrd_kakfa_outq_len
also takes other events such as statistics into account which led to a race where we were flushing for a very long time because more and more other events were produced to the queue while we were flushing.Modifications:
KafkaClient
:var outgoingQueueSize
flush(timeoutMilliseconds:)
that executes the blockingrd_kafka_flush
call on aDispatchQueue
but vends this as anasync func
KafkaProducer
:KafkaProducer.StateMachine.State.flushing
toKafkaProducer.StateMachine.State.finishing
KafkaClient.flush
before terminating poll loop