golevelup / nestjs

A collection of badass modules and utilities to help you level up your NestJS applications 🚀
MIT License
2.3k stars 268 forks source link

[RabbitMQ] How do I configure Dead Letter Queues? (Or retry policy) #138

Closed saarikivit closed 2 years ago

saarikivit commented 4 years ago

Hey! Nice lib :ok_hand: :ok_hand: :ok_hand:

Today I ran into an issue: An infinite loop of very fast retries when a "RabbitSubscribe" handler errors and keeps erroring.

How do I configure a good retry policy with this library?

I'd optimally want something like "every 10 seconds for 6 rounds -> then every 15 minutes" though I understand that might be impossible.

Thanks for your time! :pray:

rkalkani commented 4 years ago

Yes, Nice lib, as well the helper's module to simplify writing other nests lib.

@saarikivit You can write a custom error handling/retry behavior and set as a custom handler to errorHandler property.

@RabbitSubscribe({
    ......
    errorHandler: (channel: amqplib.Channel, msg: amqplib.ConsumeMessage, error: any) => Promise<void>
  })

I just started to write some sophisticated retry policies in one of my projects today. I am using https://github.com/guidesmiths/rascal#chaining-recovery-strategies as a reference. There are also some simple strategies that you can use to avoid infinite retry.

I hope I will able to post more progress or solution soon.

saarikivit commented 4 years ago

I found "RabbitSubscribe" to have "queueOptions" that includes "deadLetterExchange" :thinking:

But implementing some type of thing like: https://medium.com/@igkuz/ruby-retry-scheduled-tasks-with-dead-letter-exchange-in-rabbitmq-9e38aa39089b seems to be not doable automatically. Making a DLX and DLQ with the decorators would bind them to a function, which would result in the same thing as without = bursting my service until I fix the bug.

I have to create the DLQ manually and configure a TTL to it + the DLQ's DLQ would have to be the working queue. That is how everything would happen Rabbit-side and calls to my server would only happen with the worker queue.

Any ideas how this could be implemented with and in the lib?

Currently I resort to having this in my module.forRoot:

  defaultRpcTimeout: 4000,
  defaultRpcErrorBehavior: MessageHandlerErrorBehavior.NACK,
  defaultSubscribeErrorBehavior: MessageHandlerErrorBehavior.NACK

I'm perhaps adding https://docs.nestjs.com/techniques/queues Bull as the service who handles an exponential backoff for my stuff that I really need to work at some point.

rkalkani commented 4 years ago

Yes, I am also stuck at the same step on how to assert a queue when service starts so that I do not need to create it manually.

That DLX and DLQ example is similar to what is in chaining recovery strategies explanation on the rascal repo. https://github.com/guidesmiths/rascal#chaining-recovery-strategies

On medium repo, the usage of 'CC' header is not explained but it may also be needed.

I am trying to implement it as a custom error handler.

If you already have Redis on deployment, the bull can be a good solution to handle the redelivered messages or messages. Or use it instead of rabbitmq, if your requirement is work queue only.

If you are not looking for a time delay between two tries and do not care about an order then you can try re-queue with attempts. You can have a delay in code before re-queue or handling redelivered message but it does not survive the service restart (in that case you have message is handled as soon as service is back again). However for most of the time when the error is due to external service, and this one will be simplest to implement.

saarikivit commented 4 years ago

Yep. Sounds too hacky for my taste :smile:. Maybe there is a better tool for this job.

Bull seems good for the retries. Supported natively by Nest. But before an item enters bull, I need some event based fan-out thing..

Google cloud pub sub might solve my needs. Though It does not have a ready solution.

WonderPanda commented 4 years ago

Hey loving the back and forth with this one. With the introduction of the new errorHandler functionality I was definitely hoping to start developing some better community patterns for things like backoff and retry with DLQ. The link that @rkalkani provided for the rascal repo looks really interesting I'll spend some more time looking through it and see if there's anything we can pull into the NestJS ecosystem

saarikivit commented 4 years ago

Let me know if I can help! 🙏

WonderPanda commented 4 years ago

@saarikivit I'm curious about your thoughts around Google Pub Sub. What functionality does it provide that you think might make these scenarios easier? I actually started a Google Pub Sub module a while back that is half finished that I could potentially revisit

saarikivit commented 4 years ago

@WonderPanda actually that was my fluke. A mixup with GC Tasks.

Currently GC pub/sub provides DLQs. So the implementation would be very similar to RabbitMQ.

https://cloud.google.com/pubsub/docs/dead-letter-topics

The good thing about their API is that they have the retry attempt# in the message object which allows simple retry policies. (Retry 5 times before sent to DLQ) I have not found the same property in AMQP. 🤔

iamonkey commented 3 years ago

Hey @saarikivit @WonderPanda and @rkalkani where did you get to this? I'm having the same challenge :-)

saarikivit commented 3 years ago

I decided to make the error handling policy to not retry on error. Thus removing the infinite loops.

Currently I'm removing this from my stack and using nest js events with bull. I realized I really don't need this yet at all.

WonderPanda commented 3 years ago

@sz-adminuser At the moment my best answer is to include appropriate error handling inside of your message handlers so that they don't bubble up and cause a retry.

There are so many possible ways different people may want to uniquely handle error setup that I never put anything opinionated in the library

You have full access to the amqp connection so it should be possible to for example use a NestJS Interceptor to catch errors and automatically issue a new message to any queue of your choosing (for DLQ) or do error reporting/handling as makes sense for your implementation

iamonkey commented 3 years ago

Hi Thanks for responding. This is what I have done and it seems to work well....

Created a Dead Letter Exchange (DLX1) that rejected events go to. DLX1 puts the events back for retrying or removes them after certain number of retries and persists them in a DB.
Those put back in go to a delay exchange which has no subscribers but has a second Dead Letter Exchange (DLX2). Messages on the delay exchange are added with incrementally increasing expiry times - given there are no subscribers, they all expire and reject. DLX2 picks these up and republished them back on the original route.

I managed it all with your library :-), apart from the creating of the queue without any subscribers.

More details of this similar set up here. https://www.cloudamqp.com/docs/delayed-messages.html https://blog.idempotent.ca/starred/2015-04-30-use-rabbitmq-dlx-to-implement-delayed-retry/

iamonkey commented 3 years ago

One quick q... is there a way to specify centrally (i.e in RabbitMQConfig) that all queues default to not requing on the event of error (defaultNackErrorHandler)? At the moment whenever an exception occurs that is not caught it is putting the event back into the queue. We fix this by using errorHandler ... but it worries me that other members of the team will miss including this and it will kill the server very quickly.

@RabbitSubscribe({ errorHandler: defaultNackErrorHandler,

iamonkey commented 3 years ago

For anyone following up on setting default so that it does not automatically requeue:

defaultSubscribeErrorBehavior

  createModuleConfig(): RabbitMQConfig {
    return {
      uri: this.CLOUDAMQP_URL,
      defaultSubscribeErrorBehavior: MessageHandlerErrorBehavior.NACK,
      exchanges: [
        {
          name: GlobalEventSettings.EXCHANGE,
          type: 'topic',
        },
iamonkey commented 3 years ago

Another excellent feature that has really helped us is prefetchCount. We have a Pupperteer MS running that was crashing if it tried to do too many requests at once. This limits the number of messages a consumer can take that are not ack. More info here.

@WonderPanda it would be even more helpful to be able to put this setting on a per subscriber basis, rather than global?

createModuleConfig(): RabbitMQConfig {
    return {
      prefetchCount: 3,
      defaultSubscribeErrorBehavior: MessageHandlerErrorBehavior.NACK,
      exchanges: [
        {
          name: GlobalEventSettings.EXCHANGE,
          type: 'topic',
        },
        {
          name: GlobalEventSettings.DLX,
          type: 'topic',
        },
      ],
renato-spaka commented 3 years ago

@iamonkey & @WonderPanda , this discussion is so rich, thank you.

Nevertheless, I have my own questions about this: Where and when should I set QueryOptions (expires, deadLetterExchange, deadLetterRoutingKey)? Where is the definiton of amqplib.Options.AssertExchange (what are the values?) for RabbitMQExchangeConfig?

sergei-pankratov commented 2 years ago

I found "RabbitSubscribe" to have "queueOptions" that includes "deadLetterExchange" 🤔

But implementing some type of thing like: https://medium.com/@igkuz/ruby-retry-scheduled-tasks-with-dead-letter-exchange-in-rabbitmq-9e38aa39089b seems to be not doable automatically. Making a DLX and DLQ with the decorators would bind them to a function, which would result in the same thing as without = bursting my service until I fix the bug.

I have to create the DLQ manually and configure a TTL to it + the DLQ's DLQ would have to be the working queue. That is how everything would happen Rabbit-side and calls to my server would only happen with the worker queue.

Any ideas how this could be implemented with and in the lib?

Currently I resort to having this in my module.forRoot:

  defaultRpcTimeout: 4000,
  defaultRpcErrorBehavior: MessageHandlerErrorBehavior.NACK,
  defaultSubscribeErrorBehavior: MessageHandlerErrorBehavior.NACK

I'm perhaps adding https://docs.nestjs.com/techniques/queues Bull as the service who handles an exponential backoff for my stuff that I really need to work at some point.

I also read that article and wanted to implement something like that. What I ended up is a inherited module with overridden OnApplicationBootstrap method, which scans handlers and configures additional DLX queues and exchanges before calling the parent method Also I added an adapter for a RabbitSubscribe decorator, something like @RabbitSubscribe(MessageToQueueDefAdapter(JobCreatedV1, 'processing')) In the adapter I set up a handler that acks message and redirects it to different queue if it exceeded retries count For those interested I posted an example repo here https://github.com/tehnetaholic/netsjs-rabbitmq-with-retries/blob/main/src/rabbit/rabbitModule.ts