majek / puka

Puka - the opinionated RabbitMQ client
https://github.com/majek/puka
Other
182 stars 34 forks source link

PRECONDITION_FAILED - unknown delivery tag #28

Closed bslima closed 11 years ago

bslima commented 11 years ago

I'm trying to write a threaded rpc communication with puka and rabbitmq. But i'm keeping getting:

puka.spec_exceptions.PreconditionFailed: {'class_id': 60, 'method_id': 80, 'reply_code': 406, 'reply_text': 'PRECONDITION_FAILED - unknown delivery tag X'}, where X can be any msg number.

The error is always on the RPCServer, following my code:

            #!/usr/bin/env python
            from threading import Thread
            import puka
            import uuid
            import logging

            class RPCServer(object):
                def __init__(self):
                    self.client = None
                    self.consume = None
                    try:
                        self.client = puka.Client("amqp://localhost/")
                        promise = self.client.connect(callback=self.on_connected)
                        self.client.wait(promise,timeout=2)#Waits 2s
                    except  Exception as e:
                        print "Error connecting to server - ",e

                def close(self):
                    self.client.wait(self.client.close())

                def on_connected(self,promise,result):
                    try:
                        self.client.wait(self.client.queue_declare(queue='rpc_queue_test'))
                        self.consume = self.client.basic_consume(queue='rpc_queue_test',
                                                                 callback=self.on_request)
                    except Exception as e:
                        print "Error declaring queue - ",e

                def on_request(self,promise,request):
                    th=Thread(target=self.test, args = [request])
                    th.start()

                def test(self,request):
                    pr = self.client.basic_publish(exchange='',routing_key=request['headers']['reply_to'],
                                              headers={'correlation_id':request['headers']['correlation_id']},
                                              body=str(2))
                    self.client.wait(pr)
                    self.client.basic_ack(request)

                def run(self):
                    while True:
                        request = self.client.wait(self.consume)
                        print "Request received (%s)"%(request)

            print " [x] Awaiting RPC requests"
            rpc_server = RPCServer()

            th1 = Thread(target=rpc_server.run())
            th1.start()

Is there anyway to get this server done in a thread for every request ??

Thanks in advance.

majek commented 11 years ago

You've seen that, right? https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/python-puka/rpc_client.py https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/python-puka/rpc_server.py

http://www.rabbitmq.com/tutorials/tutorial-six-python.html

majek commented 11 years ago

That makes no sense:

                        promise = self.client.connect(callback=self.on_connected)
                        self.client.wait(promise,timeout=2)#Waits 2s

so you want a callback or you want to wait on a promise?

Also, you're using a single puka connection from multiple threads. That just won't work.

bslima commented 11 years ago

The problem is not on the wait and the callback, that works fine. So what is the difference between puka and pika ? For what i read puka allows me to share connection between threads.

majek commented 11 years ago

The problem is not on the wait and the callback, that works fine.

Yes, I know. It just doesn't make any sense. You may as well do "self.client.wait(promise); self.on_connected()" and it will be so much more readable.

So what is the difference between puka and pika ?

They are two separate projects.

For what i read puka allows me to share connection between threads.

Interesting, where did you read that?