reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.93k stars 1.19k forks source link

Fusion of Mono.single() over most fuseable sources causes ClassCastException #2663

Closed GeorgiPetkov closed 3 years ago

GeorgiPetkov commented 3 years ago

Usage of Mono#single between 2 calls of Mono#map fails with ClassCastException.

Expected Behavior

no ClassCastException failure during subscription (worked on 3.4.3)

Actual Behavior

ClassCastException failure during subscription

Steps to Reproduce

The following test fails with ClassCastException (instead of NoSuchElementException). The expected behaviour is not relevant, you can replace Mono.empty() with Mono.just("test") and it will still fail with ClassCastException (instead of passing). Note that each of the 3 operators is necesasry to reproduce the issue.

@Test
void reproCase() {
    Mono.empty()
        .map(x -> "test1")
        .single()
        .map(x -> "test2")
        .block();
}

Test output (version 3.4.4):

java.lang.ClassCastException: class reactor.core.publisher.MonoSingle$SingleSubscriber cannot be cast to class reactor.core.Fuseable$QueueSubscription (reactor.core.publisher.MonoSingle$SingleSubscriber and reactor.core.Fuseable$QueueSubscription are in unnamed module of loader 'app')

    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:95)
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:115)
    at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
    at reactor.core.publisher.Operators.complete(Operators.java:135)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4099)
    at reactor.core.publisher.Mono.block(Mono.java:1702)
    at x.x.x.MapSingleMapTest.reproCase

Expected output (version 3.4.3):

java.util.NoSuchElementException: Source was empty
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:179)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    at reactor.core.publisher.Operators.complete(Operators.java:136)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4046)
    at reactor.core.publisher.Mono.block(Mono.java:1702)
    at x.x.x.MapSingleMapTest.reproCase
    ...

Your Environment

Java15, project reactor v3.4.4 (works on 3.4.3)

simonbasle commented 3 years ago

has-workaround: since this is a fusion issue, disabling fusion by hiding the source will temporarily fix the problem:

Mono.empty()
    .map(x -> "test1")
    .single()
      .hide() // THIS IS THE WORKAROUND
    .map(x -> "test2")
    .block();
simonbasle commented 3 years ago

As a potential workaround to be applied to an entire codebase, one can use the hook demonstrated below:

    // see https://github.com/reactor/reactor-core/issues/2663
    @Test
    void smokeTestFusionMonoSingleMonoWithGh2663Workaround() {

        //HOOK START
        //TODO remove the workaround part when #2663 is fixed
        Hooks.onEachOperator("workaroundGh2663", p -> {
            if ("MonoSingleMono".equals(p.getClass().getSimpleName())
            || "MonoSingleCallable".equals(p.getClass().getSimpleName())) {
                //it should be ok to use the Mono API here as we selectively
                // wrap two types of publishers, so it won't loop and reapply
                //this hook to hide()
                return ((Mono) p).hide();
            }
            return p;
        });

        //HOOK ENDS, DEMONSTRATION BELOW

        Mono.just(1)
            .map(Function.identity())
            .single()
            .filter(v -> true)
            .block();

        Mono.fromCallable(() -> 1)
            .single()
            .filter(v -> true)
            .block();
    }