LeanKit-Labs / wascally

Abstractions to simplify working with wascally wabbitMQ
MIT License
161 stars 54 forks source link

Unacked messages piling up in response queue #39

Closed neverfox closed 9 years ago

neverfox commented 9 years ago

I have a producer/dispatcher/consumer topology that involves one case of request instead of publish but since adding that, the related response queue is filling up with unacked messages, despite the fact that I'm calling reply on the consumer side and res.ack in the promise then of the request. What could be going on here? I even tried explicitly acking the message in the consumer, with no luck.

// topology.js
'use strict';

module.exports = function (rabbit, subscribeTo) {
    return rabbit.configure({
        // arguments used to establish a connection to a broker
        connection: {
            user:   'guest',
            pass:   'guest',
            server: ['127.0.0.1'],
            port:   5672,
            vhost:  '%2f'
        },

        // define the exchanges
        exchanges: [
            {
                name: 'tasks-x',
                type: 'fanout',
                autoDelete: true
            },
            {
                name: 'dispatch-x',
                type: 'direct',
                autoDelete: true
            },
            {
                name: 'requests-x',
                type: 'fanout',
                autoDelete: true
            }
        ],

        // setup the queues, only subscribing to the one this service
        // will consume messages from
        queues: [
            {
                name: 'tasks-q',
                autoDelete: true,
                limit: 1,
                subscribe: subscribeTo === 'tasks'
            },
            {
                name: 'requests-q',
                autoDelete: true,
                limit: 1,
                subscribe: subscribeTo === 'requests'
            },
            {
                name: 'p1-q',
                autoDelete: true,
                limit: 1,
                subscribe: subscribeTo === 'p1'
            },
            {
                name: 'p2-q',
                autoDelete: true,
                limit: 1,
                subscribe: subscribeTo === 'p2'
            }
        ],

        // binds exchanges and queues to one another
        bindings: [
            {
                exchange: 'tasks-x',
                target:   'tasks-q',
                keys:     []
            },
            {
                exchange: 'requests-x',
                target:   'requests-q',
                keys:     []
            },
            {
                exchange: 'dispatch-x',
                target:   'p1-q',
                keys:     ['p1']
            },
            {
                exchange: 'dispatch-x',
                target:   'p2-q',
                keys:     ['p2']
            }
        ]
    });
};
// producer.js
'use strict';

var rabbit = require('wascally');

var i = 0;

require('./topology')(rabbit).then(function () {
    setInterval(function () {
        console.log('Publishing task');
        rabbit.publish('tasks-x', 'task', {
            task: ++i
        });
    }, 3000);
});
// dispatcher.js
'use strict';

var Kefir  = require('kefir');
var rabbit = require('wascally');

var taskEmitter = Kefir.emitter();
var reqEmitter  = Kefir.emitter();
var pairs = Kefir.zip([taskEmitter, reqEmitter]);

pairs.onValue(function (pair) {
    var task = pair[0];
    var req  = pair[1];
    console.log(task.body, req.body);
    rabbit.request('dispatch-x', {
        type: 'process',
        body: task.body,
        routingKey: 'p1'
    })
        .then(function (res) {
            console.log(res.body);
            res.ack();
            task.ack();
            req.ack();
        });
});

rabbit.handle('task', function (task) {
    console.log('Dispatch received', JSON.stringify(task.body));
    taskEmitter.emit(task);
});

rabbit.handle('request', function (req) {
    console.log('Dispatch received', JSON.stringify(req.body));
    reqEmitter.emit(req);
});

require('./topology')(rabbit).then(function () {
    rabbit.startSubscription('requests-q');
    rabbit.startSubscription('tasks-q');
});
/// consumer.js
'use strict';

var rabbit = require('wascally');

var i = 0;

function requestJob () {
    rabbit.publish('requests-x', 'request', {
        req: ++i
    });
}

rabbit.handle('process', function (msg) {
    console.log('Received:', JSON.stringify(msg.body));
    msg.reply({ success: true });
    requestJob();
});

require('./topology')(rabbit, 'p1').then(function () {
    requestJob();
});
neverfox commented 9 years ago

I can confirm that this problem is also a result of the bug from #38. It works perfectly when reset to the recommended commit.

arobson commented 9 years ago

@neverfox - Thanks for trying that out. I'm stumped. I've spent a good bit of time slowly importing the new approach to the working commit trying to find what's caused the defect and nothing yet.

neverfox commented 9 years ago

I tried taking a look myself but, you're right, it's a lot. I wouldn't know where to begin.

arobson commented 9 years ago

I've narrowed it down to something in the new exchangeFsm and amqp/exchange modules. Basically, I checked out the original exchange module and have been gradually back-porting changes to it. I've been surprised by what it hasn't turned out to be so far. I'm sure it's going to be something really face-palm-worthy when I find it.

neverfox commented 9 years ago

Please keep me posted!

arobson commented 9 years ago

@neverfox - here are the lines that shall live on in infamy. They were introduced so that when wascally has failed to connect to all nodes multiple times, it will reject all outstanding publish promises. (previously, they could just hang indefinitely if you never got a connection.

In hind-sight, this was a bad approach. I'm honestly surprised that the alternative I put in place doesn't appear to cause any performance degradation or memory leak. I suppose time will tell, but I profiled heap allocations, CPU usage and compared heap snapshots taken between re-running subscriber instances.

Thanks again for finding this and being patient while I worked through it!

neverfox commented 9 years ago

:+1:

neverfox commented 9 years ago

Unfortunately, I'm getting unacked responses again. msg.reply in the consumer just doesn't seem to be acking the message.

arobson commented 9 years ago

@neverfox - just so I'm clear: is it the request messages that aren't getting ack'd or the replies that aren't? msg.reply should ack the original request message, the response has to be ack'd explicitly.

neverfox commented 9 years ago

Oops. that's what I meant. In the consumer, I call msg.reply and then in the dispatcher, after the promise resolves, I'm calling res.ack(). So I thought I had it covered. So it's res.ack() that isn't doing what I expected then, I guess.

arobson commented 9 years ago

I will try to repro this and get back to you. Just for reference - are you on 0.2.2? What version of Node, RabbitMQ and OS?

neverfox commented 9 years ago

0.2.2, Rabbit 3.4.4, Arch linux x64 and iojs 1.3.0

arobson commented 9 years ago

If you have time, would you clone the repo and then run the unit tests? I've never tested this on arch linux or iojs. (I would be surprised if either broke it though)

neverfox commented 9 years ago

Got through most of it and failed on the Integration Tests:

Error: Failed to create exchange 'wascally-ex.consistent-hash' on connection 'default' with 'Channel ended, no reply will be forthcoming'

arobson commented 9 years ago

ah, I need to document this better in the readme, you'll need to turn on the consistent-hash-exchange plugin. if you run

rabbit-plugins enable rabbitmq_consistent_hash_exchange

and then restart the server, the integration tests should get past that issue. Thanks for trying the tests.

neverfox commented 9 years ago

No problem. Unfortunately, I'm on a deadline so I won't have time to sort this out right now. I'll just stick with publishing for now, since I don't really need a reply. Thanks for looking into it though. Let me know how the above code works for you.

neverfox commented 9 years ago

Ok so I created a very simple example that leaves a reply unacked, despite the fact that reply.ack() is called in request.js. RabbitMQ 3.4.4, Erlang 17.4, node.js 0.12.0 or io.js 1.4.2, wascally 0.2.3. All tests pass.

Hopefully, I'm just doing something wrong.

// reply.js
var rabbit = require('./lib/wascally');

rabbit.connection.then(function () {
    rabbit.handle('status', function (msg) {
        console.log('Received message', msg.body);
        msg.reply({ text: 'Not much.' });
    });
    rabbit.startSubscription('status-q');
});
// request.js
var rabbit = require('./lib/wascally');

rabbit.connection
    .then(function () {
        return rabbit.request('status-x', {
            type: 'status',
            body: { text: 'Wassup?' }
        })
            .then(function (reply) {
                console.log('Got reply', reply.body);
                reply.ack();
            });
    });
// lib/wascally.js
var rabbit = module.exports = require('wascally');

rabbit.connection = require('../amqp/topology')(rabbit);
// topology.js
'use strict';

module.exports = function (rabbit) {
    return rabbit.configure({
        // arguments used to establish a connection to a broker
        connection: {
            user:   process.env.AMQP_USER,
            pass:   process.env.AMQP_PASS,
            server: [process.env.AMQP_SERVER],
            port:   5672,
            vhost:  process.env.AMQP_VHOST
        },

        // define the exchanges
        exchanges: [
            {
                name:       'status-x',
                type:       'fanout',
                autoDelete: false,
                durable:    true,
                persistent: true
            }
        ],

        // setup the queues, only subscribing to the one this service
        // will consume messages from
        queues: [
            {
                name: 'status-q',
                autoDelete: false,
                durable:    true
            }
        ],

        // binds exchanges and queues to one another
        bindings: [
            {
                exchange: 'status-x',
                target:   'status-q',
                keys:     []
            }
        ]
    });
};
arobson commented 9 years ago

@neverfox - I believe this is a similar issue to the one reported in #57. I believe I have a fix which should hit NPM shortly (version 0.2.4). I am sorry it's taken me so long to figure this one out and get a fix. Thank you for providing example code and background information.