RichardKnop / machinery

Machinery is an asynchronous task queue/job queue based on distributed message passing.
Mozilla Public License 2.0
7.48k stars 912 forks source link

Retry Recommendations & Considerations #698

Open cameron-dunn-sublime opened 3 years ago

cameron-dunn-sublime commented 3 years ago

Hi, I am considering my options for implementing a backing off retry for out tasks in Machinery.

I am familiar with the existing retry feature which was implemented in https://github.com/RichardKnop/machinery/issues/225. My understanding is that when a task returns a tasks.ErrRetryTaskLater the following will occur:

This approach may have some draw backs:

For these reasons, I was considering submitting a PR for an alternative approach and I figured I'd ask for any guidance first. I am trying to think of this generically, but I am most interested in my own use cases which involve SQS.

With SQS a message visibility timeout is available. In addition to changing the message visibility from a receive request (which machinery supports), you may change it using a call to ChangeMessageVisibility. A relatively common pattern is to block a message for re-processing by changing the timeout upon failure. This is discussed here and I am familiar with this approach from various projects. This approach means send/receive metrics are not affected and the message body is not resent. A receive count is also available which allows retry time to backoff.

Since there's no way to pass Signature details in workers, I believe the two approaches I could take to implement this would be:

  1. Create a new error type, similar to ErrRetryTaskLater but with a different name to suggest the change in behavior. This error type would not be caught in the task processor layer, but instead different brokers would have to watch for this and handle as appropriate (I'm not sure if other brokers have similar abilities to SQS).
    • The task still does not have the ability to see the RetryCount. I could allow the error to include a func(retryCount int) time.Duration to alleviate this.
  2. On SQSConfig add a field like DelayMessageRetry func(err error, retryCount int) (time.Duration)
    • If given, this would be used to decide what to set the visibility delay to when there's an error. I think it would be best to allow the message to still be deleted, perhaps signaled by a negative return value or I could add a bool.
    • This approach feels more "tacked on" but it also avoids any questions around what behavior should be for other brokers.
    • I think this would be harder to use than the first approach because we must maintain an additional mapping between error types and how they affect retry duration (vs. deciding this where the error occurs). This could be complicated if there are several tasks which would have different retries.

Would you be interested in a PR for either approach, or do you have any thoughts?

chen-zhuohan commented 3 years ago

I support the first approaches more. For retry, the problems generally involved include backoff and randomization of retry time. I think the retry time can be used as an attribute of error, and machinery provides some tool functions to generate this value, also allow users to make some customization.

The code I recommend like this:

type WaitStrategy func(errs ...error) time.Duration

// DefaultWaitStrategyOption will wait sleepBase * 2^(len(errors) - 1) +/- rand(0, sleepBase)
func DefaultWaitStrategyOption(sleepBase time.Duration) WaitStrategy {
    return func(errs ...error) time.Duration {
        count := len(errs)
        if count <= 0 {
            return time.Duration(0)
        }
        return sleepBase*time.Duration(math.Pow(2, float64(count-1))) + time.Duration(rand.Int63n(int64(2*sleepBase))-int64(sleepBase))
    }
}

In order to implement delay retry, the most direct and simple way is to rely on the delay task. If some brokers provide advanced features to support delayed retry, I'd like to see these advanced features used.

Further, delay retry can become a function of the broker interface. Each broker is implemented using advanced features or delay tasks.

I also has a issue #699 about record echo error in the process of task retry, we can think about it together.

Hope can help you.