postwait / node-amqp

[UNMAINTAINED] node-amqp is an AMQP client for nodejs
MIT License
1.69k stars 357 forks source link

Channel orphaned with Connection idle Timeout set on server #352

Open occasl opened 10 years ago

occasl commented 10 years ago

Our admins have a 60 minute idle connection timeout, which causes an additional queue to be bound and orphan the previous channels for my queue that were created previously. My logs look as follows and notice how an additional queue is bound every hour (next time will be 3, then 4 and so on):

[app] 2014-08-07T16:15:25.000Z: 2014-08-07T16:15:25.174Z - debug: ConsumerTag: node-amqp-145-0.9590792271774262
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.751Z - debug: AMQP Queue bound successfully.
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.731Z - debug: AMQP Queue bound successfully.
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.344Z - debug: AMQP Queue is subscribing...
[app] 2014-08-07T16:15:24.000Z: 2014-08-07T16:15:24.344Z - debug: AMQP Queue is binding...
[app] 2014-08-07T16:15:23.000Z: 2014-08-07T16:15:23.831Z - debug: AMQP Queue is initializing...
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.933Z - debug: ConsumerTag: node-amqp-145-0.6444592161569744
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.658Z - debug: AMQP Queue bound successfully.
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.341Z - debug: AMQP Queue is subscribing...
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.341Z - debug: AMQP Queue is binding...
[app] 2014-08-07T15:13:36.000Z: 2014-08-07T15:13:36.067Z - debug: AMQP Queue is initializing...

Here's how I have my connection and queue configured (notice the events queueUnbindOk and basicCancel aren't being called on idle timeout:

 // Establish connection to AMQP
    var conn = amqp.createConnection({url: amqp_url, clientProperties: { applicationName: "ma-services", capabilities: { consumer_cancel_notify: true }}});

    conn.on('ready', function () {
        logger.debug('AMQP Queue is initializing...');
        var ctag;
        var queue = conn.queue('ma.services.gapns', {'durable': true, 'autoDelete': false}, function (queue) {
            try {
                logger.debug('AMQP Queue is binding...');
                queue.bind('ma.integration.exchange', 'gapns');
                logger.debug('AMQP Queue is subscribing...');
                queue.subscribe( function (msg) {
                    // do stuff

                }).addCallback(function(ok) {
                    logger.debug("ConsumerTag: " + ok.consumerTag);
                    ctag = ok.consumerTag;
                });
            }
            catch (e) {
                logger.error("Exception occurred while processing push notifications: " + e);
            }
        });
        queue.on('queueBindOk', function () {
            logger.debug('AMQP Queue bound successfully.');
        });
        queue.on('basicCancel', function() {
            // this never gets called
            logger.debug('The channel has been canceled (likely server timeout).');
        });
        queue.on('queueUnbindOk', function () {
            // Unsubscribe from queue to prevent orphan -- never gets called
            logger.debug('Unsubscribing consumertag: ' + ctag);
            queue.unsubscribe(ctag);
            logger.debug('AMQP Queue unbound successfully.');
        });
    });