postwait / node-amqp

[UNMAINTAINED] node-amqp is an AMQP client for nodejs
MIT License
1.69k stars 357 forks source link

Persistent messages being lost #119

Open ali-bugdayci opened 12 years ago

ali-bugdayci commented 12 years ago

Hi,

I am using durable queues and persistent messages ("delivery_mode" = 2). I check via:

"sudo rabbitmqctl list_queues"

to confirm that the messages are persistent after a restart on rabbitmq. When I subscribe and get the first message from the queue, the other messages are lost.

It is working fine when only the publisher publishes messages, and subscriber subscribes after like 3 messages and receives them one by one. I am implementing a long polling server, hence I subscribe using ack:true and send queue.shift after unsubscribe.

Here is my code:

connection.queue(queue_name, {durable: true}, function(q){
            console.log("Queue connected");
            // Catch all messages
            q.bind('#');
            var ctag;

            // Receive messages
            q.subscribe({ack: true}, function (message) {
            console.log("incoming message");
            console.log(message.data);
             response.writeHead(200, { "Content-Type": "text/plain" });
             response.end(message.data);

            console.log("unsubscribe");
            q.unsubscribe(ctag)
             .addCallback(function() { q.shift(); });
            })
            .addCallback(function(ok) { ctag = ok.consumerTag; });

My publisher is in Ruby:

AMQP.start do |connection|
      AMQP::Channel.new(connection) do |channel|
       queue = channel.queue(to_imei, {:auto_delete => true, :durable => true})

    channel.default_exchange.publish json, :routing_key => queue.name, :persistent => true
  end

My Configuration:

$ node -v v0.8.5

$ npm list amqp@0.1.3/home/bor/sunucu/node_sunucu/node_modules/amqp

$ rabbitmqctl status {running_applications, [{rabbitmq_management,"RabbitMQ Management Console","2.8.4"}, {xmerl,"XML parser","1.2.10"}, {rabbitmq_management_agent,"RabbitMQ Management Agent","2.8.4"}, {amqp_client,"RabbitMQ AMQP Client","2.8.4"}, {rabbit,"RabbitMQ","2.8.4"}, {os_mon,"CPO CXC 138 46","2.2.7"}, {sasl,"SASL CXC 138 11","2.1.10"}, {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.8.4"}, {webmachine,"webmachine","1.7.0-rmq2.8.4-hg"}, {mochiweb,"MochiMedia Web Server","1.3-rmq2.8.4-git"}, {inets,"INETS CXC 138 49","5.7.1"}, {mnesia,"MNESIA CXC 138 12","4.5"}, {stdlib,"ERTS CXC 138 10","1.17.5"}, {kernel,"ERTS CXC 138 10","2.14.5"}]}, {os,{unix,linux}},

postwait commented 12 years ago

You can't use {ack: true} to do what you want.

ali-bugdayci commented 12 years ago

Hi,

Thanks for the quick reply. First of all I dont buy why I cant use it. Doesn't rabbitmq reboot as if the queued messages are send before it crashed. If it doesn't then I believe this should be reported to rabbit to fix ack work on persisted messages too.

I checked without ack:true. I am getting all the messages but afterward getting the following exception on node-amqp side:

events.js:66 throw arguments[1]; // Unhandled 'error' event ^ Error: PRECONDITION_FAILED - unknown delivery tag 4 at Queue._onMethod (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:1720:15) at Queue.Channel._onChannelMethod (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:1365:14) at Connection._onMethod (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:922:28) at AMQPParser.self.addListener.parser.onMethod (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:797:12) at AMQPParser._parseMethodFrame (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:442:10) at frameEnd (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:187:16) at frame (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:172:14) at AMQPParser.header as parse at AMQPParser.execute (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:231:21) at Connection. (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:837:12)

ali-bugdayci commented 12 years ago

I find out that Persistent messages dont work well with the default {autoDelete: true } option.

Correct me if I am wrong but I believe they are two different things: autoDelete removes the queue if there are no messages in the queue where as if there is any persistent message then the queue will not be autoDeleted.

For the time being I am making the queue's autoDelete:false but would this lead to too many resources being consumed ?

Thanks