dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.53k stars 1.39k forks source link

Help: KafkaConsumer iterator ignores consumer_timeout_ms if entire cluster is down #1322

Open KunTjz opened 6 years ago

KunTjz commented 6 years ago

Hi! Forgive my poor English, but I need your help. @dpkp While stress testing Kafka, I have found that comsumer can block forever when all brokers down. After all brokers down, comsumer will fall into a loop to getting metadata from broker. This loop will never break util one or more brokers restart.
Is there a way that consumer can break this loop? I do not want block here, I want to do some other things when brokers down.

my code is like this:

consumer = KafkaConsumer(bootstrap_servers=['xxxxx','xxxxx'],
                                              group_id="test1",
                                              consumer_timeout_ms= 2000)
partition = TopicPartition("topic1", 0)
consumer.assign([partition])
consumer.seek_to_end()
for message in consumer: # it will block here when all brokers down
    print msg

After consumer block, I got this when keyboardInterrupt: image

other infos: kafka_python-1.3.2 python2.6.3 kafka_2.11-0.10.2.1

KunTjz commented 6 years ago

I have also found that kafkaConsumer iterator ignores consumer_timeout_ms if consumer assigns to a non exist topic. It seems to the same question.
Is this a bug? Or my configuration is wrong?
@dpkp thanks a lot

stigok commented 6 years ago

I experience the same thing. And the loop makes CPU go to 100%.

dpkp commented 6 years ago

@RechardTheGrey I can't reproduce this with the latest release. Have you tried upgrading?

JamesMackerel commented 6 years ago

I am experiencing the same problem, and here is the step to reproduce:

First, save this piece of code to test_error.py:

from kafka import KafkaProducer, KafkaConsumer
import time

config = {
    'bootstrap_servers': 'localhost:9092',
    'group_id': 'test',
    'consumer_timeout_ms': 1000
}

consumer = KafkaConsumer('pdd-sec-precise-waf-result', **config)

while True:
    for message in consumer:
        print(message)

    print('no message')

Second, start a kafka server, create a topic named test, and run the test_error.py, we should see there are "no message" messages rolling in the terminal every 1 second.

Third, kill the kafka server. Turn back to your terminal, you will see there is no more no message output.


So I followed the code of kafka-python, and finally I stepped into here. I found the fact that the only condition to break the infinite loop is to have the future variable set to None or change the future.is_done to True, but there is not any code to modify the future variable.

Maybe the call stack would help you:

poll (/home/jm/tmp/test/kafka/client_async.py:544)
ensure_coordinator_ready (/home/jm/tmp/test/kafka/coordinator/base.py:260)
poll (/home/jm/tmp/test/kafka/coordinator/consumer.py:262)
_message_generator (/home/jm/tmp/test/kafka/consumer/group.py:1053)
__next__ (/home/jm/tmp/test/kafka/consumer/group.py:1119)
<module> (/home/jm/tmp/test/test_error.py:23)

And thank you for this great opensource project, you are doing a great job.

JamesMackerel commented 6 years ago

After testing, I found a surprise. It will block, until the cluster is back. But I still want to know if there is any way to know if the cluster is down in consumer.