mateodelnorte / servicebus

Simple service bus for sending events between processes using amqp.
MIT License
420 stars 66 forks source link

Leveraging RabbitMQ Delayed Message Plugin #102

Open allenhartwig opened 7 years ago

allenhartwig commented 7 years ago

I can't seem to figure out how to pass it the proper options to get the Delayed Message Plugin to work properly when using servicebus.

My current attempt is as follows:

Consumer:

  const options = {
      ack: true,
      queueOptions: {
        exchangeOptions: {
          type: 'x-delayed-message',
          'x-delayed-type': 'direct'
        }
      }
    };
    bus.listen(eventName, options, handlerFunc);

Producer:

const options = {
    ack: true,
    headers: {
      'x-delay': 1000
    }
  }
bus.send(eventName, message, options);

The message sends and is received by the listener immediately (~100ms).

I'm wondering if I am not providing the information correctly on the headers on the send or with exchangeOptions on the listen. Or perhaps servicebus just doesn't support propagating these settings into the RabbitMQ queue/message properly.

Any help is appreciated.

mateodelnorte commented 7 years ago

That looks like it should work. servicebus should be passing any of those options on to amqp.node which allows for a number of properties on the message, including a headers object: http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish.

I've got a unit test for exactly this: https://github.com/mateodelnorte/servicebus/blob/master/test/properties.js#L23-L33

allenhartwig commented 7 years ago

Yeah, I had a feeling the headers were ok, as thats pretty well documented in your tests.

Applying this was less clear:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);

It looks like exchangeOptions were passed in to the queue (at least on subscribe, it was less clear on listen when tracing your code), but I wasn't sure how to best set the args specified here for the exchange option.

mateodelnorte commented 7 years ago

can you not just pass those options into the bus() function?

const bus = require('servicebus').bus(options)
allenhartwig commented 7 years ago

I'm not seeing how exchangeOptions is leveraged here: https://github.com/mateodelnorte/servicebus/blob/master/bus/rabbitmq/bus.js#L21

The options object is never added as a whole to this for future reference, nor is the exchangeOptions prop directly referenced.

allenhartwig commented 7 years ago

I tested it though and still did not get a delay applied: const bus = servicebus.bus({url: rabbitmqUrl, exchangeOptions: {type: 'x-delayed-message'}});

allenhartwig commented 7 years ago

I found this gist on how to setup ampqlib appropriately to utilize the plugin: https://gist.github.com/mfressdorf/f46fdf266f35d8c525aea16719f837ac

I'm not sure that servicebus will propagate all of these values to ch.assertExchange

allenhartwig commented 7 years ago

Never mind. It looks like its addressed here: https://github.com/mateodelnorte/servicebus/blob/cb0729f1b9ba5a64fda0a4846a2b8e29275a6b3b/bus/rabbitmq/pubsubqueue.js#L45

As long as exchangeOptions is available it should work. This leads me back to my origin concern of: https://github.com/mateodelnorte/servicebus/issues/102#issuecomment-314528720

allenhartwig commented 7 years ago

This PR ensures that the exchangeOptions and exchangeName are applied when the channel is setup: https://github.com/mateodelnorte/servicebus/pull/103

I still am having issues getting the delay to work properly, however. I have been able to get it to work properly with amqplib directly, and after analyzing the RabbitMQ setup, it appears the issue revolves around queue bindings. When using servicebus, there are no bindings created.

mateodelnorte commented 7 years ago

yep. you're right. must have been an oversight of switching from amqplib to node-amqp (done because amqplib opens a channel for every consumer, whereas I wanted one for send and one for receive).

Care to make a PR?