majek / puka

Puka - the opinionated RabbitMQ client
https://github.com/majek/puka
Other
182 stars 34 forks source link

AttributeError on basic_cancel for the last message in queue #44

Closed okomestudio closed 11 years ago

okomestudio commented 11 years ago

I'm using the examples on synchronous send/receive in Basic Examples on this page: http://majek.github.io/puka/puka.html.

Call this receive.py:

import puka

client = puka.Client("amqp://localhost/")
promise = client.connect()
client.wait(promise)

promise = client.queue_declare(queue='test')
client.wait(promise)

consume_promise = client.basic_consume(queue='test')
for i in range(3):
    result = client.wait(consume_promise)
    print " [x] Received message %r" % (result,)

    client.basic_ack(result)

client.basic_cancel(consume_promise)
client.wait(consume_promise)

promise = client.close()
client.wait(promise)

and synchronous send (send.py):

import puka

client = puka.Client("amqp://localhost/")
promise = client.connect()
client.wait(promise)

promise = client.queue_declare(queue='test')
client.wait(promise)

promise = client.basic_publish(exchange='', routing_key='test',
                              body="Hello world!")
client.wait(promise)

promise = client.close()
client.wait(promise)

The synchronous receive appears to always fail with an AttributeError on waiting for basic_cancel in receive.py, when it does basic_ack on the very last message in the queue and then does basic_cancel on the same consume_promise:

$ python receive.py 
 [x] Received message {'body': 'Hello world!', 'exchange': '', 'consumer_tag': 'amq.ctag-V2fN9ND13qH4imH_uRQKRA', 'promise_number': 4, 'routing_key': 'test', 'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 1}
 [x] Received message {'body': 'Hello world!', 'exchange': '', 'consumer_tag': 'amq.ctag-V2fN9ND13qH4imH_uRQKRA', 'promise_number': 4, 'routing_key': 'test', 'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 2}
 [x] Received message {'body': 'Hello world!', 'exchange': '', 'consumer_tag': 'amq.ctag-V2fN9ND13qH4imH_uRQKRA', 'promise_number': 4, 'routing_key': 'test', 'headers': {'x-puka-delivery-tag': 1}, 'redelivered': False, 'delivery_tag': 3}
Traceback (most recent call last):
  File "receive.py", line 27, in <module>
    client.wait(consume_promise)
  File "/usr/local/lib/python2.7/dist-packages/puka/connection.py", line 222, in wait
    raise_errors=raise_errors)
  File "/usr/local/lib/python2.7/dist-packages/puka/promise.py", line 35, in run_callback
    return self._promises[number].run_callback(**kwargs)
  File "/usr/local/lib/python2.7/dist-packages/puka/promise.py", line 134, in run_callback
    if raise_errors and result.is_error:
AttributeError: 'NoneType' object has no attribute 'is_error'

This only happens on the very last message in the queue.

Is this an intended behavior? If it is, how should I cancel the consume more gracefully at the very end?

Thanks.

majek commented 11 years ago

Here's the bug:

client.basic_cancel(consume_promise)
client.wait(consume_promise)

It should be:

promise = client.basic_cancel(consume_promise)
client.wait(promise)
okomestudio commented 11 years ago

Oh, I see. That makes sense, haha... The code snippets are copied directly from Basic Examples, "Synchronously receive three messages" in http://majek.github.io/puka/puka.html. The document probably should be updated there. Thanks!

majek commented 11 years ago

Should be better now :) Thanks.