celery / py-amqp

amqplib fork
Other
308 stars 193 forks source link

Infinite wait possible in Channel.queue_declare #293

Open rconradharris opened 4 years ago

rconradharris commented 4 years ago

I'm using Celery for messaging and recently noticed that my task.delay requests were blocking forever. I traced out the code from celery, into kombu, and eventually in py-amqp.

I eventually landed at the code that was blocking and it turned out to be Connection.drain_events() , which is called from AbstractChannel.wait(). It turns out that while wait() accepts a timeout parameter, it isn't set by its caller Channel.declare_queue(). Since it's not set, it defaults to None, meaning that if data isn't received on the socket, it could potentially block forever.

Even if it did, that wouldn't be enough, however, since timeout would need to be passed down into wait() which would require adding the timeout parameter to all of its ancestor calls in the stack.

At the end of the day, the root case of this whole issue was my RabbitMQ server getting into a wonky-state. Restarting it fixed the core issue. However, the wonky-state did highlight the possibility of the an infinite wait, so figured I'd report the issue. Unfortunately, I don't know enough about this code to offer any suggestions on how to fix this, or whether it should be fixed.

Here are some of my notes on the issue:

This commit adds the timeout parameter to AbtractChannel.wait(): https://github.com/celery/py-amqp/commit/ef6f614b89f6c1f1507ec9842dfb361ac23a7849

My the annotated call stack for this issue was `

celery.app.task.Task.delay()
    celery.app.task.Task.apply_async()
        celery.app.base.Celery.send_task
            celery.app.amqp.AMQP.send_task_message()

                < ... kombu-land ...>

                kombu.messaging.Producer.publish()
                    kombu.messaging.Producer._publish()
                        kombu.common.maybe_declare()
                            kombu.common._maybe_declare()
                                kombu.entity.Queue.declare()
                                    kombu.entity.Queue._create_queue()
                                        kombu.entity.Queue.queue_declare()

                                < ... amqp-land ...>

                                            amqp.channel.Channel.queue_declare()
                                                amqp.abstract_channel.AbstractChannel.wait() <== timeout is not passed in, so defaults to None
                                                    amqp.connection.Connection.drain_events(timeout=None)
                                                        amqp.connection.Connection.blocking_read(timeout=None)
                                                            amqp.transport._AbstractTransport.having_timeout(timeout=None) <= no timeout
                                                                amqp.transport._AbstractTransport.read_frame()
                                                                    socket.recv() <== BLOCKS FOREVER

`

thedrow commented 4 years ago

Thank you very much for the report. We will try to address it as soon as possible.