postwait / node-amqp

[UNMAINTAINED] node-amqp is an AMQP client for nodejs
MIT License
1.69k stars 357 forks source link

Stream errors when using queue.shift() and prefetchCount #356

Open matomesc opened 10 years ago

matomesc commented 10 years ago

Versions used: node-amqp@0.2.0, node.js@0.10.25 and RabbitMQ@3.3.4.

I have a consumer that is supposed to pull 5 messages off a queue, process and ack them. It looks like:


var queue = {
       subOptions: { ack: true, prefetchCount: 5 },
       /* other options */
};

connection.queue(queue.name, queue.options, function (q) {
    q.bind(queue.bindOptions.exchange, queue.bindOptions.key);
    q.subscribe(queue.subOptions, function (message, headers, deliveryInfo, ack) {
        work(message, function (err, result) {
            if (err) {
                log.error(err);
            }
            return q.shift();
        });
    });
});

When i start running the consumer, i'm getting a bunch of the following errors on the q.shift():

Error: write after end
at writeAfterEnd (_stream_writable.js:130:12)
at Socket.Writable.write (_stream_writable.js:178:5)
at Socket.write (net.js:613:40)
at Connection.self.(anonymous function) [as write] (/tmp/node_modules/amqp/lib/connection.js:607:27)
at Connection._sendMethod (/tmp/node_modules/amqp/lib/connection.js:782:8)
at Message.acknowledge (/tmp/node_modules/amqp/lib/message.js:52:25)
at Queue.shift (/tmp/node_modules/amqp/lib/queue.js:203:25)
at /tmp/test.js:50:14
at /tmp/test.js:182:10
at /tmp/test.js:128:12

Also i'm also seeing the following errors randomly but with various tags (5, 7, 9, 21, 40) and identical stacks:

Error: PRECONDITION_FAILED - unknown delivery tag 21
at Queue._onMethod (/tmp/node_modules/amqp/lib/queue.js:482:15)
at Queue.Channel._onChannelMethod (/tmp/node_modules/amqp/lib/channel.js:85:12)
at Connection._onMethod (/tmp/node_modules/amqp/lib/connection.js:428:28)
at AMQPParser.self.parser.onMethod (/tmp/node_modules/amqp/lib/connection.js:133:12)
at AMQPParser._parseMethodFrame (/tmp/node_modules/amqp/lib/parser.js:360:10)
at frameEnd (/tmp/node_modules/amqp/lib/parser.js:93:16)
at frame (/tmp/node_modules/amqp/lib/parser.js:78:14)
at AMQPParser.header [as parse] (/tmp/node_modules/amqp/lib/parser.js:64:14)
at AMQPParser.execute (/tmp/node_modules/amqp/lib/parser.js:137:21)
at Connection.<anonymous> (/tmp/node_modules/amqp/lib/connection.js:171:21)

Any idea about what's going on here?

amackera commented 10 years ago

I'm not sure if this is the issue, but .shift() on a queue will ack the last received message. Maybe prefetch count is screwing that up and acknowledging the last received message 5 times? Unknown delivery tag tends to indicate you've acknowledged a message more than once.

matomesc commented 10 years ago

Instead of q.shift(), i just did ack.acknowledge() which solved the problem. What exactly is the point of .shift()?

algesten commented 8 years ago

q.shift() is a terrible idea. combined with prefetchCount it's a bad bad combo. should be deprecated, shout warnings on use and be removed pronto.