celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.82k stars 922 forks source link

When broker is suddenly stops calling a task is stuck forever (Connection.ensure()) #918

Open wwarne opened 5 years ago

wwarne commented 5 years ago

It's simular to #902 and https://github.com/celery/celery/issues/4296 but those issues about default_channel property and my problem is not exactly in default_channel.

software -> celery:4.2.1 (windowlicker)
 kombu:4.2.1 py:3.6.5
billiard:3.5.0.4
py-amqp:2.3.2
# tried on windows and on linux

default_channel works fine, it uses broker_transport_options to determine max_retries and call self.ensure_connection with it.

Problem appears only if you send a task to celery, then turn off broker and then try to send task again. Probably connection is still cached so it passed ensure_connection() check. But then Queue.on_task_call tries to call maybe_declare(self.binding(producer.channel), retry=True) (it doen't use broker_transport_options or retry_policy in this call). maybe_declare call _imaybe_declare which calls entity.channel.connection.client.ensure

Every function call from maybe_declare are passing **retry_policy argument but since we were called from Queue.on_task_call without any retry_policy this dict is empty. As a result the ensure() (Connection.ensure) is used its defaults for max_retries, interval_start, interval_step etc... max_retries=None which means infinite retries.

Steps to reproduce tasks.py

from celery import Celery

app = Celery('tasks', broker='amqp://worker:password@192.168.99.100:5672/myvhost')
app.conf.result_backend = 'rpc://'
app.conf.broker_transport_options = {
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.5,
}

@app.task
def add(x, y):
    return x + y

run rabbitmq and python console

>>> from tasks import add
>>> add.delay(2, 2)
<AsyncResult: 32c15fee-0d34-4baf-9c53-058996844939>

Don't close the python console, stop rabbitmq, and try to send new task in python console

>>>  add.delay(2, 2)
Here goes infifnite loop

Here is a monkeypatch I end with to avoid this problem in my programm. It ensures that celery uses settings from broker_transport_options then tries to ensure a connection. I call it once before sending tasks to celery.

def monkeypath_kombu_ensure():
    import kombu
    assert kombu.__version__ == '4.2.1', 'Check if patching ensure() is still needed'
    from kombu import Connection
    original_ensure = Connection.ensure

    def patched_ensure(self, *args, **kwargs):
        transport_opts = self.transport_options
        if transport_opts:
            if 'max_retries' in transport_opts:
                kwargs['max_retries'] = transport_opts['max_retries']
            if 'interval_start' in transport_opts:
                kwargs['interval_start'] = transport_opts['interval_start']
            if 'interval_step' in transport_opts:
                kwargs['interval_step'] = transport_opts['interval_step']
            if 'interval_max' in transport_opts:
                kwargs['interval_max'] = transport_opts['interval_max']
        return original_ensure(self, *args, **kwargs)
    Connection.ensure = patched_ensure

I don't know how to write tests for this case or maybe there is a better way to fix this. Happy to hear your thoughts about it.

thedrow commented 5 years ago

This is a regression from https://github.com/celery/kombu/commit/816e3dcf346decd4d458e2a088ac7a914b48dcce.