confluentinc / confluent-kafka-javascript

Confluent's Apache Kafka JavaScript client
https://www.npmjs.com/package/@confluentinc/kafka-javascript
MIT License
41 stars 5 forks source link

KafkaJS.Consumer: Pausing topics during rebalance_cb not supported #62

Open justjake opened 1 month ago

justjake commented 1 month ago

Environment Information

Steps to Reproduce

const LibrdKafka = require("@confluentinc/kafka-javascript")

async function main() {
    /** @type {import("@confluentinc/kafka-javascript").KafkaJS.Consumer} */
    let consumer

    const config = {
        kafkaJS: {
            clientId: `Repro-Client-${Date.now()}`,
            brokers: ["127.0.0.1:9092"],
            connectionTimeout: 7000,
            requestTimeout: 30000,
            retry: { initialRetryTime: 300, retries: 50, maxRetryTime: 600 },
            logLevel: LibrdKafka.KafkaJS.logLevel.ERROR,
            groupId: `Repro-Client-${Date.now()}-CG`,
            rebalanceTimeout: 60000,
            sessionTimeout: 45000,
            allowAutoTopicCreation: true,
            autoCommit: false,
            partitionAssigners: ["cooperative-sticky"],
        },
        "group.instance.id":
            `Repro-Client-${Date.now()}-Instance`,
        rebalance_cb: (err, assignments) => {
            const unflattened = []
            for (const { topic, partition } of assignments) {
                unflattened.push({ topic, partitions: [partition] })
            }
            consumer.pause(unflattened)
            console.log("rebalance_cb: paused during callback:", consumer.paused())
        },
    }

    const kafka = new LibrdKafka.KafkaJS.Kafka(config)
    consumer = kafka.consumer(config)

    await consumer.connect()
    // Fill in some topics you have on hand
    await consumer.subscribe({
        topics: [/^.*something.*/],
    })

    await consumer.run({
        eachMessage: async message => {
            console.log("Expected to never be called:", message)
        },
    })

    await new Promise(resolve => setTimeout(resolve, 50_000))
}

main()

Here's the console output I get:

rebalance_cb: paused during callback: [
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0001',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0002',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0003',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0004',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0005',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0006',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0007',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0008',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0009',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0010',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0011',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0012',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0013',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0014',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0015',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0016',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0017',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0019',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0020',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0021',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0022',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0023',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0024',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0025',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0026',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0027',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0028',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0029',
    partitions: [ 0 ]
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0030',
    partitions: [ 0 ]
  }
]

INTERNALS: incrementalAssign [
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0001',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0002',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0003',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0004',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0005',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0006',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0007',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0008',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0009',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0010',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0011',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0012',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0013',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0014',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0015',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0016',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0017',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0019',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0020',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0021',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0022',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0023',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0024',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0025',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0026',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0027',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0028',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0029',
    partition: 0
  },
  {
    topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0030',
    partition: 0
  }
]
Expected to never be called: {
  topic: 'jitl-confluent-sl-materializer-cdc-postgres-space-schema0018',
  partition: 0,
  message: {
    key: <Buffer 7b 22 69 64 22 3a 22 32 32 36 37 31 3a 31 35 34 36 37 30 34 38 38 22 2c 22 5f 5f 64 62 7a 5f 5f 70 68 79 73 69 63 61 6c 54 61 62 6c 65 49 64 65 6e 74 ... 96 more bytes>,
    value: <Buffer 7b 22 73 74 61 74 75 73 22 3a 22 42 45 47 49 4e 22 2c 22 69 64 22 3a 22 32 32 36 37 31 3a 31 35 34 36 37 30 34 38 38 22 2c 22 65 76 65 6e 74 5f 63 6f ... 56 more bytes>,
    timestamp: '1723065962194',
    attributes: 0,
    offset: '1543',
    size: 106,
    leaderEpoch: 21,
    headers: undefined
  },
  heartbeat: [AsyncFunction: heartbeat],
  pause: [Function: bound pause]
}
milindl commented 1 month ago

This is correct, at the moment, pause() can't be called from within the rebalance callback.

The current logic for when we receive partition assignments from the broker is: call user defined rebalance cb, and then "assign" partitions to the internal library. Before this assign, these partitions aren't counted within the assignment of the consumer.

Pause only pauses those topic partitions which are within the assignment. So it doesn't work from within the callback.

This is necessary because the user is allowed to change the assignment within the rebalance callback, so assigning before the callback isn't possible.

We're discussing this internally, there are two things possible;

  1. store a list of user-requested paused/resumed partitions and if we are assigned any such partition, pause it within the library. This gets slightly tricky to do when pausing/resuming entire topics.
  2. allow the user to call assign method themselves within the rebalance cb (in case they are called, the library will not call them, in case they are not, the library will call them after the rebalance cb). This allows for finer control but once we expose an API, it's impossible to remove it.

I'm also seeing if we can make the error handling for pause a little better, if there is a way to return a per-partition result in case a partition was not paused.

Thanks for the report!

justjake commented 1 month ago

Ok, that makes sense. I only expected “pause during rebalance_cb” to work because a Confluence support engineer suggested it. Maybe I misunderstood their guidance.

milindl commented 1 month ago

We discussed this within the team, we'll be exposing assign/unassign and their incremental variants to allow for this use case, so you'll be able to pause within the rebalance cb.

I'll check internally about the guidance, maybe there was a misunderstanding as this sort of an operation is supported within librdkafka.

milindl commented 2 weeks ago

Alright, I merged a PR which allows doing this, something like so:

            rebalance_cb: function (err, assignment, assignmentFns) {
                if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {

                    /* Assign first so we can pause. */
                    assignmentFns.assign(assignment);

                    /* Convert the assignment into format suitable for pause argument. */
                    const pausablePartitions = [{ topic: topicName, partitions: [0, 1, 2] }];
                    consumer.pause(pausablePartitions);
                } else { // This isn't strictly necessary - the library will call assign/unassign for you if you don't do it yourself.
                    assignmentFns.unassign(assignment);
                }
            }

It'll be available in the next release