agoragames / haigha

AMQP Python client
BSD 3-Clause "New" or "Revised" License
160 stars 41 forks source link

Message frames might be interleaved when sending from multiple greenlets #84

Open liorn opened 9 years ago

liorn commented 9 years ago

When publishing a large message, BasicClass.publish creates multiple frames and sends them one by one. When using async transports such as gevent, or the currently unsupported eventlet, there is no guarantee that we won't switch to another greenlet between frames. When multiple greenlets use the same connection to publish large messages, frames might get interleaved. Today, this doesn't occur with gevent by chance - but future changes in gevents greenlet switching mechanism might break this.

I've come across this problem with an eventlet Transport (which I will later contribute to haigha), in which the problem occurs. It's easy to reproduce - I spawn multiple publishers which continuously publish large messages, and very soon after I get disconnected from RabbitMQ with this message (or a similar one): _Connection closed: 505 : UNEXPECTEDFRAME - expected content body, got non content body frame instead Tracing the operations done shows the exact problem - multiple greenlets are waiting on the single EventletTransport._write_lock (similarly to GeventTransport._write_lock), and send_frame calls are interleaved.

A possible solution would be to acquire and release a semaphore for each message, not each frame... but the locking mechanism, today, is part of BasicClass and not Transport thus can't be library specific.

liorn commented 9 years ago

@awestendorf, I'll be happy to hear your opinion about this.

  1. I'm worried from the possibility that a future gevent change will break haigha
  2. I want to add an EventletTransport, but can't because of this issue.

Thanks!

awestendorf commented 9 years ago

This should only be a problem when you're using the same channel across greenlets/eventlets/threads though right?

liorn commented 9 years ago

True. It's a common use case, no?

awestendorf commented 9 years ago

It is, I just wanted to make sure I understood the problem correctly. The simplest case would be to create a channel, or pull one from a pool, for each publish on each thread. Although there's a limit to the number of channel ids, they're relatively light on both sides of the connection. There's haigha.channel_pool, but that's old code for a transactional use case that I don't use any more. I see 3 possible ways to go with this.

  1. Document that the user should publish on a channel per thread/greenlet, however they manage that.
  2. Along with 1., improve the ChannelPool so that it supports the non-transactional use case by implementing a lock around the publish.
  3. Encode that lock within BasicClass, which is an instance per channel and so can maintain its own lock relatively easily.

In either case where we handle a lock, I think we shouldn't assume that the user has monkey-patched the thread library, and so there will have to be some abstractions that allow BasicClass and/or ChannelPool to use a lock constructor that would be defined within a Transport implementation.

awestendorf commented 9 years ago

Actually, with regard to option 3, that only solves the case where someone tries to publish on multiple threads. If they do any other operation, that too could result in a frame error. So the only way to really make that work well is to grab a lock inside every method that wants to call send_frame, which I can see quickly leading to a lot of bugs (e.g. synchronous transport, try/finally debugging, etc) and likely seriously slow things down, which in turn means I think most users would want the option to control that themselves, leading to even more complications.

I'm leaning in the direction of a helper such as ChannelPool and documentation. An important but subtle detail such as this could easily be lost in documentation though.

awestendorf commented 9 years ago

Perhaps a way around all those complications and to fix the bug is to change send_frame to send_frames, so that a single logical "transaction" is carried all the way down to the transport layer, where the lock can be held while iterating over the whole list of frames to be sent.

liorn commented 9 years ago

That sounds simpler. I'll take a look at the code this weekend and try and submit a patch for this. Thanks!

liorn commented 9 years ago

Short update: the refactoring is more significant that I expected. We currently bypassed the issue with application-level semaphores, hope to get back to this soon.