mosquito / aiormq

Pure python AMQP 0.9.1 asynchronous client library
Other
268 stars 58 forks source link

Reject all futures once a FutureStore is closed. #201

Open isra17 opened 1 month ago

isra17 commented 1 month ago

This fix a race condition where a writer_drain future might be created after a disconnection and the FutureStore had all the future rejected. In this case, the new future won't ever be completed since the connection is not writing anymore.

I have seen it happen in production on high concurrency publish, where disconnect can happen while a task is waiting for the publish lock, then after everything get rejected and the lock is released, the waiting task get the lock, send data and wait for the drain that will never come.

An alternative fix would be to add is_closed checks everywhere before we create future, but my current solution seems to be foolproof. The FutureStore is never reopened, so once it's close it should stay close and fail any attempt at creating new futures.

I also added some extra check so the test behavior doesn't change (Otherwise the CancelledError get raised instead of the expected InvalidChannelState)

I'm quickly opening this PR so it can get reviewed, but I will add a test by tomorrow.

isra17 commented 1 month ago

Added a test that fail in master and pass in this PR.