jwalton / node-amqp-connection-manager

Auto-reconnect and round robin support for amqplib.
528 stars 105 forks source link

Removing auto ack seems to work wrong/unexpected #324

Closed eugenebuhor closed 1 year ago

eugenebuhor commented 1 year ago

Environment:

Precondition

const amqp = require('amqp-connection-manager');
const ExchangeName = 'ExchangeName';
const NetworkEventsQueue = 'NetworkEventsQueue';

class AMQPEvents {
    constructor() {
        (async () => {
            const connectionManager = amqp.connect([AMQPUri]);

            await connectionManager.connect();

            this.channelWrapper = connectionManager.createChannel({
                json: true,
                setup: async function(channel) {
                    const queueOptions = {
                        exclusive: false,
                        durable: true,
                        autoDelete: false,
                        arguments: {
                            'x-queue-type': 'quorum'
                        }
                    };

                    await channel.assertQueue(NetworkEventsQueue, queueOptions);

                    // Bind queue to exchange to receive data from all topics
                    await channel.bindQueue(
                        NetworkEventsQueue,
                        ExchangeName,
                        '*.*'
                    );
                }
            });

            await this.channelWrapper.consume(NetworkEventsQueue, /* several options described below */);
        })().catch(console.error);
    }
}

Desctiption: We have a node process run and connected to rebbitmq stream queue. We need to remove auto acknowledgment of messages in the queue and do it manually. In the particular case, we suppose to acknowledge an upcoming message only if it is handled w/out errors. And if there is an error we do not acknowledge the message to keep it in the queue, and instead we escalating the error up to fix it manually and restart the process. So that way when we reconnect to the queue we expect the message still be there.

Here's the snippet of the consume function, where we disable auto-ack and ack messages manually. The issue with this one is when we reconnect to the queue, the message is not there.

await this.channelWrapper.consume(
    NetworkEventsQueue,
    async (msg) => {
        try {
            const event = JSON.parse(msg.content.toString());
            await handleEvents(event)
​
            // manually call .ack
            this.channelWrapper.ack(msg);
       } catch(error) {
            // handle errors here
       }
    },
    // disable auto-ack
    { noAck: false }
);

This is another way we were trying to achieve that behavior. We're trying to nack messages manually - put the "failed" message back in the queue. But the message is not "waiting" to come until reconnect, instead it constantly gets into handler because we nack it manually every time.

await this.channelWrapper.consume(
    NetworkEventsQueue,
    async (msg) => {
        try {
            const event = JSON.parse(msg.content.toString());
            await handleEvents(event)
​
            // manually call .ack
            this.channelWrapper.ack(msg);
       } catch(error) {
            // manually call .nack
            this.channelWrapper.nack(msg, false, true);
       }
    },
    // disable auto-ack
    { noAck: false }
);

Question:

Do you guys know what goes wrong or is there any issue with this approach?

luddd3 commented 1 year ago

Messages which are unacknowledged are released back to the queue according to these rules: https://www.rabbitmq.com/confirms.html#automatic-requeueing

In the second approach, you probably have to stop consuming before nacking the message if you want to prevent it from being immediately consumed again.

eugenebuhor commented 1 year ago

Thanks @luddd3. Although as you can see from the first approach the message is ack'ed manually only in the try block. And this way I expect to receive the message back to the queue when I re-connect to the provider stream. But I don't see the message. Is there anything I'm missing or could you investigate if there's an issue with your layer?

luddd3 commented 1 year ago

Note that it takes some time for RabbitMq to detect a failed connection.

We have no reason yet to believe that there is something wrong with how node-amqp-reconnection-manager handles the ack/nack.

Can you try your logic with just amqplib and verify that the message is released back into the queue when you lose connection? Or maybe provide a code example that reproduces your issue.