reactor / reactor-pool

Apache License 2.0
138 stars 33 forks source link

Possible pool resource release bug #124

Closed andreisilviudragnea closed 7 months ago

andreisilviudragnea commented 3 years ago

There are times when the resources are never released back to the pool. I do not know the cause yet, but I was able to reproduce the problem in a separate project here.

Expected Behavior

The resources should be released to the pool.

Actual Behavior

The resources never get released to the pool.

Steps to Reproduce

package com.example;

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.pool.InstrumentedPool;
import reactor.pool.PoolBuilder;

public class ReactorPoolBugTest {

    private static final int COUNT = 10_000;

    private final InstrumentedPool<String> stringReactivePool = PoolBuilder
            .from(Mono.just("value").delayElement(Duration.ofMillis(2)))
            .maxPendingAcquireUnbounded()
            .sizeBetween(0, 3)
            .buildPool();

    @Test
    public void reactorPoolBug() throws InterruptedException {
        ExecutorService loggerThreads = Executors.newFixedThreadPool(
                1,
                r -> {
                    Thread t = Executors.defaultThreadFactory().newThread(r);
                    t.setDaemon(true);
                    return t;
                }
        );

        loggerThreads.submit(new PoolMetricsLogger(stringReactivePool.metrics()));

        ExecutorService executorService = Executors.newFixedThreadPool(
                16,
                r -> {
                    Thread t = Executors.defaultThreadFactory().newThread(r);
                    t.setDaemon(true);
                    return t;
                }
        );

        CountDownLatch cdl = new CountDownLatch(COUNT);
        for (int i = 0; i < COUNT; i++) {
            executorService.submit(new FlatMapErrorTask(cdl));
        }

        cdl.await();
    }

    private static final class PoolMetricsLogger implements Runnable {

        private final InstrumentedPool.PoolMetrics poolMetrics;

        private PoolMetricsLogger(InstrumentedPool.PoolMetrics poolMetrics) {
            this.poolMetrics = poolMetrics;
        }

        public void run() {
            while (true) {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.err.printf(
                        "[POOL Metrics] Acquired = %d Pending = %d Idle = %d%n",
                        poolMetrics.acquiredSize(),
                        poolMetrics.pendingAcquireSize(),
                        poolMetrics.idleSize()
                );
            }
        }
    }

    private final class FlatMapErrorTask implements Runnable {

        private final CountDownLatch cdl;

        public FlatMapErrorTask(CountDownLatch cdl) {
            this.cdl = cdl;
        }

        public void run() {
            Flux<Void> flux = Flux
                    .range(0, 10)
                    .flatMap(i -> stringReactivePool
                            .withPoolable(value -> Mono
                                    .just(value)
                                    .delayElement(Duration.ofMillis(10))
                                    .then()
                            )
                            .switchIfEmpty(Mono.error(new RuntimeException("Empty")))
                    )
                    .doOnComplete(() -> cdl.countDown())
                    .doOnError(error -> cdl.countDown());

            try {
                flux.blockLast();
            } catch (Exception e) {
                System.err.println(e);
            }

            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Possible Solution

The test should always finish, but most of the time it never does, because all the pool resources remain acquired. For an unknown reason, the test fails way less often if you remove the Thread.sleep() at the end.

Your Environment

simonbasle commented 3 years ago

just in case this is a bug with the withPoolable construct, have you been able to reproduce with the acquire API?

andreisilviudragnea commented 3 years ago

@simonbasle I tried reproducing the bug only on the pool side here and only on the Flux.usingWhen() operator side here, but I failed.

I am not very familiar with the inner workings of either reactor-core or reactor-pool projects, but I may not use the .doOnCancel() operator correctly in the first isolated example above.

andreisilviudragnea commented 3 years ago

@simonbasle I have opened a MonoUsingWhen / FluxUsingWhen issue https://github.com/reactor/reactor-core/issues/2661

andreisilviudragnea commented 3 years ago

@simonbasle After investigating the reactor-pool implementation more, should drainLoop() be called also when AbstractPool.cancelAcquire() is called?

andreisilviudragnea commented 3 years ago

It looks like activating background eviction makes the test fail less often, but the test for the bug still fails sometimes.

Edit: background eviction seems useless.

andreisilviudragnea commented 3 years ago

I managed to isolate the reactor-pool bug here. Not even background eviction is helpful.

agorbachenko commented 8 months ago

Hi there! According to @mp911de and @chemicL this issue is the same as https://github.com/r2dbc/r2dbc-pool/issues/198. That issue is another reproducer of this critical bug. Maybe this helps to spot the bug and fix it.

chemicL commented 8 months ago

@andreisilviudragnea thanks for the report. I'm sorry to see it didn't get much attention. With some other reports around r2dbc, jOOQ, and reactor-core I stumbled upon the same problem and discovered your report. So I'll use this issue to try to make some progress on resolving this.

Here's a highly reproducible example that I created to capture what I observed in the r2dbc-pool issue mentioned by @agorbachenko (best to run it in a loop / repeat mode in the IDE): https://github.com/reactor/reactor-pool/compare/fix-124-deliver-cancel-race?expand=1.

As a result of the race between cancel and deliver, the connection is delivered to an already cancelled Subscriber. In a happy case a onNextDropped hook picks it up and that's what happens in case of the test, which prints out the following log:

Feb 27 2024 11:39:40.743 [parallel-4] DEBUG r.c.p.Operators [Loggers.java:254] - onNextDropped: PooledRef{poolable=PoolableTest{id=1, used=0/5}, lifeTime=0ms, idleTime=0ms, acquireCount=0} 

I'll chat with the team about possible solutions.

chemicL commented 8 months ago

@andreisilviudragnea I have some news and comments. I do believe it is time to close this particular issue. Here are the details:

  1. The example that I came up with in my above comment is expected due to the async nature of cancellation. The resolution is for the downstream consumers of the API to handle dropped items either with a Hook or to provide a proper combination of operators. The reason is that, within the reactor-pool's API, if we already emitted the connection downstream, it is the responsibility of the consumer to return it back to the pool. In my contrived example, the consumer is the LambdaMonoSubscriber, which receives the connection. The constructs used by reactor-pool are not involved, it's the user's side.
  2. The example that you provided, boiled down to the one from comment from March 23rd, 2021, requires a discard hook. Unfortunately, the way certain operators work requires handling cancellation at different levels. Sometimes items are dropped, like in my example. Sometimes items are discarded - which is a case for filter for instance. In this particular scenario it is the outer flatMap, that acquires the connection, that gets cancelled (due to the emitted error) and needs to handle dropped items which it has not yet applied the mapper on top of. Below please find the modified example with my comments and changes.
    • <1> is a change to avoid using the term "forever" and just coordinate all the asynchronous resource releases instead of relying on time. In my case the test takes over 19 minutes to finish.
    • <2> is about all the drops, discards and cancellations.
    • <3> is probably not true, but makes the assumption that the cleanup can be asynchronous so it gives some time to check the state of the pool resources.
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class ReactorPoolIsolatedBugTest {

    private static final int COUNT = 10_000;

    // <1> Added a sink to ensure all releases are consumed in time.
    private Sinks.Many<Mono<Void>> asyncReleaseSink =
            Sinks.many().unicast().onBackpressureBuffer();

    private final InstrumentedPool<String> stringReactivePool = PoolBuilder
            .from(Mono.just("value").delayElement(Duration.ofMillis(2)))
            .maxPendingAcquireUnbounded()
            .sizeBetween(0, 3)
            .buildPool();

    @Test
    public void reactorPoolBug() throws InterruptedException {
        // <2> This actually never gets triggered. Will explain soon.
        Hooks.onNextDropped((Consumer<Object>) dropped -> {
            if (dropped instanceof PooledRef) {
                System.out.println("GOT DROPPED REF");
                Sinks.EmitResult result =
                        asyncReleaseSink.tryEmitNext(((PooledRef<?>) dropped).release());
                if (result.isFailure()) {
                    System.out.println("Failed to release dropped con");
                }
            }
        });

        ExecutorService executorService = Executors.newFixedThreadPool(
                16,
                r -> {
                    Thread t = Executors.defaultThreadFactory().newThread(r);
                    t.setDaemon(true);
                    return t;
                }
        );

        // <1> Make sure all cancellations return connection to the pool before
        // validation happens.
        ExecutorService asyncReleasePool = Executors.newSingleThreadExecutor();
        CountDownLatch asyncReleaseDone = new CountDownLatch(1);
        asyncReleasePool.submit(
                () -> asyncReleaseSink.asFlux()
                                      .concatMap(Function.identity())
                                      .subscribe()
        );

        CountDownLatch cdl = new CountDownLatch(COUNT);
        for (int i = 0; i < COUNT; i++) {
            executorService.submit(new FlatMapErrorTask(cdl));
        }

        cdl.await();

        System.out.println("All tasks finished. Waiting for connection release.");

        // <1> Follow up all async releases with a latch release to validate the state.
        Sinks.EmitResult result =
                asyncReleaseSink.tryEmitNext(Mono.fromRunnable(asyncReleaseDone::countDown));
        assertThat(result).matches(Sinks.EmitResult::isSuccess);

        asyncReleaseDone.await();

        // <3> In case there was any asynchronous eviction in place
        await().alias("acquiredSize").atMost(10, TimeUnit.SECONDS)
                  .untilAsserted(() -> assertThat(stringReactivePool.metrics().acquiredSize()).isEqualTo(0));
        await().alias("idleSize").atMost(10, TimeUnit.SECONDS)
                  .untilAsserted(() -> assertThat(stringReactivePool.metrics().idleSize()).isEqualTo(3));
        await().alias("allocatedSize").atMost(10, TimeUnit.SECONDS)
                  .untilAsserted(() -> assertThat(stringReactivePool.metrics().allocatedSize()).isEqualTo(3));
    }

    private final class FlatMapErrorTask implements Runnable {

        private final CountDownLatch cdl;

        public FlatMapErrorTask(CountDownLatch cdl) {
            this.cdl = cdl;
        }

        public void run() {
            Flux<Void> flux = Flux
                    .range(0, 10)
                    .flatMap(i -> stringReactivePool
                            .acquire()
                            .delayElement(Duration.ofMillis(100))
                            // <2> It can happen that the flatMap over the range is
                            // cancelled. However, the lambda in the flatMap that follows
                            // is not actually exercised. The value is discarded
                            // after cancellation, without a chance to be released. So
                            // we use the discard hook:
                            .doOnDiscard(PooledRef.class, s -> {
                                System.out.println("Discarded after acquire, pushing release to sink");
                                Sinks.EmitResult result =
                                        asyncReleaseSink.tryEmitNext(s.release());
                                if (result.isFailure()) {
                                    System.out.println("Failed to emit async release");
                                    System.exit(1);
                                }
                            })
                            .flatMap(pooledRef -> Mono
                                    .just(pooledRef.poolable())
                                    .delayElement(Duration.ofMillis(10))
                                    // <2> Might never be triggered.
                                    .then(pooledRef.release())
                                    // <2> Might never be triggered.
                                    .onErrorResume(error -> pooledRef.release())
                                    // <2> Might never be triggered.
                                    .doOnCancel(() -> {
                                        System.out.println("Canceled inner, pushing " +
                                                "release to sink");
                                        Sinks.EmitResult result =
                                                asyncReleaseSink.tryEmitNext(pooledRef.release());
                                        if (result.isFailure()) {
                                            System.out.println("Failed to emit async release");
                                            System.exit(1);
                                        }
                                    })
                            )
                            .switchIfEmpty(Mono.error(new RuntimeException("Empty")))
                    )
                    .doOnComplete(cdl::countDown)
                    .doOnError(error -> cdl.countDown());

            try {
                flux.blockLast();
            } catch (Exception e) {
                System.err.println(e);
            }

            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

@andreisilviudragnea With these explanations, my understanding is that the pool has no bug. However, the usage of various operators requires care when dealing with the connections to properly handle cancellations, discards, drops, and termination in a graceful manner at respective levels of chaining.

I look forward to your verification of the above evaluation. Thanks in advance!

chemicL commented 7 months ago

@andreisilviudragnea friendly ping :)

chemicL commented 7 months ago

Ok, I'll close the issue. Please feel free to re-open if you disagree with my assessment.