ruby-amqp / amqp

EventMachine-based RabbitMQ client. Prefer Bunny: http://rubybunny.info. See documentation guides at http://ruby-amqp.github.io/amqp/.
http://ruby-amqp.github.io/amqp/
632 stars 143 forks source link

Multiple server-named queues after auto_recovery #204

Open tht opened 10 years ago

tht commented 10 years ago

I'm using amqp-1.5 on Linux and the auto-recovery feature works great with named queues. But I experience some issues with server-named queues.

Steps to reproduce:

On every RabbitMQ restart I get more and more calls to the queue.on_recovery callback and the number of queues on the server increases.

Closing the client removes all the server-named queues on RabbitMQ.

I used this test script:

require 'rubygems'
require 'amqp'

config = { :host => 'localhost', :username => 'guest', :password => 'guest' }

EventMachine.run do
  conn = AMQP.connect(@config) do |client|
    puts "Initial connect."

    conn.on_recovery do
      puts "Connection restored"
    end

    conn.on_tcp_connection_loss do |conn, settings|
      puts "[network failure] Trying to reconnect..."
      conn.reconnect(false, 2)
    end

    # Connection up, connect channel, use auto_recovery
    channel = AMQP::Channel.new(conn)
    channel.auto_recovery = true

    AMQP::Queue.new(channel, "", :auto_delete => true, :exclusive => true) do |queue, declare_ok|
      puts "#{queue.name} is ready to go. AMQP method: #{declare_ok.inspect}"

      queue.subscribe(:ack => false) do |header,data|
        puts "queue.subscribe called with message #{data}"
      end

      queue.on_recovery do |recQueue|
        puts "queue.on_recovery called for queue #{recQueue} named #{recQueue.name}"
      end
    end
  end
end

Output while running:

$ ruby reconnect_test.rb
Initial connect.
amq.gen-dXrBy45YS1hZD8ffThK_RA is ready to go. AMQP method: #<AMQ::Protocol::Queue::DeclareOk:0x7f6a0b4aff98 @queue="amq.gen-dXrBy45YS1hZD8ffThK_RA", @consumer_count=0, @message_count=0>

// RabbitMQ restarted here...

[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
Connection restored
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-D7mIbnnR2jmyvnhKRb6r0g
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-bylghjGFgEPcLkoj8fIE8A

// RabbitMQ restarted here...

[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
Connection restored
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-bylghjGFgEPcLkoj8fIE8A
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-bylghjGFgEPcLkoj8fIE8A
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-K4pZgJCMOXxzgeTVcZfywg
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-K4pZgJCMOXxzgeTVcZfywg

// RabbitMQ restarted here...

[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
[network failure] Trying to reconnect...
Connection restored
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-K4pZgJCMOXxzgeTVcZfywg
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-K4pZgJCMOXxzgeTVcZfywg
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-K4pZgJCMOXxzgeTVcZfywg
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-w3zp3WJlDS8fGLGYry3UEw
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-w3zp3WJlDS8fGLGYry3UEw
queue.on_recovery called for queue #<AMQP::Queue:0x7f9cd2c97428> named amq.gen-w3zp3WJlDS8fGLGYry3UEw

For me it looks like queue.on_recovery get's called for the old (now invalid) queue name and once for the new one. Additionally there's one more queue after every RabbitMQ restart. Maybe two different issues or just me doing something wrong...

Thanks, Thomas

michaelklishin commented 10 years ago

OK, thanks for the info about how to reproduce it. Your hypothesis makes sense, I'll take a look at it when I have time.

tht commented 10 years ago

I think I know why the queue.on_recovery get's called twice. Checking the execution stack shows two distinct stacks.

The first call is from the session restore:

/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `exec_callback_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `each'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `exec_callback_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/queue.rb:1258:in `run_after_recovery_callbacks'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/channel.rb:1274:in `run_after_recovery_callbacks'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/channel.rb:1274:in `each'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/channel.rb:1274:in `run_after_recovery_callbacks'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:836:in `run_after_recovery_callbacks'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:836:in `each'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:836:in `run_after_recovery_callbacks'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:858:in `start_automatic_recovery'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:531:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:531:in `register_connection_callback'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/em/deferrable.rb:151:in `call'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/em/deferrable.rb:151:in `set_deferred_status'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/em/deferrable.rb:191:in `succeed'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:682:in `connection_successful'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:1040:in `handle_open_ok'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:1136
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:959:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:959:in `receive_frameset'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:933:in `receive_frame'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:671:in `receive_data'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/eventmachine.rb:187:in `run_machine'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/eventmachine.rb:187:in `run'
reconnect_test.rb:6

The second one from the queue restore:

/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `exec_callback_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `each'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:51:in `exec_callback_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/queue.rb:1217:in `auto_recover'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:59:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:59:in `exec_callback_once_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:59:in `each'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/callbacks.rb:59:in `exec_callback_once_yielding_self'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/queue.rb:928:in `handle_declare_ok'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/queue.rb:1321
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:959:in `call'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:959:in `receive_frameset'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:933:in `receive_frame'
/usr/lib64/ruby/gems/1.8/gems/amqp-1.5.0/lib/amqp/session.rb:671:in `receive_data'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/eventmachine.rb:187:in `run_machine'
/usr/lib64/ruby/gems/1.8/gems/eventmachine-1.0.3/lib/eventmachine.rb:187:in `run'
reconnect_test.rb:6

The first call (dispatched through the session-object) is kinda useless for a queue as the queue is not actually restored (still old, now invalid queue name). The second call is "the good one".

The first one is defined in channel.rb. I've removed the line '@queues.each' which clearly improves the situation.

# @private
def run_after_recovery_callbacks
  self.exec_callback_yielding_self(:after_recovery)

  ## REMOVED ## @queues.each    { |name, q| q.run_after_recovery_callbacks }
  @exchanges.each { |name, e| e.run_after_recovery_callbacks }
end

Now I no longer see the queue.on_recovery calls with the old queue name. I've retested with named queues. They still recover without problems too.

This does not solve the second problem (one additional server named queue on every RabbitMQ restart). I'm still trying to figure out where this one gets created.

tht commented 10 years ago

Sorry for spamming messages here. I'm diving deeper and found something else slightly related to this issue.

In queue.rb

def auto_recover
  self.exec_callback_yielding_self(:before_recovery)

  if self.server_named?
    old_name = @name.dup
    @name    = AMQ::Protocol::EMPTY_STRING

    #@channel.queues.delete(old_name)
  end

@channel.queues contains queue-objects. Trying to delete the name of an old queue is useless, as it will not match the string to a queue object. Also it's not needed, there is no duplicate in @channel.queues.