Closed jjrussell closed 9 years ago
Hmm, thanks for reporting this.
/cc @calj - any thoughts?
Working around this by setting auto_recovery on the channel to false, recovering the connection and just creating new channel, queues and consumers in the Connection.on_recovery callback.
Running into this as well.
Me too :/
So, reading through the code...
https://github.com/ruby-amqp/amqp/blob/d1978ceba61e5be3b83629db1a3e67fb0a04788f/lib/amqp/channel.rb#L313 https://github.com/ruby-amqp/amqp/blob/d1978ceba61e5be3b83629db1a3e67fb0a04788f/lib/amqp/queue.rb#L374
And corresponding patch in synchrony: https://github.com/igrigorik/em-synchrony/blob/master/lib/em-synchrony/amqp.rb#L165
Do we need to patch rebind at all?
The problem is that in the context of auto recovery, these methods get called in the root Fiber, hence the error. I tried it without rebind and the without rebind nor bind and it's still the same. I am gonna try to wrap my head around this call stack but I would appreciate some help :)
@igrigorik when you say "Do we need to patch rebind at all" do you mean we can take it out of the list of methods that the em-synchrony amqp code overrides and just let the base amqp code handle it? If so that seems reasonable. I'd be ok if recovery happened without any fibers but I'm not sure what the side effects of that would be. Is rebind used elsewhere where it would need to be wrapped for synchrony?
The rebind method is not really asynchronous (the callback is ignored): https://github.com/ruby-amqp/amqp/blob/d1978ceba61e5be3b83629db1a3e67fb0a04788f/lib/amqp/queue.rb#L366
You should try to just remove "rebind" from the list here: https://github.com/igrigorik/em-synchrony/blob/master/lib/em-synchrony/amqp.rb#L165
Yes, what @calj suggested.. Just try removing rebind from the list of patched methods, and see if that "solves it".
I am sorry if I wasn't clear in my last comment but I did try already to remove rebind
and it didn't help. The same error is thrown by bind
which I then also removed but to no avail either.
A working option is to keep the sync wrapper but force a secondary fiber:
module EventMachine::Synchrony::AMQP
class Queue
def rebind
Fiber.new do
arebind &EM::Synchrony::AMQP.sync_cb(Fiber.current)
end.resume
end
end
end
Using the synchrony AMQP connection with a channel auto_recovery set to true. I can bring up the connection and receive message successfully. When I bring down the message bus the connection.on_connection_interrupted event handler is called correctly.
However, when I bring the broker back up and the queue attempts to auto_recover it calls queue.rebind which is monkey patched in em-synchrony/amqp.rb to call the superclass implementation inside EM::Synchrony::AMQP.sync.
Since the callback happens as a result of an EM recieve_data call which is not run on a Fiber, the rebind call happens on the root fiber, AMQP::sync attempts to yield its fiber and you get the "can't yield from root fiber" error. Below is the stack trace of the error that happens after I bring the broker back up and auto_recovery happens.
My question is where is it possible to wrap this call in a Fiber? None of my code is actually called from the EM.recieve_data method so I'm not sure what to wrap.
Thanks a lot.