cloudamqp / amqp-client.rb

Modern AMQP 0-9-1 Ruby client
https://cloudamqp.github.io/amqp-client.rb/
MIT License
19 stars 6 forks source link

Error on failed mandatory publish #28

Closed gottlike closed 1 month ago

gottlike commented 1 month ago

When using the mandatory flag to catch unroutable messages, I'm getting a undefined local variable or method error:

amqp = AMQP::Client.new('amqp://guest:guest@localhost').start
amqp.publish('something', 'amq.topic', 'not.routed', mandatory: true, persistent: true)
#<Thread:0x0000000104cf7ce0 /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client.rb:54 run> terminated with exception (report_on_exception is true):
/Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/channel.rb:525:in `next_message_finished!': undefined local variable or method `msg' for an instance of AMQP::Client::Connection::Channel (NameError)

              warn "AMQP-Client message returned: #{msg.inspect}"
                                                    ^^^
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/channel.rb:508:in `body_delivered'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/connection.rb:396:in `parse_frame'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/connection.rb:198:in `block in read_loop'
        from <internal:kernel>:187:in `loop'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/connection.rb:185:in `read_loop'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client.rb:70:in `block (2 levels) in start'
        from <internal:kernel>:187:in `loop'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client.rb:56:in `block in start'
/Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/channel.rb:525:in `next_message_finished!': undefined local variable or method `msg' for an instance of AMQP::Client::Connection::Channel (NameError)

              warn "AMQP-Client message returned: #{msg.inspect}"
                                                    ^^^
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/channel.rb:508:in `body_delivered'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/connection.rb:396:in `parse_frame'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/connection.rb:198:in `block in read_loop'
        from <internal:kernel>:187:in `loop'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/connection.rb:185:in `read_loop'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client.rb:70:in `block (2 levels) in start'
        from <internal:kernel>:187:in `loop'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client.rb:56:in `block in start'
gottlike commented 1 month ago

Also your Docker example switched the host and container mapping for the volume around, here:

Should be:

docker run --rm -it -p 5672:5672 -p 15672:15672 -v /tmp/amqp:/var/lib/lavinmq cloudamqp/lavinmq

instead of

docker run --rm -it -p 5672:5672 -p 15672:15672 -v /var/lib/lavinmq:/tmp/amqp cloudamqp/lavinmq

gottlike commented 1 month ago

Also, I'm not able to use direct reply-to RPC:

Consumer:

amqp = AMQP::Client.new('amqp://guest:guest@localhost').start

amqp.subscribe('amq.direct.reply-to', no_ack: true) do |msg|
  pp msg

  amqp.publish('Pong!', '', msg.properties.reply_to)
  msg.ack(msg.delivery_tag)
end

sleep

at_exit do
  amqp.stop
  puts 'WORKER - Connection closed'
end

Publisher:

amqp = AMQP::Client.new('amqp://guest:guest@localhost').start

amqp.subscribe('amq.direct.reply-to', no_ack: true) do |msg|
  pp msg
  exit
end

amqp.publish('Ping!', '', '', reply_to: 'amq.direct.reply-to')

sleep

at_exit do
  amqp.stop
  puts 'PUBLISHER - Connection closed'
end
/Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/channel.rb:421:in `block in wait_for_confirms': Channel[1] closed (406) PRECONDITION_FAILED - Direct reply consumer does not exist (0/0) (AMQP::Client::Error::ChannelClosed)
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/channel.rb:418:in `synchronize'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/channel.rb:418:in `wait_for_confirms'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client/channel.rb:303:in `basic_publish_confirm'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client.rb:145:in `block in publish'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client.rb:290:in `with_connection'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/gems/amqp-client-1.1.6/lib/amqp/client.rb:143:in `publish'
        from pub.rb:11:in `<main>'
carlhoerberg commented 1 month ago

Thank you for your reports! Will look into as soon as i can

carlhoerberg commented 1 month ago

Also your Docker example switched the host and container mapping for the volume around, here:

thank you, fixed

carlhoerberg commented 1 month ago

Regarding your RPC example I think your code is wrong. The "consumer" or "server" should subscribe to a real queue, that the publisher/client should publish to. Only the client (publisher in your case) should subscribe to "amq.direct.reply-to".

But there might still be a problem, the RPC client have to publish on the same channel as it consumes, and the "high level client" do all publishes on a seperate channel from it's subscritptions. You can access the normal amqp-client with:

conn = AMQP::Client.new("amqp://guest:guest@localhost").connect

instead of .start, where you have direct control over channels.

carlhoerberg commented 1 month ago

You can use the main branch locally by putting

gem "amqp-client", github: "cloudamqp/amqp-client.rb"

in your Gemfile and bundle update

gottlike commented 1 month ago

Thank you for your quick response and fixes!

I tried getting this to work with your suggestions, but am currently stuck again:

Server:

worker_queue = 'test'
conn = AMQP::Client.new('amqp://guest:guest@localhost').connect
conn.with_channel do |ch|
  ch.basic_consume(worker_queue) do |msg|
    pp msg
    ch.basic_ack(msg.delivery_tag)
    ch.basic_publish_confirm('Pong!', '', msg.properties.reply_to, mandatory: false)
  end

  sleep
end

at_exit do
  conn.close
  puts 'WORKER - Connection closed'
end

Client:

worker_queue = 'test'
conn = AMQP::Client.new('amqp://guest:guest@localhost').connect
conn.with_channel do |ch|
  ch.basic_consume('amq.direct.reply-to', no_ack: true) do |msg|
    pp msg
  end

  ch.basic_publish_confirm('Ping!', '', worker_queue, reply_to: 'amq.direct.reply-to', mandatory: true)

  sleep
end

at_exit do
  conn.close
  puts 'PUBLISHER - Connection closed'
end
#<Thread:0x0000000104277428 /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/bundler/gems/amqp-client.rb-4626b254dcee/lib/amqp/client/channel.rb:327 run> terminated with exception (report_on_exception is true):
/Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/bundler/gems/amqp-client.rb-4626b254dcee/lib/amqp/client/channel.rb:421:in `block in wait_for_confirms': Channel[1] closed (406) PRECONDITION_FAILED - unknown delivery tag 1 (60/80) (AMQP::Client::Error::ChannelClosed)
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/bundler/gems/amqp-client.rb-4626b254dcee/lib/amqp/client/channel.rb:418:in `synchronize'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/bundler/gems/amqp-client.rb-4626b254dcee/lib/amqp/client/channel.rb:418:in `wait_for_confirms'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/bundler/gems/amqp-client.rb-4626b254dcee/lib/amqp/client/channel.rb:303:in `basic_publish_confirm'
        from worker.rb:15:in `block (2 levels) in <main>'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/bundler/gems/amqp-client.rb-4626b254dcee/lib/amqp/client/channel.rb:555:in `consume_loop'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/bundler/gems/amqp-client.rb-4626b254dcee/lib/amqp/client/channel.rb:327:in `block (2 levels) in basic_consume'
^Cworker.rb:18:in `sleep': Interrupt
        from worker.rb:18:in `block in <main>'
        from /Users/erik/.rbenv/versions/3.3.1/lib/ruby/gems/3.3.0/bundler/gems/amqp-client.rb-4626b254dcee/lib/amqp/client/connection.rb:111:in `with_channel'
        from worker.rb:11:in `<main>'

Unfortunately the docs are not great, so I can't quite figure it out myself.

@carlhoerberg Can you point me in the right direction, or give a working example?

gottlike commented 1 month ago

I found the mistake:

Instead of:

ch.basic_ack(msg.delivery_tag)
ch.basic_publish_confirm('Pong!', '', msg.properties.reply_to, mandatory: false)

I need to do this:

ch.basic_publish_confirm('Pong!', '', msg.properties.reply_to, mandatory: false)
ch.basic_ack(msg.delivery_tag)