golevelup / nestjs

A collection of badass modules and utilities to help you level up your NestJS applications 🚀
MIT License
2.28k stars 263 forks source link

consumer won't re attach to a queue when rabbitmq cluster gets restarted in 1/5 cases #691

Open Loki-Afro opened 9 months ago

Loki-Afro commented 9 months ago

hey, we run a 3 node rabbitmq cluster where when we restart the cluster in a 1/5 chance the one consumer will not re-connect probably.

It seems like that under certain circumstances the consumer "gets stuck" there is a valid tcp connection there is a valid channel (or 2 not so sure, its an rpc consumer) but there is no consumer attached to the queue as observed in the rabbitmq management ui

the consumer is just that, an app that only does that and nothing else.

2024-02-05T10:02:40.071620174Z [NestWinston] Error    2024-02-05 11:02:40.071 unhandledRejection: Channel ended, no reply will be forthcoming
2024-02-05T10:02:40.071637334Z Error: Channel ended, no reply will be forthcoming
2024-02-05T10:02:40.071641620Z     at rej (/schulcloud-server/node_modules/amqplib/lib/channel.js:201:7)
2024-02-05T10:02:40.071646532Z     at ConfirmChannel.C._rejectPending (/schulcloud-server/node_modules/amqplib/lib/channel.js:207:42)
2024-02-05T10:02:40.071650197Z     at ConfirmChannel.C.toClosed (/schulcloud-server/node_modules/amqplib/lib/channel.js:171:8)
2024-02-05T10:02:40.071653460Z     at Connection.C._closeChannels (/schulcloud-server/node_modules/amqplib/lib/connection.js:394:18)
2024-02-05T10:02:40.071656725Z     at Connection.C.toClosed (/schulcloud-server/node_modules/amqplib/lib/connection.js:401:8)
2024-02-05T10:02:40.071659930Z     at Connection.C.onSocketError (/schulcloud-server/node_modules/amqplib/lib/connection.js:355:10)
2024-02-05T10:02:40.071663518Z     at Connection.emit (node:events:517:28)
2024-02-05T10:02:40.071666965Z     at Socket.go (/schulcloud-server/node_modules/amqplib/lib/connection.js:481:12)
2024-02-05T10:02:40.071670268Z     at Socket.emit (node:events:517:28)
2024-02-05T10:02:40.071673667Z     at emitReadable_ (node:internal/streams/readable:633:12)
2024-02-05T10:02:40.071688708Z     at processTicksAndRejections (node:internal/process/task_queues:81:21) - {"error":{"cause":{},"isOperational":true},"stack":"Error: Channel ended, no reply will be forthcoming\n    at rej (/schulcloud-server/node_modules/amqplib/lib/channel.js:201:7)\n    at ConfirmChannel.C._rejectPending (/schulcloud-server/node_modules/amqplib/lib/channel.js:207:42)\n    at ConfirmChannel.C.toClosed (/schulcloud-server/node_modules/amqplib/lib/channel.js:171:8)\n    at Connection.C._closeChannels (/schulcloud-server/node_modules/amqplib/lib/connection.js:394:18)\n    at Connection.C.toClosed (/schulcloud-server/node_modules/amqplib/lib/connection.js:401:8)\n    at Connection.C.onSocketError (/schulcloud-server/node_modules/amqplib/lib/connection.js:355:10)\n    at Connection.emit (node:events:517:28)\n    at Socket.go (/schulcloud-server/node_modules/amqplib/lib/connection.js:481:12)\n    at Socket.emit (node:events:517:28)\n    at emitReadable_ (node:internal/streams/readable:633:12)\n    at processTicksAndRejections (node:internal/process/task_queues:81:21)","exception":true,"date":"Mon Feb 05 2024 11:02:40 GMT+0100 (Central European Standard Time)","process":{"pid":17,"uid":1000,"gid":1000,"cwd":"/schulcloud-server","execPath":"/usr/local/bin/node","version":"v18.19.0","argv":["/usr/local/bin/node","/schulcloud-server/dist/apps/server/apps/preview-generator-consumer.app"],"memoryUsage":{"rss":98619392,"heapTotal":41897984,"heapUsed":37594488,"external":1600487,"arrayBuffers":223086}},"os":{"loadavg":[1.64,2.64,6.72],"uptime":149252.74},"trace":[{"column":7,"file":"/schulcloud-server/node_modules/amqplib/lib/channel.js","function":"rej","line":201,"method":null,"native":false},{"column":42,"file":"/schulcloud-server/node_modules/amqplib/lib/channel.js","function":"ConfirmChannel.C._rejectPending","line":207,"method":"_rejectPending","native":false},{"column":8,"file":"/schulcloud-server/node_modules/amqplib/lib/channel.js","function":"ConfirmChannel.C.toClosed","line":171,"method":"toClosed","native":false},{"column":18,"file":"/schulcloud-server/node_modules/amqplib/lib/connection.js","function":"Connection.C._closeChannels","line":394,"method":"_closeChannels","native":false},{"column":8,"file":"/schulcloud-server/node_modules/amqplib/lib/connection.js","function":"Connection.C.toClosed","line":401,"method":"toClosed","native":false},{"column":10,"file":"/schulcloud-server/node_modules/amqplib/lib/connection.js","function":"Connection.C.onSocketError","line":355,"method":"onSocketError","native":false},{"column":28,"file":"node:events","function":"Connection.emit","line":517,"method":"emit","native":false},{"column":12,"file":"/schulcloud-server/node_modules/amqplib/lib/connection.js","function":"Socket.go","line":481,"method":"go","native":false},{"column":28,"file":"node:events","function":"Socket.emit","line":517,"method":"emit","native":false},{"column":12,"file":"node:internal/streams/readable","function":"emitReadable_","line":633,"method":null,"native":false},{"column":21,"file":"node:internal/process/task_queues","function":"processTicksAndRejections","line":81,"method":null,"native":false}]} +0ms
2024-02-05T10:02:45.125243487Z [Nest] 17  - 02/05/2024, 11:02:45 AM     LOG [AmqpConnection] Successfully connected to RabbitMQ broker (default)
2024-02-05T10:02:45.153468493Z [Nest] 17  - 02/05/2024, 11:02:45 AM     LOG [AmqpConnection] Successfully connected a RabbitMQ channel "AmqpConnection"

you can see at the very end that it successfully reconnected, but just the connection and the channel, but there was no "resumeConsumer" or "basicConsume" called afterwards so that the consumer is "re-attached"

btw, i had the same problem in an older java spring version where i was able to write a health indicator, to observer the consumers status, with something like isRunning() unfortunately there is nothing similar here to do such a workaround, or is there?

underfisk commented 1 month ago

@Loki-Afro Would you be able to help us with a PR? There may be indeed a missing feature to "restore" the queues when you restart the cluster