reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.93k stars 1.19k forks source link

Simplify `MonoIgnoreThen` implementation to avoid drainLoop undeterministic threading #2561

Closed nathankooij closed 3 years ago

nathankooij commented 3 years ago

Expected Behavior

When MonoIgnoreThen is used in conjunction with PublishOn I would expect MonoIgnoreThen#last to run on the scheduler used by PublishOn. In other words, in the construct x.publishOn(someScheduler).then(y) I would expect y to subscribe/run on the scheduler someScheduler, and not on the scheduler used by x. (Also see example below.) I would expect this as the documentation of then(...) leads me to believe it transforms the completion signal to the publisher y. Since the completion signal is published on someScheduler as per publishOn, I would therefore also expect y to be subscribed to on someScheduler. Perhaps this is an incorrect assumption, but it would make it very hard to reason about scheduling in these cases (as currently either can happen) so some clarification would be at least appreciated then. :)

Actual Behavior

Depending on how fast the thread that executes MonoIgnoreThen.ThenIgnoreMain#drain is, it is possible that x is drained completely (on a different thread) before the loop in drain reaches the end of its first iteration. When that happens, drain will continue to drain on the original thread of x and subscribe to y on that same thread.

Steps to Reproduce

Depending on your machine this might or might not work (we observed this issue mainly in our CI environment, not locally), but it demonstrates the (IMO) expected behavior, although it fails for me when run from the Reactor repository. Adding delayElement with some arbitrary delay to the initial getCurrentThreadName makes the test pass for me, but this difference seems unintuitive to me.

    @Test
    public void race() {
        Mono<String> getCurrentThreadName = Mono.fromSupplier(() -> Thread.currentThread().getName());
        StepVerifier.create(
                getCurrentThreadName
                        .publishOn(Schedulers.newSingle("non-test-thread"))
                        .then(getCurrentThreadName))
                .assertNext(threadName -> assertThat(threadName).startsWith("non-test-thread"))
                .verifyComplete();
    }

This fails with:

Expecting:
 <"Test worker">
to start with:
 <"non-test-thread">

java.lang.AssertionError: 
Expecting:
 <"Test worker">
to start with:
 <"non-test-thread">

Possible Solution

I realize I can workaround this issue by using a flatMap instead, but I would have to introduce some unused variable when then would nicely convey my intention. It also makes it unintuitive when it would be "safe" to use then as right now I cannot predict which scheduler will be used. Perhaps the initial drain started upon subscription should always short-circuit?

Your Environment

Reactor version:

3.4.2 (running the test on `master` in this repo)

Java version:

openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

OS and version:

Darwin MacBook-Pro.local 20.1.0 Darwin Kernel Version 20.1.0: Sat Oct 31 00:07:11 PDT 2020; root:xnu-7195.50.7~2/RELEASE_X86_64 x86_64
simonbasle commented 3 years ago

That might be a bit counterintuitive but publishOn can be fused. Since both fromCallable and then are also fuseable, the value might be emitted on the initial subscription thread...

While you might be tempted to debate this fusion support, it certainly has deep roots in the codebase which makes any change in that regard difficult at best.

So the best avenue is probably to prevent fusion altogether between first fromCallable and publishOn by adding a .hide() in between.

simonbasle commented 3 years ago

mmh scratch that, hide() doesn't seem to help. I'd recommend to use subscribeOn inside then() rather than publishOn above it, as a more foolproof way of ensuring that second Mono runs on the desired thread.

nathankooij commented 3 years ago

Thanks for the answer! Indeed, I see now that my example is affected by fusion as hide() does make the test pass at least (for me). This was my attempt at a simplified version of actual code which exhibited similar behavior, but where fusion isn't as obvious. I do not mean to debate fusion support, just trying to understand if there exists a race condition, or whether this can be considered a race condition in the first place. It doesn't seem to break the reactive contracts I think. Perhaps a different explanation helps with what I'm trying to get at.

I can still replicate this "condition" in the test demonstrated above (also after adding hides) by placing a breakpoint on https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/MonoIgnoreThen.java#L176 that suspends only the calling thread. I realize this is perhaps a bit unorthodox, but we observed this execution in test code where the calling thread is super slow. As we've entered drain and already subscribed to ignore (on a different thread), that subscription can conclude in the background while we are suspended (and after the complete signal of that subcription another drain will be triggered). Subsequently, we can unsuspend the thread and it will continue the drain loop and subscribe to the second Mono/call directly, but unaffected by the publishOn. This is the part I'm wondering about, is it intended that the initial drain loop (triggered here) can subscribe to the 2nd Mono even though we will also enter the drain loop via the completion signal of the inner Mono? After all, the only reason why the initial drain can continue is because the 2nd drain enables the conditions. If the 2nd drain had been allowed to run (with the 1st one stopped), the "race condition" and this "thread problem" would not be possible. Hope this clarifies my question a bit.

On a side note, I can see how with subscribeOn inside of then we could ensure the second Mono runs on the desired thread, and maybe that's something we should do. Although in our case it would lead to a slightly worse API (IMO), but good to keep that option in mind. :) (I can provide more context on this use case as well, if you want.)

simonbasle commented 3 years ago

yeah the implementation of MonoIgnoreThen is probably overcomplicated by the drain loop pattern, from which the operator doesn't really benefit as it can lead to this execution model inconsistency.

I think we'll look into simplifying the implementation so that it sticks to the logical flow better (ie. trigger the next "ignore" subscription in the onComplete of the previous one).

nathankooij commented 3 years ago

Ack, thanks for the quick answers. Looking forward to what an alternative implementation would look like. Let me know if I can test anything. Should we close the issue?

simonbasle commented 3 years ago

@nathankooij let's use that issue to track the implementation change.