tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

Re Consuming KafkaJs #1697

Open MAhmadIqbal opened 5 months ago

MAhmadIqbal commented 5 months ago

Need to Re-consume the commit offset messages Actually, last time I have faced the issue consumer consumed the messages twice after certain period of time, so I have added unique param in payload in my producers to check, is the message already consumed or not. Now I need to re consume the last offset messages but seek not working but facing an issue, it says offset is out of range. even I get the offset from the same consumer, and want to start from last 50 commit messages. those offset are available in confluent but why I am facing the issue.

const ConsumerReminderExpertSms = async ({ topic = "test1" }) => {
  try {

    const consumerReminder = expertKafka.consumer({
      groupId: `Reminder22_Expert_Sms_${ENV}`,

  });
    await consumerReminder.connect();
    const specificOffsets = {
      0: '4950', // Offset for partition 0
      1: '2450'  // Offset for partition 1
    };

    await consumerReminder.subscribe({ topic: topic, fromBeginning:false, });

    consumerReminder.on(consumerReminder.events.GROUP_JOIN, async () => {
      // Seek to the specific offset after joining the group
      try{
        for (const [partition, offset] of Object.entries(specificOffsets)) {
        await consumerReminder.seek({ topic, partition: Number(partition), offset:offset });
        console.log(`Seeking to offset ${offset} on partition ${partition} for topic ${topic}`);
      }
    }catch(err){
      console.log("SEEK_ERROR",err)
    }
    });

    consumerReminder.run({
      autoCommit:false,

      eachMessage: async ({ topic, partition, message }) => {
        try{

Environment: