cujojs / when

A solid, fast Promises/A+ and when() implementation, plus other async goodies.
Other
3.44k stars 396 forks source link

Question: How to do this control flow #426

Closed richzw closed 9 years ago

richzw commented 9 years ago

Now I am learning how to write js codes with when. Here is my case, the deliverMessage function in Sender try to connect with amqp. If success, then call publish_. otherwise, call reconnect_ to connect to amqp after 3 seconds. The codes are as following,

Sender.prototype.reconnect_ = function( err ) {
    console.error('MessageBus disconnected, attempting to reconnect' + err);
    this.createFakeChannel_();
    return setTimeout( this.deliverMessage.bind(this), 3000);
};

Sender.prototype.deliverMessage = function() {
    when(amqp.connect( this.addr_ ))
        .with( this )
        .then( this.createChannel_ )
        .then( this.createExchange_ )
        .then( this.handleUnrouteableMessages_ )
        .then( this.handleDisconnections_ )
        .catch( this.reconnect_ )
        .done( this.publish_ ); //? publish_ is invoked in all case?
};

Actually, whether the connection is succeeded or failed, publish_ is called anyway. Could anyone help me to which API could I use in my case?

briancavalier commented 9 years ago

Hi @richzw, good question. Implementing a retry pattern is fairly simple with promises and recursion. The key is ensuring that the promise chain is unbroken. In your example, the call to setTimeout severs the promise chain by initiating a new asynchronous call to deliveryMessage that is outside the promise chain. reconnect also returns the the result of setTimeout immediately. That is result is really just an id (typically just an int or opaque object), in other words, just a plain value.

Returning a plain value from a catch handler is the signal that you've successfully handled the error. IOW, promise.catch works like regular, synchronous catch: if you don't explicitly propagate the error, it is considered to be handled. That's why in your example, this.publish_ is called whether there is an error or not.

Here's a rough draft of how you might implement retry using promises and recursion. I separated the connect/reconnect logic from the publish logic, which makes it easier to recurse only the reconnect. I also added a configurable number of retry attempts, since that's a common use case, and easy to do via recursion.

Note the use of promise.delay instead of setTimeout as well, to preserve the promise chain. It wasn't clear to me what createFakeChannel_ was doing, so I left that out.

Hopefully this will give you a good direction, and you can take the idea and apply it:

Sender.prototype.deliverMessage = function() {
    return this
        .tryConnect_(3, 3000) // could easily be this._retries, this._retryDelay
        .with(this)
        .done(this.publish_)
}

Sender.prototype.tryConnect_ = function(attempts, retryDelay) {
    return when(amqp.connect( this.addr_ ))
        .with( this )
        .then( this.createChannel_ )
        .then( this.createExchange_ )
        .then( this.handleUnrouteableMessages_ )
        .then( this.handleDisconnections_ )
        .catch(function(e) {
            return this.waitAndRetryConnect_(attempts - 1, retryDelay, e)
        })
}

Sender.prototype.waitAndRetryConnect_ = function(attempts, retryDelay, err) {
    if(attempts === 0) {
        return when.reject(new Error('max reconnect attempts exceeded, connection failed'));
    }

    console.error('MessageBus disconnected, attempting to reconnect' + err);
    return when(attempts)
        .delay(retryDelay)
        .then(this.tryConnect_)
}

Hope that helps!

sompylasar commented 9 years ago

@briancavalier Thanks for this reply, it's useful for me, too. Putting it into the cookbook or tutorial would be very helpful. BTW, looks like the passed retryDelay gets lost on the line .then(this.tryConnect_).

briancavalier commented 9 years ago

@sompylasar Cool, glad that was useful!

Putting it into the cookbook or tutorial would be very helpful.

Good idea. I agree that a general retry pattern, and possibly variations on it, would be useful to add. I'll create an issue for that. Thanks for the suggestion.

looks like the passed retryDelay gets lost on the line .then(this.tryConnect_)

Oops :) Yes, it does. One more lambda needed ... oh well, that's what I get for shooting from the hip.

richzw commented 9 years ago

@briancavalier Cool, thank you for your answer.

But I have another question for that. In the previous publish_ function, after message publishing to amqp, then the channel is closed.

Sender.prototype.publish_ = function( key, msg ) {
    var self = this;

    this.ch_.publish( this.ex_, key, msg, { deliveryMode: 2, mandatory: true }, function() {
        return self.ch_.close();
    });
};

Now I want to establish one permanent connection with amqp when app starts, namely the tryConnect_ is invoked in the Sender constructor.

var Sender = function( amqpAddr, retryDelay ) {
    /**
     * amqp address
     * @type {!string}
     * @private
     */
    this.addr_ = amqpAddr || 'amqp://localhost';
        this.tryDelay_ = retryDelay;
    this.tryConnect_(retryDelay);
};

Sender.prototype.publish_ = function( key, msg ) {
    this.ch_.publish( this.ex_, key, msg, { deliveryMode: 2, mandatory: true });
        // do NOT close the channel after message published
};

Thus the deliverMessage is called whenever there are messages to be sent. So in the deliverMessage function, I need to check whether the channel is ok or not firstly, if the channel is ok, then publish the message directly. Otherwise, tryConnect_ should be called, then invoke publish function once the connection is ok. I try to do it with the following codes, but it failed.

Sender.prototype.deliverMessage = function ( key, msg ) {
    return when( this.ch_ )
        .with( this )
        .then(function() { 
            return this.publish_( key, msg ); 
        }, function() { return this.tryConnect_(this.tryDelay_);})

How to do that with when API?

briancavalier commented 9 years ago

@richzw I would probably have a method that returns a promise for a connection. That method can hide the details of whether it is returning an existing connection or it had to create a new one, or even had to try to reconnect N times. For example, something along the lines of:

Sender.prototype.deliverMessage = function ( key, msg ) {
    return this.getConnection_()
        .with(this)
        .then(function(channel) {
            return this.publsh_(channel, key, msg);
        });
};

Another option would be to move the retry loop up one level, so that each retry consists of 2 steps: returning a promise for a connection, followed by publishing the message. Perhaps something like:

Sender.prototype.deliverMessage = function ( key, msg ) {
    return this.retryDeliverMessage(key, msg, 3);
};

Sender.prototype.retryDeliverMessage = function ( key, msg, tries ) {
   // implement recursive promise-based retry loop as previously discussed, eg
   // if tries === 0 fail immediately
   // try to get a connection, then try to send
   // on failure, retry with tries - 1
   // on success, return message send result
};

Which approach is "correct" really depends on your application needs. Hopefully those give you some ideas.

briancavalier commented 9 years ago

Ping @richzw. Have you found a solution that works for you?

richzw commented 9 years ago

@briancavalier Got it, thanks

briancavalier commented 9 years ago

Cool, glad you solved it.