dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.61k stars 1.41k forks source link

infinite loop in ensure_coordinator_ready when coordinator is unknown #2373

Open pe55a opened 1 year ago

pe55a commented 1 year ago

we're facing an issue with kafka poll functionality and in particular we suspect that the culprit is ensure_coordinator_ready function called by the _coordinator.poll()

we're using robot framework so unfortunately we're not able to have a good amount of logs, but got these messages printed in an infinite loop:

10:36:19.658 INFO <BrokerConnection node_id= host= [IPv4 ('', )]>: connecting to [('', ) IPv4] 10:36:19.765 INFO <BrokerConnection node_id= host= [IPv4 ('', )]>: Connection complete. 10:36:19.886 ERROR <BrokerConnection node_id= host= [IPv4 ('', )]>: socket disconnected 10:36:19.900 INFO <BrokerConnection node_id= host= [IPv4 ('', )]>: Closing connection. KafkaConnectionError: socket disconnected 10:36:19.905 ERROR Error sending GroupCoordinatorRequest_v0 to node [KafkaConnectionError: socket disconnected]

After checking the kafka python code we noticed that the functions here https://github.com/dpkp/kafka-python/blob/master/kafka/coordinator/base.py#L241C9-L241C33 doesn't have an exit point from the while loop and neither have an option to pass a timeout parameter.

Can this be improved/fixed?

mro-rhansen2 commented 11 months ago

I've traced an intermittent hang in our processes to this exact same section of code. We have processes where the only thing keeping them alive is that we're consuming messages using the KafkaConsumer as an iterator. I came upon the code in question after confirming that we have properly configured the consumer to timeout if it has not received any data after a period of time.

The order of operations for us is as follows:

1) We get the last message from the consumer. 2) We handle the message. 3) Some network error occurs. 4) We try to manually commit the last message. 5) Main thread hangs. 6) After a little more than 9 minutes, a worker thread within Kafka indicates that idle broker connections are being destroyed. 7) HeartbeatThread indicates that the heartbeat poll has expired (in accordance with our config settings) and then proceeds to try and exit the consumer group.

Nothing happens after that until we restart the processes. It is worth mentioning that we have around 20 processes that are using the exact same boilerplate code for connecting to Kafka, but only a small handful of them will fail to recover and it is almost always a different subset of the overall population that exhibit this odd behavior when a network failure occurs.

ethiebautgeorge-nasuni commented 8 months ago

We hit the same issue and have implemented a fix. I have a branch I'd like to push with a fix for this. Can an admin give me access?

ethiebautgeorge-nasuni commented 8 months ago

I believe this should also fix https://github.com/dpkp/kafka-python/issues/1322