ruby-amqp / amqp

EventMachine-based RabbitMQ client. Prefer Bunny: http://rubybunny.info. See documentation guides at http://ruby-amqp.github.io/amqp/.
http://ruby-amqp.github.io/amqp/
632 stars 143 forks source link

Reconnection doesn't always succeed (or perhaps restoring of queues etc.) #103

Closed johnae closed 13 years ago

johnae commented 13 years ago

It seems that reconnection isn't successful at all times. I've seen it reconnect sometimes and sometimes not (because I get no "ping" message - see below code). I'm on rabbitmq-server 2.5.1 on Mac OS X Lion (installed rabbitmq-server through brew). I'm using ruby-amqp 0.8.0rc14 with amq-client 0.8.1 (where TLS reconnection was fixed just recently). At first I thought this problem might have been a TLS-one but then I started testing without TLS and got the same result. Try below code:

require "amqp"
args = ['amqp://192.168.1.142:5672']
puts "=> Example of automatic AMQP channel and queues recovery"
puts
AMQP.start(*args) do |connection, open_ok|
  connection.on_error do |ch, connection_close|
    raise connection_close.reply_text
  end

  ch1 = AMQP::Channel.new(connection, auto_recovery: true)
  ch1.on_error do |ch, channel_close|
    raise channel_close.reply_text
  end

  if ch1.auto_recovering?
    puts "Channel #{ch1.id} IS auto-recovering"
  end

  connection.on_tcp_connection_loss do |conn, settings|
    puts "[network failure] Trying to reconnect..."
    conn.reconnect(false, 2)
  end

  ping_exchange = ch1.fanout('pingie.ping')
  ping_queue = ch1.queue("", exclusive: true, auto_delete: true).bind(ping_exchange)
  ping_queue.subscribe do |metadata, payload|
    puts "Got ping: #{payload.inspect}"
  end

  EM.add_periodic_timer(5) do
    puts "send ping"
    ping_exchange.publish("ping")
  end

  show_stopper = Proc.new {
    connection.disconnect { puts "Disconnected. Exiting..."; EventMachine.stop }
  }

  Signal.trap "TERM", show_stopper
  Signal.trap "INT",  show_stopper

end

then in terminal stop the rabbitmq-server, sleep 5, start the rabbitmq-server again. You should see no pings being received (perhaps not the first time - this is a "sometimes"-problem unfortunately). Try it a few times at least. Hopefully there's something wrong with my code above but I can't see what that would be (I've seen this problem in an app I'm writing as well).

Oh, by the way, the reconnection is probably successful but the restoring of queues etc. probably isn't. That's the feeling I get anyway.

michaelklishin commented 13 years ago

We have test examples that kill RabbitMQ, then reconnect, then send messages. The problem you are seeing is that after reconnection, publisher is not recovered fully. This is a known limitation and I was not going to implement it. Automatic recovery mode only covers consumers right now because publishers have to employ special techniques to detect connection failure early and buffer messages. Adding this to stock AMQP::Exchange will entail a lot of overhead.

michaelklishin commented 13 years ago

What is going on with exchanges must be this:

So I guess we need to guarantee it is closed but then you will get exceptions. Try using channel.fanout('pingie.ping'), channel caches should be updated after recovery.

This is the problem with OO languages and automatic recovery: it is easy to keep track of queues/bindings/exchanges/consumers and recover them, but very difficult to update all objects that point to the old connection. With Clojure the opposite is generally true.

johnae commented 13 years ago

I see, so it was my understanding of recovery then. Good to know there WAS something wrong with my code. Thanks for the help!

johnae commented 13 years ago

So am I supposed to do something like this then(for all exchanges I've declared):

connection.on_recovery do
  @some_exchange.redeclare
end
michaelklishin commented 13 years ago

@johnae the problem is with the object, it holds reference to the old connection. Exchange#redeclare will simply send exchange.declare AMQP method to the broker. It won't update object graph in any way.

johnae commented 13 years ago

Ok I'm starting to get it I think. Still I'm not entirely sure how to go about it. I'm thinking perhaps something more like this then?

def recover
  @ping_queue.bind(@channel.fanout(PING_EXCHANGE_NAME))
  @producer = Producer.new(@channel, @channel.default_exchange)
  @some_other_queue.bind(@channel, @channel.topic('whatevertopic'))
end

I've been playing around with it but couldn't get it working in my code though - am I on the right track to recreate classes and rebind queues like this? Sorry for continuing this discussion but I'm still wrapping my head around amqp...

michaelklishin commented 13 years ago

Yes, you are on the right track. I think you can use #auto_recover on many entities, this is what the automatic recovery mode uses. It is an interesting problem to tackle.

johnae commented 13 years ago

What seem to work is to simply recreate my whole object graph and not using channel.auto_recover - that's working for me anyway so right now I'm doing:

channel  = AMQP::Channel.new(connection)#, auto_recovery: true)
director = Director.new(channel, delegate: DirectorDelegate.new, persistent: false, interval: 15)

connection.on_recovery do
  director.stop
  channel  = AMQP::Channel.new(connection)#, auto_recovery: true)
  director = Director.new(channel, delegate: DirectorDelegate.new, persistent: false, interval: 15)
  GC.start
end

I guess that's one way of doing it though with a bit of overhead... hopefully it won't have to reconnect that often. As you can see I tried using auto_recover but that didn't do it for me. In some other simpler applications I've used it successfully though (they're just pushing data really - this one's doing more than that and keeps tabs on "agents" on many hosts).

michaelklishin commented 13 years ago

I agree that automatic recovery does not cover 100%. I tried to reflect this in the documentation. We will see if it can be improved but for consumers it works very well, judging from my personal experience. Producers side of things needs more work.