confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
3.72k stars 882 forks source link

Title: confluent_kafka.Consumer.consume Method Takes Excessive Time to Respond #1753

Closed abdelghanimeliani closed 1 month ago

abdelghanimeliani commented 1 month ago

Description

I am writing a Flask API to retrieve data from Apache Kafka topics. The consumer subscribes to a different topic each time according to the API path and uses the confluent_kafka.Consumer.consume method to fetch a given number of events each time. Here is the consumer configuration I am using:

consumerConfig = {
    'bootstrap.servers': "localhost:9092,0.0.0.0:9092",
    'client.id': socket.gethostname(),
    'group.id': "group",
    'auto.offset.reset': "latest",
    'enable.auto.commit': False,
    'fetch.max.bytes': 52428800,  # 50MB
    'fetch.min.bytes': 1,
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 300000,
}

The code to consume events from the given topic is as follows:

def consumeEvent(topic,consumer, messages_number=1, time_out=60):
    consumer.subscribe(topic, on_assign=assignment_callback)
    try:
        messages= consumer.consume(num_messages=messages_number,timeout=time_out)
        if(len(messages)==0):
            return{
                "status":"fail",
                "messsage":"topic empty or timeout exeeded"
            }
        elif(len(messages)<messages_number):
            print(messages)
            return{
                "status":"fail",
                "message":"topic does not contain this number of messages or timeout exeeded"
            }

        else:
            return{
                "status":"success",
                "message":"request completed succesfully",
                "data":messages_parser(messages)
            }      
    except KeyboardInterrupt:
        print('Canceled by user.')

the assignment_callback and the messages_parser are other methods that i built . The problem is that it takes a long time (1-2 minutes) to return the answer. Is this normal behavior, or is there something wrong with the configuration or usage? additional infomration: I am using kafka as a docker container for an experimental environment

How to reproduce

Checklist

Please provide the following information:

pranavrth commented 1 month ago

1-2 mins is not normal even if you subscribe to a new topic every time. Can you try with same topic by subscribing only once?

abdelghanimeliani commented 1 month ago

hey @pranavrth, the issue arose because when auto.offset.reset is set to latest, the consumer cannot fetch events produced before its subscription and waits for the producer to generate a new event. By setting 'auto.offset.reset' to earliest, the consumer can now access the old events, but this results in retrieving the oldest event in the topic, which is not suitable for my use case. I will close this issue for now as it is not a confluent-kafka-python problem and will continue addressing it in the discussion at this link : https://github.com/confluentinc/confluent-kafka-python/discussions/1752.