rajeshkumarbehura / ts-nestjs-kafka

Exmaple to Integrate kafkajs with nestjs & kakfka
75 stars 24 forks source link

Can not use a custom provider inside subscription handle methods #6

Open rodrigo2604 opened 3 years ago

rodrigo2604 commented 3 years ago

Hello Rajesh,

I tried to log the incoming KafkaMessage with my custom logger provider. As decorators runs before nestjs initialization, the this.logger got undefined since decorators have the first reference of that method, which has no the injected logger yet. I'm new on TS and NestJs and maybe this is a lack of concept for me, but do you know how I could use custom providers inside @SubscribeTo method?

// consummer.controller.ts

constructor(private logger: MyLogger) {
  this.logger.log('It logs normally');
}

@SubscribeTo('myTopic')
handleMessage(kafkaMessage: KafkaMessage) {
  console.log('Incoming message', kafkaMessage) // This line works
  this.logger.log(kafkaMessage); // this.logger is undefined
}

Thank you.

rajeshkumarbehura commented 3 years ago

Can you checkout previous commit 73f9102cc5b739c77216c50e5069b8e8d0ce3752. And test with logger. I think my new changes could be the issue. Please try with the previous commit and let me know.

rodrigo2604 commented 3 years ago

I've just forked the repo and test this issue on master and on the commit that you suggested and I still have the same issue. I think it is related to the subscribe decorators. Training with NestJS, those decorators might be interceptors, catching with swithToRpc microservices requests. Doing so maybe message handlers would have injected services.

aashutoshrathi commented 3 years ago

Hi, @rajeshkumarbehura I'm facing the same issue, I wanted to inject another service and use this inside function, but this is not the same in function as in the rest of the class.

hvitoi commented 3 years ago

I'm facing the have issue. Any updates? I'm gonna try the suggestion of @rodrigo2604 to switch the decorators to interceptors.

aashutoshrathi commented 3 years ago

@hvitoi I tried for around 6 hours reading about it and fixing this thing, the root cause is that the module (since global) gets initialized before the service we want to import in the constructor. (so it's like that decorator over any function will screw that function's this).

I had few solutions in mind:

I went for the second one, for now. The NestJS Kafka official docs make consuming easy, but publishing is easy with a setup like this repo, so went for the best of both. Hope this helps!

rajeshkumarbehura commented 3 years ago
@Module({
    imports: [
        ConfigModule.forRoot({
            isGlobal: true,
            envFilePath: `./env/.env.${process.env.APP_ENV}`,
        }),
        RavenModule,
        WinstonConfigModule.forConnection(),
        DBProviderModule.forConnection(),
        GraphqlConfigModule.forConnection(),
        MqttConfigModule.forConnection(),
        KafkaModule.register({
            clientId: 'test-app-client',
            brokers: ['localhost:9092'],
            groupId: 'test-app-group',
          })
    ] 
})
export class AppConfigModule {}

Here is sample of my configuration, how did i use it. I put all configuration in separate module, which must initiate before your application business module initiate. In this app config module, order of every module really matters. Like logger module initiate before Kafka module etc.

hvitoi commented 3 years ago

@rajeshkumarbehura Thanks for the ideas! Could you share the whole project here? I couldn't make it to use the SubscribeTo() decorator even with two separated instances of Kafka (one for the producer and another for the consumer)

nK2708 commented 3 years ago

I'm facing the have issue. Any updates?

StarryO1224 commented 1 year ago

the module initialized before others you want to inject, so the injection will be undefined, because it cannot get the instance from ioc. you can use EventEmitter to support the data transport, like my repo Animal-Go do, but i think it has a little stupid. or just like aashutoshrathi said, create a microservice and use event pub/sub to transport data, you can see in nestjs official doc.