agoragames / haigha

AMQP Python client
BSD 3-Clause "New" or "Revised" License
160 stars 41 forks source link

gevent_transport is broken in v0.5.1 - v0.5.3 #22

Closed vitaly-krugl closed 12 years ago

vitaly-krugl commented 12 years ago

The code below worked great with Haigha 0.4.1. When I tried to upgrade to Haigha 0.5.3, I noticed that the test_haigha() function (in the code snippet below) never returns. I investigated GeventTransport and found that it now relies on SocketTransport to create the socket. Since SocketTransport creates a regular built-in socket.socket (not gevent.socket), GeventTransport is no longer gevent-compatible.

The gevent_transport unit test doesn't test for gevent hub compatibility. Something like starting a read from a Greenlet, gevent.sleep()'ing for a second, then gevent.kill()'ing that greenlet and, finally waiting for the greenlet to finish via greenlet.get() should do the trick. Also, could you please incorporate the code snippet below into a unit test so that we can test Haigha with real gevent concurrency? Thank you.

import gevent
from haigha.connection import Connection
from haigha.message import Message

def test_haigha():
  """
  A simple test to check Haigha's NuPIC installation
  Note that Rabbit MQ must be running

  Upon success this test should print out:

  CONSUMER: MESSAGE RECEIVED:  Message[body: body, delivery_info: {'exchange': 'test_exchange', 'consumer_tag': 'channel-1-1', 'routing_key': 'test_key', 'redelivered': 0, 'delivery_tag': 1, 'channel': <haigha.channel.Channel object at 0x10ac87b50>}, properties: {'application_headers': {'hello': 'world'}}]
  readFrames result:  GreenletExit()
  """
  connection = Connection(
    user='guest', password='guest',
    vhost='/', host='localhost',
   heartbeat=None, debug=True, transport="gevent")

  def consumer(msg):
    print "CONSUMER: MESSAGE RECEIVED: ", msg
    gevent.kill(g)
    #NOTE: haigha's hello-world example had connection.close()
    # here (instead of our geven.kill(g) statement).  The close()
    # resulted in deadlock when running with gevent as we're here.
    # Even Ctrl-c wouldn't budge this deadlock!

  ch = connection.channel()

  ch.exchange.declare('test_exchange', 'direct')

  ch.queue.declare('test_queue', auto_delete=True)

  ch.queue.bind('test_queue', 'test_exchange', 'test_key')

  ch.basic.consume('test_queue', consumer)

  ch.basic.publish( Message('body', application_headers={'hello':'world'}),
     'test_exchange', 'test_key' )

  def readFrames(conn):
    while True:
      conn.read_frames()

  g = gevent.spawn(readFrames, conn=connection)

  result = g.get()
  print "readFrames result: ", repr(result)

  connection.close()

if __name__ == '__main__':
  test_haigha()
vitaly-krugl commented 12 years ago

Hi Aaron, thank you for knocking these out so quickly!