tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.71k stars 525 forks source link

How to fix the mistake? #1294

Open datehoer opened 2 years ago

datehoer commented 2 years ago

hello everyone,i think i need some help.

KafkaJSNonRetriableError: Consumer group was not initialized, consumer#run must be called first
    at Object.commitOffsets (/opt/nodework/node_modules/kafkajs/src/consumer/index.js:362:13)
    at Runner.eachMessage (/opt/nodework/pincong-img-download-2.js:105:40)
    at runMicrotasks (<anonymous>)
    at processTicksAndRejections (internal/process/task_queues.js:93:5)
    at async Runner.processEachMessage (/opt/nodework/node_modules/kafkajs/src/consumer/runner.js:151:9)
    at async onBatch (/opt/nodework/node_modules/kafkajs/src/consumer/runner.js:287:9)
    at async /opt/nodework/node_modules/kafkajs/src/consumer/runner.js:339:15 {
  retriable: false,
  helpUrl: undefined
}

"kafkajs": "1.15.0" “node”: "14.15.0" “server”: "Linux version 4.15.0-124-generic (buildd@lgw01-amd64-029) (gcc version 7.5.0 (Ubuntu 7.5.0-3ubuntu1~18.04)) #127-Ubuntu SMP Fri Nov 6 10:54:43 UTC 2020"

async function main(){
  const groupId = "message";
  const topic = "data";
  let index = 0;
  const consumer = kafka.consumer({ groupId });
  await consumer.subscribe({ topic, fromBeginning: true });
  try{
    await consumer.run({
      eachMessage: async ({topic,partition,message}) =>{
        const data = JSON.parse(message.value.toString("utf-8"));
        try{
          await consumer.commitOffsets([{
            topic: topic,
            partition: partition,
            offset: Number(message.offset) + 1
          }])
          await insert(data);
        }catch(e){
          console.error(e);
        }
        return;
      }
    })
  }catch (e){
    console.log(e)
    return
  }
}

so,what should i do?

Nevon commented 2 years ago

There's no need to call consumer.commitOffsets from within eachMessage. A soon as eachMessage resolves the offset will be committed automatically:

await consumer.run({
  eachMessage: async ({topic,partition,message}) => {
    await insert(JSON.parse(message.value.toString("utf-8")));
  }
})

I'm not entirely sure how to trigger this behavior though. The only thing I can think of is stopping the consumer, but even then I would expect that to have exited out without calling eachMessage.

datehoer commented 2 years ago

thanks,and i closed the kafka autocommit,so it doesn't need to commit?