failsafe-lib / failsafe

Fault tolerance and resilience patterns for the JVM
https://failsafe.dev
Apache License 2.0
4.17k stars 297 forks source link

Incorrect timeout behaviour given overflowing threads pools #257

Closed timothybasanov closed 3 years ago

timothybasanov commented 4 years ago

I encountered multiple different issues regarding timeouts not firing when the main thread pool and/or its queues are full or busy for a long time.

In most cases issues cause timeouts to either happen with a delay or never happen at all. It did affect real running code as soon as some RPC hanging request on one of the code paths was encountered. It quickly filled up the thread pools and prevented anything else from being executed grinding app to a halt.

Here are some reproducible scenarios:

package net.jodah.failsafe;

import net.jodah.failsafe.util.concurrent.Scheduler;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.JUnitCore;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** Several related failures preventing timeouts caused by an overflowing thread pool or its queue. */
public class ThreadingTest {

    public static void main(String[] args) {
        JUnitCore.main("net.jodah.failsafe.ThreadingTest");
    }

    /**
     * While the thread pool is full, no timeout-based cancellations are executed as they stay in the queue.
     * We schedule two tasks, each schedule a timeout, both timeouts get stuck in queue until one of the tasks
     * is not finished.
     */
    @Test(timeout = 60_000)
    public void test01_NoTimeout_FullThreadPool() throws Exception {
        ExecutorService executorService = new ThreadPoolExecutor(
                2, 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),
                r -> new Thread(r, "test01_NoTimeout_FullThreadPool@" + r.hashCode()));
        FailsafeExecutor<?> failsafe = Failsafe.with(Timeout.of(Duration.ofSeconds(1)).withCancel(true))
                .with(executorService);
        CompletableFuture<?> firstTask = failsafe.runAsync(ctx -> Thread.sleep(Long.MAX_VALUE)); // never finishes
        CompletableFuture<?> secondTask = failsafe.runAsync(ctx -> Thread.sleep(TimeUnit.SECONDS.toMillis(2)));
        try {
            secondTask.join(); // both tasks keep the pool busy, so it's not possible to execute cancellation for any
            Assert.fail(); // as second task was never cancelled it finishes successfully
        } catch (CompletionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutExceededException);
        }
        Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // second task finished, now the first task is cancelled
        Assert.assertTrue(firstTask.isDone()); // first task should have been cancelled by now
    }

    /**
     * When main thread pool and its queue are full, no timeout-based cancellation could be scheduled for execution.
     * Contrary to the previous test even after one of the tasks has finished, second one would not get cancelled.
     */
    @Test(timeout = 60_000)
    public void test02_NoTimeout_FullQueue() throws Exception {
        ExecutorService executorService = new ThreadPoolExecutor(
                2, 2, 1, TimeUnit.MINUTES, new SynchronousQueue<>(),
                r -> new Thread(r, "test02_NoTimeout_FullQueue@" + r.hashCode()));
        FailsafeExecutor<?> failsafe = Failsafe.with(Timeout.of(Duration.ofSeconds(1)).withCancel(true))
                .with(executorService);
        CompletableFuture<?> firstTask = failsafe.runAsync(ctx -> Thread.sleep(Long.MAX_VALUE)); // never finishes
        CompletableFuture<?> secondTask = failsafe.runAsync(ctx -> Thread.sleep(TimeUnit.SECONDS.toMillis(2)));
        try {
            secondTask.join(); // we fail to schedule timeout cancellations, so none tasks will ever be interrupted
            Assert.fail(); // as second task was never cancelled it finishes successfully
        } catch (CompletionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutExceededException);
        }
        Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // give it some time to pick up, just in case
        Assert.assertTrue(firstTask.isDone()); // we failed to schedule cancellation, so first task would never cancel
    }

    /**
     * Naive fix that executes cancellation on the scheduled thread pool instead of delegating it back to the main.
     * This is the only test that passes out of the box.
     */
    @Test(timeout = 60_000)
    public void test03_NaiveFix_ExecuteOnScheduledThreadPool() throws Exception {
        ExecutorService executorService = new ThreadPoolExecutor(
                1, 1, 1, TimeUnit.MINUTES, new SynchronousQueue<>(),
                r -> new Thread(r, "test02_NaiveFix_ExecuteOnScheduledThreadPool"));
        Scheduler delegatingScheduler = Scheduler.of(executorService);
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
                r -> new Thread(r, "test02_NoTimeout_FullQueue-scheduled@" + r.hashCode()));
        FailsafeExecutor<?> failsafe = Failsafe.with(Timeout.of(Duration.ofSeconds(1)).withCancel(true))
                .with((callable, delay, unit) -> {
                    if (delay == 0) {
                        return delegatingScheduler.schedule(callable, delay, unit);
                    } else {
                        // Direct execution to never fail to execute TimeoutExecutor.timeoutFuture
                        return scheduledExecutorService.schedule(callable, delay, unit);
                    }
                });
        CompletableFuture<?> firstTask = failsafe.runAsync(ctx -> {
            System.out.println("Thread: " + Thread.currentThread().getName());
            Thread.sleep(Long.MAX_VALUE);
        });
        try {
            firstTask.join();
            Assert.fail();
        } catch (CompletionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutExceededException);
        }
    }

    /**
     * Naive fix does not handle retry policy well. Retry policy executes directly, avoiding the scheduler.
     * As a result business logic code is executed on the scheduled thread pool. And it blocks timeouts once again.
     */
    @Test(timeout = 60_000)
    public void test04_NaiveFix_IncorrectRetryPolicyHandling() throws Exception {
        ExecutorService executorService = new ThreadPoolExecutor(
                1, 1, 1, TimeUnit.MINUTES, new SynchronousQueue<>(),
                r -> new Thread(r, "test04_NaiveFix_IncorrectRetryPolicyHandling"));
        Scheduler delegatingScheduler = Scheduler.of(executorService);
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
                r -> new Thread(r, "test04_NaiveFix_IncorrectRetryPolicyHandling-scheduled@" + r.hashCode()));
        FailsafeExecutor<?> failsafe = Failsafe.with(new RetryPolicy<>(),
                                                     Timeout.of(Duration.ofSeconds(1)).withCancel(true))
                .with((callable, delay, unit) -> {
                    if (delay == 0) {
                        return delegatingScheduler.schedule(callable, delay, unit);
                    } else {
                        // Direct execution to never fail to execute TimeoutExecutor.timeoutFuture
                        return scheduledExecutorService.schedule(callable, delay, unit);
                    }
                });
        CompletableFuture<?> firstTask = failsafe.runAsync(ctx -> {
            System.out.println("Thread: " + Thread.currentThread().getName());
            Thread.sleep(Long.MAX_VALUE);
        });
        try {
            firstTask.join();
            Assert.fail();
        } catch (CompletionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutExceededException);
        }
    }
}

There was no simple way to fix the retry policy behaviour, so I added a custom "async" policy that always deferred execution to a thread pool. It could only work together with a SimpleDelegatingScheduler introduced above in the naive fix.

Unfortunately my lack of understanding of Failsafe API prevented me from creating an elegant solution. It fixed some, but not all the issues. At this point I think somebody with more understanding of the internal Failsafe piping should take over:

import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import net.jodah.failsafe.AbstractExecution;
import net.jodah.failsafe.ExecutionResult;
import net.jodah.failsafe.FailsafeFuture;
import net.jodah.failsafe.Policy;
import net.jodah.failsafe.PolicyExecutor;
import net.jodah.failsafe.util.concurrent.Scheduler;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.invoke.MethodType;
import java.lang.invoke.VarHandle;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkNotNull;

/**
 * Policy allowing move business logic execution onto a different thread pool. It takes care of threading issues
 * for several different use-cases:
 * <ul>
 *     <li>{@code SimpleDelegatingScheduler}: In most cases
 *     it executes {@link Policy}s on the calling thread, {@code net.jodah.failsafe.TimeoutExecutor}
 *     on the {@link java.util.concurrent.ScheduledExecutorService} and main business logic on
 *     {@link ExecutorService}, and this is a good thing. Unfortunately
 *     {@code RetryPolicy} causes retries business logic to be executed on
 *     {@link java.util.concurrent.ScheduledExecutorService} which does not have enough threads for this.</li>
 *     <li>{@link net.jodah.failsafe.internal.util.DelegatingScheduler}: Contrary to the previous case it always
 *     executes all logic (including {@code net.jodah.failsafe.TimeoutExecutor} on an {@link ExecutorService}.
 *     Unfortunately it means that when the thread pool is full it's not possible to time out tasks already
 *     being executed, moreover these a {@link java.util.concurrent.Callable} that would cancel in-flight
 *     execution can not be scheduled, so it breaks {@code net.jodah.failsafe.TimeoutExecutor}</li>
 *     <li>Failsafe before version 2.3.4 had a different threading model: As all tasks (including
 *     {@code net.jodah.failsafe.TimeoutExecutor}) never executed on the calling thread. So
 *     {@code net.jodah.failsafe.TimeoutExecutor} starts timing out counter only when queue on
 *     {@link ExecutorService} is already cleaned up, not from the time when the task was scheduled.</li>
 * </ul>
 * <p/>
 * This code you'd see in this class may frighten you, it's intentional, we tried and failed to make simpler.
 * As we execute our task asynchronously for it to be cancellable from {@code net.jodah.failsafe.TimeoutExecutor}
 * we need <em>at least</em> to override {@code net.jodah.failsafe.FailsafeFuture#delegate}.
 * {@link net.jodah.failsafe.Failsafe} is very protective of its internals and all the helper functions are hidden
 * e.g. {@code net.jodah.failsafe.FailsafeFuture#inject} is package private.
 * All of this means we're set up for a major surgery here and it would not be pretty.
 */
public class AsyncExecutionPolicy<R> implements Policy<R> {

    /** Follow the convention of using raw types here. */
    @SuppressWarnings({"unchecked", "rawtypes"})
    @Override
    public PolicyExecutor toExecutor(AbstractExecution execution) {
        return new AsyncExecutor(this, execution);
    }

    @Override
    public String toString() {
        return MoreObjects.toStringHelper(this)
                .toString();
    }

    static class AsyncExecutor<R> extends PolicyExecutor<AsyncExecutionPolicy<R>> {

        static final FailsafeReflectionHelper reflection = new FailsafeReflectionHelper();

        protected AsyncExecutor(AsyncExecutionPolicy<R> policy, AbstractExecution execution) {
            super(policy, execution);
        }

        /**
         * Works a bit similar to {@code net.jodah.failsafe.TimeoutExecutor}. We return a promise that is completed when
         * the async execution finishes (either successfully or not). To support cancellation we have to update
         * {@link FailsafeFuture}'s internal state and restore it back afterwards. This way when
         * {@code net.jodah.failsafe.TimeoutExecutor#supplyAsync} will do
         * {@code net.jodah.failsafe.FailsafeFuture#cancelDelegates(boolean, boolean)} it would cancel
         * business logic code and not the main thread.
         */
        @Override
        protected Supplier<CompletableFuture<ExecutionResult>> supplyAsync(
                Supplier<CompletableFuture<ExecutionResult>> supplier, Scheduler scheduler,
                FailsafeFuture<Object> future) {
            return () -> {
                CompletableFuture<ExecutionResult> promise = new CompletableFuture<>();

                try {
                    // Coordinates a result between the this and execution threads
                    synchronized (future) {
                        if (future.isDone() || execution.isFirstAttempt()) {
                            // When we're already done no need to schedule anything.
                            // On the first call net.jodah.failsafe.Functions.getPromiseAsync does async for us.
                            // TODO(tbasanov): File a ticket against Failsafe, this is confusing behaviour
                            final CompletableFuture<ExecutionResult> result = supplier.get();
                            return result;
                        }

                        Future<?> executionFuture = scheduler.schedule((Callable<?>) () -> {
                            // Execute postExecute listeners on the business logic thread, we're already
                            // running, we can as well finish without additional rescheduling
                            supplier.get().whenComplete((result, error) -> {
                                if (error != null) {
                                    promise.completeExceptionally(error);
                                } else {
                                    promise.complete(result);
                                }
                                // Don't call postExecuteAsync() as we're not a PolicyListeners-compatible
                            });
                            return null;
                        }, 0, TimeUnit.NANOSECONDS);
                        // When timeout would expire we want cancellation to affect business logic,
                        // not the current thread.
                        reflection.setFailsafeFutureDelegate(future, executionFuture);
                    }
                    return promise;
                } catch (Throwable t) {
                    // Hard scheduling failure
                    promise.completeExceptionally(t);
                    return promise;
                }
            };
        }
    }

    /** Reflection logic split into a separate class for readability. */
    static class FailsafeReflectionHelper {

        private final VarHandle failsafeFutureDelegate;
        private final MethodHandle failsafeFutureInjectTimeout;

        public FailsafeReflectionHelper() {
            try {
                Lookup failsafeFutureLookup =
                        MethodHandles.privateLookupIn(FailsafeFuture.class, MethodHandles.lookup());
                failsafeFutureDelegate = failsafeFutureLookup
                        .findVarHandle(FailsafeFuture.class, "delegate", Future.class);
                failsafeFutureInjectTimeout = failsafeFutureLookup
                        .findVirtual(FailsafeFuture.class, "injectTimeout",
                                     MethodType.methodType(void.class, Future.class));
            } catch (Exception e) {
                // This can not happen unless method signature has changed
                throw new IllegalStateException("Incompatible version of Failsafe was discovered", e);
            }
        }

        /**
         * Changes the main delegate in {@link FailsafeFuture} to the one passed, moves the previous
         * one to a list of {@code net.jodah.failsafe.FailsafeFuture#timeoutDelegates}. This allows
         * cancellation logic to cancel even the current future if necessary. Note that we don't expect
         * current future would fail as it either does not contain any business logic
         * (i.e. {@link net.jodah.failsafe.Failsafe} only) that was not yet executed. Should be equivalent to:
         * <pre>
         * synchronized(future) {
         *     Future previousDelegate = future.delegate;
         *     future.delegate = executionFuture;
         *     future.injectTimeout(previousDelegate);
         * }
         * </pre>
         */
        private void setFailsafeFutureDelegate(FailsafeFuture<?> future, Future<?> executionFuture) {
            synchronized (checkNotNull(future, "future")) {
                try {
                    // Note we can not use inject(Future) as it resets timeoutDelegates
                    Future<?> previousDelegate = (Future<?>) failsafeFutureDelegate.get(future);
                    failsafeFutureDelegate.set(future, executionFuture);
                    if (previousDelegate != null) {
                        failsafeFutureInjectTimeout.invoke(future, previousDelegate);
                    }
                } catch (Throwable t) {
                    Throwables.throwIfUnchecked(t);
                    // This can not happen unless method signature has changed
                    throw new IllegalStateException("Incompatible version of Failsafe was discovered", t);
                }
            }
        }

    }
}
jhalterman commented 4 years ago

Indeed, Failsafe uses the Scheduler/ThreadPool you supply via .with(Scheduler) for all async executions, including scheduled timeouts. In this case, if the thread pool is full, then not only do executions block but so do Timeout tasks. You could try to ensure that the thread pool is never fully utilized, but that may be a bit of a hack depending on your use case.

The simplest solution would be to allow policies such as Timeout to have a separately configured Scheduler if desired:

Timeout.of(Duration.ofSeconds(1)).withCancel(true).withScheduler(threadpool);

Thoughts?

timothybasanov commented 4 years ago

That would work. And it would resolve the most immediate need today of making time outs work even when the app is slow.

Some additional thoughts on this

This does feel like a bit of a hack. Not sure if a retry/fallback policies also need a fix or not, they schedule things. This would require a bit of a hack on my side as I'm using my own policies that allow to change delay/retry/thresholds at runtime. But it's workable solution for sure. A "proper" fix may take some time as it may change how threading works within Failsafe, and it even may require to fully define Failsafe behavior in the event of resource constraints.

I think the ideal solution would be to allow to move all Failsafe logic onto a separate thread pool. My application logic is slow and heavy, so I want a big thread pool. Failsafe-logic is fast and non-blocking, it can as well run on top of a common fork join pool. This way no matter what kind of application logic I'll throw at it, it would never break retries/timeouts and all other Failsafe policies (e.g. retries.)

Ideally I'd want to execute even policy listeners on the application thread pool. This may be hard to implement given that all listeners are currently done in a fully sync manner. To make things worse this desire clashes with the fact that I want to be able to execute policy listener even if my main thread pool is busy. Example of a listener would be adding debug information to the response when something is starting to fail (before ultimately sending it back via a fallback policy) even when main thread pool got fully stuck in a busy wait and ignoring cancellations.

One more related note on thread pools is that ideally Failsafe should create all callables that would execute application logic on threads from the application logic thread pool (even if the actual scheduling may not happen later on). This makes it much easier to make tracing/context passing work, as I don't have to think about Failsafe internal implementation at all.

I think the cheapest way with full backwards compatibility is to always schedule application logic via schedule(callable, 0) and schedule all Failsafe logic via schedule(callable, !=0). It's very close to what Failsafe already does with a few exceptions. Going further may be even changing Scheduler interface to have two or three methods: Future scheduleApplicationLogic(callable), ScheduledFuture scheduleFailsafeCode(callable, delay), Future scheduleFailsafeCode(callable). It is convenient to have a separate method for 0-delay scheduling method as then I can return Future instead. Non-scheduled future is much easier to create with if I already have complicated custom thread pools that can not support scheduling.

Sorry about such an unstructured response. I do not have any definitive answers and even some of my requirements are mutually exclusive. But I hope that some of these ideas would give you some insight into my difficulties.

This is not a contribution.

Tembrel commented 4 years ago

As with issue 263, I think we should limit our expectations of what the Timeout policy can achieve.

In this issue and in #260, the problem boils down to contention between application code and Failsafe internals for a limited number of available threads. But threads are potentially scarce system resources, and we don't expect Failsafe to handle the depletion of other system resources, like heap memory, gracefully. Using separate thread pools gives the illusion of having independent sandboxes, but ultimately there are limits imposed by the JVM, by other processes on the machine running the JVM, and potentially by other machines running on the same virtualized hardware. This issue and #260 demonstrate how things can break down under thread scarcity, but they aren't direct indictments of Timeout.

That said, I think it would be good to explore @timothybasanov's idea of moving some Failsafe internals, particularly the Timeout internals, to the common FJP. Failing that, I think @jhalterman's idea above, of allowing Timeouts to optionally be configured with a different thread pool, is reasonable.

I think the cheapest way with full backwards compatibility is to always schedule application logic via schedule(callable, 0) and schedule all Failsafe logic via schedule(callable, !=0).

I'm not sure about this. There's a performance penalty associated with a non-zero delay. More importantly, it could subtly change timing that users were (perhaps unreasonably) relying on. Better to hash the larger issue out carefully, waiting for a major version change, and not risk a lot of complaints from users whose code suddenly stops working.

jhalterman commented 3 years ago

I encountered multiple different issues regarding timeouts not firing when the main thread pool and/or its queues are full or busy for a long time.

The Timeout aspect of this issue was resolved in e41381b11912d8ec09e49bc3a067f50b5285faf3, where Timeouts now use the internal Scheduler (backed by the common ForkJoinPool, when possible).

Not sure if a retry/fallback policies also need a fix or not, they schedule things

Those are a bit different since actual user-supplied executable logic may be run within a RetryPolicy or Fallback's scheduled thread, whereas a Timeout doesn't do much, it's purely internal. That's a good reason for moving Timeouts to an internal thread and leaving Fallbacks and retries on the user-supplied scheduler, if any.

I believe this is resolved now so I'm closing. Feel free to reopen if not.