pika / pika

Pure Python RabbitMQ/AMQP 0-9-1 client library
https://pika.readthedocs.io
BSD 3-Clause "New" or "Revised" License
3.6k stars 843 forks source link

Pika failing to consume after a basic_nack #819

Closed beardedeagle closed 7 years ago

beardedeagle commented 7 years ago

Running into an issue where I am trying to ack a message immediately to pull it off the queue, do some work on it, then nack if there is an issue, it basically pulls the first two messages but then sits forever after nacking the second message. Is this expected behavior? Or am I doing something wrong? In the meantime I am accomplishing what I want by republishing the message with the updated message body, but I feel like that is unnecessary connections being opened and closed to the server.

- test.py
# -*- coding: utf-8 -*-
"""Test module."""
import json
import logging
import pika

logging.basicConfig(level=logging.INFO)
LOG = logging.getLogger(__name__)

RMQ = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1',
    heartbeat_interval=15,
    connection_attempts=3))
CHANNEL = RMQ.channel()
CHANNEL.queue_declare(queue='example', durable=True)

def callback(chan, method, prop, body):
    """Callback method that gets a message, does something, then nacks it."""
    LOG.info('[X] Got message')
    chan.basic_ack(delivery_tag=method.delivery_tag)
    LOG.info('Acked message')
    jbod = json.loads(body)
    LOG.info('Message body: %s', jbod)
    if jbod.get('test', False):
        LOG.info('`test` is true, setting to false')
        jbod['test'] = False
        LOG.info('Nack message and requeue with updated message')
        chan.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        LOG.info('Successfully nacked message')
    else:
        LOG.info('`test` is false, printing success')
        print('success!')

CHANNEL.basic_qos(prefetch_count=1)
CHANNEL.basic_consume(callback, queue='example')
LOG.info('[*] Begin consuming messages')
CHANNEL.start_consuming()

- showing queues and queue content before running test.py
Listing queues ...
example 5
+-------------+----------+---------------+----------------+---------------+------------------+-------------+
| routing_key | exchange | message_count |    payload     | payload_bytes | payload_encoding | redelivered |
+-------------+----------+---------------+----------------+---------------+------------------+-------------+
| example     |          | 4             | {"test": true} | 14            | string           | True        |
+-------------+----------+---------------+----------------+---------------+------------------+-------------+

- output from running test.py
~ python test.py
INFO:pika.adapters.base_connection:Connecting to 127.0.0.1:5672
INFO:pika.adapters.blocking_connection:Created channel=1
INFO:__main__:[*] Begin consuming messages
INFO:__main__:[X] Got message
INFO:__main__:Acked message
INFO:__main__:Message body: {'test': True}
INFO:__main__:`test` is true, setting to false
INFO:__main__:Nack message and requeue with updated message
INFO:__main__:Successfully nacked message
INFO:__main__:[X] Got message
INFO:__main__:Acked message
INFO:__main__:Message body: {'test': True}
INFO:__main__:`test` is true, setting to false
INFO:__main__:Nack message and requeue with updated message
INFO:__main__:Successfully nacked message
INFO:pika.channel:<METHOD(['channel_number=1', 'frame_type=1', "method=<Channel.Close(['class_id=60', 'method_id=120', 'reply_code=406', 'reply_text=PRECONDITION_FAILED - unknown delivery tag 1'])>"])>
WARNING:pika.channel:Received remote Channel.Close (406): PRECONDITION_FAILED - unknown delivery tag 1
^CTraceback (most recent call last):
  File "test.py", line 39, in <module>
    CHANNEL.start_consuming()
  File "/Users/rlthompson/GitHub/ha-lbaas-api/venv/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1681, in start_consuming
    self.connection.process_data_events(time_limit=None)
  File "/Users/rlthompson/GitHub/ha-lbaas-api/venv/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 647, in process_data_events
    self._flush_output(common_terminator)
  File "/Users/rlthompson/GitHub/ha-lbaas-api/venv/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 410, in _flush_output
    self._impl.ioloop.poll()
  File "/Users/rlthompson/GitHub/ha-lbaas-api/venv/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 515, in poll
    self.get_next_deadline())
KeyboardInterrupt

- showing queues and queue content after running test.py
example 4
+-------------+----------+---------------+----------------+---------------+------------------+-------------+
| routing_key | exchange | message_count |    payload     | payload_bytes | payload_encoding | redelivered |
+-------------+----------+---------------+----------------+---------------+------------------+-------------+
| example     |          | 3             | {"test": true} | 14            | string           | True        |
+-------------+----------+---------------+----------------+---------------+------------------+-------------+

Versions:

distro: OS X 10.12.5
rabbitmq: stable 3.6.9
erlang: stable 19.3
python: 3.6.1
pika: 0.10.0
vitaly-krugl commented 7 years ago

IIRC, Acn and Nack should be mutually-exclusive, so you may be violating AMQP when you do both. Take a look at server-side logs after this happens. Also, please repost your question on the RabbitMQ-Users google group.