postwait / node-amqp

[UNMAINTAINED] node-amqp is an AMQP client for nodejs
MIT License
1.69k stars 357 forks source link

How to swallow ECONNRESET error correctly? #384

Open richzw opened 9 years ago

richzw commented 9 years ago

In my app, the Sender which wrapped with amqp as following

Sender.prototype.createConnection_ = function () {
    var deferred = Q.defer();
    this.con_ = amqp.createConnection( this.connectOpt_, this.implementOpt_ );
    deferred.resolve( this.con_ );

    return deferred.promise;
}

Sender.prototype.connectionReady_ = function() {
    var deferred = Q.defer(),
      self = this;

    self.con_.on('ready', function() {
        console.log('connection is ok now');
        deferred.resolve(self.con_);

    });
    return deferred.promise;      
}

Sender.prototype.handleConnectionError_ = function() {
    var self = this;

    self.con_.on('error', function(err) {
        console.log('Sender: Connection error' + err);
        if (err.code === 'ECONNRESET')
            console.log('Get ECONNRESET error');
    });

    return Q(self);
}

Sender.prototype.createExchange_ = function() {
    var deferred = Q.defer(),
      self = this;

    this.con_.exchange( this.exchangeName_, this.exchangeOpt_, function( ex ) {
        console.log('exchange is ok now');
        self.ex_ = ex;   
        deferred.resolve(self.ex_);   
    });

    return deferred.promise;      
}

Sender.prototype.exchangeReady_ = function() {
    var deferred = Q.defer(),
      self = this;

    this.ex_.on('open', function() {
        console.log('Sender: exchange opened');
        deferred.resolve(this.ex_);
    });

    return deferred.promise;
}

Sender.prototype.handleMessageReturn_ = function() {
    var self = this;

    self.ex_.on('basic-return', function( msg ) {
        console.log('Message return ' + msg);
    });

    return Q(self);
}

Sender.prototype.handleExchangeError_ = function() {
    var self = this;

    self.ex_.on('error', function(err) {
        console.log('Sender: Exchange Error' + err);
    }) ;
    return Q(self);
}

Sender.prototype.getExchange_ = function() {
    var deferred = Q.defer();

    if ( this.ex_ === null ) {
        deferred.reject( new Error('Sender: Message Connection Failed Now...') );
    } else {
        deferred.resolve( this.ex_ );
    }

    return deferred.promise;
}

Sender.prototype.publish_ = function( ex, key, msg ) {
    var self = this,
      deferred = Q.defer();

    console.log('publish message here...');
    ex.publish( key, msg, this.publishOpt_, function(flag, err) {
        if (flag) {
            deferred.reject(new Error('Sender: publish message ' + err) );
            self.cb_( key, err );
        } else {
            console.log('Sender: delivered message processed');
            deferred.resolve();
        }
    });

    return deferred.promise;    
}

/**
 *
 * @public
 */
Sender.prototype.deliverMessage = function( key, message ) {
    var self = this;

    return self.getExchange_()
            .then( self.publish_.call( self, self.ex_, key, message ) )
            .catch( function(err) {
                console.info(err);
                self.cb_( key, err );
            });
}

Sender.prototype.connect_ = function() {
    var self = this;

    return self.createConnection_()
            .then( self.connectionReady_.bind(self) )
            .then( self.handleConnectionError_.bind(self) )
            .then( self.createExchange_.bind(self) )
            .then( self.exchangeReady_.bind(self) )
            .then( self.handleMessageReturn_.bind(self) )
            .catch( function(err) {
                console.info(err);
            });
}

Now here is my test case. Send message every 3 seconds, and start or stop RabbitMQ service randomly. Now the ECONNRESET error comes up, and the log shows

Sender: Connection errorError: CONNECTION_FORCED - broker forced connection clos ure with reason 'shutdown'

It seems that this error can be caught by con_.on('error', however, there is still exception here. How to do that?