confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
84 stars 892 forks source link

`Consumer.poll` goes into infinite loop #1767

Open r-priyam opened 3 months ago

r-priyam commented 3 months ago

Description

Consumer.poll client is going into infinite loop when the Kafka is running on Kubernetes. When the cluster is scheduled on the new node is just stuck on the below error:

Screenshot 2024-06-20 at 4 02 28 PM

As you can see in the screenshot the service was just stuck and then it recovered after like 12 hours.

How to reproduce

import socket
from confluent_kafka import Consumer, Message

consumer = Consumer(
    {
            "bootstrap.servers": "",
            "client.id": socket.gethostname(),
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
            "group.id": "",
        }
)
consumer.subscribe(topics=[...])
msg: Message = self._consumer.poll(timeout="10') # Goes into infinite loop here and gets stuck

Run the above code and then restart the kafka cluster or change the node in k8s

Checklist

Please provide the following information:

pranavrth commented 3 months ago

It feels that the host is not reachable or up.

r-priyam commented 3 months ago

@pranavrth the host gets up back in like max 2 mins, it's just k8s scheduling it on the another node which changes the IP, the service container works fine when it's restarted.

r-priyam commented 3 months ago

@pranavrth, any luck, please? Appears that the package is handling this issue at the very low level and not raising an exception?

r-priyam commented 3 months ago

@pranavrth, upon further digging in the source code, we found that we can use "error_cb" in the config to have the callback whenever the error is raised. It would be good if the client has raised this directly instead of having a callback.

pranavrth commented 3 months ago

Can you enable debug logging by using 'debug': 'all' in the config and send us the logs to investigate?