tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

[Question] How to restart the consumer on the crash event handler? #1443

Open FelipeEmerim opened 2 years ago

FelipeEmerim commented 2 years ago

Is your feature request related to a problem? Please describe. We are using kafkaJS through NestJS microservices package. We have a hybrid app with multiple microservices, some using kafka while other use Azure service bus. Non-retriable errors work fine in most cases as they indicate a failure in the configuration and cannot be fixed by simply retrying.

However, there is a problem with the UNKNOWN_SERVER_ERROR, this error is thrown by our broker from time to time (we are investigating why that happens with the Azure support team) and restarting the consumer does fix the issue. The restart does not happen as of kafkaJS 1.16 because, according to the kafka protocol, this is a non-retriable error.

Note that NestJS crashes if an error happens while the application is initializing, but since this error happens after all consumers have already sucessfully started, the application keeps running without a kafka consumer and never recovers.

Describe the solution you'd like I would like to know if there is a way to cleanly restart the consumer on a non-retriable error using the CRASH event and, if possible, do that only on that specific error type. This is mentioned in the instrumentation events docs, but I couldn't find an example on how to do it.

If you think our issue is worth a change in KafkaJS, I think the restartOnFailure callback could be changed to always be invoked, and its default value could be to return true on retriable errors only, allowing users of this lib to change that behavior.

Additional context We tried to perform the restart manually using this code:

this.consumer.on('consumer.crash', async (error) => {
      if (error.payload.error instanceof KafkaJSNonRetriableError) {
        this.logger.error({
          message: 'Consumer crashed on non-retriable error: restarting',
          context: ServerKafka.name,
        });
        this.logger.error({
          ...error.payload.error,
          context: ServerKafka.name,
        });
        await this.consumer.connect();
      }
    });

Sadly, it did not work the way we intended as the consumer does not restart. I also checked the way the lib itself handles restarts. From what I saw it invokes the start function using a setTimeout call to apply the retry timeout. Since we cannot use that function, I have no clue on how to achieve a clean consumer restart.

Environment:

FelipeEmerim commented 2 years ago

We managed to make it work with the following code:

export function shouldRestart(error: Error): boolean {
  const isNonRetriableError = error instanceof KafkaJSNonRetriableError;
  const isNumberOfRetriesExceeded =
    error instanceof KafkaJSNumberOfRetriesExceeded;

  return isNonRetriableError && !isNumberOfRetriesExceeded;
}

this.consumer.on('consumer.crash', async (payload) => {
      if (!shouldRestart(payload.payload.error)) {
        return;
      }

      this.logger.error({
        message: 'Consumer crashed on non-retriable error: restarting',
        context: ClientKafka.name,
      });
      this.logger.error({
        ...payload.payload.error,
        context: ClientKafka.name,
      });

      try {
        await this.consumer.disconnect();
      } finally {
        setTimeout(async () => {
          await this.consumer.connect();
          await this.bindTopics();
          this.logger.warn({
            context: ClientKafka.name,
            message: 'Restarted Consumer on non-retriable error',
          });
        }, 5000);
      }
    });

Apparently, after a reconnect you have to subscribe to the topics again and call the method consumer.run which is what the bindTopics function does.

pinturic commented 2 years ago

Hi,

I have a similar logic in the way I am reconnecting and sometimes it is not working crrectly so to understand better I have a couple of questions:

  1. you are computing if you need to restart by calling a custom shouldRestart function. The documentation says that there is a parameter called crash that coud be useful in this case: https://kafka.js.org/docs/instrumentation-events. This is the code rising the event: https://github.com/tulios/kafkajs/blob/v2.2.2/src/consumer/index.js#L286.
  2. Why you are delaying th restart in 5 secods ?

Thank you very much

FelipeEmerim commented 2 years ago

Hi,

We made that shouldRestart function so that we could restart the consumer on non-retriable errors. Since we have a hybrid application (http + kafka + rabbitMQ) it is undesirable to restart the whole application when kafka crashes because of an unknown server error.

We don't have access to the crash parameter in the custom shouldRestart, we only have access to what kafkaJS sends in the crash event payload since it is this handler that calls the custom shouldRestart logic. This also explains the 5 seconds delay.

Ideally we would honor the configured retry timeouts for the consumer, but since we cannot access them in the crash event we used a magic number of 5 seconds. This works in our case because Azure event hubs has a problem that causes an unknown server error and, after 5 seconds, we can safely reconnect to the broker.

pinturic commented 2 years ago

the payload object contains a restart key that you could use instead of having your shouldRestart function ...

FelipeEmerim commented 2 years ago

We could restart on !restart, I believe it would give us the same results. We've made the custom function because we wanted to restart on specific non-retriable errors. It is not desirable to restart on configuration errors as those cannot be fixed by a restart.

Since we did not find a way to know which non-retriable error caused the crash, the function in the code snippet behaves the same as using !restart

pinturic commented 2 years ago

oki oki undertood! thanks

stravi commented 1 year ago

We managed to make it work with the following code:

export function shouldRestart(error: Error): boolean {
  const isNonRetriableError = error instanceof KafkaJSNonRetriableError;
  const isNumberOfRetriesExceeded =
    error instanceof KafkaJSNumberOfRetriesExceeded;

  return isNonRetriableError && !isNumberOfRetriesExceeded;
}

this.consumer.on('consumer.crash', async (payload) => {
      if (!shouldRestart(payload.payload.error)) {
        return;
      }

      this.logger.error({
        message: 'Consumer crashed on non-retriable error: restarting',
        context: ClientKafka.name,
      });
      this.logger.error({
        ...payload.payload.error,
        context: ClientKafka.name,
      });

      try {
        await this.consumer.disconnect();
      } finally {
        setTimeout(async () => {
          await this.consumer.connect();
          await this.bindTopics();
          this.logger.warn({
            context: ClientKafka.name,
            message: 'Restarted Consumer on non-retriable error',
          });
        }, 5000);
      }
    });

Apparently, after a reconnect you have to subscribe to the topics again and call the method consumer.run which is what the bindTopics function does.

Hi, thank you for providing your solution. I am wondering how have you tested that it works as expected?