reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.99k stars 1.21k forks source link

switchOnMap with single returned item cancels input #3936

Closed yawkat closed 1 day ago

yawkat commented 1 week ago

Expected Behavior

When a switchOnFirst function returns a single item that contains the input flux but is not derived through operators, the input flux should remain available without restriction.

Actual Behavior

The input flux is cancelled when the outer flux is cancelled (e.g. by Mono.from), making it unusable.

Steps to Reproduce

    public static void main(String[] args) {
        Flux<String> input = Flux.just("foo", "bar");
        Mono<Flux<String>> nested = Mono.from(input.switchOnFirst((first, all) -> Mono.just(all)));
        Flux<String> item = nested.block();
        System.out.println(item); // prints SwitchOnFirstMain as expected
        System.out.println(item.collectList().block()); // never terminates as the item has been cancelled
    }

Possible Solution

In the above example, Mono.from sees the onNext immediately, and cancels the FluxSwitchOnFirst. Cancelling FluxSwitchOnFirst however also cancels the SwitchOnFirstMain, which it should not. Cancellation should only be sent to the publisher returned by the switchOnFirst lambda (in this case the Mono.just)

Your Environment

chemicL commented 2 days ago

Hey, @yawkat !

Thanks for the detailed report. The thing is that you are attempting to use the internal product of switchOnFirst outside of its scope.

After you've obtained the Flux<String> item from block() it means that the nested chain has completed. Subscribing to its inner representation is disallowed outside of the scope of the lambdas that you can define. As stated in the javadoc:

The whole source (including the first signal) is passed as second argument to the BiFunction and it is very strongly advised to always build upon with operators (see below).

What you do instead is not derive anything but rather capture this intermediate result and terminate the chain with it. That's not supported. However, perhaps the javadoc can be more specific here.

There's also the following note:

It is advised to return a Publisher derived from the original Flux in all cases

But it follows with another risk that you are not actually running into since you complete the execution. That's when the source is actually cancelled!

I propose two follow up tasks:

  1. Propagate onError to the late subscriber in case of subscribing to a cancelled intermediate result (I believe this is a bug on our part).
  2. Improve the Javadoc. Not sure yet about the right wording.
yawkat commented 2 days ago

@chemicL I understand why it breaks the way it does from reading the code, but I disagree that this behavior is forbidden by the javadoc. The note about deriving the result of the lambda from the given publisher just says "strongly advised", it doesn't say it's necessary, and the javadoc makes it appear like this is because you may leave a dangling flux, which the code above does not.

Also, I just tried this with io.projectreactor:reactor-core:3.3.0.RELEASE and this issue does not happen, so this is technically a regression.

chemicL commented 2 days ago

Thanks, that's interesting, I wasn't aware. 3.3.0.RELEASE was released over 5 years ago and is no longer supported in the OSS. We currently support 3.6 and 3.7. I just checked and 3.6.0 behaves in the same way for this particular scenario. There have been some major changes implemented since 3.3.0.RELEASE:

With a whole rewrite happening in 3.4.6:

And from my POV a regression is introduced in the following fix for another issue: https://github.com/reactor/reactor-core/pull/2794/files#diff-ca833c86dffdc3cb95fd2ead3626c836e9951f7b01a38163215ad483479916a3L654-R660

All in all, I think the time to report regressions between 3.3 and 3.4 is behind us. @yawkat Let's focus on the problem you are trying to solve. Can you be more specific and perhaps we can work out a way to achieve the goals you have with the current behaviour?

yawkat commented 2 days ago

In Micronaut HTTP, when there is a controller like Publisher<byte[]> controller() { ... }, we want to be able to have special handling for the case where the Publisher immediately returns an error when we can still send a full error response, as opposed to an error that happens after the first byte[] has been sent, when we can't change the response status or headers anymore. So I tried to use switchOnFirst for that, for the special error handling.

I don't need a workaround anymore, I implemented my own processor for it: https://github.com/micronaut-projects/micronaut-core/blob/126058cf5fd16e7ac565ec0e70840799f46a52a1/core-reactive/src/main/java/io/micronaut/core/async/subscriber/LazySendingSubscriber.java

Just thought this might still be worth fixing.

chemicL commented 2 days ago

Thanks for the additional context. With that, I'll inline my observations about the initial example you provided as comments:

        Flux<String> input = Flux.just("foo", "bar");

        // Mono.from causes the execution to cancel the provided Publisher once the first item is emitted
        Mono<Flux<String>> nested = Mono.from(input.switchOnFirst((first, all) -> Mono.just(all)));

        // block() waits for the Mono to complete -> once the first item is retrieved, the source is cancelled
        // meaning that SwitchOnFirstMain is cancelled
        Flux<String> item = nested.block();
        System.out.println(item); // prints SwitchOnFirstMain as expected

        // collectList().block() subscribes to a cancelled SwitchOnFirstMain,
        // it is a bug that it doesn't terminate but waits forever.
        System.out.println(item.collectList().block()); // never terminates as the item has been cancelled

So essentially it's not really the effect of SwitchOnFirstMain being unusable outside of the scope of the lambda as I initially thought.

Let's apply the first mitigation so that the cancellation does not happen, Mono<Flux<String>> -> Flux<Flux<String>>:

        Flux<Flux<String>> nested = Flux.from(input.switchOnFirst((first, all) -> Mono.just(all)));
        // now we use blockLast to wait for the last item of the Flux
        Flux<String> item = nested.blockLast();

Now, from the collectList().block() you'll observe the following:

Exception in thread "main" java.util.concurrent.CancellationException: FluxSwitchOnFirst has already been cancelled
    at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.subscribe(FluxSwitchOnFirst.java:724)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
    at reactor.core.publisher.Mono.block(Mono.java:1778)
    at reactor.core.publisher.Flux.main(Flux.java:138)
    Suppressed: java.lang.Exception: #block terminated with an error

And that is expected as I commented before, unless you use the switchOnFirst overload which accepts a boolean cancelSourceOnComplete:

        input.switchOnFirst((first, all) -> Mono.just(all), false)

This will successfully print:

[foo, bar]

As an alternative to using Flux instead of Mono consider using .single():

        Flux<String> input = Flux.just("foo", "bar");
        Mono<Flux<String>> nested =
                input.switchOnFirst((first, all) -> Mono.just(all), false).single();
        Flux<String> item = nested.block();
        System.out.println(item);
        System.out.println(item.collectList().block());
chemicL commented 1 day ago

Pushed the fix to the hanging behaviour. With the above explanation I think the issue can be closed. @yawkat hope this helps. Please don't hesitate to open a new issue in case something else is fishy.

yawkat commented 1 day ago

Thanks!