Open jmmerz opened 2 months ago
Update: I've discovered that the problem appears related specifically to using a Mono.fromCallable()
as the final producer for the Flux.mergeSequential()
. If I instead use a Mono.fromRunnable()
the code works as expected.
I've updated the description and workarounds in the original ticket accordingly.
Problem Description
I'm upgrading an application to take Reactor from 3.4.x to the latest (3.6.5) and have run into this problem:
If I invoke Flux.mergeSequential(Publisher<? extends I>... sources) like:
where
producer1
,producer2
, ...,producerN
have code that prepares an operation and then awaits a signal, andmonoFinal
is aMono.fromCallable()
containing code that sends the signal to allow the code run by release the first N Producers to continue and complete.It seems the requirements to trigger the unexpected behavior are:
Mono.fromCallable()
. I've found that if I use aMono.fromRunnable()
instead, the contained Runnable is executed, while in the former case, the Callable is not executed.Background (my specific use case for the above)
I have some code structured as described above where
producer1
,producer2
, ...,producerN
send commands to a Redis client that is configured not to flush commands over the wire immediately, andmonoFinal
is aMono.fromCallable()
that tells the client to flush the commands so that they are sent over the wire as a batch after the last Mono runs.When invoked like this,
producer1
,producer2
, ...,producerN
are all subscribed by the merged Flux, and their code waits for the signal sent bymonoFinal
, butmonoFinal
is not subscribed - at least its Callable is never executed.I've developed a simplified test case not requiring external services/libraries to demonstrate what's happening (link below in Steps to Reproduce).
Expected Behavior
The merged Flux should subscribe to all Producers eagerly and their emitted values should be produced in order.
Actual Behavior
The first N Producers are subscribed, but the last Mono appears never to be subscribed (or at least it's Callable is never never executed).
Steps to Reproduce
I've created a small project with a test case to Github. If it is preferable to deliver test code in another way, please let me know, I'm happy to oblige. See the project in test-reactor-flux-mergeSequential:
monoFinal
. (Mono.log() works as well.)Possible Solution
I don't have a solution, but I've found a workaround (explained below) in case it helps.
Workaround 1
If I redefine
monoFinal
usingMono.fromRunnable()
in place of the existingMono.fromCallable()
, things work as expected.Workaround 2
(This workaround seems less good, but I'm leaving it documented in case it helps narrow down the cause.)
If I invoke Mono.doOnSubscribe(Consumer<? super Subscription> onSubscribe) on
monoFinal
even with a No-Op Consumer, it causes things to work.The same thing happens with Mono.log() which I suspect is due to adding signal handling for
onSubscribe
as well. Other error handlers (onCancel
,onError
,onTerminate
) have no effect.Your Environment
netty
, ...):java -version
):uname -a
):