tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.7k stars 522 forks source link

Is it possible to synchronously consume stored messages from kafka using kafkajs? #1518

Open impriyadev opened 1 year ago

impriyadev commented 1 year ago

Im trying to consume kafka msgs and push it to array using eachMessage: in kafkajs on aws lambda. I'm using eventBridge to trigger this lambda and it should consume messages from kafka's topic in this scenario. Then it just prints only one message from kafka even when frombeginning parameter is set. It gives error on aws lambda that "Co-ordinator is not member of group" I used heartbeatInterval to solve this, and awaited heartbeat() inside eachMessage, still it doesn't push messages to array and exits process from lambda.

Is it possible to synchronously handle this behavior? So I can consume all kafka messages and push it to array for further processing? Any help on above point would be really appreciable. Thank you.

Code :- const { Kafka } = require("kafkajs"); const consumer = kafka.consumer({ groupId: 'test-group',heartbeatInterval:8000}) await consumer.connect() await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }) await heartbeat(); },

})

Note:- In this case this consumer is reading messages from kafka topic where there are already stored messages. Producer will be triggered only in error case and it would store its message into one kafka topic, this consumer has to read msgs on daily basis.

Environment:**

bjornmolin commented 1 year ago

the fromBeginning option is only used when the offset is invalid or not defined. Give consumer.seek or admin.reset a try if you like to re-process messages