cloudamqp / amqproxy

An intelligent AMQP proxy, with connection and channel pooling/reusing
https://www.cloudamqp.com
MIT License
342 stars 50 forks source link

Error when consuming (`Error writing to socket: Broken pipe (IO::Error)`) #137

Closed dentarg closed 10 months ago

dentarg commented 11 months ago

While working on a spec for #135 I noticed the following.

Running this code:

require "spec"
require "amqp-client"

describe do
  it "publish and consume works" do
    amqp_url = ENV.fetch("AMQP_URL", "amqp://127.0.0.1:5672")
    queue_name = "amqproxy-test-queue"
    message_text = "message from specs"
    num_received_messages = 0
    num_messages_to_publish = 5

    num_messages_to_publish.times do
      AMQP::Client.start(amqp_url) do |conn|
        channel = conn.channel
        queue = channel.queue(queue_name)
        queue.publish_confirm(message_text)
        puts "published"
      end
      sleep 0.1
    end

    AMQP::Client.start(amqp_url) do |conn|
      channel = conn.channel
      channel.basic_consume(queue_name, tag: "AMQProxy specs") do |msg|
        body = msg.body_io.to_s
        puts "consumed: #{msg.delivery_tag} (#{body})"

        if body == message_text
          channel.basic_ack(msg.delivery_tag)
          num_received_messages += 1
        end
      end
    end

    num_received_messages.should eq num_messages_to_publish
  end
end
$ cat shard.yml
name: amqp-client-test
version: 0.1.0

dependencies:
  amqp-client:
    git: https://github.com/cloudamqp/amqp-client.cr.git
    version: 1.0.11

Against AMQProxy (reproduces with v0.8.12, v0.8.13 hangs after the first publish)

$ AMQP_URL=amqp://127.0.0.1:5673 CRYSTAL_OPTS="--link-flags=-Wl,-ld_classic" crystal pub-sub.cr
published
published
published
published
published
consumed: 1 (message from specs)
consumed: 2 (message from specs)
consumed: 3 (message from specs)
Unhandled exception in spawn(name: AMQPconsumer#AMQProxy specs #0): Error writing to socket: Broken pipe (IO::Error)
  from /usr/local/Cellar/crystal/1.9.2/share/crystal/src/io/evented.cr:82:13 in 'unbuffered_write'
  from /usr/local/Cellar/crystal/1.9.2/share/crystal/src/io/buffered.cr:239:5 in 'flush'
  from lib/amqp-client/src/amqp-client/connection.cr:144:9 in 'write'
  from lib/amqp-client/src/amqp-client/channel.cr:605:7 in 'write'
  from lib/amqp-client/src/amqp-client/channel.cr:44:7 in 'close'
  from lib/amqp-client/src/amqp-client/channel.cr:384:11 in 'consume'
  from lib/amqp-client/src/amqp-client/channel.cr:360:9 in '->'
  from /usr/local/Cellar/crystal/1.9.2/share/crystal/src/fiber.cr:146:11 in 'run'
  from /usr/local/Cellar/crystal/1.9.2/share/crystal/src/fiber.cr:98:34 in '->'
^C

Aborted!
Finished in 3.92 seconds
0 examples, 0 failures, 0 errors, 0 pending

Against RabbitMQ it works well:

$ AMQP_URL=amqp://127.0.0.1:5672 CRYSTAL_OPTS="--link-flags=-Wl,-ld_classic" crystal pub-sub.cr
published
published
published
published
published
consumed: 1 (message from specs)
consumed: 2 (message from specs)
consumed: 3 (message from specs)
consumed: 4 (message from specs)
consumed: 5 (message from specs)
.

Finished in 552.38 milliseconds
1 examples, 0 failures, 0 errors, 0 pending
dentarg commented 11 months ago

This started to happen with v0.8.2, the spec above run fine with versions before.

dentarg commented 11 months ago

I can't reproduce the issue in Ruby (with the Bunny client)

#!/usr/bin/env ruby

$stdout.sync = true
$stderr.sync = true

begin
  require "bunny"
rescue LoadError
  require "bundler/inline"
  gemfile do
    source "https://rubygems.org"
    gem "bunny"
  end
end

puts "Bunny version: #{Bunny::VERSION}"

def connect_and_publish(url:, queue_name:, message:)
  connection = Bunny.new(url, {})
  connection.start
  channel = connection.create_channel
  channel.confirm_select # enable publisher confirms
  queue = channel.queue(queue_name, durable: true)
  queue.publish(%(time="#{Time.now}" message="#{message}"), :key => queue.name)
  channel.wait_for_confirms
  connection.close

  puts "Message published with confirm, connection closed (#{message})"
end

def connect_and_consume(url:, queue_name:)
  puts "Starting connection for consume loop"
  connection = Bunny.new(url, {})
  connection.start
  channel = connection.create_channel
  queue = channel.queue(queue_name, durable: true)
  queue.subscribe(consumer_tag: "foo bar", block: false) do |_delivery_info, _properties, body|
    channel.basic_ack(delivery_info.delivery_tag.to_i)
    puts "Consumed message: #{body}"
  end
  puts "Sleeping..."
  sleep 1
end

url = ENV.fetch("AMQP_URL", "amqp://127.0.0.1:5672")
queue_name = "pub-sub-ruby-test-queue"
message = "hello from ruby pub-sub"

5.times do
  connect_and_publish(url:, queue_name:, message:)
end

connect_and_consume(url:, queue_name:)
$ AMQP_URL=amqp://127.0.0.1:5673 ruby pub-sub.rb
Bunny version: 2.22.0
Message published with confirm, connection closed (hello from ruby pub-sub)
Message published with confirm, connection closed (hello from ruby pub-sub)
Message published with confirm, connection closed (hello from ruby pub-sub)
Message published with confirm, connection closed (hello from ruby pub-sub)
Message published with confirm, connection closed (hello from ruby pub-sub)
Starting connection for consume loop
Sleeping...
Consumed message: time="2023-10-17 21:29:02 +0200" message="hello from ruby pub-sub"
Consumed message: time="2023-10-17 21:29:02 +0200" message="hello from ruby pub-sub"
Consumed message: time="2023-10-17 21:29:02 +0200" message="hello from ruby pub-sub"
Consumed message: time="2023-10-17 21:29:02 +0200" message="hello from ruby pub-sub"
Consumed message: time="2023-10-17 21:29:02 +0200" message="hello from ruby pub-sub"
dentarg commented 11 months ago

Saw the Crystal example work one time 🤯 amqproxy v0.8.12 that I had run the Ruby code above against

$ AMQP_URL=amqp://127.0.0.1:5673 CRYSTAL_OPTS="--link-flags=-Wl,-ld_classic" crystal pub-sub.cr
AMQP::Client::VERSION: 1.0.12
published
published
published
published
published
consumed: 1 (message from specs)
consumed: 2 (message from specs)
consumed: 3 (message from specs)
consumed: 4 (message from specs)
consumed: 5 (message from specs)
.

Finished in 545.47 milliseconds
1 examples, 0 failures, 0 errors, 0 pending
dentarg commented 11 months ago

Related to https://github.com/crystal-lang/crystal/issues/9065?

carlhoerberg commented 10 months ago

basic_consume isnt blocking, so youre disconnecting as soon as you've subscribed, causing a race condition, is that intended?

carlhoerberg commented 10 months ago

In the ruby code youre sleeping 1s after starting to consume, but not in the crystal code

dentarg commented 10 months ago

Thanks, that was not intended and explains the issues.