gmr / rabbitpy

A pure python, thread-safe, minimalistic and pythonic RabbitMQ client library
http://rabbitpy.readthedocs.org
BSD 3-Clause "New" or "Revised" License
243 stars 58 forks source link

Connection are not thread-safe #56

Closed paradis closed 9 years ago

paradis commented 9 years ago

Hello,

Based on https://rabbitpy.readthedocs.org/en/latest/threads.html, it seems that Connection are thread-safe but Channel aren't. However, if the same connection is used to create several channel simultaneously by using several threads, some channels will eventually get the same channel id and a thread will block waiting for a frame already received by another thread:

#4 file '/usr/lib64/python2.6/threading.py', in 'wait'
#8 file '/usr/lib64/python2.6/Queue.py', in 'get'
#12 file '/usr/lib/python2.6/site-packages/rabbitpy/base.py', in '_read_from_queue'
#15 file '/usr/lib/python2.6/site-packages/rabbitpy/base.py', in '_wait_on_frame'
#19 file '/usr/lib/python2.6/site-packages/rabbitpy/channel.py', in 'open'
#22 file '/usr/lib/python2.6/site-packages/rabbitpy/connection.py', in 'channel'

For example, the code below may lead to blocked threads:

def publisher(connection, name):
    for index in range(0, MESSAGE_COUNT):
        with connection.channel() as channel:
            print('[%s] message %s' % (name, index))
            message = rabbitpy.Message(channel, '[%s] Message #%i' % (name, index))
            message.publish(EXCHANGE, ROUTING_KEY)

with rabbitpy.Connection(<>) as connection:
    with connection.channel() as channel:
        exchange = rabbitpy.Exchange(channel, EXCHANGE)
        exchange.declare()

    # Start the publisher thread
    publisher_threads = {}
    for i in xrange(5):
        publisher_threads[i] = threading.Thread(target=publisher, kwargs={'connection': connection, 'name': i})
        publisher_threads[i].start()

    for v in publisher_threads.values():
        v.join()

Reproducing this issue can be help by printing the channel id:

diff --git a/rabbitpy/connection.py b/rabbitpy/connection.py
index e5b97ce..7a7e18a 100644
--- a/rabbitpy/connection.py
+++ b/rabbitpy/connection.py
@@ -157,6 +157,7 @@ class Connection(base.StatefulObject):

         """
         channel_id = self._get_next_channel_id()
+        print('channel id = %s' % channel_id)
         channel_frames = queue.Queue()
         self._channels[channel_id] = channel.Channel(channel_id,
                                                      self._events,
gmr commented 9 years ago

Thanks for the example, it was quite helpful.

gmr commented 9 years ago

This will go out in 0.25.0 today.