cloudamqp / amqp-client.cr

An AMQP 0-9-1 client for Crystal
https://cloudamqp.github.io/amqp-client.cr/
MIT License
69 stars 10 forks source link

Event based loop #7

Closed bararchy closed 4 years ago

bararchy commented 4 years ago

Looking at #5 , I still can't figure out how to make an Event based loop around consuming incoming messages.

It seems that the whole subscribe function isn't actually re-looping to receive a new message but just blocks until a message arrives and then ends.

Any suggestions? maybe I'm using the wrong methods?

carlhoerberg commented 4 years ago

yes it is. show me your code

bararchy commented 4 years ago

Sure, So the basic logic looks like this:

AMQP::Client.start("amqp://my_url") do |client|
        client.channel do |channel|
          request_queue = channel.queue("requests")
          response_queue = channel.queue("responses")
          request_queue.subscribe(no_ack: true, block: true) do |msg|
            spawn { message_handler(message: msg, queue: response_queue) }
          end
        end
      end

And I have another one calling to it with

AMQP::Client.start("amqp://my_url") do |client|
      client.channel do |channel|
        request_queue = channel.queue("requests")
        response_queue = channel.queue("responses")

        message = "my message"
        request_queue.publish(message)

        response_queue.subscribe(no_ack: true, block: true) do |msg|
          puts "Got back message!"
        end
      end
    end

But both are just stuck, the weird thing is, if I add sleep for some sec it seems to work, almost like the block produces a tight loop logic that won't return the Fiber

bararchy commented 4 years ago

btw, looking at the code, queue subscribe just calls to https://github.com/cloudamqp/amqp-client.cr/blob/master/src/amqp-client/channel.cr#L255-L266 which consume once, there is no loop logic at any of those, unless I'm missing something.

carlhoerberg commented 4 years ago

A general Crystal thing, don't do:

spawn { message_handler(message: msg, queue: response_queue) }

instead do:

spawn message_handler(message: msg, queue: response_queue)

it has to do with variable binding.. See https://crystal-lang.org/reference/guides/concurrency.html#spawning-a-call

carlhoerberg commented 4 years ago

I just verified

AMQP::Client.start("amqp://guest:guest@localhost") do |c|
  c.channel do |ch|
    q = ch.queue("my-queue")

    q.publish "msg 1"
    q.publish "msg 2"

    q.subscribe(no_ack: true, block: true) do |msg|
      puts msg.body_io
    end
  end
end

and it outputs

msg 1
msg 2
carlhoerberg commented 4 years ago

basic_consume is not a loop, and that's on purpose. The block you see will only return when the consumer is cancelled. It's this code that calls the block that you've provided: https://github.com/cloudamqp/amqp-client.cr/blob/master/src/amqp-client/channel.cr#L144 and it's done when the client receivies a new message from the server. Which is handled here: https://github.com/cloudamqp/amqp-client.cr/blob/master/src/amqp-client/channel.cr#L101, and Channels are sent frames from the connection loop here: https://github.com/cloudamqp/amqp-client.cr/blob/master/src/amqp-client/connection.cr#L95

carlhoerberg commented 4 years ago

my best guess it that that you have another fiber somewhere that blocks crystals whole eventloop so that no other fiber can run.

bararchy commented 4 years ago

Managed to get it work by breaking out the block parts, thanks for the help @carlhoerberg