amqp-node / amqplib

AMQP 0-9-1 library and client for Node.JS
https://amqp-node.github.io/amqplib/
Other
3.67k stars 474 forks source link

Fanout delivery acknowledgement on channel 1 timed out #733

Closed vahidsabet closed 1 year ago

vahidsabet commented 1 year ago

I run rabbitmq using nodejs on docker

Here is my code for consume:

    const connection = await amqp.connect(amqpUrl, "heartbeat=30");
        const channel = await connection.createChannel();
        process.once('SIGINT', async () => {
            logger.log('got sigint, closing connection ');
            await channel.close();
            await connection.close();
            process.exit(0);
    });

    await channel.assertExchange(logsEx,"fanout", {durable: true});
    const tellogQueue = await channel.assertQueue("tellog");
    const dblogQueue = await channel.assertQueue("dblog");
    const assetQueue = await channel.assertQueue("getasset");

    channel.bindQueue(tellogQueue.queue,logsEx);
    channel.bindQueue(dblogQueue.queue,logsEx);
    channel.bindQueue(assetQueue.queue,logsEx);

    channel.prefetch(1);

    await channel.consume(tellogQueue.queue, async (msg) => {
        let mo = JSON.parse(msg.content.toString());

        await axios.post('https://api.telegram.org/bot/sendMessage', {
          chat_id: chid,
          text: "tellogQueue consumed ",
          parse_mode:'HTML'
        }
        .then(async response => {
            await channel.ack(msg);
        })
        .catch(err => {
            console.error(err);
        })
    }, { noAck: false });

    await channel.consume(dblogQueue.queue, async (msg) => {
        let mo = JSON.parse(msg.content.toString());
        console.log('assetQueue consumed ' + mo.data);
        channel.ack(msg);
    }, { noAck: false });

    await channel.consume(assetQueue.queue, async (msg) => {
        let mo = JSON.parse(msg.content.toString());
        console.log('assetQueue consumed ' + mo.data);
        channel.ack(msg);
    }, { noAck: false });

I got this error:

"Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more"

Here is myproblem:

cressie176 commented 1 year ago

Hi @vahidsabet,

There are a few individual errors/oversights in your code sample which I'll highlight first...

  1. If you are using a connection url, specify the heartbeat as a query parameter, e.g. amqp://localhost:5672?heartbeat=30 not as a second string parameter
  2. await channel.bind
  3. await channel.prefetch
  4. register an error listener on the connection
  5. register an error listener on the channel
  6. handle a connection error or channel error - the easiest option is to restart the application. Alternatively you may need to re-establish both and consume again.
  7. handle a null message in each consumer (the broker sends this under certain conditions such as queue deletion or broker shutdown)
  8. handle a poison message (e.g. one which isn't JSON)

The PRECONDITION_FAILED error is occurring because one of your consumers is not acknowledging the message within 180s. This is not a fault with amqplib however. It could be because an error is occurring, preventing channel.ack from being reached. Using wireshark would confirm whether the channel.ack is being sent.

Regarding your questions

It depends on whether a single container using a shared connection can handle the throughput of messages. A single container with one consumer would be the most scalable (providing you did not exceed the number of connections the broker can handle), but could also be unnecessary.

The way you are doing it is OK providing you fix the above issues

vahidsabet commented 1 year ago

Thanks dear @cressie176 and appreciate your response as I found you expert of message brokers. I've changed issues you mentioned as follows:

  const mbConnect = async () => {

      let conn = await amqp.connect(amqpUrl+"?heartbeat=30")
      let channel = await conn.createChannel()

      conn.on("error", function(err) {
        if (err.message !== "Connection closing") {
          console.error('new catch Err' + err.message)
        }
      });

      conn.on("close", function() {
        console.error('@avsvahid', "AMQP reconnecting")
        return setTimeout(mbConnect , 10000);
      })

      channel.on("error", function(err) {
         console.error('AMQP channel error' + err.message)
         return setTimeout(mbConnect , 10000);
      })

      channel.on("close", function() {
        console.error('AMQP channel closed' + err.message)
      })

      return channel
  }

    const channel = await mbConnect();
    process.once('SIGINT', async () => {
        logger.log('got sigint, closing connection ');
        await channel.close();
        await connection.close();
        process.exit(0);
});

await channel.assertExchange(logsEx,"fanout", {durable: true});
const tellogQueue = await channel.assertQueue("tellog");
const dblogQueue = await channel.assertQueue("dblog");
const assetQueue = await channel.assertQueue("getasset");

await  channel.bindQueue(tellogQueue.queue,logsEx);
await  channel.bindQueue(dblogQueue.queue,logsEx);
await channel.bindQueue(assetQueue.queue,logsEx);

await channel.prefetch(1);

await channel.consume(tellogQueue.queue, async (msg) => {
    let mo = JSON.parse(msg.content.toString());

    await axios.post('https://api.telegram.org/bot/sendMessage', {
      chat_id: chid,
      text: "tellogQueue consumed ",
      parse_mode:'HTML'
    }
    .then(async response => {
        await channel.ack(msg);
    })
    .catch(err => {
        console.error(err);
    })
}, { noAck: false });

await channel.consume(dblogQueue.queue, async (msg) => {
    let mo = JSON.parse(msg.content.toString());
    console.log('dblogQueue consumed ' + mo.data);
    channel.ack(msg);
}, { noAck: false });

await channel.consume(assetQueue.queue, async (msg) => {
    let mo = JSON.parse(msg.content.toString());
    console.log('assetQueue consumed ' + mo.data);
    channel.ack(msg);
}, { noAck: false });

I didn't actually get the point of your latest comments:

It's my server on docker. Is it ok for my scenario? Transactions come to the queues and consome for different purposes and I most be sure that all the messages arrived to the destination and acked. Fanout is better or topic? Sometimes I get Error: connect ETIMEDOUT AxiosError [AggregateError]

image

cressie176 commented 1 year ago

handle a null message in each consumer (the broker sends this under certain conditions such as queue deletion or broker shutdown)

If the broker cancels a consumer amqplib will invoke the consumer callback with a null message. You should therefore check for this at the start of your consumer function.

handle a poison message (e.g. one which isn't JSON)

If you receive a bad or "poison" message which causes your consumer function to throw an error, the channel will be closed and the message automatically rolled back. If your application restarts/resubscribes, the message will be redelivered, causing the error to be rethrown indefinitely. There are a couple of ways to avoid this, one is to use a quorum queue with a delivery limit. Another is to catch any errors caused by an invalid message and nack them. In your example, someone might publish a message which isn't JSON, causing the JSON.parse line to fail.

connect ETIMEDOUT AxiosError [AggregateError]

This sounds like it's the HTTP request which is failing intermittently. It's most likely to do with the server or the network, but nothing to do with RabbitMQ or amqplib. You could reduce the chance of it happening by retrying

Fanout is better or topic?

I tend to prefer topics because they are more flexible. I believe fanouts are faster though.

vahidsabet commented 1 year ago

Dear @cressie176, Is my mbConnect function safe enough to reconnect on any error?

could you please handle cancel in my script:

await channel.consume(dblogQueue.queue, async (msg) => {
//Handle cancel and nack the message
let mo = JSON.parse(msg.content.toString());
console.log('dblogQueue consumed ' + mo.data);
channel.ack(msg);
}, { noAck: false });

"connect ETIMEDOUT AxiosError [AggregateError]" issue was about using "node:alpine" image

In my scenario, consumers most execute and guarantied for send messages to endpoints.

cressie176 commented 1 year ago

Is my mbConnect function safe enough to reconnect on any error?

You're going in the right direction but unfortunately it ends up being a little more complicated. It's possible to get multiple events for the same scenario. For example, if you force close a connection from the RabbitMQ admin UI, you get both a channel close event and a connection close event. Similarly you can get multiple error events from the same connection. Therefore it is best to remove all your event handlers whenever an event is received. You will also need to re-consume once you've created the new channel.

could you please handle cancel in my script:

await channel.consume(dblogQueue.queue, async (msg) => {
if (message === null) {
  // reconsume
}

You will need to do this for all consumers not just the dblog queoe

vahidsabet commented 1 year ago

It resolved dear @cressie176 according your guidance. Thank you very much.