ebin123456 / py-amqplib

Automatically exported from code.google.com/p/py-amqplib
GNU Lesser General Public License v2.1
0 stars 0 forks source link

make basic_consume useful without callback #10

Closed GoogleCodeExporter closed 9 years ago

GoogleCodeExporter commented 9 years ago
Note: this is an enhancement request, not a bug per se. 

Which version of amqplib are you using?

0.6.1

Have you checked to see if there is a newer version in the "Featured
Downloads" section of the front page of this project?

Yes.

Which broker are you using (RabbitMQ?) which version?

RabbitMQ 1.5.4-0ubuntu1

Which version of Python?

2.6.2

What steps will reproduce the problem?
1. Call basic_consume without a callback.
2. Watch in wild wonder as nothing happens. 

What is the expected output? What do you see instead?

Well, the actual behavior of basic_consume with callback=None matches the 
documented behavior. I just don't really see how that functionality is useful. 
Is there 
some use case in which you would want to call basic_consume and do nothing with 
the messages there?

Maybe my thinking is poisoned by the positions I tend to find myself in with 
amqp. 
This is the second project in as many months where I *really* want to act upon 
the 
messages within the same function or method basic_consume() is called in. 

It seems like it might be reasonable to either make basic_consume a generator 
(so I 
could say 'for msg in basic_consume(blah)', for example), or to just return the 
Message object upon arrival at the consumer's doorstep if callback=None. 

Here's a scenario that shows the type of issue I'm running into: 

I have a main module that launches and spawns a few threads. Each thread talks 
to a 
different queue, and wants to process messages from it. I've decided to 
abstract out 
the low-level queuing details to clean up the core code, so the core code just 
creates 
a QueueConnector object, and call that object's 'consume()' method. My call to 
basic_consume() lives inside of that consume() method. 

Now, it would seem reasonable that I might be able to pass my processing 
function 
as a callback, and I can, but the problem then is that *my core code still 
needs a 
copy of the Message object*, because it's going to be *requeued* to another 
queue 
(but only if the processing was successful!). So, in pseudocode (and if 
basic_consume 
were a generator): 

def consume(blah,blah):
    for msg in chan.basic_consume('q'):
        try:
            process(msg.body)
        except Exception as out:
            print out
            raise
        else:
            msg.ack()
            return/yield msg

Another alternative is to tell basic_deliver to 'return msg' if func is None 
(on line 
1973: http://code.google.com/p/py-
amqplib/source/browse/amqplib/client_0_8/channel.py#1971). It would be returned 
to basic_consume() at that point, and then basic_consume would need to pass it 
back 
to my calling code. I understand the consumer_tag is already being returned, 
but I 
wonder if there isn't some other logical place to get the consumer tag? 

I suppose there's also the issue of chan.wait(), but I'm not sure if there's 
another use 
case outside of basic_consume where that would ever be called. Is there? If 
there 
isn't, why not just make basic_consume a blocking (or optionally blocking) 
procedure? If there *is* another use case for using chan.wait() outside of 
basic_consume, perhaps understanding that better would help me solve my 
problems. As it stands, using chan.wait with basic_consume is pretty confusing 
when 
it comes time to figure out how this program is going to finally end.

Thanks for input or advice (or an enhancement!)
brian

Original issue reported on code.google.com by bkjo...@gmail.com on 24 Dec 2009 at 3:52

GoogleCodeExporter commented 9 years ago
I'm not sure this is realistically possible. basic_consume issues a 
basic.consume method to the broker and 
associates a callback with the associated consumer key, but doesn't actually 
process any messages. 
Conversely, when you call wait() on a channel, any server-initiated message 
gets handled, not just message 
delivery for any specific consumer. This is a little easier to see if you call 
basic_consume twice (with different 
queues and callbacks) -- wait() will dispatch a message to either one, 
depending on which queue receives a 
message first.

Can you turn your loop inside out? Consider something like

def mainloop(chan):
    msg = yield
    while msg:
        print msg.body
        chan.basic_ack(msg.delivery_tag)
        msg = yield

# ...

# Kick off generator
m = mainloop(chan)
m.next()

chan.basic_consume('q', callback=m.send)

would let you write your main loop in a mostly-imperative style.

Original comment by angrybal...@gmail.com on 24 Dec 2009 at 4:36

GoogleCodeExporter commented 9 years ago
Sorry, I'm not sure I clearly understand how that code would fit into the 
context of my program. I've put together a code sample that I hope will make my 
situation clearer: 

#!/usr/bin/env python
from amqplib import client_0_8 as amqp

class Processor(object):
    def __init__(self):
        self.qhost = 'localhost'
        self.qport = '5672'
        self.uname = 'guest'
        self.pwd = 'guest'
        self.ssl = false 
        self.vhost = '/' 
        self.exch_name = 'fooX'
        self.exch_type = 'direct'
        self.queue_name = 'fooQ'
        self.conn = amqp.Connection(userid=self.uname, password=self.pwd, host=self.qhost,
                                    virtual_host=self.vhost, ssl=self.ssl)
        self.chan = self.conn.channel()
        self.chan.exchange_declare(self.exch_name, type=self.exch_type)
        self.chan.queue_declare(self.qname)
        self.chan.queue_bind(self.qname, self.exch_name)

    def consume(self, callback):
        self.chan.basic_consume(self.qname, callback=callback)
        while True: 
            self.chan.wait()

class Munger(object):
    def munge(self,msg):
        if msg % 2 == 0: 
            yield msg

class Sender(object):
    def run(self):
        p = Processor(file='/tmp/msgs')
        m = Munger()
        for msg in p.consume(m.munge):
            """   
            I know this doesn't work right now. This 
            piece of the code should send 'msg' to another queue. 
            """
            pass  

if __name__ == '__main__':
    s = Sender()
    s.run()

A couple of quick notes: Sender() in real life is a thread. Munger.munge() 
actually parses the message and tries to put it in a database. Processor 
doesn't look 
much different from what you see here. The goal is that Processor and Munger 
are actually pluggable -- so users can set things up such that Processor talks 
to a 
STOMP queue (that's already supported, actually), or Munge talks to MongoDB or 
whatever. I hope to release the code as soon as I can get amqp working. 

I've played with setting up an observer-like 'notify' in Processor and using 
that as the callback for consume(), but it still doesn't get the msg back where 
I need 
it. I've thought of a couple of other things I might be able to do, but I 
really haven't gotten to a solution. If your code example somehow fits 
somewhere in here, 
even if it forces me to rewrite large sections of my actual program, I'll do 
it. I just need to understand. Appreciate your help and your work. 

brian

Original comment by bkjo...@gmail.com on 24 Dec 2009 at 5:03

GoogleCodeExporter commented 9 years ago
Sorry, amqplib is not totally threadsafe.  That's a big omission from the docs, 
fixed in Rev 2c694206543d   

You can however use it in a threaded program as long separate threads keep 
separate Connection objects (and related Channel objects).

I agree that consuming without a callback is pointless, but it's pretty much 
equivalent to redirecting a program's output to /dev/null, and I think is a 
reasonable thing to do given no callback.

Original comment by barry.pe...@gmail.com on 28 Mar 2011 at 9:33