Blizzard / node-rdkafka

Node.js bindings for librdkafka
MIT License
2.08k stars 391 forks source link

Kafka consumer reads same message again after consumer group rebalances #1079

Open subhamKumar04 opened 1 month ago

subhamKumar04 commented 1 month ago

Environment Information

Steps to Reproduce

I have two consumers running in my application both running on the same consumer group ID G1.

consumer1 is reading from topic1 and consumer2 is reading from topic2. autoCommit is disabled. I am manually committing each offset after message processing.

Both topics have only one partition.

MAX_MESSAGE_POLL is set to 1. So I am polling only 1 message at a time from the broker.

What I am noticing is, that if consumer1 is in middle of message processing and in the meantime consumer2 comes up. Consumer1 will re-read the same message again. I enabled rebalance_cb in kafka consumer config and got to know that broker rebalances the consumer group whenever a new consumer joins the group. It revokes and re-assigns the same partition again (since there is only one partition)

Another thing I noticed is that if I provide a default implementation of rebalance_cb from https://www.npmjs.com/package/node-rdkafka#rebalancing

That is the below code:

'rebalance_cb': (err, assignment) => {

      if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
        // Note: this can throw when you are disconnected. Take care and wrap it in
        // a try catch if that matters to you
        this.consumer.assign(assignment);
      } else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
        // Same as above
        this.consumer.unassign();
      } else {
        // We had a real error
        console.error(err);
      }

    }

In this case, message is NOT read twice.

My expectation is that after a successful commit, same message should never be read again from same consumer group.

node-rdkafka Configuration Settings

brokerConfig: {
    'metadata.broker.list': 'localhost:9092',
    'group.id': `group-name`,
    'event_cb': true,
    'compression.codec': 'snappy',
    'socket.keepalive.enable': true,
    'enable.auto.commit': false,
    'heartbeat.interval.ms': 250,
    'queued.min.messages': 100,
    'fetch.error.backoff.ms': 250,
    'queued.max.messages.kbytes': 50,
  },
  topicConfig: {
    'auto.offset.reset': 'earliest',
    'request.required.acks': 1
  }