Open marinakurtin opened 1 year ago
Would like to see this implemented too I had to hack it:
export class CustomKafkaServer extends ServerKafka {
async start(callback): Promise<void> {
const consumerOptions = Object.assign(this.options.consumer || {}, {
groupId: this.groupId,
});
this.consumer = this.client.consumer(consumerOptions);
this.producer = this.client.producer(this.options.producer);
this.registerEvents(this.instrumentationEvents); <--- add events here
await this.consumer.connect();
await this.producer.connect();
await this.bindEvents(this.consumer);
callback();
}
}
I created a PR to address crashing consumers based on the Instrumentation Events. Maybe it could be handle more generally.
Here is the related PR: https://github.com/nestjs/nest/pull/11910
So far the only way to accomplish this is to use a custom strategy that extends the built-in transporter. Example:
import { ServerKafka } from '@nestjs/microservices';
import { Consumer } from '@nestjs/microservices/external/kafka.interface';
class MyCustomStrategy extends ServerKafka {
async bindEvents(consumer: Consumer) {
consumer.on('consumer.heartbeat', () => console.log('...'));
await super.bindEvents(consumer);
}
}
// and later
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
strategy: new MyCustomStrategy({ ...kafka options here }),
});
Hey @micalevisk @johnbiundo @jmcdo29 @BrunnerLivio @MarkPieszak @valorkin, We're big fans of NestJS at our organization! :rocket: We've submitted a PR that adds kafkajs instrumentation events to NestJS, addressing a current gap. We'd be grateful if you could review it when you have time. Thanks!
Any updates regarding the review of the PR? This feature is crucial for a production environment, since you could get into the state, where the consumer dies, but your nest-service is still running without any issues. When a consumer crashes, the service would not be consuming anymore messages from kafka, but the service is still alive. I would not recommend to use the kafka consumer implementation in production until your cable of listening to consumer CRASH events or at least the nest-service carries over the failure.
I tried to address the consumer crash problem in a different PR, where the nestjs kafka consumer is always listening to these event in #11910
This feature isn't crucial as there's currently a different way to register event listeners. We're still debating if the approach/API proposed in this PR is actually simpler than what you can do now @esahin90 (https://github.com/nestjs/nest/issues/11616#issuecomment-1610883351)
I think that we shouldn't add new features related with kafka.js due to #13223
Is there an existing issue that is already proposing this?
Is your feature request related to a problem? Please describe it
I need an ability to monitor the events of my Kafka consumer
As far as I know it's already supported in the Kafka.js
I added the documentations reference in the documentation section below
I succeeded to find a workaround solution. On receive of the first message of kafka, I can extract the reference of the Consumer object and add listeners on it.
Here is code example
But in my opinion, it's a bad solution, because that way requires to check on each message if it's already handled or not.
Describe the solution you'd like
I would an ability to connect the Instrumentation events.
I would like to have a decorator, which would receive the name of the event and connect to it.
Or
I would like to have an ability to add it into the options object of the
NestFactory.createMicroservice
methodTeachability, documentation, adoption, migration strategy
https://kafka.js.org/docs/instrumentation-events
What is the motivation / use case for changing the behavior?
The ability to monitor the consumer behaviour