cloudamqp / amqp-client.rb

Modern AMQP 0-9-1 Ruby client
https://cloudamqp.github.io/amqp-client.rb/
MIT License
19 stars 6 forks source link

RPC setup fails with higher concurrency #31

Closed gottlike closed 3 weeks ago

gottlike commented 3 weeks ago

Issue

I'm close to using LavinMQ as my replacement for REST, but ran into a blocker. I'm getting this error in the worker.rb process (see below), when doing concurrent requests:

#<Thread:0x00007f393670c738 /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/channel.rb:327 run> terminated with exception (report_on_exception is true):
worker.rb:95:in `rescue in send': WorkerClient.send > Connection closed (504) CHANNEL_ERROR - second 'channel.open' seen (20/10) (RuntimeError)
        from worker.rb:59:in `send'
        from worker.rb:31:in `block in start'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/channel.rb:555:in `consume_loop'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/channel.rb:327:in `block (2 levels) in basic_consume'
/var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/connection.rb:180:in `rescue in write_bytes': Connection closed (504) CHANNEL_ERROR - second 'channel.open' seen (20/10) (AMQP::Client::Error::ConnectionClosed)
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/connection.rb:175:in `write_bytes'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/channel.rb:541:in `write_bytes'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/channel.rb:49:in `open'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/connection.rb:102:in `channel'
        from worker.rb:64:in `send'
        from worker.rb:31:in `block in start'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/channel.rb:555:in `consume_loop'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/channel.rb:327:in `block (2 levels) in basic_consume'
/var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/connection.rb:177:in `write': closed stream (IOError)
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/connection.rb:177:in `block in write_bytes'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/connection.rb:176:in `synchronize'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/connection.rb:176:in `write_bytes'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/channel.rb:541:in `write_bytes'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/channel.rb:49:in `open'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/connection.rb:102:in `channel'
        from worker.rb:64:in `send'
        from worker.rb:31:in `block in start'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/channel.rb:555:in `consume_loop'
        from /var/lib/gems/3.1.0/gems/amqp-client-1.1.7/lib/amqp/client/channel.rb:327:in `block (2 levels) in basic_consume'

Reproduction

client.rb

# frozen_string_literal: true

$stdout.sync = true

require 'bundler/setup'
require 'amqp-client'
require 'json'
require 'parallel'
require 'zlib'

Signal.trap('INT') do
  puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIGINT, shutting down"
  exit
end

Signal.trap('TERM') do
  puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIGTERM, shutting down"
  exit
end

module Client
  @conn = AMQP::Client.new('amqp://guest:guest@localhost').connect

  def send(data:, queue:)
    mutex = Mutex.new
    cv = ConditionVariable.new
    response = nil

    ch = @conn.channel
    ch.basic_consume('amq.direct.reply-to', no_ack: true, worker_threads: 1) do |msg|
      msg.body = Zlib.gunzip(msg.body) if msg.properties.content_encoding == 'gzip'
      msg.body = JSON.parse(msg.body) if msg.properties.content_type == 'application/json'

      mutex.synchronize do
        response = msg.body
        cv.signal
      end
    end

    ch.basic_publish_confirm(
      Zlib.gzip(data),
      '',
      queue,
      reply_to: 'amq.direct.reply-to',
      content_type: 'application/json',
      content_encoding: 'gzip',
      persistent: true,
      mandatory: true,
      timestamp: Time.now.to_i
    )

    mutex.synchronize do
      cv.wait(mutex)
    end

    ch.close

    response.transform_keys(&:to_sym)
  rescue StandardError => e
    raise("#{name}.#{__method__} > #{e.message}")
  end

  class << self
    include Client
  end
end

start = Time.now

array = 150.times.collect { '{"msg":"Ping!"}' }
responses = Parallel.map(array, in_threads: 100) do |data|
  Client.send(data:, queue: 'test.queue')
end

puts "Finished all requests at rate of #{(responses.length / (Time.now - start)).round} rps"
puts responses.last.to_json

worker.rb

# frozen_string_literal: true

$stdout.sync = true

require 'bundler/setup'
require 'amqp-client'
require 'json'
require 'zlib'

Signal.trap('INT') do
  puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIGINT, shutting down"
  exit
end

Signal.trap('TERM') do
  puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIGTERM, shutting down"
  exit
end

module Worker
  @conn = AMQP::Client.new(ENV.fetch('MQ_URL', nil), connection_name: 'Worker').connect

  def start(threads: 0)
    ch = @conn.channel
    ch.queue_declare(ENV.fetch('MQ_QUEUE', nil))

    ch.basic_consume(ENV.fetch('MQ_QUEUE', nil), no_ack: false, worker_threads: threads) do |msg|
      msg.body = Zlib.gunzip(msg.body) if msg.properties.content_encoding == 'gzip'
      msg.body = JSON.parse(msg.body) if msg.properties.content_type == 'application/json'

      data = WorkerClient.send(data: msg.body, queue: 'test.rpc.queue').to_json

      ch.basic_publish(
        Zlib.gzip(data),
        '',
        msg.properties.reply_to,
        content_type: 'application/json',
        content_encoding: 'gzip',
        persistent: true,
        timestamp: Time.now.to_i
      )

      msg.ack
    end

    sleep
  rescue StandardError => e
    raise("#{name}.#{__method__} > #{e.message}")
  end

  class << self
    include Worker
  end
end

module WorkerClient
  @conn = AMQP::Client.new(ENV.fetch('MQ_URL', nil), connection_name: 'WorkerClient').connect

  def send(data:, queue:)
    mutex = Mutex.new
    cv = ConditionVariable.new
    response = nil

    ch = @conn.channel
    ch.basic_consume('amq.direct.reply-to', no_ack: true, worker_threads: 1) do |msg|
      msg.body = Zlib.gunzip(msg.body) if msg.properties.content_encoding == 'gzip'
      msg.body = JSON.parse(msg.body) if msg.properties.content_type == 'application/json'

      mutex.synchronize do
        response = msg.body
        cv.signal
      end
    end

    ch.basic_publish_confirm(
      Zlib.gzip(data.to_json),
      '',
      queue,
      reply_to: 'amq.direct.reply-to',
      content_type: 'application/json',
      content_encoding: 'gzip',
      persistent: true,
      mandatory: true,
      timestamp: Time.now.to_i
    )

    mutex.synchronize do
      cv.wait(mutex)
    end

    ch.close

    response.transform_keys(&:to_sym)
  rescue StandardError => e
    raise("#{name}.#{__method__} > #{e.message}")
  end

  class << self
    include WorkerClient
  end
end

Worker.start(threads: 100)

worker_rpc.rb

# frozen_string_literal: true

$stdout.sync = true

require 'bundler/setup'
require 'amqp-client'
require 'json'
require 'zlib'

Signal.trap('INT') do
  puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIGINT, shutting down"
  exit
end

Signal.trap('TERM') do
  puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIGTERM, shutting down"
  exit
end

module Worker
  @conn = AMQP::Client.new(ENV.fetch('MQ_URL', nil), connection_name: 'Worker').connect

  def start(threads: 0)
    ch = @conn.channel
    ch.queue_declare(ENV.fetch('MQ_QUEUE', nil))

    ch.basic_consume(ENV.fetch('MQ_QUEUE', nil), no_ack: false, worker_threads: threads) do |msg|
      msg.body = Zlib.gunzip(msg.body) if msg.properties.content_encoding == 'gzip'
      msg.body = JSON.parse(msg.body) if msg.properties.content_type == 'application/json'

      data = { msg: 'Pong!' }.to_json

      ch.basic_publish(
        Zlib.gzip(data),
        '',
        msg.properties.reply_to,
        content_type: 'application/json',
        content_encoding: 'gzip',
        persistent: true,
        timestamp: Time.now.to_i
      )

      msg.ack
    end

    sleep
  rescue StandardError => e
    raise("#{name}.#{__method__} > #{e.message}")
  end

  class << self
    include Worker
  end
end

Worker.start(threads: 100)

Steps to reproduce (start everything in new tabs):

  1. docker run --rm -p 5672:5672 -p 15672:15672 cloudamqp/lavinmq
  2. MQ_URL='amqp://guest:guest@localhost' MQ_QUEUE='test.queue' ruby worker.rb
  3. MQ_URL='amqp://guest:guest@localhost' MQ_QUEUE='test.rpc.queue' ruby worker_rpc.rb
  4. ruby client.rb

If you change the 150.times.collect { '{"msg":"Ping!"}' } to 2.times.collect { '{"msg":"Ping!"}' } it works.

Interestingly this does not fail on my M1 MacBook, even with higher numbers. But on a server with weaker shared CPUs it does so, starting at only 50 threads.

gottlike commented 3 weeks ago

Another detail: Even if I don't get the channel errors, I'm seeing a few "un-acked" messages in the queue, which block the client.rb process indefinitely. But not so for e.g. 10 threads or less.

gottlike commented 3 weeks ago

FYI: There's also a related chat on the 84codes Slack: https://app.slack.com/client/T01K2BK0K6G/C01K2BK1PT6

spuun commented 3 weeks ago

The library isn't thread safe and the problem here is that two threads are trying to open a channel at the same time and the same channel id is used.

dentarg commented 3 weeks ago

@spuun the README here says "fully thread-safe" so this is a bug that we should address? Not "close as completed"

gottlike commented 3 weeks ago

I agree.. this should be reopened as a bug, especially since the RPC feature (which is advertised as a main feature on the LavinMQ homepage) is unusable without implementing special handling for this.

carlhoerberg commented 3 weeks ago

Please take a look at https://github.com/cloudamqp/amqp-client.rb/pull/30 which implements a RPC server and client directly into the lib.

spuun commented 3 weeks ago

I was too quick here.