mosquito / aiormq

Pure python AMQP 0.9.1 asynchronous client library
Other
268 stars 58 forks source link

Graceful shutdown #188

Closed patstrom closed 9 months ago

patstrom commented 10 months ago

Hello!

I want to use this library to run a consumer as part of a larger application. The consumer will cache a lot of data on disk before flushing it to persistent storage, and I don't want to do a basic_ack on messages until they're in persistent storage (as opposed to just once it's in the on-disk cache).

The issue I am having right now is that I can't figure out what actually happen to a callback I register with basic_consume if I use the same channel to send a basic_cancel to stop the messages coming.

What I want to happen is to "drain" the in-flight messages but I cannot access the internal buffers where things are stored before invoking the callback. In fact I cannot really figure out if there is an internal buffer at all.

Is there any recommended way to implement this? Should the callback put things onto a queue instead that I then manage manually? Or can I somehow access the internal buffer or file descriptors?

mosquito commented 10 months ago

You might catch CancelledError insude of your consumer callback, that means someone stop your app or just cancelling the connection.

patstrom commented 10 months ago

But how can I ensure that the currently in-flight messages are still consumed?

patstrom commented 9 months ago

I solved this using and asyncio.Queue to buffer messages