tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.76k stars 529 forks source link

Multiple consumer with same consumer groupid for same topic is not working together #904

Closed jaiswarvipin closed 4 years ago

jaiswarvipin commented 4 years ago

Hi Team, We are running 10 instance of node in the Production environment and each instance subscribe the same topic with common consumer "gropuid", however we had noticed that only one running instance of consumer are reading the message from subscribed topic, however other 9 are remain ideal, this causing us a huge performance issue. Kindly assist...

Consumer Code

/***************************************************************************/
Purpose   : Reading Message from topics.
Input     : pStrConsumerGroup :: Topic consumer Group,
         : pStrTopicName :: Topic name.
Response  : Message Detail
Created By: Jaiswar Vipin Kumar R.
/***************************************************************************/
function getMessageFromTopic(pStrConsumerGroup, pStrTopicName, callback){
    try{
        /* Getting the kafak avaliable messge broker reference */
        const { Kafka }                 = require('kafkajs')
        const kafakBrokerendPointArr    = (process.env.KAFAK_BROKER_ENDPOINT).split(',');
        const kafka                     = new Kafka({clientId: process.env.KAFAK_DEFAULT_CONSUMER_GROUP,brokers: kafakBrokerendPointArr,logLevel:'INFO'});
            /* Creating the consumer */
            const consumerObj = kafka.consumer({ groupId: pStrConsumerGroup,maxWaitTimeInMs:100});

            /* Creating the non-blocking aync consumer request */
            const runs = async () => {
                //console.log("Consumer Init 1 : "+process.env.KAFAK_OUTBOUND_TOPIC_PREFIX+pStrTopicName+process.env.KAFAK_OUTBOUND_TOPIC_POSTFIX);
                strTopicConnectedArr[pStrTopicName] = pStrTopicName;
                /* Connecting to the consumer */
                consumerObj.connect()
                .then(returnObj =>{
                    /* Subscribing consumer to the topic */
                    consumerObj.subscribe({ topic: process.env.KAFAK_OUTBOUND_TOPIC_PREFIX+pStrTopicName+process.env.KAFAK_OUTBOUND_TOPIC_POSTFIX, fromBeginning: false})
                    console.log(pStrConsumerGroup+" === "+pStrTopicName);
                    /* Runnign the look for getting the messge */
                    consumerObj.run({
                        //autoCommitInterval: 10,
                        partitionsConsumedConcurrently: 10,
                        /* autoCommit: false,
                        autoCommitInterval: 60000, */
                        eachMessage: async ({ topic, partition, message }) => {
                            try{
                                if(message.value.toString() !=''){
                                    /* Send Message */
                                    sendMessage(message.value.toString(), message.timestamp);
                                }
                            }catch(Exception){                          
                                console.log(Exception);
                            }

                        }
                    })
                })
                .catch(excp =>{
                    delete strTopicConnectedArr[pStrTopicName];
                    console.log("-------------Consumer---------------")
                    console.log(excp);
                }).finally(fObj=>{

                })
                //consumerObj.disconnect();
            }
            runs().catch(e => console.error(`[example/consumer] ${e.message}`, e))
    }catch(exception){
        console.log(exception);
    }
}

Where KAFAK_BROKER_ENDPOINT : {my kafka cluster IP} KAFAK_DEFAULT_CONSUMER_GROUP: KafkaConsumer pStrConsumerGroup: customerdetails pStrTopicName: customerdemo

ankon commented 4 years ago

How is your topic configured? In particular, how many partitions do you have?

Nevon commented 4 years ago

Closing this as inactive.