Blizzard / node-rdkafka

Node.js bindings for librdkafka
MIT License
2.08k stars 391 forks source link

Frequent disconnect error LibrdKafkaError: Local: Timed out #4399 #1036

Open jayprakash941 opened 11 months ago

jayprakash941 commented 11 months ago

Environment Information

node-rdkafka Configuration Settings

Same as below Reference : https://github.com/confluentinc/librdkafka/issues/1101

we have tow producer instance running inside k8 pods, publishing messages to 1 topics and we don't have delivery report callback. we are seeing "Local:Message Timed Out" error frequently after some producer ideal time.

We are using latest stable version "node-rdkafka": "^2.17.0" in our nodejs project. "version": "v2.17.0", "description": "Node.js bindings for librdkafka", "librdkafka": "2.2.0", "main": "lib/index.js", we have 3 partition for each topics. messages to topics are grouped by our key.

Below is the my configs and code.

let kafkaConfig = { 'client.id': config.clientId, 'metadata.broker.list': config.hosts.split(','), 'compression.codec': 'gzip', 'retry.backoff.ms': 200, 'message.send.max.retries': 10, 'request.timeout.ms':900000, 'queue.buffering.max.messages': 100000, 'queue.buffering.max.ms': 1000, 'batch.num.messages': 1000000, 'message.timeout.ms': 10000000, 'linger.ms':100, 'batch.size':1000, 'dr_cb': false } let producer = new Kafka.Producer(kafkaConfig);

module.exports = {producer:producer}

producer.js

const producer = require('./index').producer; var config = require('../config').get('kafka');

let isProducerReady = false; let producerPromise = null;

async function createProducer() { if (!producerPromise) { producerPromise = new Promise(resolve => { producer.connect(); //logging all errors producer.on('event.error', function (err) { console.error('Error from producer', err); producer.disconnect(); producerPromise = null; createProducer(); });

  producer.on('ready', function (arg) {
    console.log('producer ready.' + JSON.stringify(arg));
    isProducerReady = true;
    resolve();
  });

  producer.on('disconnected', function (arg) {
    console.log('producer disconnected. ' + JSON.stringify(arg));
    producerPromise = null;
    createProducer();
  });
});

} return producerPromise; }

createProducer();

exports.add = async function (topic, messages) { try { console.log("Topic and message received-------", topic, messages); let topicName = topic`; if (!isProducerReady) { console.log('Producer is not ready yet. Please wait...'); await createProducer(); } const value = Buffer.from(JSON.stringify(messages)); producer.produce(topicName, config.kafka_partition, value, null, Date.now(), undefined); producer.poll(0); } catch (error) { console.error('Error occurred while publishing message:', error); throw error; } }

End of producer.js

Thanks in advance

Additional context PFA for server logs. error_kafka_producer_activity.log