nestjs / nest

A progressive Node.js framework for building efficient, scalable, and enterprise-grade server-side applications with TypeScript/JavaScript 🚀
https://nestjs.com
MIT License
67.47k stars 7.6k forks source link

Expose Producer in KafkaContext #10263

Closed Papooch closed 2 years ago

Papooch commented 2 years ago

Is there an existing issue that is already proposing this?

Is your feature request related to a problem? Please describe it

When implementing various error handling strategies using Kafka (retry topic, dead letter queue) using an Exception filter, I need some way of producing messages in there - one way is injecting the client, which IMHO violates the separation of concerns and requires to import the clients module where the exception filter is used.

I found it useful to put the producer to the context object, so I can retrieve it in the exception filter and put the message to a retry topic or to the DLQ based on the error severity.

For that to work, I have to create a custom transporter by extending the base KafkaServer and overwriting a great deal of the code - basically, I have to copy-and-paste the entirety of the handleMessage method only to change the context class creation.

Describe the solution you'd like

I see two solutions here: 1) Just include the producer in the KafkaContext 2) Create a protected createContext method that can be overriden by child classes

Teachability, documentation, adoption, migration strategy

If the producer is added as the last parameter to the KafkaContextArgs, then the change will not be breaking at all (but it wouldn't look pretty) and it would immediately be easier to create retry strategies based on producing messages.

What is the motivation / use case for changing the behavior?

Most serious Kafka implementations will require some kind of retry and dead letter topics, so exposing the producer in the context would make it easier to adopt the practice instead of working around the framework limitations.

Papooch commented 2 years ago

To clarify, what I'm proposing is to be able to do this:

@Catch(UnrecoverableException)
export class AllExceptionsFilter implements ExceptionFilter {
   catch(exception: UnrecoverableException, host: ArgumentsHost): void {
       const context = host.switchToRpc().getContext()
       const producer = context.getProducer() // <-- this is new
       await producer.emit({ topic: `${context.getTopic()}.dlq`, messages: [/* ... */] })
                        // ^ here we can emit to a different topic based on the exception
   }
}

Which would require adding a new parameter to the KafkaContext

type KafkaContextArgs = [
  message: KafkaMessage,
  partition: number,
  topic: string,
  consumer: Consumer,
  heartbeat: () => Promise<void>,
+ producer: Producer
];
kamilmysliwiec commented 2 years ago

Sounds good. Would you like to create a PR for this @Papooch?

kamilmysliwiec commented 2 years ago

Let's track this here https://github.com/nestjs/nest/pull/10272