confluentinc / confluent-kafka-javascript

Confluent's Apache Kafka JavaScript client
https://www.npmjs.com/package/@confluentinc/kafka-javascript
MIT License
92 stars 5 forks source link

Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE #32

Closed serj026 closed 2 months ago

serj026 commented 3 months ago

Hello, I noticed that COOPERATIVE rebalance (incremental assign/unassign bindings) was implemented in confluent-kafka-javascript library, which is currently not supported by node-rdkafka. I made a small POC to see how it works in Node.js, but encountered a problem where the following error occurs during rebalance:

Consumer [1|poc_test]: rebalance.error Error: Local: Erroneous state
        at KafkaConsumer.assign (/Users/s.franchuk/github/confluent-kafka-javascript/lib/kafka-consumer.js:266:16)
        at KafkaConsumer.conf.rebalance_cb (/Users/s.franchuk/github/confluent-kafka-javascript/lib/kafka-consumer.js:65:16)
[2024-04-01T13:05:27.192Z]  WARN: poc-confluent-kafka/36080 on s-franchuk:
    Consumer [1|poc_test]: event.log {
      severity: 4,
      fac: 'ASSIGN',
      message: '[thrd:main]: Group "poc_test": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE'
    }

Debugging showed that the KafkaConsumer::IncrementalAssign method in kafka-consumer.cc is indeed called, and the consumer method consumer->incremental_assign(partitions) is also called with the correct arguments. What happened next in C++ code of librdkafka is hard for me to say, but rko->rko_u.assign.method does not return RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN or RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN here: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_cgrp.c#L4842

Kafka consumer:

const { CODES, KafkaConsumer } = require('@confluentinc/kafka-javascript');

this._consumer = new KafkaConsumer({
      'bootstrap.servers': KAFKA_BROKERS,
      'client.id': clientId,
      'group.id': groupId,
      'auto.offset.reset': 'latest',
      'session.timeout.ms': 10000,
      'heartbeat.interval.ms': 100,
      'enable.auto.commit': false,
      'partition.assignment.strategy': 'cooperative-sticky',
      'fetch.wait.max.ms': 100,
      rebalance_cb: true,
      // debug: 'consumer,topic',
    });

Rebalance callback:

this._consumer.on('rebalance', (err, assignments) => {
      const partitions = assignments.map((assignment) => assignment.partition);
      const type = RebalanceEventType[err.code] || 'ERROR';

      logger.info(`${this._who()}: rebalance happened - ${type} | [${partitions.join(',')}]`);

      switch (err.code) {
        case CODES.ERRORS.ERR__ASSIGN_PARTITIONS:
          this._consumer.incrementalAssign(assignments);
          break;
        case CODES.ERRORS.ERR__REVOKE_PARTITIONS:
          this._consumer.incrementalUnassign(assignments);
          break;
        default:
          logger.error(`${this._who()}: rebalance error`, err);
      }
});

this._consumer.on('rebalance.error', (err) => {
      logger.error(`${this._who()}: rebalance.error`, err);
});

For testing, used the library version 0.1.11-devel from npm and a manual build of the project from the dev_early_access_development_branch branch.

What could be the reason for this problem?

Environment Information

milindl commented 3 months ago

Thanks for trying this out - I'll give this a look.

milindl commented 3 months ago

Alright, I think I figured it out, the case where the rebalance_cb is a function in the config is handled correctly, but the case where it's a boolean is not handled correctly. I pushed a fix onto the dev_early_access_development_branch and it'll be included in the next release. I'll be adding a test for it too, but you can continue with your POC.

Also, one more thing, the incrementalAssign/incrementalUnassign (or for that matter, the assign/unassign), it's already called by the library in case where rebalance_cb is a boolean. The sequence of events is like this, for example in case of an assign:

  1. dispatch the rebalance event.
  2. your code calls incrementalAssign
  3. the library code calls incrementalAssign, too. (This will give you an error in rebalance.error of type ERR__CONFLICT, saying that whatever you're trying to assign is already assigned.)

You could ignore this error, or else, just change the values contained within the assignment that is passed to the rebalance event handler if you want to modify the assignment.

serj026 commented 3 months ago

@milindl Thanks for the explanation!

milindl commented 2 months ago

Fix included in latest release.