aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.08k stars 224 forks source link

[QUESTION] Unable connect to node with id: X: [Errno 111]: Connection refused #992

Open OlehDziuba opened 3 months ago

OlehDziuba commented 3 months ago

Hi, I have group of consumers that on startup connect to kafka and infinite consume to messages. And I've encountered a problem that when the address of the kafka changes, the consumers don't get an error, just an infinite error logging. Since the Kafka address is taken from environment variables and I have an automatic reboot if an error occurs, it would be great if the consumer would raise an error

 consumer = AIOKafkaConsumer(
        *kafka_consumer_settings.topics
        bootstrap_servers=kafka_consumer_settings.bootstrap_servers,
        group_id=kafka_consumer_settings.group_id
    )

 try:
     async for msg in consumer:
         # Some message processing 
 finally:
     await consumer.stop()  # <--- it is not executed  in case of 'Unable connect to node with id: X: [Errno 111]: Connection refused'

I found it in source code: https://github.com/aio-libs/aiokafka/blob/256ce17c2d235f417b5302a6046c27879ce610f7/aiokafka/client.py#L475-L481

Do I understand correctly that the error is not triggered only when the consumer is in a group?