rholder / guava-retrying

This is a small extension to Google's Guava library to allow for the creation of configurable retrying strategies for an arbitrary function call, such as something that talks to a remote service with flaky uptime.
Apache License 2.0
1.43k stars 275 forks source link

Any interest in a Future's retryer? #65

Open manasdk opened 8 years ago

manasdk commented 8 years ago

I have use for a Retryer that retries methods that returns futures. If there is any interest I would be happy to work on contributing it back to this project.

package com.github.rholder.retry;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.WaitStrategies;
import com.github.rholder.retry.WaitStrategy;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.StopStrategy;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Retries calls that returns a {@link ListenableFuture} by itself returning a {@link ListenableFuture}.
 */
public class FutureRetryer<I, O> {
    private static final Logger logger = LoggerFactory.getLogger(FutureRetryer.class);

    private final Function<I, ListenableFuture<O>> wrappedCall;
    private final I input;
    private final WaitStrategy waitStrategy;
    private final StopStrategy stopStrategy;
    private final Predicate<Attempt<Object>> retryableExceptionPredicate;
    private final ScheduledExecutorService scheduledExecutorService;
    private final SettableFuture<O> responseFuture;
    private final AtomicInteger attemptCount = new AtomicInteger(0);
    private final long startTime;

    public FutureRetryer(Function<I, ListenableFuture<O>> wrappedCall, I input, WaitStrategy waitStrategy,
                         StopStrategy stopStrategy, Predicate<Attempt<Object>> rejectionPredicate,
                         ScheduledExecutorService scheduledExecutorService) {
        this.wrappedCall = wrappedCall;
        this.input = input;
        this.waitStrategy = waitStrategy;
        this.stopStrategy = stopStrategy;
        this.retryableExceptionPredicate = rejectionPredicate;
        this.scheduledExecutorService = scheduledExecutorService;
        this.responseFuture = SettableFuture.create();
        this.startTime = System.nanoTime();
    }

    public ListenableFuture<O> performAction() {
        performActionImpl();
        return this.responseFuture;
    }

    public int getAttemptCount() {
        return this.attemptCount.get();
    }

    private void performActionImpl() {
        ListenableFuture<O> callResponseFuture = wrappedCall.apply(this.input);
        Futures.addCallback(callResponseFuture, new FutureCallback<O>() {
            @Override
            public void onSuccess(final O result) {
                handleSuccessfulResponse(result);
            }

            @Override
            public void onFailure(final Throwable throwable) {
                handleFailureResponse(throwable);
            }
        });
    }

    /**
     * Handles successful response by writing to the responseFuture.
     */
    private void handleSuccessfulResponse(O result) {
        this.responseFuture.set(result);
    }

    /**
     * Handles failure response by retrying if the failure is retryable and the all attempts have not been
     * exhausted. In case a retry is not possible the last exception is set on the responseFuture.
     */
    private void handleFailureResponse(Throwable throwable) {
        int currentAttemptCount = this.attemptCount.get();
        ExceptionAttempt attempt = new ExceptionAttempt(throwable, currentAttemptCount,
            System.nanoTime() - this.startTime);
        // If the retryable exception predicate does not allow the last attempt then set the exception on the
        // response future and end all further retries.
        if (!this.retryableExceptionPredicate.apply((Attempt<Object>) attempt)) {
            this.responseFuture.setException(throwable);
            return;
        }
        // check if the no of retries is exhausted
        if (stopStrategy.shouldStop(attempt)) {
            this.responseFuture.setException(throwable);
            return;
        }
        // increment after it is known that a retry should happen
        this.attemptCount.incrementAndGet();

        // schedule retry based after some delay
        long delayTime = this.waitStrategy.computeSleepTime(attempt);

        // schedule for delayed execution.
        this.scheduledExecutorService.schedule(() -> this.performActionImpl(), delayTime, TimeUnit.MILLISECONDS);
    }

    public static class Builder<IB, OB> {

        private static final int DEFAULT_STOP_ATTEMPT = 10;
        private static final int DEFAULT_WAIT_MULTIPLIER = 300;
        private static final int DEFAULT_WAIT_MAX = 60;
        private static final TimeUnit DEFAULT_WAIT_MAX_UNIT = TimeUnit.SECONDS;

        private Function<IB, ListenableFuture<OB>> wrappedCall;
        private IB input;
        private WaitStrategy waitStrategy;
        private StopStrategy stopStrategy;
        private Predicate<Attempt<Object>> rejectionPredicate = Predicates.alwaysFalse();
        private ScheduledExecutorService scheduledExecutorService;

        public Builder<IB, OB> setWrappedCall(Function<IB, ListenableFuture<OB>> wrappedCall) {
            this.wrappedCall = wrappedCall;
            return this;
        }

        public Builder<IB, OB> setInput(IB input) {
            this.input = input;
            return this;
        }

        public Builder<IB, OB> setWaitStrategy(WaitStrategy waitStrategy) {
            this.waitStrategy = waitStrategy;
            return this;
        }

        public Builder<IB, OB> setStopStrategy(StopStrategy stopStrategy) {
            this.stopStrategy = stopStrategy;
            return this;
        }

        public Builder<IB, OB> retryIfExceptionOfType(Class<? extends Throwable> exceptionClass) {
            this.rejectionPredicate = (Predicate<Attempt<Object>>) Predicates.or(this.rejectionPredicate,
                new ExceptionClassPredicate(exceptionClass));
            return this;
        }

        public Builder<IB, OB> setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
            this.scheduledExecutorService = scheduledExecutorService;
            return this;
        }

        public FutureRetryer<IB, OB> build() {
            Preconditions.checkNotNull(this.wrappedCall, "Wrapped call is required.");
            Preconditions.checkNotNull(this.scheduledExecutorService, "Scheduled executor service is required.");

            if (this.stopStrategy == null) {
                logger.warn("No StopStrategy provided, using default strategy.");
                this.stopStrategy = StopStrategies.stopAfterAttempt(DEFAULT_STOP_ATTEMPT);
            }

            if (this.waitStrategy == null) {
                logger.warn("No WaitStrategy provided, using default strategy.");
                this.waitStrategy = WaitStrategies.exponentialWait(DEFAULT_WAIT_MULTIPLIER, DEFAULT_WAIT_MAX,
                    DEFAULT_WAIT_MAX_UNIT);
            }

            return new FutureRetryer<>(
                this.wrappedCall,
                this.input,
                this.waitStrategy,
                this.stopStrategy,
                this.rejectionPredicate,
                this.scheduledExecutorService
            );
        }
    }

    /**
     * Attempt impl to be used with an Exception.
     */
    static final class ExceptionAttempt implements Attempt<Object> {

        private final ExecutionException e;
        private final long attemptNumber;
        private final long delaySinceFirstAttempt;

        ExceptionAttempt(Throwable cause, long attemptNumber, long delaySinceFirstAttempt) {
            this.e = new ExecutionException(cause);
            this.attemptNumber = attemptNumber;
            this.delaySinceFirstAttempt = delaySinceFirstAttempt;
        }

        @Override
        public Object get() throws ExecutionException {
            throw this.e;
        }

        @Override
        public boolean hasResult() {
            return false;
        }

        @Override
        public boolean hasException() {
            return true;
        }

        @Override
        public Object getResult() throws IllegalStateException {
            throw new IllegalStateException("The attempt resulted in an exception, not in a result");
        }

        @Override
        public Throwable getExceptionCause() throws IllegalStateException {
            return this.e.getCause();
        }

        @Override
        public long getAttemptNumber() {
            return attemptNumber;
        }

        @Override
        public long getDelaySinceFirstAttempt() {
            return delaySinceFirstAttempt;
        }
    }

    static final class ExceptionClassPredicate implements Predicate<Attempt<Object>> {

        private Class<? extends Throwable> exceptionClass;

        ExceptionClassPredicate(Class<? extends Throwable> exceptionClass) {
            this.exceptionClass = exceptionClass;
        }

        @Override
        public boolean apply(Attempt<Object> attempt) {
            if (!attempt.hasException()) {
                return false;
            }
            return exceptionClass.isAssignableFrom(attempt.getExceptionCause().getClass());
        }
    }
}

I also have it in a gist https://gist.github.com/manasdk/ea816f45b26ff4a74f99e2c99116b0b5. If there is interest I can create a PR and send it out for review. I also wouldn't mind if someone took over the code and wrote it in a cleaner way that fits better with this project.

Dpetters commented 5 years ago

+1

jerrypeng7773 commented 3 years ago

+1