Blizzard / node-rdkafka

Node.js bindings for librdkafka
MIT License
2.1k stars 390 forks source link

Consumer throws `Error: Need to specify a callback` #1004

Open PavelLoparev opened 1 year ago

PavelLoparev commented 1 year ago

Environment Information

Steps to Reproduce I have a wrapper around consumer.consume() method like this:

...
async consumeEvents(amount: number): Promise<Array<KafkaRawEvent>> {
    const { consumer } = this;

    return new Promise((resolve, reject) => {
        consumer.consume(amount, (err, messages) => {
            if (err) {
                return reject(err);
            }

            if (messages.length > 0) {
                return resolve(messages);
            }

            return resolve([]);
        });
    });
}
...

Then just call await this.consumeEvents(100);

node-rdkafka Configuration Settings

"client.id": "test",
"metadata.broker.list": "ip1,ip2,etc",
"group.id": "test_rdkafka",
"fetch.wait.max.ms": 50,
"socket.keepalive.enable": true,
rebalance_cb: true

Additional context I faced a weird issue that is reproducible only in the Docker container. Above I described a wrapper that I use for consuming events in non-flowing mode.

Local env (described above):

  1. Run await this.consumeEvents(100);
  2. It does connect to the broker and receives messages just fine.

Docker env

  1. Run await this.consumeEvents(100);
  2. It throws:
    Error: Need to specify a callback
    at KafkaConsumer._consumeNum (/app/node_modules/node-rdkafka/lib/kafka-consumer.js:466:16)
    at KafkaConsumer.consume (/app/node_modules/node-rdkafka/lib/kafka-consumer.js:399:10)
    at /app/dist/kafka/kafka.service.js:52:22
    at new Promise (<anonymous>)
    at KafkaService.consumeEvents (/app/dist/kafka/kafka.service.js:51:16)
    at KafkaService.onReady (/app/dist/kafka/kafka.service.js:35:46)
    at processTicksAndRejections (internal/process/task_queues.js:93:5)

/node-rdkafka/lib/kafka-consumer.js:466:16: image

I also found cpp code where this error is actually being thrown, I think it's here: image

As far as I understand it validates 3rd parameter and for some reason, it's not a function but as we see from this code it's actually a passed callback function.

I also tried different combinations of node versions, node-rdkafka libs - nothing works in a Docker container.

Additionally, tried to build & run this code in a container on Intel-based Mac and on aws ec2 instance - the same result, just throws a described error.

Please help to investigate farther.

marrinn commented 12 months ago

I had this issue and discovered it was due to the value for number of messages being passed to the consume method being a string and not a number. It was being read from an external config file and parsed as a string resulting in the "Need to specify a callback" error being thrown. Looking at your example above though, I can't see how that would occur in this instance.