postwait / node-amqp

[UNMAINTAINED] node-amqp is an AMQP client for nodejs
MIT License
1.69k stars 358 forks source link

Help: Queue with ack = true and prefetchCount >1. Multiple handlers #461

Open FrostBy opened 7 years ago

FrostBy commented 7 years ago

Hi! Can anybody help me with the next issue:

1) I subscribe to a queue with ack = true and prefetchCount >1. 2) All messages transfers to a balancer, which creates instances of workers, which works with messages. 3) All messages from a subscription are sent to an array and when any worker is ready for a message, I send it to him from this array. 3) Once the message is processed, worker acknowledge this message and tries to take another message from the array. 4) If the array is empty, worker tries to call queue.shift();

So... if i try to call queue.shift() after messageObject.acknowledge, i get

Error: PRECONDITION_FAILED - unknown delivery tag 1

Question: How can i acknowledge messages and then request queue for another bundle of messages

Here some examples of my logic:

   /**
     *
     * @param connection {Connection}
     * @param queue {Queue}
     */
    runQueue(connection, queue) {
        this._connection = connection;
        this._queue = queue;
        queue.subscribe({
            ack:           true,
            prefetchCount: this._availableBots.length
        }, (message, headers, deliveryInfo, ack) => {
            message = JSON.parse(message.data.toString());
            this.balance(message, ack);
        });
    }
    /**
     *
     * @param message {string}
     * @param messageObject {Message}
     */
    balance(message, messageObject) {
        let selectedBot = null;

        for (const bot of this._bots) {
            if (!bot.busy) {
                selectedBot = bot;
                break;
            }
        }
        if (selectedBot) {
            this.run(message, messageObject, selectedBot)
        }
        else {
            this._tasks.push([message, messageObject]);
            if (this._availableBots.length) {
                this.createBot();
            }
        }
    }
 bot.on('free', (messageObject) => {
            console.log('freeBot');
            bot.busy = false;
            if (messageObject) {
                messageObject.acknowledge();
            }
            if (this._tasks.length) {
                let task = this._tasks.shift();
                this.run(task[0], task[1], bot)
            }
            else {
                this._queue.shift(true);
            }
        });