tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.66k stars 518 forks source link

suggestion for autoreconnect in case when Kafka brokers are not accessible at the time of app start #1034

Open nenadlazovic opened 3 years ago

nenadlazovic commented 3 years ago

Using Kafkajs 1.15.0.

Issue: ensure consumer / producer always keep reconnecting in all scenarios

Our app is mobile app that connects to Kafka cluster. App have 1 consumer / 1 producer . Network used to connect to brokers may go down. Connectivity to Kafka has to be maintained and re-established when Kafka cluster is accessible.

We noticed that

We would appreciate if you could answer

  1. Is above expected behavior ?
  2. Is there an configuration setting or maybe suggested code handling (e.g. re try to connect in case of KafkaJSNumberOfRetriesExceeded) ... to ensure producer / consumer re-connect in this scenario ?

Current kafkajs init code is pretty standard ...

` async init(connStatusListener) { this.connStatusListener = connStatusListener;

    this.mqDeviceTopicName =...;
    this.mqGroupId = ...;

    let pwd = fs.readFileSync(registrationRoot + '/' + mSecurity.client.privateKeyPwdFile, 'utf-8');

    //https://kafka.js.org/docs/retry-detailed
    this.kafka = new Kafka({
        logLevel: logLevel[global.config.get("mq.logLevel")],
        brokers: global.config.get("mq.brokers"),
        ssl: {
            rejectUnauthorized: false,
            ca: [fs.readFileSync(registrationRoot + '/' + mSecurity.ca.certificateFile, 'utf-8')],
            key: fs.readFileSync(registrationRoot + '/' + mSecurity.client.privateKeyFile, 'utf-8'),
            passphrase: pwd.trim(),
            cert: fs.readFileSync(registrationRoot + '/' + mSecurity.client.certificateFile, 'utf-8')
        }
    });

    //initialize producer
    this.producer = this.kafka.producer();
    const producerConnectRemoveListener = this.producer.on(this.producer.events.CONNECT, (e) => {
        logger.info(`Producer connect at ${e.timestamp}`);
        if (this.connStatusListener) this.connStatusListener('producer', e);
    });
    const producerDisconnectRemoveListener = this.producer.on(this.producer.events.DISCONNECT, (e) => {
        logger.error(`Producer disconnect at ${e.timestamp}`);
        if (this.connStatusListener) this.connStatusListener('producer', e);
    });
    const runProducer = async () => {
        await this.producer.connect();
    }

    runProducer().catch(e => {
        logger.debug(`MQService, producer error ${e.message}`);
    });

    //initialize consumer
    this.consumer = this.kafka.consumer({groupId: this.mqGroupId});//, allowAutoTopicCreation: false });
    const consumerHeartbeatRemoveListener = this.consumer.on(this.consumer.events.HEARTBEAT, (e) => {
        logger.info(`Consumer heartbeat at ${e.timestamp}`);
        if (this.connStatusListener) this.connStatusListener('consumer', e);
    });
    const consumerConnectRemoveListener = this.consumer.on(this.consumer.events.CONNECT, (e) => {
        logger.info(`Consumer connect at ${e.timestamp}`);
        if (this.connStatusListener) this.connStatusListener('consumer', e);
    });
    const consumerDisconnectRemoveListener = this.consumer.on(this.consumer.events.DISCONNECT, (e) => {
        logger.error(`Consumer disconnect at ${e.timestamp}`);
        if (this.connStatusListener) this.connStatusListener('consumer', e);
    });
    const consumerCrashRemoveListener = this.consumer.on(this.consumer.events.CRASH, (e) => {
        logger.error(`Consumer crash at ${e.timestamp}`);
        if (this.connStatusListener) this.connStatusListener('consumer', e);
    });
    const consumerStopRemoveListener = this.consumer.on(this.consumer.events.STOP, (e) => {
        logger.error(`Consumer stop at ${e.timestamp}`);
        if (this.connStatusListener) this.connStatusListener('consumer', e);
    });

    const runConsumer = async () => {
        await this.consumer.connect();
        await this.consumer.subscribe({ topic: this.mqDeviceTopicName, fromBeginning: true });
        await this.consumer.run({
            eachMessage: async ({topic, partition, message}) => {
                logger.debug({
                    partition,
                    offset: message.offset,
                    value: message.value ? message.value.toString() : "",
                });
                await remoteCommands.processMqMessage(message.value.toString());
            }
        })
    }

    runConsumer().catch(e => {
        logger.debug(`MQService, consumer error ${e.message}`);
    });
}

`

Aweller95 commented 2 years ago

I'm facing into the same issue, where our app starts, if the consumer is down, the app crashes. I'm looking for / looking to build a way of intermittently retrying kafka connections while the app runs, however no successes yet.

Attempting to put the connect in a try/catch block does get past the app exiting, however I'm looking for a way to attempt a reconnection after a given time period:

async connect() {
    const kafka = new Kafka(this.options.client);
    this.producer = kafka.producer();

    try {
      this.producer.connect();
    } catch (err) {
      setTimeout(async () => {
        await this.producer.connect();
      }, 1000 * 60 * 5); // retry connection every 5 minutes
    }
  }

This block unfortunately doesn't work as you'd expect.