Open gauravbaranwal opened 6 years ago
If you are using Consumer Groups (the newer kafka feature), you should be using a ConsumerGroup instance, not High Level Consumer.
HLC was for the older zookeeper based consumers.
I am also facing same issue and using consumerGroup instance using kafka-node library in nodejs
autoCommit:false
this.consumerGroup.on('message', (message) => { console.log('message', message) const offset = { topic: message.topic, partition: message.partition, offset: message.offset, // Adjust as needed tried offset: message.offset+1 also }; this.consumerGroup.commit(offset,(error, data) => { if (error) { console.error(error); } else { console.log('response>>>', data); } }); });
response>>>{ 'topic1': { partition: 0, errorCode: 0 }, 'topic2': { partition: 0, errorCode: 0 }, 'topic3': { partition: 0, errorCode: 0 } }
and message is not committed , i received again when i am restarting consumer.
I have set autocommit to false, as I want to manually commit the offset after my task completion. I am using a highlevel consumer, and use its commit method to manually commit the offset. The commit method is not giving any error, It is used as : consumer.commit( function(err, data) { if(err) console.log('ERROR IN consumer committed',err);
where consumer is an instance of highlevelconsumer.Now, console.log depicts that commits are done fine, but kafka-consumer-groups.sh --zookeeper localhost:2181 --describe command shows we are still at complete lag and current-offset is zero. How can I commit offsets ?
thing tried after google search : 1.> using consumer.commit({topic: msg.topic, partition: msg.partition, offset: msg.offset } 2.> using consumer.commit({topic: msg.topic, partition: msg.partition, offset: msg.offset + 1 } I also tried using consumerstream and consumergroup, but in these two cases the consumerstream instance and consumergroup instance didn't get the message event .
one more doubt : when autocommit is false, the consumer gets message event for all consumers, should the consumer get just one message corresponding to the current offset. (as autocommit is set to false)