reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.98k stars 1.21k forks source link

Flux.collectList() return empty list instead of emit error #3917

Open ben-cpng opened 1 week ago

ben-cpng commented 1 week ago

For a flux created by zipping two Mono MonoA and MonoB, followed by collectList(),

Flux.zip(MonoA, MonoB).collectList()

If MonoB emit error, the collectList() operator should emit the error instead of return a empty list.

Expected Behavior

Flux.collectList() should emit the error emitted by MonoB.

Actual Behavior

Sometime Flux.collectList() will return empty list instead of emit the error.

Steps to Reproduce

Here is a unit test to reproduce the issue

@Test
public void bug_Test() {
    final var numItem = 1000000;
    var fluxToTest = Flux.range(1, numItem)
            .flatMap(ignored -> {
              var mono1 = Mono.empty()
                      .publishOn(PARALLEL_SCHEDULER) // would pass the test if not run in parallel scheduler
                      .map(Optional::of)
                      .defaultIfEmpty(Optional.empty());

              var mono2 = Mono.error(new NullPointerException())
                      .map(Optional::of)
                      .defaultIfEmpty(Optional.empty());

              return Flux.zip(mono1, mono2).collectList().onErrorResume(e -> Mono.empty());
            })
            // expect upstream will emit error signal only
            .flatMap(evt ->
                Mono.error(new RuntimeException("Unexpected empty list return by collectList of size %s".formatted(evt.size())))
            );
    StepVerifier.create(fluxToTest).expectNextCount(0).verifyComplete();
}

The above test always failed with empty list emitted in the last flatMap() block. For expect behavior, the onErrorResume() operator would replace all error signal into Mono.empty and no data would be processed by the last flatMap() block.

Possible Solution

Your Environment

chemicL commented 1 week ago

Hey, thanks for the report. I am wondering though - aren't you explicitly swallowing the error with .onErrorResume(e -> Mono.empty()) that follows after the .collectList() operator?

ben-cpng commented 1 week ago

I expected the collectList() operator always emit the error. Thus, the onErrorResume() call always return empty flux.  

I don’t expect the last map flatMap(evt -> ...) block will be triggered .  However, I found that the collectList() call will return an empty list occasionally… that will trigger the last map call. That is not the expected behavior.

chemicL commented 1 week ago

[EDIT] I misinterpreted the sequence of events in my initial evaluation. The below is not really what's going on since the delay is applied to a successful termination of a Mono that emits an item, not to the one that terminates with an error.

[ORIGINAL] Hmmm, I think your test is rather convoluted and makes it difficult to infer what's actually going on. Let me try to explain:

  1. The fact you needed to publish the error on the parallel scheduler because otherwise the test would pass indicates there is a timing issue in your code.
  2. Because you delay the error, the Mono.zip operator terminates once mono1 completes with just the onComplete() signal and cancels mono2.
  3. Due to the above, the collectList has no chance of observing an error, it just gets the completion signal from the zip operator.
  4. As the specification of collectList states, a completion signal from upstream gets transformed into an empty list downstream signal.

With the above explanation, I'm closing the report as invalid. Hope this explanation helps you resolve the issue you are facing.

ben-cpng commented 1 week ago

Thanks for your updated. For 2. Would you please tell me why Flux.zip operator terminates once mono1 completed? As in the code, mono1 should emit a data Optional.empty(). Should Flux.zip operator wait signal from mono2 (either data or error) Why should the Flux.zip operator cancel mono2 in this case? Thanks!

chemicL commented 1 week ago

My apologies, I think I have confused something there. Your questions are very helpful and I do think you might have in fact discovered a bug that's a race condition uncovered by these events:

  1. First Mono completes with a genuine signal on Thread1
  2. Second Mono errors on Thread2.

The situation happens to be that both Threads are draining the results at the same time and break the contract. I will investigate further.

I will also update my previous comment to reflect the reality (edit history can be viewed) as I made an error to assume the error gets delayed. The genuine signal is actually delayed and that is triggering this condition.

chemicL commented 1 week ago

Just for the record - the exclusivity of drain looks correct, so not both threads are draining at the same time. My observation came from the natural concurrency in Flux.flatMap and two different Flux.zip instances were draining at the same time but were completely unrelated.

The actual issue is that both threads attempt to complete the FluxZip operator and the problematic sequence seems to be the following:

  1. T1: mono1 delivers an actual signal, starts the drain procedure. It doesn't notice any error so continues looking at other results.
  2. T2: mono2 delivers the error, the error is set in the zip operator, but it fails to gain exclusive access to the drain procedure as T1 is executing it.
  3. T1: mono1 code path delivers the successful signal, while not noticing the error that was stored.

There should be some handling that would drop one of the signals in case the other Thread is already handling termination.