Blizzard / node-rdkafka

Node.js bindings for librdkafka
MIT License
2.11k stars 392 forks source link

Exception When Flushing Producer #1080

Closed bliu13 closed 3 months ago

bliu13 commented 3 months ago

Environment Information

Steps to Reproduce

  1. Instantiate Kafka producer client
  2. Encode message in AVRO
  3. Send encoded message
  4. Flush Kafka producer client

When producer.flush() is called, the client crashes with the message:

Error: Need to specify a timeout and a callback
    at Producer.flush (/app/node_modules/node-rdkafka/lib/producer.js:252:16)
    at file:///app/helpers/getKafkaProducer.js:76:16
    at new Promise (<anonymous>)
    at flushedClose (file:///app/helpers/getKafkaProducer.js:74:10)
    at handler (file:///app/index.js:52:11)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)

Here's a snippet of getKafkaProducer.js:

export async function flushedClose (producer) {
  return new Promise((resolve, reject) => {
    if (producer && producer.isConnected()) {
      producer.flush(5000, (e) => {
        if (e) {
          return reject(e)
        }
      }).disconnect(5000, (e, metrics) => {
        if (e) {
          return reject(e)
        }

        if (metrics) {
          console.log('Kafka disconnect metrics', metrics)
        }

        return resolve(metrics)
      })
    }
  })
}

node-rdkafka Configuration Settings

{
    'metadata.broker.list': 'Some Brokers',
    'client.id': 'Some Id',
    'group.id': 'Some Group Id',
    'retry.backoff.ms': 200,
    'message.send.max.retries': 5,
    'socket.keepalive.enable': true,
    'compression.codec': 'lz4'
}

Additional context I followed the index.d.ts type definition for flush in the Producer class:

export class Producer extends Client<KafkaProducerEvents> {
  ...

  flush(timeout?: NumberNullUndefined, cb?: (err: LibrdKafkaError) => void): this;
}
bliu13 commented 3 months ago

I think I may have found two issues. I should disconnect when the produce.flush() callback is called. The code would look like this:

export async function flushedClose (producer) {
  return new Promise((resolve, reject) => {
    if (producer && producer.isConnected()) {
      producer.flush(5000, (e) => {
        if (e) {
          return reject(e)
        }

        producer.disconnect(5000, (e, metrics) => {
          if (e) {
            return reject(e)
          }

          if (metrics) {
            console.debug('Kafka disconnect metrics', metrics)
          }

          return resolve(metrics)
        })
      })
    }
  })
}

The second problem was that the function was simplified, and the timeouts were using environment variables and the values were not parsed into numbers.