tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.73k stars 526 forks source link

KafkaJs consumer taking longer time if keeping groupID same #807

Open jaiswarvipin opened 4 years ago

jaiswarvipin commented 4 years ago

Hi Team, we are facing the same issue, however time remain unpredictable. As we are using the same code KafkaJS in the production environment, so this is not possible to change the consumer groupID after each call, it must kept same to make make sure that we can read only those message which was not read from the groupID, if we keep changing the groupID on each request/interval then we will always getting the all message again and again, yes this improvement connection and message getting time like anytime.

Current Stat Keeping same consumer group ID: 20-30 Seconds Keep changing the groupID: 2-5 mili second

here we are looking for your help, how to improve the connection time and message fetching keeping consumer groupID same

my code base is.....

.env

DEFAULT_MESSAGE_POLLING_TIME=700 KAFAK_BROKER_ENDPOINT=localhost:9094 KAFAK_OUTBOUND_TOPIC_PREFIX= KAFAK_OUTBOUND_TOPIC_POSTFIX=_OUT

I have set the consumer group name and topic name in the Database, in this case consumer_group_name = "customer_demo_group" outbound_topic = "customer_demo_OUT"

/***************************************************************************/
/*Purpose   : Polling the mesage from topic.
/*Input     : None.
/*Response  : None. 
/*Created By: Jaiswar Vipin Kumar R.
/***************************************************************************/
setInterval(function(){
    /* Iterating the operation array loop */
    Object.keys(strTopicArr).forEach(function(key, index) {
        //console.log(this[key].inbound_topic);
        /* get the message of each outbound topic */
        resetTheConsumer(this[key].consumer_group_name, this[key].outbound_topic);
    }, strTopicArr);
},process.env.DEFAULT_MESSAGE_POLLING_TIME);

and consumer code base is

/***************************************************************************/
/*Purpose   : Reading Message from topics.
/*Input     : pStrConsumerGroup :: Topic consumer Group,
            : pStrTopicName :: Topic name.
/*Response  : Message Detail
/*Created By: Jaiswar Vipin Kumar R.
/***************************************************************************/
function getMessageFromTopic(pStrConsumerGroup, pStrTopicName, callback){
    try{
        /* Getting the kafak avaliable messge broker reference */
        const { Kafka }                 = require('kafkajs')
        const kafakBrokerendPointArr    = (process.env.KAFAK_BROKER_ENDPOINT).split(',');
        const kafka                     = new Kafka({clientId: pStrConsumerGroup+'_client',brokers: kafakBrokerendPointArr,logLevel:0});

        /* Creating the consumer */
        //const consumerObj = kafka.consumer({ groupId: pStrConsumerGroup+"_"+getRandomInt(10)});
        const consumerObj = kafka.consumer({ groupId: pStrConsumerGroup});

            /* Creating the non-blocking aync consumer request */
            const runs = async () => {
                /* Connecting to the consumer */
                consumerObj.connect()
                .then(returnObj =>{
                    /* Subscribing consumer to the topic */
                    consumerObj.subscribe({ topic: process.env.KAFAK_OUTBOUND_TOPIC_PREFIX+pStrTopicName+process.env.KAFAK_OUTBOUND_TOPIC_POSTFIX, fromBeginning: true})
                    //console.log(pStrConsumerGroup+" === "+pStrTopicName);
                    /* Runnign the look for getting the messge */
                    consumerObj.run({
                        partitionsConsumedConcurrently: 3,
                        eachMessage: async ({ topic, partition, message }) => {
                            try{
                                //console.log(message.value.toString());
                                if(message.value.toString() !=''){
                                    /* return callback({
                                        partition,
                                        offset: message.offset,
                                        value: message.value.toString(),
                                    }) */
                                    //console.log(message.value.toString());
                                    sendMessage(message.value.toString());
                                }
                            }catch(Exception){                          
                                console.log(Exception);
                            }

                        }
                    })
                })
                .catch(excp =>{
                    console.log(excp);
                }).finally(fObj=>{

                })
                consumerObj.disconnect();
            }
            runs().catch(e => console.error(`[example/consumer] ${e.message}`, e))

    }catch(exception){
        console.log(exception);
    }
}

and we expecting 0.1-0.2 billions of message in the 15 min time to be get consumer and send to the requester

@tulios @Nevon Kindly assist

Originally posted by @jaiswarvipin in https://github.com/tulios/kafkajs/issues/260#issuecomment-654489802

ankon commented 4 years ago

setInterval(() => { ... resetTheConsumer(...).then(...) }, process.env.DEFAULT_MESSAGE_POLLING_TIME)

I don't see that you posted your resetTheConsumer function, but the name is very very suspicious: Are you really reconnecting each time? Kafka uses connections and long-polling to be efficient. This has also effects on how consumer groups work, and there are timeouts that Kafka must obey to know all members of a consumer group. You should read up a bit on that topic.

const kafakBrokerendPointArr = (process.env.KAFAK_BROKER_ENDPOINT).split(','); consumerObj.subscribe({ topic: process.env.KAFAK_OUTBOUND_TOPIC_PREFIX+pStrTopicName+process.env.KAFAK_OUTBOUND_TOPIC_POSTFIX, fromBeginning: true})

Typos: It's "kafka", not "kafak" :)

goriunov commented 4 years ago

https://github.com/tulios/kafkajs/issues/701 probably related. Most likely the reason for slow connection is due to session timeout as if you disconnect from CG it may take some time for server to expire your previous connection. Also i noticed that consumerObj.disconnect(); is not await. I think kafka is better used with single consistent connection rather then periodic re connection.

jaiswarvipin commented 4 years ago

Thank @ankon and @goriunov for you view on the code. Yes i have corrected it and now now calling the consumer on-time only and that's help and now my response time for first and followed message is

Current Stat Keeping same consumer group ID: 3-6 mili second

@ankon resetTheConsumer is just the method who calling the getMessageFromTopic method and thanks for spell correction :)

However, needed your view on the procedure as well. Allow me to give background: currently we are doing the testing the with 10K message in 3 seconds (as of now) and it will go till the 0.1 billions...

The code base for procedure is ( currently procedure creating the inbound and outbound topic automatically such that other side scheduler will pick the message from inbound topic and put response from into outbound topic and from where our consumer (above getMessageFromTopic method) is consuming the message and sending back to the requester and its job to just out the incoming message into the inbound topic)

/***************************************************************************/
    /*Purpose   : Send Message into the topics.
    /*Input     : pStrConsumerGroup :: Consumer Group name,
                : pStrTopicName :: Topic name,
                : pStrOutBoundTopicName :: Outbound topic name.
                : pStrMessageBody   :: Message body
    /*Response  : Message send status :: TRUE/FALSE
    /*Created By: Jaiswar Vipin Kumar R.
    /***************************************************************************/
    setMessageInTopic(pStrConsumerGroup, pStrTopicName,pStrOutBoundTopicName, pStrMessageBody, callback){
        try{
            //console.log(pStrConsumerGroup+" === "+pStrTopicName+" === "+pStrOutBoundTopicName);
            /* Variable initilization */
            const { Kafka }                 = require('kafkajs')
            const kafakBrokerendPointArr    = (process.env.KAFAK_BROKER_ENDPOINT).split(',');
            const kafka                     = new Kafka({clientId: pStrConsumerGroup+'_client',brokers: kafakBrokerendPointArr,logLevel:0});
            const producer                  = kafka.producer()

            /* Creating the non-blocking aync prodecure request */
            const run = async () => {
                try{
                    /* Creating teh topic and sending the message  */
                    await producer.send(
                                        {
                                            topic: pStrTopicName,
                                            messages: [
                                                        { 
                                                            value: JSON.stringify(pStrMessageBody)
                                                        },
                                                    ],
                                        }
                                    );

                    /* Creating the outbound topic */
                    await producer.send(
                                        {
                                            topic: process.env.KAFAK_OUTBOUND_TOPIC_PREFIX+pStrOutBoundTopicName+process.env.KAFAK_OUTBOUND_TOPIC_POSTFIX,
                                            messages: [
                                                        { 
                                                            value: ""
                                                        },
                                                    ],
                                        }
                                    );
                    /* Return the response */
                    return callback(true);
                }catch(exception){
                    /* Print the exception */
                    console.log(exception);
                    /* Return the response */
                    return callback(false);
                }
            }
            /* Throw if any execption occured while performing the procedure operation */
            run().catch(console.error);
        }catch(exception){
            /* Print the exception */
            console.log(exception);
            /* Return the response */
            return callback(false);
        }
    }

one observation we found when we are start sending the 10K+ message in 3 seconds at few (30%) message are available after 1 min and because of same the TAT getting impacted.

Kindly suggest

Thanks.

Mylucci commented 3 years ago

I have the same problem when using nodemon for debugging. And it may be the same with 260# Every time I edit a file, nodemon will restart my program. I add consumer.disconnect() like this:

  const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']

  signalTraps.map(type => 
        process.once(type, async () => {
          await this.consumer.disconnect();
          logger.info("kafka consumer disconnect");
        })
      );
    }

And when it restarts, it outputs:

[start:server] [2021-03-01T08:48:39.349+0100] [INFO] service - kafka consumer disconnect [start:server] [2021-03-01T08:48:39.349+0100] [INFO] service - kafka consumer disconnect [start:server] [2021-03-01T08:48:39.349+0100] [INFO] service - kafka consumer disconnect [start:server] [2021-03-01T08:48:39.349+0100] [INFO] service - kafka consumer disconnect [start:server] [2021-03-01T08:48:39.349+0100] [INFO] service - kafka consumer disconnect [start:server] [2021-03-01T08:48:39.349+0100] [INFO] service - kafka consumer disconnect [start:server] [2021-03-01T08:48:39.350+0100] [INFO] service - kafka consumer disconnect [start:server] [2021-03-01T08:48:39.350+0100] [INFO] service - kafka consumer disconnect [start:server] [2021-03-01T08:48:39.350+0100] [INFO] service - kafka consumer disconnect [start:server] [2021-03-01T08:48:39.350+0100] [INFO] service - kafka consumer disconnect [start:server] [2021-03-01T08:48:39.350+0100] [INFO] service - kafka consumer disconnect

It shows that consumers indeed disconnected. But consumers still taking a long time to connect if keeping the same groupID, exactly it seems like the sessionTimeout value. Changing groupID is not a graceful idea. It will be better if I set

heartbeatInterval: 2000,
sessionTimeout: 6000

It doesn't occur if I use a different groupID or use Ctrl+C to exit the process and restart it manually, so I wonder why reconnecting should take sessionTimeout time with nodemon, it should be the same with node. How can I avoid such a long time? Is there any incorrect in my consumer disconnecting?

Nevon commented 3 years ago

What you need to keep in mind is that the consumer group exists outside of your node instances. Whenever a consumer joins or leaves the group, all the members of the group have to re-join and sync before they can start consuming again.

When you start your node process, your consumer will join that consumer group, and when you quit the process (with graceful disconnect), your consumer will leave the consumer group. Like mentioned above, this means the group has to rebalance (re-join and sync).

When you just exit the process without disconnecting, your consumer doesn't actually leave the group - so initially there won't be a rebalance. However, after sessionTimeout that consumer will be considered unhealthy and will be kicked out of the group, which will trigger a rebalance. In the in-between time, no processing will happen on any partitions that are assigned to that consumer.

If you are developing with nodemon, causing frequent restarts, a solution is probably to generate a random groupid on each restart. When static membership (#884) becomes available, that could be another option, but it's not available yet so for now a random group id is probably the best bet. Either that, or you have to wait for the rebalance to happen.

Mylucci commented 3 years ago

Thanks for your explanation~ I still have a question, I add consumer.disconnect in the code, and my consumers should disconnect when the process restarts with nodemon. Is it normal for rebalance to take such a long time? Whether will consumer.disconnect work when developing with nodemon...

Nevon commented 3 years ago

Is it normal for rebalance to take such a long time? Whether will consumer.disconnect work when developing with nodemon

It will take up to rebalanceTimeoutms if you don't gracefully disconnect.

From the documentation:

rebalanceTimeout The maximum time that the coordinator will wait for each member to rejoin when rebalancing the group default: 60000

Mylucci commented 3 years ago

yeah, I get it, how about I set this value to less than 10000, any side effects will be made? I know this option but no detailed description in the documentation. Thanks a lot! 🙇‍

Nevon commented 3 years ago

It's pretty important that you understand how heartbeatInterval, sessionTimeout and rebalanceTimeout fit together. If rebalanceTimeout is less than heartbeatInterval, for example, it would mean that if you have multiple consumers in the group, when a new instance joins and triggers a rebalance, the other consumers might get kicked out of the group because they don't become aware that there is a rebalance going on until they heartbeat.

This talk https://www.confluent.io/online-talks/everything-you-always-wanted-to-know-about-kafkas-rebalance-protocol-but-were-afraid-to-ask-on-demand explains everything very well. There are also loads of other resources on the internet explaining this.

Mylucci commented 3 years ago

Is it normal for rebalance to take such a long time? Whether will consumer.disconnect work when developing with nodemon

It will take up to rebalanceTimeoutms if you don't gracefully disconnect.

From the documentation:

rebalanceTimeout The maximum time that the coordinator will wait for each member to rejoin when rebalancing the group default: 60000

Hi Nevon,

After several weeks I'm working on this problem again, since random group id will bring to some questions we don't expect. eg. with different consumer group id, our new consumer may consume the same message again that the old consumer has consumed.

I am confused about gracefully disconnect, currently I just have this piece of code when exiting:

  const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']

  signalTraps.map(type => {
    process.on(type, async () => {
      logger.info(`Kafka client: process prepare to exit.`)

      await Promise.all(this.allConsumer.map(
        async (consumer)=>{ 
          await consumer.disconnect();
        } 
      ));
      await this.producer.disconnect();
      logger.info(`Kafka client: producer disconnected`);
      await this.admin.disconnect();
      logger.info(`Kafka client admin disconnected`);
      process.exit(0);
    }) 
  });

It shows in the log that the consumer indeed stopped:

[2021-03-22T08:42:49.886+0100] [INFO] service - Kafka client: process prepare to exit.
{"level":"INFO","timestamp":"2021-03-22T07:42:49.887Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"Monitor"}
{"level":"INFO","timestamp":"2021-03-22T07:42:51.285Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"Monitor"}
[2021-03-22T08:42:52.345+0100] [INFO] service - Kafka client: producer disconnected
[2021-03-22T08:42:52.346+0100] [INFO] service - Kafka client admin disconnected

Is this graceful disconnecting? But in the Kafka server, I still see that the consumer was kicked out after session timeout.

[2021-03-22 07:43:01,282] INFO [GroupCoordinator 0]: Member kafkajs-2d4e3bd3-d7ca-41bd-b1e6-05a4bb5d1210 in group Monitor has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-03-22 07:43:01,282] INFO [GroupCoordinator 0]: Preparing to rebalance group Monitor in state PreparingRebalance with old generation 6 (__consumer_offsets-38) (reason: removing member kafkajs-2d4e3bd3-d7ca-41bd-b1e6-05a4bb5d1210 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2021-03-22 07:43:01,282] INFO [GroupCoordinator 0]: Group Monitor with generation 7 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)

Does it mean that consumer.disconnect doesn't work?

Thanks in advance.

Nevon commented 3 years ago

If you enable debug logs, you can see exactly what happens. What should be happening is that the consumer(s) wait for ongoing work, send a GROUP_LEAVE request and then disconnect.

Mylucci commented 3 years ago

Yes, I see it like this:

{"level":"INFO","timestamp":"2021-03-22T09:08:14.053Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"Monitor"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:14.054Z","logger":"kafkajs","message":"[Consumer] consumer has stopped, disconnecting","groupId":"Monitor"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:14.054Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"127.0.0.1:9092","clientId":"kafkajs"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:14.054Z","logger":"kafkajs","message":"[RequestQueue] Waiting for pending requests","clientId":"kafkajs","broker":"127.0.0.1:9092","currentInflightRequests":1,"currentPendingQueueSize":0}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:17.160Z","logger":"kafkajs","message":"[Runner] consumer not running, exiting","error":"Not connected","groupId":"Monitor","memberId":"kafkajs-527d1166-cdb4-4da4-97c8-b4d193e47623"}
{"level":"INFO","timestamp":"2021-03-22T09:08:17.160Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"Monitor"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:17.160Z","logger":"kafkajs","message":"[Consumer] consumer has stopped, disconnecting","groupId":"Monitor"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:17.160Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"127.0.0.1:9092","clientId":"kafkajs"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:17.160Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"127.0.0.1:9092","clientId":"kafkajs"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:17.160Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"127.0.0.1:9092","clientId":"kafkajs"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:17.160Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"127.0.0.1:9092","clientId":"kafkajs"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:17.170Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"127.0.0.1:9092","clientId":"kafkajs"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:17.170Z","logger":"kafkajs","message":"[Connection] disconnecting...","broker":"127.0.0.1:9092","clientId":"kafkajs"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:17.170Z","logger":"kafkajs","message":"[Connection] disconnected","broker":"127.0.0.1:9092","clientId":"kafkajs"}
{"level":"DEBUG","timestamp":"2021-03-22T09:08:17.170Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 11)","broker":"127.0.0.1:9092","clientId":"kafkajs","correlationId":25,"size":134,"data":"[filtered]"}

I wonder why the Kafka server still kicked out the consumer after session timeout, even if I disconnect the consumer when the process exits. From your explanation above, I think it may not normal.

sidyag commented 2 years ago

I believe I am facing a similar issue to this. Is there any update on the fix? Based on #701 it seems that this is a known issue, and the workaround for now is to use different groupIds. Is that correct?

MishaTsypkin commented 2 years ago

@sidyag I faced the same issue in e2e tests. I had multiple test scenarios creating consumers with the same group id and despite calling consumer.disconnect() it was taking rebalanceTimeout to run a new consumer. The solution that seems to be working for me is to create a new instance of Kafka for each test scenario to isolate consumers from one another. I hope it helps.

KunalBurangi commented 1 year ago

Thank @ankon and @goriunov for you view on the code. Yes i have corrected it and now now calling the consumer on-time only and that's help and now my response time for first and followed message is

Current Stat Keeping same consumer group ID: 3-6 mili second

@ankon resetTheConsumer is just the method who calling the getMessageFromTopic method and thanks for spell correction :)

However, needed your view on the procedure as well. Allow me to give background: currently we are doing the testing the with 10K message in 3 seconds (as of now) and it will go till the 0.1 billions...

The code base for procedure is ( currently procedure creating the inbound and outbound topic automatically such that other side scheduler will pick the message from inbound topic and put response from into outbound topic and from where our consumer (above getMessageFromTopic method) is consuming the message and sending back to the requester and its job to just out the incoming message into the inbound topic)

/***************************************************************************/
    /*Purpose   : Send Message into the topics.
    /*Input     : pStrConsumerGroup :: Consumer Group name,
              : pStrTopicName :: Topic name,
              : pStrOutBoundTopicName :: Outbound topic name.
                : pStrMessageBody   :: Message body
    /*Response  : Message send status :: TRUE/FALSE
    /*Created By: Jaiswar Vipin Kumar R.
    /***************************************************************************/
    setMessageInTopic(pStrConsumerGroup, pStrTopicName,pStrOutBoundTopicName, pStrMessageBody, callback){
      try{
          //console.log(pStrConsumerGroup+" === "+pStrTopicName+" === "+pStrOutBoundTopicName);
          /* Variable initilization */
          const { Kafka }                 = require('kafkajs')
          const kafakBrokerendPointArr    = (process.env.KAFAK_BROKER_ENDPOINT).split(',');
          const kafka                     = new Kafka({clientId: pStrConsumerGroup+'_client',brokers: kafakBrokerendPointArr,logLevel:0});
          const producer                  = kafka.producer()

          /* Creating the non-blocking aync prodecure request */
          const run = async () => {
              try{
                  /* Creating teh topic and sending the message  */
                  await producer.send(
                                      {
                                          topic: pStrTopicName,
                                          messages: [
                                                      { 
                                                          value: JSON.stringify(pStrMessageBody)
                                                      },
                                                  ],
                                      }
                                  );

                  /* Creating the outbound topic */
                  await producer.send(
                                      {
                                          topic: process.env.KAFAK_OUTBOUND_TOPIC_PREFIX+pStrOutBoundTopicName+process.env.KAFAK_OUTBOUND_TOPIC_POSTFIX,
                                          messages: [
                                                      { 
                                                          value: ""
                                                      },
                                                  ],
                                      }
                                  );
                  /* Return the response */
                  return callback(true);
              }catch(exception){
                  /* Print the exception */
                  console.log(exception);
                  /* Return the response */
                  return callback(false);
              }
          }
          /* Throw if any execption occured while performing the procedure operation */
          run().catch(console.error);
      }catch(exception){
          /* Print the exception */
          console.log(exception);
          /* Return the response */
          return callback(false);
      }
  }

one observation we found when we are start sending the 10K+ message in 3 seconds at few (30%) message are available after 1 min and because of same the TAT getting impacted.

Kindly suggest

Thanks.

Hi, I didn't understand what did you change to reduce the consumer joining the group time ?

orange1337 commented 10 months ago

Have the same issue with the NestJs Kafka application, reconnect to Kafka takes 15-30 sec when I'm using hot reload, this is really annoying while developing an app. My app gracefully disconnects from Kafka and then connects...

jewel109 commented 2 months ago

Have the same issue with the NestJs Kafka application, reconnect to Kafka takes 15-30 sec when I'm using hot reload, this is really annoying while developing an app. My app gracefully disconnects from Kafka and then connects...

So how did you solved?

orange1337 commented 2 months ago

Have the same issue with the NestJs Kafka application, reconnect to Kafka takes 15-30 sec when I'm using hot reload, this is really annoying while developing an app. My app gracefully disconnects from Kafka and then connects...

So how did you solved?

Unfortunately didn't find a solution yet :(