confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
3.72k stars 881 forks source link

A consumer joining the consumer group takes 45s to get the first message #1770

Open persona94 opened 6 days ago

persona94 commented 6 days ago

Description

The consumer subscribes to a topic in the following fashion

def setup_consumer(topic_list):
        try:
            consumer = Consumer(
                    <options for broker>,
                    'auto.offset.reset':'earliest',
                    'fetch.min.bytes': '200000',
                }
            )

            consumer.subscribe(topic_list)
        except Exception as e:
            app.logger.error(
                f'Exception during creation of active calls consumer - {e}'
            )
            return

start_consumer(consumer)

Inside start_consumer() the main while loop looks like

while True:
            message = consumer.poll(0.1)
            if message is None:
                print('Waiting for message')
                gevent.sleep(0.1)
                continue
            elif message.error():
                print(f'Error in loop {message.error()}')
                break
            else:
              print('Message received')
              ....

I see the consumer takes 45s from "Waiting for message" to move to "Message received".

I turned up debug logs in kafka and saw this

"APIVERSION [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Enabling feature SaslAuthReq"}                                                                                                                                                                              
"FEATURE [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq"}             
"STATE [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state APIVERSION_QUERY -> UP"}                                                                                                                                                                      
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Broadcasting state change"}                                                                                                                                                                                                      
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Skipping metadata refresh of 1 topic(s): connected: already being requested"}                                                                                                                                                     
"CGRPSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed state wait-broker-transport -> up (join-state init)"}                                                                                                                                             
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:main]: Broadcasting state change"}                                                                                                                                                                                                                  
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": join with 0 subscribed topic(s)"}                                                                                                                                                                             
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: Hinted cache of 1/1 topic(s) being queried"}                                                                                                                                                                                                  
"CGRPMETADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: consumer join: metadata for subscription only available for 0/1 topics (-1ms old)"}                                                                                                                                                       
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: Request metadata for 1 topic(s): consumer join"}                                                                                                                                                    
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": postponing join until up-to-date metadata is available"}                                                                                                                                                      
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state init -> wait-metadata (state up)"}                                                                                                                                                 
"SEND [my-kafka-consumer-ac#consumer-2] [thrd:kafka-controller-headless:9092/bootstrap]: kafka-controller-headless:9092/bootstrap: Sent MetadataRequest (v12, 88 bytes @ 0, CorrId 4)"}                                                                                                                
"DUMP [my-kafka-consumer-ac#consumer-2] [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)"}                                                                                                                                                                                                
"DUMP_ALL [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}                                                                                                                                                                                                                   
"DUMP_PND [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}                                                                                                                                                                                                                   
"DUMP_QRY [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}                                                                                                                                                                                                                   
"DUMP_REM [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}                                                                                                                                                                                                                   
"RECV [my-kafka-consumer-ac#consumer-2] [thrd:kafka-controller-headless:9092/bootstrap]: kafka-controller-headless:9092/bootstrap: Received MetadataResponse (v12, 196 bytes, CorrId 4, rtt 0.19ms)"}                                                                                                  
"ASSIGNDONE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": assignment operations done in join-state wait-metadata (rebalance rejoin=false)"}                                                                                                                       
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ===== Received metadata (for 1 requested topics): consumer join ====="}                                                                                                                             
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ClusterId: XJWpYEvfk0PCISvF51RubA, ControllerId: 0"}                                                                                                                                                
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: 1 brokers, 1 topics"}                                                                                                                                                                               
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap:   Broker #0/1: kafka-controller-0.kafka-controller-headless.kube-system.svc.cluster.local:9092 NodeId 0"}                                                                                           
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap:   Topic kafka_test_topic with 1 partitions"}                                                                                                                                                
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: 1/1 requested topic(s) seen in metadata (lookup by name)"}                                                                                                                                          
"CLUSTERID [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ClusterId update \"\" -> \"XJWpYEvfk0PCISvF51RubA\""}                                                                                                                                              
"CONTROLLERID [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ControllerId update -1 -> 0"}                                                                                                                                                                   
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:main]: Broadcasting state change"}                                                                                                                                                                                                                  
"SUBSCRIPTION [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": effective subscription list changed from 0 to 1 topic(s):"}                                                                                                                                           
"SUBSCRIPTION [my-kafka-consumer-ac#consumer-2] [thrd:main]:  Topic kafka_test_topic with 1 partition(s)"}                                                                                                                                                                                     
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": subscription updated from metadata change: rejoining group in state wait-metadata"}                                                                                                                         
"GRPLEADER [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": resetting group leader info: group (re)join"}                                                                                                                                                            
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" (re)joining in join-state wait-metadata with 0 assigned partition(s): Metadata for subscribed topic(s) has changed"}                                                                                         
"REBALANCE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" initiating rebalance (NONE) in state up (join-state wait-metadata) with 0 assigned partition(s): Metadata for subscribed topic(s) has changed"}                                                           
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": Rejoining group without an assignment: Metadata for subscribed topic(s) has changed"}                                                                                                                       
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state wait-metadata -> init (state up)"}                                                                                                                                                 
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": join with 1 subscribed topic(s)"}                                                                                                                                                                             
"CGRPMETADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: consumer join: metadata for subscription is up to date (0ms old)"}                                                                                                                                                                        
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-0.kafka-controller-headless.kube-system.svc.cluster.local:9092/0: Joining group \"my-kafka-consumer\" with 1 subscribed topic(s) and member id \"\""}                                                                     
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state init -> wait-join (state up)"}                                                                                                                                                     
"SEND [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 212 bytes @ 0, CorrId 2)"}                                                                                                                                                             
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Broadcasting state change"}                                                                                                                                                                                                      
"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 84 bytes, CorrId 2, rtt 1.20ms)"}                                                                                                                                                 
"JOINGROUP [my-kafka-consumer-ac#consumer-2] [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId , my MemberId my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc, member metadata count 0: Broker: Group member needs a valid member ID"}                             
"REQERR [my-kafka-consumer-ac#consumer-2] [thrd:main]: GroupCoordinator/0: JoinGroupRequest failed: Broker: Group member needs a valid member ID: explicit actions Ignore"}                                                                                                                            
"MEMBERID [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": updating member id \"\" -> \"my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc\""}                                                                                                         
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": Rejoining group without an assignment: JoinGroup error: Broker: Group member needs a valid member ID"}                                                                                                      
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state wait-join -> init (state up)"}                                                                                                                                                     
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": join with 1 subscribed topic(s)"}                                                                                                                                                                             
"CGRPMETADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: consumer join: metadata for subscription is up to date (2ms old)"}                                                                                                                                                                        
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-0.kafka-controller-headless.x.svc.cluster.local:9092/0: Joining group \"my-kafka-consumer\" with 1 subscribed topic(s) and member id \"my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc\""}     
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state init -> wait-join (state up)"}                                                                                                                                                     
"SEND [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 276 bytes @ 0, CorrId 3)"}                                                                                                                                                             
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Broadcasting state change"}                                                                                                                                                                                                      
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: Expired 1 entries from metadata cache (0 entries remain)"}
"Waiting for message"
"Waiting for message"
"Waiting for message"
"Waiting for message"
<snip><42s of repeated prints></snip>
"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 271 bytes, CorrId 3, rtt 42039.28ms)"}                                                                                                                                                                      
"JOINGROUP [my-kafka-consumer-ac#consumer-2] [thrd:main]: JoinGroup response: GenerationId 14, Protocol range, LeaderId my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc (me), my MemberId my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc, member metadata count 1: (no error)"}               
"JOINGROUP [my-kafka-consumer-ac#consumer-2] [thrd:main]: I am elected leader for group \"my-kafka-consumer\" with 1 member(s)"}                                                                                                                                                                                          
"GRPLEADER [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": resetting group leader info: JoinGroup response clean-up"}                                                                                                                                                                                                                                                                                                                                                                                                                      

How to reproduce

Checklist

Please provide the following information:

persona94 commented 5 days ago

Is there any other information I can provide? @pranavrth

pranavrth commented 3 days ago

Can you also provide time in the logs? Better to show human readable logs. Your app logger is not logging time.

pranavrth commented 3 days ago

JoinGroupRequest is taking around 42 seconds.

"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 271 bytes, CorrId 3, rtt 42039.28ms)"}

How many consumers is this coordinator serving? Is there some issue in the coordinaor? I don't know what is happening at the coordinator side which is taking 42s to respond.

persona94 commented 2 days ago

Can you also provide time in the logs? Better to show human readable logs. Your app logger is not logging time.

Sure. I had trimmed the logs because they were a lot, I've attached the unmodified file from this morning book.log

persona94 commented 2 days ago

JoinGroupRequest is taking around 42 seconds.

"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 271 bytes, CorrId 3, rtt 42039.28ms)"}

How many consumers is this coordinator serving? Is there some issue in the coordinaor? I don't know what is happening at the coordinator side which is taking 42s to respond.

This coordinator serves 2 consumers. Each in it's own consumer group. I run this consumer on 3 separate clusters (ie.e all different independent kafka instances) and all of them seem to have the same issue. From what I can tell this is not an issue on the other consumer which uses a c++ driver. Also, if I use kafka-python as my kafka driver I don't see this delay, I connect instantly