reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.96k stars 1.2k forks source link

#publish does not propagate onComplete to late Subscribers #2897

Open jGleitz opened 2 years ago

jGleitz commented 2 years ago

Expected Behavior

When a #published source completes while no one is subscribed, at least the first Subscriber to subscribe afterwards directly receives onComplete.

Actual Behavior

The subscriber receives no signal at all and effectively hangs.

Steps to Reproduce

    @Test
    void subscribeAfterComplete() {
        var sink = Sinks.many().unicast().<Integer>onBackpressureBuffer();
        var published = sink.asFlux().log("source").publish().autoConnect().log("published");

        sink.tryEmitNext(1).orThrow();
        StepVerifier.create(published)
            .expectNext(1)
            .thenCancel()
            .verify(Duration.ofSeconds(1));

        sink.tryEmitComplete().orThrow();
        StepVerifier.create(published)
            .expectComplete()
            .verify(Duration.ofSeconds(1));
    }

Output:

2022-01-20 18:00:05.376  INFO   --- [Pool-1-worker-3] published : onSubscribe(FluxPublish.PublishInner)
2022-01-20 18:00:05.386  INFO   --- [Pool-1-worker-3] published : request(unbounded)
2022-01-20 18:00:05.388  INFO   --- [Pool-1-worker-3] source    : | onSubscribe([Fuseable] UnicastProcessor)
2022-01-20 18:00:05.392  INFO   --- [Pool-1-worker-3] source    : | request(256)
2022-01-20 18:00:05.392  INFO   --- [Pool-1-worker-3] source    : | onNext(1)
2022-01-20 18:00:05.392  INFO   --- [Pool-1-worker-3] published : onNext(1)
2022-01-20 18:00:05.393  INFO   --- [Pool-1-worker-3] published : cancel()
2022-01-20 18:00:05.394  INFO   --- [Pool-1-worker-3] source    : | request(1)
2022-01-20 18:00:05.395  INFO   --- [Pool-1-worker-3] source    : | onComplete()
2022-01-20 18:00:05.396  INFO   --- [Pool-1-worker-3] published : onSubscribe(FluxPublish.PublishInner)
2022-01-20 18:00:05.397  INFO   --- [Pool-1-worker-3] published : request(unbounded)

java.lang.AssertionError: VerifySubscriber timed out on reactor.core.publisher.FluxPeek$PeekSubscriber@58f5f822

Workaround

Use #materialize before and #dematerialize after publishing:

var published = sink.asFlux().log("source").materialize().publish().autoConnect().<Integer>dematerialize().log("published");

Your Environment

jGleitz commented 2 years ago

To elaborate on my expectation:

I am using #publish with a source that honours backpressure. I am relying on the documented property of #publish that

if any Subscriber is missing demand (requested = 0), multicast will pause pushing/pulling.

I take that to mean that if there are no Subscribers at all (and, hence, no request), pushing and pulling will also pause. This works as expected for next signals, but not for completion. I think it is because of the prefetch: prefetched next signals are buffered and replayed, but completion signals are not.

My expectation is that at least the first subscriber that subscribes after the source completed sees the completion. I would find it even more useful if all later subscribers got the completion replayed, but I guess one could also make the argument that this would be inconsistent with how #publish handles next signals.

jGleitz commented 2 years ago

Here is another example that shows why I find the current behaviour unexpected: The completion signal is being buffered while there are no subscribers, as long as there are still outstanding next signals.

We can see this in this succeeding test case:

    @Test
    void subscribeAfterComplete() {
        var sink = Sinks.many().unicast().<Integer>onBackpressureBuffer();
        var published = sink.asFlux().log("source").publish().autoConnect().log("published");

        sink.tryEmitNext(1).orThrow();
        StepVerifier.create(published)
            .expectNext(1)
            .thenCancel()
            .verify(Duration.ofSeconds(1));

        sink.tryEmitNext(2).orThrow();
        sink.tryEmitComplete().orThrow();
        StepVerifier.create(published)
            .expectNext(2)
            .expectComplete()
            .verify(Duration.ofSeconds(1));
    }

In other words: #publish makes sure that no signals are lost while there are no Subscribers, except if the only outstanding signal is a completion. In that case it is dropped.

simonbasle commented 2 years ago

This indeed sounds like something that needs fixing. Thanks for adding that other test case, we'll look into it 👍

OlegDokuka commented 1 year ago

@jGleitz there is a workaround as of now which is replacement of publish with replay(history=0).

RonBarkan commented 7 months ago

In the duplicate issue #3739 the published flux gladly accepts the 2nd late subscriber then ignores it and the pipeline hangs. This behavior makes no sense to me. It is also not documented anywhere, AFAICT.

Stating the obvious that causing a pipeline to hang, possibly in a subtle way, should be avoided.

IMHO, it should simply give it a complete signal, instead of hanging.