golevelup / nestjs

A collection of badass modules and utilities to help you level up your NestJS applications 🚀
MIT License
2.23k stars 259 forks source link

RabbitMQ: Queues not recreated in all circumstances #628

Open ssilve1989 opened 1 year ago

ssilve1989 commented 1 year ago

As originally outlined here: https://github.com/golevelup/nestjs/issues/239 with an updated comment by me https://github.com/golevelup/nestjs/issues/239#issuecomment-1653912986

There is an issue with queue creation when connected to a Rabbit cluster.

I can consistently reproduce the following case:

These replicas when reconnected to rabbit, as evidenced by a k8s health check and checking managedConnection.isConnected() never re-create a new queue binding for the service.

Interestingly enough, if you restart all nodes in the rabbit cluster in rolling restart fashion, when the final node has restarted then all queues will be recreated, but until then each replica when reconnecting does not make any new queues.

ssilve1989 commented 1 year ago

Upon further investigation, it looks like what's happening is:

I would have expected an error event to have been emitted on the Channel, but that doesn't look like it would really do anything anyway since the library only logs something on that event handler.

underfisk commented 1 year ago

@ssilve1989 Thanks for the repro and thorough investigation. As of now i probably don't have that much time free to provide a fix for this as it might be also an issue with the underlying library or just how we setup/configure the connection manager. If you're willing to provide a fix that would be very much appreciated 🙏

ssilve1989 commented 1 year ago

Implementing a fix would be possible if the error event actually emitted on the channel. My naive understanding is that it should emit an error when this happens but it doesn't. Maybe thats a problem with amqplib. I was also able to reproduce this in a single-node instance by just deleting the queue from the Management console, no error event was emitted.

My naive workaround atm is to patch the @golevelup/nestjs-rabbitmq library to track the queues it creates and as part of a k8s health check, query those and call checkQueue to make sure they still exist. Maybe the library already has a way to get the auto-generated queue names? I didn't see one though.

I'll see if I can figure something out with amqplib when I get some time

underfisk commented 1 year ago

@ssilve1989 Thank you!

ssilve1989 commented 1 year ago

So as pointed out here https://github.com/amqp-node/amqplib/issues/736 it looks like we'd need to respond to a null message as the signal that consumption has stopped and to re-create the queue. The library could/should probably expose options on what to do in such a case, such as throw or implicitly re-create the queue.

WonderPanda commented 1 year ago

So as pointed out here amqp-node/amqplib#736 it looks like we'd need to respond to a null message as the signal that consumption as stopped and to re-create the queue. The library could/should probably expose options on what to do in such a case, such as throw or implicitly re-create the queue.

Nice this is a great discovery! Do you have an interest in helping to add this functionality to the library?

ssilve1989 commented 1 year ago

@WonderPanda Yea, I'm looking into adding just a similar option like the other error handlers as consumerCancellationHandler and letting the consumer decide what to do. At least for my use-case, I'll probably just throw an error, but I think passing along the channel + queueName might be ok for other use-cases.

ssilve1989 commented 1 year ago

I have something started here but haven't throughly tested it yet. I don't think I'll have time to test it until maybe the end of the week.

I'm not sure this approach is suitable though for cases where the consumer wants the library to auto-recreate/subscribe to the queue again, since providing the channel/queue like this doesn't enable them to bind back to the decorated methods right?