reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.93k stars 1.19k forks source link

NoSuchElementException in FlatMapMain.drainLoop under high load #2318

Closed mlex closed 4 years ago

mlex commented 4 years ago

We are using a combination of flatMap, publishOn and Flux.fromStream in our processing pipeline and see regular NoSuchElementExceptions that we couldn't really explain. The processing pipeline roughly looks like this:

hotSource
  .flatMap(data -> Flux.stream(deserializeDataToJavaStream(data))
  .publishOn(Schedulers.parallel())
  .subscribe()

The exception is not very frequent (once or twice per day), but it appears regularly (we've seen it for months now). See the example below for a full stacktrace.

To give some context: the application is under high load (millions of messages processed per second) and on some hosts cpu usage regularly reaches 100%. So a theoretical concurrency problem is likely to surface in this environment.

Expected Behavior

No NoSuchElementException is thrown.

Actual Behavior

NoSuchElementException is thrown because iterator.next() is called although the the iterator is already empty.

Steps to Reproduce

With a small trick we were able to reproduce the problem in a unit test:

  @Test
  public void reproduceNoSuchElementException
    final ExecutorService executor = Executors.newSingleThreadExecutor();
    Flux.<Long>create(sink ->
        sink.onRequest(requested ->
            executor.submit(() ->
                LongStream.range(0, requested).forEach(sink::next))))
        .flatMap(
            x -> Flux.fromStream(IntStream.range(0, 64).boxed()),
            8, 4)
        .publishOn(Schedulers.parallel())
        .take(Duration.ofSeconds(10))
        .blockLast();

If nothing else is change, this test runs fine. However, if I introduce a Thread.yield() in https://github.com/reactor/reactor-core/blob/v3.3.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java#L952 (between queue=f and done=true) then suddenly the test fails with the following exception:

java.util.NoSuchElementException
    at java.util.Spliterators$1Adapter.next(Spliterators.java:688)
    at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:230)
    at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:201)
    at reactor.core.publisher.FluxFlatMap$FlatMapInner.request(FluxFlatMap.java:994)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:710)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:569)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:339)
    at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.request(FluxContextStart.java:132)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:405)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Except for some more intermediate reactor operators, this is exactly the stacktrace that we are also seeing in our production environment.

Possible Solution

I don't have a solution. But at least an explanation. It is generally possible, that the FluxFlatMap.FlatMapMain.onNext and FluxFlatMap.FlatMapMain.request are called concurrently (the first because the upstream Publisher emits new elements, the second because downstream subscriber requests new elements).

This means, that it's possible for thread-1 to execute FluxFlatMap.FlatMapInner.onSubscribe, while thread-2 executes FluxFlatMap.FlatMapMain.drainLoop.

This means, that it can happen that thread-2 sees a inner with inner.queue != null but with inner.done == false. Because inner.queue has already the new value assigned in FlatMapInner.onSubscribe because of operator fusion, but the inner.done hasn't been assigned yet, because thread-1 didn't execute this line yet.

In this case, it can happen that during the drainLoop both methods FluxIterable.IterableSubscription.poll and FluxIterable.IterableSubscription.request are called (which shouldn't be possible, because the first should only be called if fusion is enabled while the second should only be called when fusion is disabled). And this can lead to the observed NoSuchElementException.

Your Environment

simonbasle commented 4 years ago

without the artificial trick of the yield, can you confirm if it reproduces with the latest reactor version? you're using 3.3.2.RELEASE which is almost 7 month old! the latest is 3.3.9.RELEASE in this line, and I suspect there might be a change in 3.3.5.RELEASE that may positively impact this issue.

mlex commented 4 years ago

Thank you for the quick response. We'll try out the latest 3.3.9.RELEASE.

mlex commented 4 years ago

With the yield-trick, the problem is still reproducibl with 3.3.9.RELEASE. I couldn't find any change between 3.3.2 and 3.3.9 that would have an effect on the observed concurrency issue. Can you give me a hint?

It will take some time to test & deploy & observe the version change in production.

mlex commented 4 years ago

I managed to reproduce the problem without the yield-trick and the latest 3.3.9.RELEASE. As said, our production systems sometimes reach 100% CPU usage. On a local MacBook with 12 cores the following test also causes 100% CPU usage and finally provokes the same error:

  @Test
  public void test() {
    Flux.fromStream(IntStream.range(0, 32).boxed())
        .flatMap(number -> {
          final ExecutorService executor = Executors.newSingleThreadExecutor();
          return Flux.<Long>create(sink ->
              sink.onRequest(requested ->
                  executor.submit(() ->
                      LongStream.range(0, requested).forEach(sink::next))))
              .flatMap(
                  x -> Flux.fromStream(IntStream.range(0, 64).boxed()),
                  8, 4)
              .publishOn(Schedulers.newParallel("foo-" + number, 2))
              .take(Duration.ofSeconds(60))
              .last();
        })
        .blockLast();
  }

Would it be possible to wrap done and queue into a wrapper object so that assignment in FlatMapInner.onSubscribe can be done atomically? Of course this causes overhead when checking done and queue in the drainLoop :-(

simonbasle commented 4 years ago

thanks for coming up with that second repro case. I further investigated this morning and now with your last comment it definitely looks like a bug in FlatMapInner handling of SYNC fusion.

actually, it shouldn't accommodate request calls when in fusionMode == SYNC. I'll run a few more tests to validate this fixes the issue.

mlex commented 4 years ago

As I understood the code, the if (!inner.done) { in https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java#L727 normally makes sure that inner.request() is not called when in fusionMode==SYNC.

Does it make sense to change this check to if (!inner.done && inner.sourceMode != Fuseable.SYNC) {?

simonbasle commented 4 years ago

that check can be done inside the inner's request method. that's the current fix I'm leaning towards. the remaining issue I have is how to perform a relevant enough test without adding an extra whole minute to the tests duration.

simonbasle commented 4 years ago

I came up with a slightly different reproducer code, which I'll archive here for reference (as it was long-running with too much GC pressure as well):

    @Rule
    public AutoDisposingRule afterTest = new AutoDisposingRule();

    @Test
    @Ignore("causes too much GC pressure and OutOfMemoryError")
    public void flatmapInnerShouldntRequestInFusionModeSync_gcHeavy() {
        /*
        See original reproduction case in https://github.com/reactor/reactor-core/issues/2318
        This simplified test attempts to trigger the bad state of the inner subscription a bit more directly,
        and allows asserting said state rather than detecting a consequence / symptom (NoSuchElementException).
        As such it runs (a bit) faster but may reproduce a bit less reliably.
        */
        final Scheduler scheduler = afterTest.autoDispose(Schedulers.newParallel("flatmapInnerShouldntRequestInFusionModeSync", 2));
        final List<AssertQueueSubscription<Integer>> innerQueueSubscriptions = Collections.synchronizedList(new ArrayList<>());
        final List<Integer> innerContent = Arrays.asList(1, 2, 3, 4, 5);

        Flux.fromStream(IntStream.range(0, 32).boxed())
            .flatMap(number -> {
                final ExecutorService executor = Executors.newSingleThreadExecutor();
                return Flux.<Long>create(sink -> sink.onRequest(requested -> executor.submit(() -> LongStream.range(0, requested).forEach(sink::next))))
                        .flatMap(x -> {
                            AssertQueueSubscription<Integer> assertQueueSubscription = new AssertQueueSubscription<Integer>() {
                                @Override
                                public int requestFusion(int requestedMode) {
                                    if (requestedMode != Fuseable.NONE) {
                                        return Fuseable.SYNC;
                                    }
                                    return Fuseable.NONE;
                                }
                            };
                            assertQueueSubscription.addAll(innerContent);

                            innerQueueSubscriptions.add(assertQueueSubscription);
                            return subscriber -> subscriber.onSubscribe(assertQueueSubscription);
                        }, 8, 4)
                        .publishOn(scheduler)
                        .take(Duration.ofSeconds(5));
            })
            .blockLast();

        //queuesubscription cannot be represented as string by assertj so we avoid allSatisfy, instead we manually loop
        innerQueueSubscriptions.forEach(aqs -> assertThat(aqs.requested)
                .withFailMessage("Inner publisher shouldn't be requested by FlatMapInner, got <request(%s)>", aqs.requested)
                .isZero());
    }
mlex commented 4 years ago

Thank you for the quick fix!