agoragames / haigha

AMQP Python client
BSD 3-Clause "New" or "Revised" License
160 stars 41 forks source link

event example #18

Open altaurog opened 12 years ago

altaurog commented 12 years ago

I'm evaluating rabbitmq and python clients. This library looks much more mature than pika, but I need a little help figuring out how to use it. I am interested in asynchronous operation because I need an event loop in which I can schedule various tasks, however I prefer event over gevent; we haven't been using gevent and it seems to add more complexity than I want at the moment. I guess my other option is to use the 'socket' transport and implement my own loop in which I call connection.read_frames().

Here's my attempt at a publisher:

#!/usr/bin/env python
import logging
import sys
import event
from haigha.connection import Connection
from haigha.message import Message

class Client(object):
    def __init__(self):
        print('instantiating Connection')
        self.connection = Connection(transport='event', port=5673, open_cb=self.on_connect)

    def on_connect(self):
        print('on_connect')
        self.connection.channel().add_open_listener(self.on_open)

    def on_open(self, channel):
        print('on_open: %r' % channel)
        self.channel = channel
        self.channel.queue.declare(queue='hello', cb=self.on_declare)

    def on_declare(self):
        print('on_declare')
        self.channel.basic.publish(Message('Hello World!'),
                              exchange='',
                              routing_key='hello',
                              cb=self.on_publish
                              )

    def on_publish(self):
        print('on_publish')

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    c = Client()
    event.signal(2, sys.exit)
    event.dispatch()

Here's what I see when I run the code:

$ python send.py 
instantiating Connection
on_connect
on_open: <haigha.channel.Channel object at 0x7f45feab9dd0>
WARNING:root:error on connection to localhost:5673: error processing socket input buffer

Using the tracer supplied with the rabbit java client, I see the following:


1337545216819: <Tracer-0> ch#0 <- {#method<connection.start>(version-major=0, version-minor=9, server-properties={product=RabbitMQ, information=Licensed under the MPL.  See http://www.rabbitmq.com/, platform=Erlang/OTP, capabilities={exchange_exchange_bindings=true, consumer_cancel_notify=true, basic.nack=true, publisher_confirms=true}, copyright=Copyright (C) 2007-2012 VMware, Inc., version=2.8.2}, mechanisms=PLAIN AMQPLAIN, locales=en_US), null, ""}
1337545216819: <Tracer-0> ch#0 -> {#method<connection.start-ok>(client-properties={library_version=0.5.3, library=Haigha}, mechanism=AMQPLAIN, response=LOGINSguesPASSWORDSguest, locale=en_US), null, ""}
1337545216821: <Tracer-0> ch#0 <- {#method<connection.tune>(channel-max=0, frame-max=131072, heartbeat=0), null, ""}
1337545216856: <Tracer-0> ch#0 -> {#method<connection.tune-ok>(channel-max=65535, frame-max=131072, heartbeat=0), null, ""}
1337545216856: <Tracer-0> ch#0 -> {#method<connection.open>(virtual-host=/, capabilities=, insist=true), null, ""}
1337545216896: <Tracer-0> ch#0 <- {#method<connection.open-ok>(known-hosts=), null, ""}
1337545216936: <Tracer-0> ch#1 -> {#method<channel.open>(out-of-band=), null, ""}
1337545216976: <Tracer-0> ch#1 <- {#method<channel.open-ok>(channel-id=), null, ""}
1337545217016: <Tracer-0> ch#1 -> {#method<queue.declare>(ticket=0, queue=hello, passive=false, durable=false, exclusive=false, auto-delete=true, nowait=false, arguments={}), null, ""}
1337545217056: <Tracer-0> ch#1 <- {#method<queue.declare-ok>(queue=hello, message-count=0, consumer-count=1), null, ""}
1337545217099: <Tracer-0> ch#1 -> {#method<channel.close>(reply-code=500, reply-text=Failed to dispatch MethodFrame[channel: 1, class_id: 50, method_id: 11, args: \x00\x32\x00\x0b\x05\x68\x65\x6c\x6c\x6f\x00\x00\x00\x00\x00\x00\x00\x01], class-id=0, method-id=0), null, ""}
1337545217138: <Tracer-0> ch#1 <- {#method<channel.close-ok>(), null, ""}

How do I make it work?

altaurog commented 12 years ago

I've made a little progress on this. I'm realizing that callbacks work very differently under haigha than pika. The fact that callbacks operate in a synchronous manner is somewhat counter-intuitive, but I imagine that under gevent this doesn't affect performance. Perhaps I should re-evaluate using gevent.

In any case, the code below seems to work, but

How can I close the connection and exit cleanly after sending the message?

Is there a mailing list for discussing this project?

#!/usr/bin/env python
import logging
import sys
import event
from haigha.connection import Connection
from haigha.message import Message

class Client(object):
    def __init__(self):
        self.connection = Connection(transport='event')
        self.channel = self.connection.channel()
        self.channel.queue.declare(queue='hello',)
        self.channel.basic.publish(Message('Hello World!'),
                              exchange='',
                              routing_key='hello',
                              )

    def loop(self):
        while not self.connection.close_info:
            self.connection.read_frames()

    def close(self):
        print("closing")
        self.connection.close()

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    c = Client()
    event.signal(2, sys.exit)
    event.timeout(0, c.loop)
    event.timeout(1, c.close)
    event.dispatch()
awestendorf commented 12 years ago

There isn't a dedicated libevent example, but scripts/stress_test uses libevent and you can pattern after that. You can pass a logger and debug handle into the Connection constructor and see similar trace output; I just added info to the DOCUMENTATION file regarding that.

You don't need to run your own read_frames loop for libevent, but also it needs to yield; the while True part will block and essentially void your libevent benefits.

I like libevent, it was the first implementation of an async AMQP client that we had and the modern EventTransport-based haigha implementation is still in production in some of our applications. That said, I've fallen in love with the simplicity and power of gevent; it's our preferred transport these days and so is the one that receives the most testing.

The stress_test script will give you a reasonable comparison between the two. I think libevent is still faster, but only by a bit, and the extra complexity may nullfiy the small gain in performance. I'm happy to help in your libevent pursuit as best I can. I recommend playing with the stress_test script to get a feel for the options and performance characteristics.

There isn't a mailing list yet and github issues are a reasonable place to ask for assistance. It's tough to measure the uptake so I haven't considered a mailing list necessary.