celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.87k stars 928 forks source link

Consumer not gracefully recovering from rabbitmq restarts/issues #814

Open naemono opened 6 years ago

naemono commented 6 years ago

From what I'm seeing, a kombu consumer using amqp doesn't gracefully recover from an issue such as a rabbitmq restart, or any recoverable error without explicitly calling consumer.consume() before any drain_events calls.

Example showing issue

Consumer

#!/usr/bin/env python3
import kombu
import threading
from kombu import Exchange, Queue
from kombu import exceptions as kombu_exceptions

import logging

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

LOG = logging.getLogger(__name__)

def process_media(body, message):
    print(body)
    message.ack()

def _drain(conn, timeout):
    try:
        conn.drain_events(timeout=timeout)
    except kombu_exceptions.TimeoutError:
        pass

def _drain_errback(exc, interval):
    LOG.exception('Draining error: %s', exc)
    LOG.info('Retry triggering in %s seconds', interval)

class Consumer(object):

    def __init__(self):
        transport = 'amqp'
        transport_options = dict()
        url = 'amqp://devtools:devtools@localhost:5672/'
        self._exchange_name = 'media'
        self._topic = 'media'
        self._running = threading.Event()
        self._drain_events_timeout = 1
        self._conn = kombu.Connection(url, transport=transport,
                                      transport_options=transport_options)
        self._exchange = kombu.Exchange(name=self._exchange_name)

    def _make_queue(self, routing_key, exchange, channel=None):
        queue_name = "%s_%s" % (self._exchange_name, routing_key)
        return kombu.Queue(name=queue_name,
                           routing_key=routing_key, durable=False,
                           exchange=exchange, auto_delete=True,
                           channel=channel)

    def start(self):
        with kombu.connections[self._conn].acquire(block=True) as conn:
            with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
                ensure_kwargs = dict()
                ensure_kwargs['errback'] = _drain_errback
                safe_drain = conn.ensure(consumer, _drain, **ensure_kwargs)
                self._running.set()
                try:
                    while self._running.is_set():
                        # uncomment the below line to make things recover...
                        # consumer.consume()
                        safe_drain(conn, self._drain_events_timeout)
                finally:
                    self._running.clear()

def main():
    consumer = Consumer()
    consumer.start()

if __name__ == '__main__':
    main()

Producer

#!/usr/bin/env python3
import kombu
import threading
import time
from kombu import Exchange, Queue

import logging

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

LOG = logging.getLogger(__name__)

def _produce(producer):
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                     exchange=media_exchange, routing_key='video',
                     declare=[video_queue])

def _drain_errback(exc, interval):
    LOG.exception('Draining error: %s', exc)
    LOG.info('Retry triggering in %s seconds', interval)

class Producer(object):

    def __init__(self):
        transport = 'amqp'
        transport_options = dict()
        url = 'amqp://devtools:devtools@localhost:5672/'
        self._exchange_name = 'media'
        self._topic = 'media'
        self._running = threading.Event()
        self._conn = kombu.Connection(url, transport=transport,
                                      transport_options=transport_options)
        self._exchange = kombu.Exchange(name=self._exchange_name)

    def _make_queue(self, routing_key, exchange, channel=None):
        queue_name = "%s_%s" % (self._exchange_name, routing_key)
        return kombu.Queue(name=queue_name,
                           routing_key=routing_key, durable=False,
                           exchange=exchange, auto_delete=True,
                           channel=channel)

    def start(self):
        with kombu.connections[self._conn].acquire(block=True) as conn:
            with conn.Producer(serializer='json') as producer:
                ensure_kwargs = dict()
                ensure_kwargs['errback'] = _drain_errback
                safe_produce = conn.ensure(producer, _produce, **ensure_kwargs)
                self._running.set()
                try:
                    while self._running.is_set():
                        safe_produce(producer)
                        time.sleep(2)
                finally:
                    self._running.clear()

def main():
    producer = Producer()
    producer.start()

if __name__ == '__main__':
    main()

Run the above scripts in parallel, and then restart rabbitmq, and watch the queue fill up with producer messages while the consumer throws no additional errors after recovery completes.

If you uncomment the consumer.consume() line in the consumer, things will automatically recover.

Is this expected behavior, or is this an underlying bug within kombu's Connection.ensure method where the reconnection actions happen?

dralley commented 6 years ago

I believe I have run into the same issue, or a similar one.

stuartspotlight commented 6 years ago

I think I'm getting a similar issue in both kombu 4.1.0 and 4.2.0 in that the connection appears to degrade and then not be able to recover: https://github.com/celery/kombu/issues/857 I've not managed to get any responses yet though so I'm not sure if anyone has been able to identify the source. Have you tried downgrading to kombu 4.0.2 since this medium post indicates that it may prove effective for some instances:- https://medium.com/squad-engineering/two-years-with-celery-in-production-bug-fix-edition-22238669601d

auvipy commented 5 years ago

can anyone tell the update with kombu 4.6.x?