nestjs / terminus

Terminus module for Nest framework (node.js) :robot:
https://nestjs.com/
MIT License
671 stars 101 forks source link

Kafka Microservice Consumer Healthcheck #2550

Open edeesis opened 3 months ago

edeesis commented 3 months ago

Is there an existing issue that is already proposing this?

Is your feature request related to a problem? Please describe it

The Microservice Health Indicator only checks whether a producer can connect to Kafka, but I’m also interested in the state of the consumer. If the consumer has crashed, then I want the health indicator to fail.

unfortunately, implementing this myself I ran into a few issues:

1) accessing the ServerKafka instance, which contains the consumer, doesn’t seem possible in the context of a health indicator 2) there’s no easy way to subscribe to instrumentation events, see https://github.com/nestjs/nest/issues/11616

Describe the solution you'd like

I’d like the Microservice Health Indicator to also check the status of the Server side, not just the client side.

Teachability, documentation, adoption, migration strategy

The usage should be the same as the existing MicroserviceHealthIndicator, though it shouldn’t require instantiating a new instance of ServerKafka, it should use the existing one.

What is the motivation / use case for changing the behavior?

I need to be able to know if the consumer has crashed and include that information in a liveness check.

edeesis commented 3 months ago

My workaround I've found is to move the custom microservice implementation into a module in the application context, add a public method to expose the consumer and then use app.resolve to fetch it to run connectMicroservice:

export class KafkaMicroserviceServer extends ServerKafka {

    getConsumer(): Consumer {
      return this.consumer;
    }
export class KafkaConsumerHealthIndicator extends HealthIndicator implements OnModuleInit {
  private readonly crashEvents: { [groupId: string]: ConsumerCrashEvent | undefined } = {};

  constructor(
    private readonly kafkaMicroserviceServer: KafkaMicroserviceServer
  ) {
    super();
  }

  async onModuleInit(): Promise<void> {
    const consumer = this.kafkaMicroserviceServer.getConsumer();
    const { groupId } = await consumer.describeGroup();
    this.crashEvents[groupId] = undefined;
    consumer.on(consumer.events.CRASH, (event) => {
      this.crashEvents[event.payload.groupId] = event;
    });
  }
    const kafkaMicroserviceServer = await app.resolve(KafkaMicroserviceServer);
    app.connectMicroservice({
      strategy: kafkaMicroserviceServer,
    });