reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.93k stars 1.19k forks source link

`FluxConcatArray` subscribes to the next source despite being cancelled #2778

Closed nathankooij closed 2 years ago

nathankooij commented 3 years ago

Expected Behavior

When FluxConcatArray gets cancelled, any remaining sources shouldn't be subscribed to as any downstream subscriber will likely drop elements since it cancelled. This was (seemingly) the behavior in 3.4.8.

Actual Behavior

FluxConcatArray subscribes to the next source despite being cancelled by the downstream subscriber already.

Steps to Reproduce

The following test succeeds on my machine with Reactor 3.4.9 (which demonstrates the erroneous behavior). With Reactor 3.4.8 this test fails as the Mono#error isn't subscribed to.

  @Test
  void test() {
    Flux.concat(Mono.fromSupplier(() -> 5), Mono.error(IllegalStateException::new))
        .next()
        .as(StepVerifier::create)
        .expectNext(5)
        .expectComplete()
        .verifyThenAssertThat()
        .hasDroppedErrors(1)
        .hasDroppedErrorOfType(IllegalStateException.class);
  }

Possible Solution

The issue was introduced in 3.4.9, by the work done in https://github.com/reactor/reactor-core/pull/2742. Adding hide() to the #fromSupplier "fails" the test, which might indicate that this is a particular issue with fusion (or the eager behavior of #fromSupplier). From my debugging I concluded the following:

As such, I think a possible solution could be to check whether cancelled == true before draining any other sources, like how it's checked for the other branches in this loop.

Your Environment

nathankooij commented 3 years ago

To add some more context: we noticed this issue after upgrading Reactor in a WebFlux application, which started to log exceptions for each request, while fulfilling the request correctly otherwise. In particular this line caused the issue, as Spring attempted to extract a path variable twice, when only the first route can handle the request (and did!), so there's no point in checking the second route.

simonbasle commented 3 years ago

a better test (that fails in the current version) would probably be: (edited)

    @Test
    void stopSubscribingAdditionalSourcesWhenDownstreamCancelled() {
        AtomicBoolean concatIsCancelled = new AtomicBoolean();
        AtomicBoolean part2Subscribed = new AtomicBoolean();
        Flux.concat(
                Mono.just(1),
                Mono.just(2).doFirst(() -> part2Subscribed.set(true))
            )
            .doOnCancel(() -> concatIsCancelled.set(true))
            .next()
            .as(StepVerifier::create)
            .expectNext(1)
            .expectComplete()
            .verifyThenAssertThat()
            .hasNotDroppedElements();

        assertThat(concatIsCancelled).as("concat was cancelled").isTrue();
        assertThat(part2Subscribed).as("part2 was never subscribed").isFalse();
    }
simonbasle commented 3 years ago

except damn, it passes on main 🤦

nathankooij commented 3 years ago

Thanks for looking into this! I agree in hindsight that it would have made more sense to write a test that would fail with the current status quo, rather than the other way around. The fix in the PR make sense. 👍