failsafe-lib / failsafe

Fault tolerance and resilience patterns for the JVM
https://failsafe.dev
Apache License 2.0
4.2k stars 297 forks source link

Perform async recovery action before reattempt #246

Open mateusz-stefanski opened 4 years ago

mateusz-stefanski commented 4 years ago

This issue is similar to #227 but it concerns CompletionStage support. Currently there is no clean way to perform an asynchronous recovery action before reattempts. Here is an example of desired behaviour with pure Java:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class ReconnectBeforeReattemptExample {

    private final AtomicBoolean isConnectionEstablished = new AtomicBoolean(false);
    private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

    public static void main(String[] args) {
        new ReconnectBeforeReattemptExample()
                .fetchDataWithReattempt()
                .whenComplete((s, throwable) -> {
                    System.out.println("fetched data = " + s);
                });
    }

    private CompletionStage<String> fetchDataWithReattempt() {
        return fetchData().handle((s, throwable) -> {
            if (throwable instanceof IllegalStateException) {
                return reconnect().thenCompose(aVoid -> fetchData());
            } else if (throwable == null) {
                return CompletableFuture.completedFuture(s);
            } else {
                CompletableFuture<String> cf = new CompletableFuture<>();
                cf.completeExceptionally(throwable);
                return cf;
            }
        }).thenCompose(Function.identity());
    }

    private CompletionStage<String> fetchData() {
        System.out.println("data fetch started...");
        CompletableFuture<String> cf = new CompletableFuture<>();
        scheduledExecutor.schedule(() -> {
            if (isConnectionEstablished.get()) {
                System.out.println("data fetch succeeded");
                cf.complete("some fetched data");
            } else {
                System.out.println("data fetch failed due to connection error");
                cf.completeExceptionally(new IllegalStateException("Connection error"));
            }
        }, 2, TimeUnit.SECONDS);
        return cf;
    }

    private CompletionStage<Void> reconnect() {
        System.out.println("reconnecting started...");
        CompletableFuture<Void> cf = new CompletableFuture<Void>();
        scheduledExecutor.schedule(() -> {
            isConnectionEstablished.set(true);
            System.out.println("reconnecting succeeded");
            cf.complete(null);
        }, 2, TimeUnit.SECONDS);
        return cf;
    }

}

Program output:

data fetch started...
data fetch failed due to connection error
reconnecting started...
reconnecting succeeded
data fetch started...
data fetch succeeded
fetched data = some fetched data

However, it would be much cleaner if I could do:

    ...
    private CompletionStage<String> fetchDataWithReattempt() {
        RecoveryPolicy<Object> recoveryPolicy = Recovery
                .ofStageAsync(this::reconnect)
                .handle(IllegalStateException.class);
        RetryPolicy<Object> retryPolicy = new RetryPolicy<>().withMaxRetries(1);
        return Failsafe
                .with(retryPolicy, recoveryPolicy)
                .with(scheduledExecutor)
                .getStageAsync(this::fetchData);
    }
    ...

If I missed something and this feature is already supported, please, let me know how to use it.

jhalterman commented 4 years ago

One option I mentioned in #227 is you can use onRetry:

RetryPolicy<Object> rp = new RetryPolicy<>().onRetry(e -> reconnect());
Failsafe.with(rp).get(this::mainOperation);

This may or may not be desirable since failures that happen in event listeners are swallowed, but I'd love to know if this works for you and if not, why (perhaps we can improve).

That said, your RecoveryPolicy idea is very similar to a Fallback - the main difference being that a Fallback does not propagate the failure but replaces it with something else. We could still propagate the failure via the Fallback, so that the RetryPolicy will be triggered:

private CompletionStage<String> fetchDataWithReattempt() {
  Fallback<Object> fallback = Fallback
    .ofStageAsync(e -> reconnect(e.getLastFailure()))
    .handle(IllegalStateException.class);
  RetryPolicy<Object> retryPolicy = new RetryPolicy<>().withMaxRetries(1);
  return Failsafe
    .with(retryPolicy, fallback)
    .getStageAsync(this::fetchData);
}

CompletableFuture<Object> reconnect(Throwable failure) {
  // ... reconnect
  return CompletableFuture.failedFuture(failure); // propagate original failure
}

One open question with this is what do you want to do if reconnect fails. In that case you probably want to return some exception that your RetryPolicy does not handle. Else propagate the original exception.

I'd be open to adding support for this directly. Perhaps Fallback could still be used for this rather than adding a new policy (like RecoveryPolicy, but hopefully that's a decent solution for now?

jhalterman commented 4 years ago

I'm interested in input about what a solution/API should look like that allows arbitrary logic to be executed on a failure (as in onRetry) without necessarily returning an alternative result (like a Fallback). Some options:

1. Add support for failure propagation to Fallback:

Fallback.of(this::reconnect).withPropagateFailures(true);

This would allow a Fallback to perform some logic, such as a reconnect, while still propagating the original failure.

2. Create a new policy

Recovery<Object> recovery = Recovery.of(this::reconnect)
Failsafe.with(retryPolicy, recovery)...

3. Handle errors in event listeners

RetryPolicy<Object> rp = new RetryPolicy<>().onRetry(this::reconnect);
Failsafe.with(rp)...

Currently failures in event listeners are not handles, but are quietly ignored. I'm not sure if that's relevant for your use case here, but for a typical reconnect use case I suspect it's important not to ignore errors. #198 contains a discussion about the topic of errors in event listeners, but there's no conclusion yet.


Any other ideas I missed? Which approach seems most natural to you?

mateusz-stefanski commented 4 years ago

One option I mentioned in #227 is you can use onRetry:

RetryPolicy<Object> rp = new RetryPolicy<>().onRetry(e -> reconnect());
Failsafe.with(rp).get(this::mainOperation);

This may or may not be desirable since failures that happen in event listeners are swallowed, but I'd love to know if this works for you and if not, why (perhaps we can improve).

Unfortunately, it doesn't work in my case for two reasons. First, reconnect is asynchronous and must complete before next attempt is triggered. Second, if fetchData fails due to connection error, no further attempt should be triggered until reconnect completes successfully. If reconnect also fails, failure should be propagated.

I'd be open to adding support for this directly. Perhaps Fallback could still be used for this rather than adding a new policy (like RecoveryPolicy, but hopefully that's a decent solution for now?

As for me, fallback with error propagation would do the trick.