Open matusvalo opened 5 years ago
Another improvement can be to ensure that after connection is closed - after calling Connection.collect()
method - drain_events loop is immediately ended:
https://github.com/celery/py-amqp/blob/0e793de205e447d57214b32ea121ff2078c2819d/amqp/connection.py#L498-L501
@auvipy you can close this issue now...
thanks!!
Running 2.4.0, we are still getting the same exception:
'NoneType' object has no attribute 'drain_events'
From amqp/abstract_channel.py in wait at line 80
while not p.ready:
self.connection.drain_events(timeout=timeout)
Anyone else still getting it ?
Thoughts on what is causing it ?
@AvnerCohen I was not able to create situation with your Exception. Could you provide some simple failing example?
:( I afraid not. We randomly get it in production, random workers would crash.
@matusvalo What's interesting is that your suggested fix is basically to expose the "correct: exception:
amqp.exceptions.NotFound: Basic.publish: (404) NOT_FOUND - no exchange 'test_exc' in vhost '/'
In our case, we seem to be getting both of this errors:
Basic.consume: (404) NOT_FOUND - no queue
Unrecoverable error: NotFound(404, u"NOT_FOUND - no queue ' xxxx@xxx.celery.pidbox' in vhost '/'", (60, 20), u'Basic.consume')
From amqp/channel.py in _on_close at line 282:
raise error_for_code(
reply_code, reply_text, (class_id, method_id), ChannelError,
)
The 2 errors would happen in parallel and random workers will fail on it.
maybe this is now no longer an amqp issue but a core celery issue?
@AvnerCohen I was afraid that it is random issue. I am not sure about problem on kombu/celery side because the exception is coming from py-amqp. If there is issue on celery/kombu side in worst case py-amqp must be able handle correctly wrong usage. Could you post here full tracebacks for not found errors and also attribute error? + how often do you get this errors?
At least I know about one problem on py-amqp side - it is not fully following amqp standards because it processes incoming messages in drain_events()
even when close()
method is received from broker... This should be fixed but we need to be careful to not broke something else :-)
@matusvalo thanks so much for the insights and time.
We are getting this during deploy of new code changes or any maintenance that involves large scale stop/start of workers (btw - which brings this item as could be relevant - https://github.com/celery/celery/issues/4618 - bot looks like there is no extra info there). So it is very much dependent on the count of deploys we make.
Here are the two raw stack traces:
AttributeError: 'NoneType' object has no attribute 'drain_events'
File "celery/worker/worker.py", line 205, in start
self.blueprint.start(self)
File "celery/bootsteps.py", line 119, in start
step.start(parent)
File "celery/bootsteps.py", line 369, in start
return self.obj.start()
File "celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "celery/bootsteps.py", line 119, in start
step.start(parent)
File "celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "celery/worker/loops.py", line 91, in asynloop
next(loop)
File "kombu/asynchronous/hub.py", line 354, in create_loop
cb(*cbargs)
File "kombu/transport/base.py", line 236, in on_readable
reader(loop)
File "kombu/transport/base.py", line 218, in _read
drain_events(timeout=0)
File "amqp/connection.py", line 500, in drain_events
while not self.blocking_read(timeout):
File "amqp/connection.py", line 506, in blocking_read
return self.on_inbound_frame(frame)
File "amqp/method_framing.py", line 79, in on_frame
callback(channel, msg.frame_method, msg.frame_args, msg)
File "amqp/connection.py", line 510, in on_inbound_method
method_sig, payload, content,
File "amqp/abstract_channel.py", line 126, in dispatch_method
listener(*args)
File "amqp/channel.py", line 1616, in _on_basic_deliver
fun(msg)
File "kombu/messaging.py", line 624, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "kombu/messaging.py", line 590, in receive
[callback(body, message) for callback in callbacks]
File "celery/worker/pidbox.py", line 51, in on_message
self.reset()
File "celery/worker/pidbox.py", line 66, in reset
self.stop(self.c)
File "celery/worker/pidbox.py", line 63, in stop
self.consumer = self._close_channel(c)
File "celery/worker/pidbox.py", line 71, in _close_channel
ignore_errors(c, self.node.channel.close)
File "kombu/common.py", line 298, in ignore_errors
return fun(*args, **kwargs)
File "amqp/channel.py", line 226, in close
wait=spec.Channel.CloseOk,
File "amqp/abstract_channel.py", line 60, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "amqp/abstract_channel.py", line 80, in wait
self.connection.drain_events(timeout=timeout)
File "amqp/connection.py", line 500, in drain_events
while not self.blocking_read(timeout):
File "amqp/connection.py", line 506, in blocking_read
return self.on_inbound_frame(frame)
File "amqp/method_framing.py", line 79, in on_frame
callback(channel, msg.frame_method, msg.frame_args, msg)
File "amqp/connection.py", line 510, in on_inbound_method
method_sig, payload, content,
File "amqp/abstract_channel.py", line 126, in dispatch_method
listener(*args)
File "amqp/channel.py", line 1616, in _on_basic_deliver
fun(msg)
File "kombu/messaging.py", line 624, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "kombu/messaging.py", line 590, in receive
[callback(body, message) for callback in callbacks]
File "celery/worker/pidbox.py", line 51, in on_message
self.reset()
File "celery/worker/pidbox.py", line 67, in reset
self.start(self.c)
File "celery/worker/pidbox.py", line 54, in start
self.node.channel = c.connection.channel()
File "kombu/connection.py", line 266, in channel
chan = self.transport.create_channel(self.connection)
File "kombu/transport/pyamqp.py", line 100, in create_channel
return connection.channel()
File "amqp/connection.py", line 491, in channel
channel.open()
File "amqp/channel.py", line 437, in open
spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,
File "amqp/abstract_channel.py", line 60, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "amqp/abstract_channel.py", line 80, in wait
self.connection.drain_events(timeout=timeout)
And:
NotFound: Basic.consume: (404) NOT_FOUND - no queue ' xxx@xxxx.celery.pidbox' in vhost '/'
File "celery/worker/worker.py", line 205, in start
self.blueprint.start(self)
File "celery/bootsteps.py", line 119, in start
step.start(parent)
File "celery/bootsteps.py", line 370, in start
return self.obj.start()
File "celery/worker/consumer/consumer.py", line 316, in start
blueprint.start(self)
File "celery/bootsteps.py", line 119, in start
step.start(parent)
File "celery/worker/pidbox.py", line 55, in start
self.consumer = self.node.listen(callback=self.on_message)
File "kombu/pidbox.py", line 91, in listen
consumer.consume()
File "kombu/messaging.py", line 477, in consume
self._basic_consume(T, no_ack=no_ack, nowait=False)
File "kombu/messaging.py", line 598, in _basic_consume
no_ack=no_ack, nowait=nowait)
File "kombu/entity.py", line 737, in consume
arguments=self.consumer_arguments)
File "amqp/channel.py", line 1572, in basic_consume
returns_tuple=True
File "amqp/abstract_channel.py", line 60, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "amqp/abstract_channel.py", line 80, in wait
self.connection.drain_events(timeout=timeout)
File "amqp/connection.py", line 500, in drain_events
while not self.blocking_read(timeout):
File "amqp/connection.py", line 506, in blocking_read
return self.on_inbound_frame(frame)
File "amqp/method_framing.py", line 55, in on_frame
callback(channel, method_sig, buf, None)
File "amqp/connection.py", line 510, in on_inbound_method
method_sig, payload, content,
File "amqp/abstract_channel.py", line 126, in dispatch_method
listener(*args)
File "amqp/channel.py", line 282, in _on_close
reply_code, reply_text, (class_id, method_id), ChannelError,
@matusvalo Anything we can help or provide information additional info on that?
Thank you @AvnerCohen. I need to have to fine some spare time to have a look on that... For now no.
Anything new on this issue?
@thedrow for now no. I am not able to reproduce the issue. There are multiple issues present which points to using closed connection/channel - e.g. another issue: https://github.com/celery/kombu/issues/1027
There are only two places where connection
or channel
is set to None
:
Connection.collect()
is called when:
Connection.connect()
fails - this is not the case of issuesClose-OK
is received from broker. But this should be a reply to Close
message sent by client.Close
received by broker, but this case raises an exception so it should not yield tracebacks as we haveChannel.collect()
is called only when Close-OK
is received. This is also after client sends Close
.
One possible option can be if multiple threads shares one single connection...
One possibility came to my mind: From stacktrace:
File "celery/worker/pidbox.py", line 63, in stop
self.consumer = self._close_channel(c)
File "celery/worker/pidbox.py", line 71, in _close_channel
ignore_errors(c, self.node.channel.close)
File "kombu/common.py", line 298, in ignore_errors
return fun(*args, **kwargs)
File "amqp/channel.py", line 226, in close
wait=spec.Channel.CloseOk,
The other possiblity is that when Client sends Close
method it waits for Close-OK
but in the meanwhile another message comes requesting waiting which yields another message with wating etc... In general AMQP spec requires that after close
method is sent client must process only Close-OK
replies [1]:
After sending this method, any received methods except Close and Close-OK MUST be discarded. The > response to receiving a Close after sending Close must be to send Close-Ok.
Unfortunately py-amqp library does not conform specs in this case.
Hmm so how do we fix that?
@thedrow see #280.
@AvnerCohen could you please check master branch if your problem still occurs?
@matusvalo On our end, we have seen this behavior when starting some 80 celery workers (with anywhere between 2 to 8 concurrency) in parallel. To work around the issue, we broke this down and are no longer doing that (at max we start 20 workers at the same time) and this seems to have solved the issue for us.
Hi everyone, I found this issue after looking for a solution for this problem.
Env: Celery 5.1.2 Rabbitmq: 3.9.4
From rabbit 3.8.15, they introduced this new feature: https://www.rabbitmq.com/consumers.html#acknowledgement-timeout
When the timeout is triggered, the 'NoneType' object has no attribute 'drain_events'
error appears again, causing the celery worker to stop working
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start
self.blueprint.start(self)
File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
return self.obj.start()
File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 326, in start
blueprint.start(self)
File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 618, in start
c.loop(*c.loop_args())
File "/usr/local/lib/python3.8/site-packages/celery/worker/loops.py", line 81, in asynloop
next(loop)
File "/usr/local/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 361, in create_loop
cb(*cbargs)
File "/usr/local/lib/python3.8/site-packages/kombu/transport/base.py", line 235, in on_readable
reader(loop)
File "/usr/local/lib/python3.8/site-packages/kombu/transport/base.py", line 217, in _read
drain_events(timeout=0)
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 523, in drain_events
while not self.blocking_read(timeout):
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 529, in blocking_read
return self.on_inbound_frame(frame)
File "/usr/local/lib/python3.8/site-packages/amqp/method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 535, in on_inbound_method
return self.channels[channel_id].dispatch_method(
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 143, in dispatch_method
listener(*args)
File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 277, in _on_close
raise error_for_code(
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/bin/celery", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/site-packages/celery/__main__.py", line 15, in main
sys.exit(_main())
File "/usr/local/lib/python3.8/site-packages/celery/bin/celery.py", line 213, in main
return celery(auto_envvar_prefix="CELERY")
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/decorators.py", line 21, in new_func
return f(get_current_context(), *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/celery/bin/base.py", line 133, in caller
return f(ctx, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/celery/bin/worker.py", line 346, in worker
worker.start()
File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 208, in start
self.stop(exitcode=EX_FAILURE)
File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 251, in stop
self._shutdown(warm=True)
File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 266, in _shutdown
self.blueprint.stop(self, terminate=not warm)
File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 174, in stop
self.on_stopped()
File "/usr/local/lib/python3.8/site-packages/celery/worker/worker.py", line 162, in on_stopped
self.consumer.shutdown()
File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 370, in shutdown
self.blueprint.shutdown(self)
File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 168, in shutdown
self.send_all(parent, 'shutdown')
File "/usr/local/lib/python3.8/site-packages/celery/bootsteps.py", line 148, in send_all
fun(parent, *args)
File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/connection.py", line 29, in shutdown
ignore_errors(connection, connection.close)
File "/usr/local/lib/python3.8/site-packages/kombu/common.py", line 325, in ignore_errors
return fun(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 375, in release
self._close()
File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 341, in _close
self._do_close_self()
File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 331, in _do_close_self
self.maybe_close_channel(self._default_channel)
File "/usr/local/lib/python3.8/site-packages/kombu/connection.py", line 323, in maybe_close_channel
channel.close()
File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 219, in close
return self.send_method(
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 66, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 86, in wait
self.connection.drain_events(timeout=timeout)
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 523, in drain_events
while not self.blocking_read(timeout):
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 529, in blocking_read
return self.on_inbound_frame(frame)
File "/usr/local/lib/python3.8/site-packages/amqp/method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "/usr/local/lib/python3.8/site-packages/amqp/connection.py", line 535, in on_inbound_method
return self.channels[channel_id].dispatch_method(
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 143, in dispatch_method
listener(*args)
File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 276, in _on_close
self._do_revive()
File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 161, in _do_revive
self.open()
File "/usr/local/lib/python3.8/site-packages/amqp/channel.py", line 432, in open
return self.send_method(
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 66, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/usr/local/lib/python3.8/site-packages/amqp/abstract_channel.py", line 86, in wait
self.connection.drain_events(timeout=timeout)
AttributeError: 'NoneType' object has no attribute 'drain_events'
Do you know how to prevent this? Celery should not break for timeout errors
For me no permissions to exchange (or virtual host) ends up with the behavior above.
@matusvalo I'm able to reproduce the NotFound: Basic.consume: (404) NOT_FOUND - no queue
issue with https://github.com/povilasb/celery-issues/ . Hope that helps :)
I've also created a py-amqp only example to reproduce the issue: https://github.com/povilasb/celery-issues/#py-amqp-example
What happens, I believe, is that RabbitMQ responds with "Channel Close" messagewhen we're trying to publish to non-existing queue. Then [
Channel._on_close()` is called](https://github.com/celery/py-amqp/blob/98f6d364188215c2973693a79e461c7e9b54daef/amqp/channel.py#L142).
Upon reading the docs I tend to think that this may be expected from the AMQP library:
Certain scenarios are assumed to be recoverable ("soft") errors in the protocol. They render the channel closed but applications can open another one and try to recover or retry a number of times. Most common examples are:
* Consuming from a queue that does not exist will fail with a 404 NOT_FOUND error
* Publishing to an exchange that does not exist will fail with a 404 NOT_FOUND error
Anyway, I'm gonna stay away from this issue now since I guess this is more of a Kombu/Celery issue for not reopening a new channel. My issue may not be related to the original one as well.
Steps to reproduce:
test_exc
exchange does not existExample code:
Executing of the script ends up with the following stacktrace:
I was able to replicate this issue on RabbitMQ 3.7.8 and Master branch of py-amqp.
I have found out that connection was set to None in the following line: https://github.com/celery/py-amqp/blob/0e793de205e447d57214b32ea121ff2078c2819d/amqp/connection.py#L464
After inserting breakpoint I have found out the following tracebacks:
From the tracebacks can be seen that:
Close
method to serverCloseOK
. He starts drain_events loop.Close
method instead ofCloseOk
.Open
method (part of_do_revive()
method) and waits forOpenOK
. It starts another (!) drain_events loopCloseOK
method. It clears connection (sets self.channels = None)OpenOk
method. Client receives some method but crashes since Connection.channels == NoneIn general the problem is that client executes
Channel._do_revive()
method even when connection is closing: https://github.com/celery/py-amqp/blob/0e793de205e447d57214b32ea121ff2078c2819d/amqp/channel.py#L276-L280Possible solution is to in general will be roughly like this:
Mark Connection as closing when
Connection.close()
method is called:Channel._do_revive()
:After this fix correct exception is raised:
Moreover does it make sense to revive connection just before raising exception? https://github.com/celery/py-amqp/blob/0e793de205e447d57214b32ea121ff2078c2819d/amqp/channel.py#L276-L280