amqp-node / amqplib

AMQP 0-9-1 library and client for Node.JS
https://amqp-node.github.io/amqplib/
Other
3.69k stars 474 forks source link

How to know if message is published ? #751

Closed SabinShree closed 9 months ago

SabinShree commented 9 months ago

I have tested some of the methods and callback in direct exchange type and it is not returning the correct status.

const amqplib = require('amqplib');
const { v4: uuidv4 } = require('uuid');

const args = process.argv.slice(2);

if (args.length == 0) {
    console.log("Usage: rpc_client.js num");
    process.exit(1);
}

const num = args;
const uuid = uuidv4();

async function publishMessage(channel, exchange, routingKey, message) {
    channel.publish(exchange, 'random_route_key', Buffer.from(message), {
        persistent: true,
        mandatory: true,
    });
    try {
        await channel.waitForConfirms();
    } catch (error) {
        console.error(`Error publishing message: ${error}`);
    }
}

const startPublisher = async () => {
    const connection = await amqplib.connect('amqp://localhost');
    const channel = await connection.createConfirmChannel();
    const exchange = 'direct_exchange_1';
    const routingKey = 'routing_key';

    await channel.assertExchange(exchange, 'direct', { durable: true });
    const queueName = "request_queue";
    await channel.assertQueue(queueName, { durable: true, autoDelete: false, });
    const responseQueue = await channel.assertQueue('', { exclusive: true, durable: true, autoDelete: false });

    console.log(' [x] Requesting fib(%s)', num.join(' '));

    await publishMessage(channel, exchange, routingKey, num);

    channel.consume(responseQueue.queue, (msg) => {
        if (msg.properties.correlationId == uuid) {
            console.log(' [.] Got %s', msg.content.toString());
        }
    }, { noAck: true });

};

startPublisher();

This is one code I have tested but if the routing key doesn't exist it does not return an error. I have also used callback in the publish method and in every publish whether it is routed or not it returns an error as null.

 channel.publish(exchange, 'random_route_key', Buffer.from(message), {
        persistent: true,
        mandatory: true,
    }, (err, ok )=> {console.log(err, ok)});
cressie176 commented 9 months ago

Hi @SabinShree,

Publishing to a random routing key won't return an error, however you should receive a 'return' channel event.

https://amqp-node.github.io/amqplib/channel_api.html#channel_events

If a message is published with the mandatory flag (it’s an option to Channel#publish in this API), it may be returned to the sending channel if it cannot be routed. Whenever this happens, the channel will emit return with a message object (as described in #consume) as an argument.

SabinShree commented 9 months ago

How can i know that if the message is published to a 'direct exchange' type queue? I have tried some with waitForConfirms() and callback in publish() none of them throw error in case of consumer is down . but in 'rabbitmq management' graph i am seeing the message is being dropped by 'unroutable (return)'. The event 'return' you mentioned worked in case of message is returned but is there any event which will get triggered in case of message get successfully published ?

cressie176 commented 9 months ago

Hi @SabinShree,

Both the publisher callback and waitForConfirms examples you shared previously will only yield once amqplib has received an acknowledgement from the broker that the message has been safely received (which is not the same as being routeable). Since you have no idea how long this will take, and it is possible for the acknowledgement to go astray my preferred approach is to also set a timeout using Promise.race or similar. Be careful though - if the acknowledgement arrives after the timeout expired then the callback / waitForConfirms will still yield, so you have write your code to tolerate this.

function publish(channel, exchange, routingKey, content) {
  const message = Buffer.from(content);
  const options =  { persistent: true, mandatory: true };

  const timeoutP = new Promise((resolve, reject) => {
    setTimeout(() => reject(new Error('Publish timeout')), 5000);
  });

  const publishP = new Promise((resolve, reject) => {
    channel.publish(exchange, routingKey, message, options, (err) => {
      if (err) return reject(err);
      resolve();
    })
  });

  return Promise.race([timeoutP, publishP]);
}    
// The above function can be improved by cancelling the timeout and 
// resolving the timeoutP when the publish promise resolves or rejects

The second step is to listen for the return event. This tells you that your message was unrouteable. Once again there is no guarantee how long it will take to receive the return message from the broker. However, it will arrive before the publisher acknowledgement (assuming it wasn't lost due to network error). The BIG problem with using the return event is that if you publish messages to the channel concurrently, there is no easy way of knowing which publish promise to reject. i.e.

function publish(channel, exchange, routingKey, content) {
  const message = Buffer.from(content);
  const options =  { persistent: true, mandatory: true };

  const timeoutP = new Promise((resolve, reject) => {
    setTimeout(() => reject(new Error('Publish timeout')), 5000);
  });

  const publishP = new Promise((resolve, reject) => {
      channel.once('return', (returnedMessage) => {
         // BUG returnedMessage might be a different message!
         reject(new Error('Message was unrouteable'));
      });
      channel.publish(exchange, routingKey, message, options, (err) => {
        if (err) return reject(err);
        resolve();
      })
  });

  return Promise.race([timeoutP, publishP]);
} 

So instead you have to do something like...

function publish(channel, exchange, routingKey, content) {
  const message = Buffer.from(content);
  // Note the new messageId option
  const options =  { messageId: uuid(), persistent: true, mandatory: true };

  const timeoutP = new Promise((resolve, reject) => {
    setTimeout(() => reject(new Error('Publish timeout')), 5000);
  });

  const publishP = new Promise((resolve, reject) => {
      function returnedMessageHandler(returnedMessage) {
         if (returnedMessage.properties.messageId !== options.messageId) return;
         channel.removeListener('return', returnedMessageHandler);
         reject(new Error('Message was unrouteable'));
      };

      // This will register a listener per publish, using some memory
      // and a potential storm of event handlers when any message is returned!!!
      // Also remember to disable max listeners where you create the channel, i.e.
      // channel.setMaxListeners(0);
      channel.on('return', returnedMessageHandler);

      channel.publish(exchange, routingKey, message, options, (err) => {
         channel.removeListener('return', returnedMessageHandler);
        if (err) return reject(err);
        resolve();
      })
  })
}

An alternative to registering a listener per publish is to us a single listener and maintain a map of messageIds => returnedMessageHandler functions

const returnedMessageHandlers = new Map();

function init(channel) {
  channel.on('return', (returnedMessage) => {
    const messageId = returnedMessage.properties.messageId
    const returnedMessageHandler = returnedMessageHandlers.get(messageId);
    returnedMessageHandler();
  })
}

function publish(channel, exchange, routingKey, content) {
  const message = Buffer.from(content);
  const options =  { messageId: uuid(), persistent: true, mandatory: true };

  const timeoutP = new Promise((resolve, reject) => {
    setTimeout(() => reject(new Error('Publish timeout')), 5000);
  });

  const publishP = new Promise((resolve, reject) => {
      function returnedMessageHandler(returnedMessage) {
         returnedMessageHandlers.delete(options.messageId);
         reject(new Error('Message was unrouteable'));
      };

      returnedMessageHandlers.set(options.messageId, returnedMessageHandler);
      channel.publish(exchange, routingKey, message, options, (err) => {
        returnedMessageHandlers.delete(options.messageId);
        if (err) return reject(err);
        resolve();
      })
  })
}

Disclaimer - I haven't tried running the above code, but hopefully it's sufficient for you to understand and work with

cressie176 commented 9 months ago

Just realised the above code will cause a memory leak if the timeout fires, but the publish acknowledgement never arrives and the message is never returned. To avoid this you should remove / delete the returnedMessageHandler on timeout.

cressie176 commented 9 months ago

@SabinShree OK to close?

SabinShree commented 9 months ago

Yes thank you. I have used racing condition in channel.on(return) to know if the message is published or returned.