Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Channel publish requires a lock #205

Open adamhooper opened 5 years ago

adamhooper commented 5 years ago

Steps to reproduce:

  1. Open a channel
  2. Call channel.basic_publish(...) or channel.publish() 100 times simultaneously

My naive expected results: 100 messages get written to the server, one after another. Actual results: messages are interwoven, leading to undefined behavior (and undelivered messages).

The problem: each AMQP message consists of 3+ frames. (It can be more than 3 if the message is so large it needs to be split up.) After sending frame 1 for message A, frames 2+3 for message A must be written before frame 1 for message B is written. This is kinda intuitive on synchronous libraries (e.g., Java docs say "one channel per thread"); but it isn't intuitive in asyncio-land.

In a nutshell: this didn't feel intuitive.

I have three ideas for making aioamqp easier to work with:

  1. Add docstrings to channel.publish and channel.basic_publish, explaining that they can't be used asynchronously.
  2. Create an asyncio.Lock "send mutex" per channel, so concurrent callers can call publish() and basic_publish() (and all other methods) without interleaving frames. I haven't thought through whether there are negative consequences to this.
  3. Add assertions instead of an asyncio.Lock: if users end up calling publish() or basic_publish() within another publish, raise a RuntimeError with instructions on how to fix the calling code.

This relates to #145. (I don't think #145 should be merged, though, because writes should be async so buffers don't overflow. That's not the case today, which is fine for most people; but I don't think the API should "lock in" buffer overflows.)

My vote is to implement both 1 and 3. I'm willing to do this, if there's agreement from maintainers?