Open iciclespider opened 6 years ago
Thanks @iciclespider for the detailed report.
A race condition can also trigger this for asynchronous producers. Producer._update
is called from an OwnedBroker
worker thread in response to a SocketDisconnectError
. The calling application can produce()
another message from its own thread while the update is taking place. If this happens before the new OwnedBroker
is created, then that message will be enqueued on the old OwnedBroker
, which will then be destroyed before the message is sent. Even if flush()
did not return an empty list, the enqueue could happen after the flush, but before the OwnedBroker
is replaced with the new instance, dropping the message.
To reliably reproduce this behaviour, configure a kafka broker with an artificially low idle client disconnect time (say connections.max.idle.ms=2000
), then asynchronously produce batches of messages, sleeping long enough between each batch for the broker to disconnect the client. I find ~10% of batches will result in a dropped message, though that's clearly highly dependent on the environment.
This effects any real world system that sporadically sends batches of messages, since the default idle timeout is 10 minutes.
Thanks @andreweland for clarifying. This is one of the highest-priority issues at the moment. If anyone reading this would like to take a stab at fixing it, this is a great opportunity for contribution.
@emmett9001 do you remember why the early return was necessary (ie this commit)? The fix should be a case of removing that, and then locking the OwnedBroker
replacement, but it'd be good to understand why that was added.
Looks like https://github.com/Parsely/pykafka/pull/595 is the original PR containing that commit. The description has a good outline of my thinking at that time.
Already queued messages are lost when a SocketDisconnectError is detected here.
This is occurring because the
Producer._update
method first closes all OwnedBrokers here, which results in the OwnedBroker settingself.running
to False here. Then,Producer._update
callsProducer._setup_owned_brokers
to reestablish the broker connection.Producer._setup_owned_brokers
(which also does it's ownOwnedBroker.stop
call) callsOwnedBrokers.flush
to collect all currently queued messages here.The attempt to collect all currently queued messages will never return any messages, because
OwnedBroker.flush
checks to see ifself.running
is False and if so, returns an empty list here.self.running
will be False, because that OwnedBroker was stopped.I discovered this in a long running test that in the course of queuing 3,200,000 messages, a couple of dozen messages would go missing.