arobson / rabbot

Deprecated: Please see https://github.com/Foo-Foo-MQ/foo-foo-mq
MIT License
277 stars 129 forks source link

Direct reply-to Support #74

Closed brandonpsmith closed 6 years ago

brandonpsmith commented 7 years ago

There seems to be some code for direct reply-to. It's not working. Here's part of the problem. If you set the replyQueue = "rabbitmq" it never set's the replyQueue to "amq.rabbitmq.reply-to". See the snippet below from "topology.js". Here's the other problem, you have to consume and publish on the same channel for the direct reply-to to work. I tested this by overriding the channel that get's returned for publish. I return the channel that is created for the "amq.rabbitmq.reply-to" queue and it published with no errors. If you do not publish and consume from the same channel you get a RabbitMQ error "operation basic.publish caused a channel exception precondition_failed: "fast reply consumer does not exist".

Bad

if ( _.has( options, "replyQueue" ) ) {
  replyQueueName = options.replyQueue.name || options.replyQueue;
  if ( replyQueueName === false ) {
    this.replyQueue = { name: false };
  } else if ( replyQueueName ) {
    this.replyQueue = userReplyTo;
  } else if ( replyQueueName === "rabbitmq" ) {
    this.replyQueue = rabbitReplyTo;
  }
} else {
  this.replyQueue = autoReplyTo;
}

Good

if ( _.has( options, "replyQueue" ) ) {
  replyQueueName = options.replyQueue.name || options.replyQueue;
  if ( replyQueueName === false ) {
    this.replyQueue = { name: false };
  } else if ( replyQueueName === "rabbitmq" ) {
    this.replyQueue = rabbitReplyTo;
  } else if ( replyQueueName ) {
    this.replyQueue = userReplyTo;
  }
} else {
  this.replyQueue = autoReplyTo;
}
brandonpsmith commented 7 years ago

Here is an example of direct reply-to using amqplib. If you notice the reply consumer and publisher are on the same channel. If you change the reply consumer channel or the publish channel to different channels you'll get the error mentioned above. See the RabbitMQ site https://www.rabbitmq.com/direct-reply-to.html for details. Note that it say's nothing about consuming and publishing on the same channel.

'use strict'

const amqp = require('amqplib');

amqp.connect('amqp://localhost').then((conn) => {
  console.log('connected');

  conn.createConfirmChannel().then((ch1) => {
    conn.createChannel().then((ch2) => {
      ch2.assertExchange('exchange.ex', 'direct', { durable: false, autoDelete: true })
      .then(ch2.assertQueue('queue.q', { durable: false, autoDelete: true }))
      .then(ch2.bindQueue('queue.q', 'exchange.ex'))
      .then(() => {
        let count = 0;

        ch1.consume('amq.rabbitmq.reply-to', (msg) => {
          console.log(`Got Reply: ${msg.content.toString()}`)
        }, { noAck: true }); //must consume with noAck = true

        ch2.consume('queue.q', (msg) => {
          console.log(`Received: ${msg.content.toString()}`);
          ch1.sendToQueue(msg.properties.replyTo, msg.content, {}, (err) => {
            console.log(`Reply: ${msg.content.toString()}`);
          });
        }, { noAck: true });

        setInterval(() => {
          const content = (count++).toString();
          ch1.publish('exchange.ex', '', Buffer.from(content), { replyTo: 'amq.rabbitmq.reply-to' }, (err) => {
            console.log(`Sent: ${content}`);
          });
        }, 1000);
      });
    });
  });
});
arobson commented 6 years ago

@brandonpsmith good catch - I hadn't tested this thoroughly in the past and there was a pretty obvious error in the way I had implemented the check for this.

I hope to get the next version pushed this weekend. There's an integration test using this now that demonstrates the fix. Thanks for finding it.