reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.87k stars 1.18k forks source link

Auto-cancelled Sink still accepts emissions #3715

Open kaqqao opened 4 months ago

kaqqao commented 4 months ago

A sink created via Sinks.many().multicast().onBackpressureBuffer() will keep accepting emissions even after it's cancelled by autoCancel (which is implicitly enabled, and unfortunately not mentioned in the method's Javadoc), until the internal buffer is filled. Javadoc on the overload onBackpressureBuffer(int bufferSize, boolean autoCancel) mentions:

@param autoCancel should the sink fully shutdowns (not publishing anymore) when the last subscriber cancels

Expected Behavior

After the sink is shut down (cancelled), sink.tryEmitNext(...) should fail.

Actual Behavior

sink.tryEmitNext(...) succeeds and keeps filling the buffer.

Steps to Reproduce

@Test
public void cancelledSinkStillAcceptsEmissions() {
    Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
    Disposable subscription1 = sink.asFlux().subscribe(s -> System.out.println("1: " + s));
    assertEquals(1, sink.currentSubscriberCount());
    sink.tryEmitNext("Test1");
    subscription1.dispose();
    assertEquals(0, sink.currentSubscriberCount()); //autoCancel kicks in, subscriptions no longer possible
    Disposable subscription2 = sink.asFlux().subscribe(s -> System.out.println("2: " + s));
    assertTrue(subscription2.isDisposed()); //auto-cancelled
    assertEquals(0, sink.currentSubscriberCount());
    assertTrue(sink.tryEmitNext("Test2").isFailure(), "Emissions on a cancelled sink should fail");
}

In this state, no subscriber will ever be able to subscribe to the sink and consume "Test2" or any future emissions, yet emissions are accepted and buffered.

Possible Solution

Add

if (isCancelled()) {
    return EmitResult.FAIL_CANCELLED;
}

to the beginning of tryEmitNext, tryEmitError and tryEmitComplete in SinkManyEmitterProcessor.java;

Your Environment

qnnn commented 4 months ago

Hi, I'm lucky to come across this issue. I'm currently learning Reactor and have also encountered this problem, which has left me confused. Additionally, I have a question related to the EmitResult code, and perhaps it can be addressed together in this issue.

Is it necessary to use a new response code to represent "warm up" and distinguish whether subscribers exist when using tryEmitNext?

Issue #2338 mentions: do we really need a separate fail code (vs reusing FAIL_OVERFLOW)

The solution related to #2338 is to introduce the FAIL_ZERO_SUBSCRIBER status code. However, triggering the FAIL_ZERO_SUBSCRIBER response code requires meeting the following two conditions:

  1. The queue cache is full or there is no queue cache
  2. There are no subscribers

Perhaps adjusting or extending the meaning of the EmitResult code can help us better understand the state of Sinks.Many when using tryEmitNext. I'm not sure if the following points are reasonable:

  1. The FAIL_ZERO_SUBSCRIBER response code only indicates that there are no subscribers and does not consider whether the queue cache is full.

  2. When the queue overflows, only the FAIL_OVERFLOW status code is triggered (or a new extended status code, such as "FAIL_OVERFLOW_WITHOUT_SUBSCRIBERS," which means that there are no subscribers when FAIL_OVERFLOW occurs).

  3. A new status code can be used to represent "warm up," as mentioned in the Javadoc of Sinks.MulticastSpec#onBackpressureBuffer:

    "Without Subscriber: warm up. Remembers up to Queues.SMALL_BUFFER_SIZE elements pushed via Sinks.Many.tryEmitNext(Object) before the first Subscriber is registered."

image-20240214215905746

When the "warm up" response code appears, it can indicate the possibility of FAIL_OVERFLOW risk.

chemicL commented 4 months ago

@kaqqao thanks for the report! I reviewed the other Sinks implementations and do believe your expectations are backed by the fact that SinkManyUnicast and SinkManyUnicastNoBackpressure also reject emissions in case of cancellation. I'll have a look at the PR you opened as well and provide feedback.

chemicL commented 4 months ago

@qnnn glad to hear you're learning Reactor 🚀 Hope the experience is pleasant so far.

Regarding the suggestion you made: re 1. FAIL_ZERO_SUBSCRIBER would indicate failure, while the scenario you describe in the buffering cases means the items are buffered and succeed. re 2. the FAIL_ZERO_SUBSCRIBER in its current meaning is exactly that as far as I can tell. re 3. the idea is not bad per se, however it's not applicable for two reasons: 1) it would be a breaking change of behaviour; 2) I don't see what value it would provide. The idea for Sinks is essentially to decouple the producer from the consumer so both can make progress. The failure events tell the producer that it needs to worry and deal with backpressure, while the warmup phase is actually a successful scenario until the buffer fills up and requires no indication that nobody is yet receiving.

If you feel this is falling short in some scenarios and there are situations that changing this behaviour would enable new flows, please open a new issue with ideas about the migration path if there's a behaviour change proposed. Let's not mix two concerns in the same issue. Thanks in advance and good luck on your learning journey!

qnnn commented 4 months ago

@chemicL I really appreciate your patient response, and it has been very helpful to me. I apologize for bringing it up in this issue. Best wishes!

kimec commented 4 months ago

Nice to see this discussion finally started

chemicL commented 2 months ago

This issue is still a valid one and is open for contributions. The attempt in #3725 had some concerns and was closed due to inactivity so there's a chance to build upon that feedback.

bajibalu commented 2 months ago

Hi @chemicL I am also learning reactor. I am interested to work on this issue. I saw your concern in the other PR. I am not sure whether it is possible/feasible. But is it a good idea to somehow freeze the queue once the sink is canceled so that it does not accept any more items?

chemicL commented 2 months ago

Hey, @bajibalu! Happy to hear you're learning reactor and are interested in contributing 🎉

The queues we have do not have a concept of freezing/closing. However, there are other mechanisms to prevent inserting an item. On the other hand, it's also not an issue if the item is injected as long as somthing later notices that a cancellation has happened and takes care of clearing the queue.

After looking at the code one more time I think it is the case already that an item inserted after cancellation should be removed from the queue due to the drain operation that follows. Unfortunately, I couldn't get it to work because somehow the WIP (work-in-progress) marker is left in an unclean state. Debug the below code to see what's happening:

    SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);
    processor.tryEmitNext(1);
    processor.asFlux().doOnNext(i -> System.out.println("Received " + i)).subscribe().dispose();

    processor.tryEmitNext(2);

    System.out.println(processor.queue.size());

If nothing was emitted before, then no queue is created:

    SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);

    processor.asFlux().subscribe().dispose();

    processor.tryEmitNext(1);

    System.out.println(processor.queue == null);

Solving this issue would be the first step to a comprehensive solution.

One potential issue is that in case of a emit/cancel race we wouldn't communicate the cancellation to the caller if cancellation happens after checks for cancellation but before the item is inserted into the queue. But perhaps that's not a big deal as long as something discards the item from the queue (so fixing the above nuance is necessary).

What I would love to see in a PR is:

@bajibalu in case you'd like to work on this, just leave a note here so if anyone else interested in this wants to take over we can re-assign in case you don't find the time. Thanks!

bajibalu commented 2 months ago

Hi @chemicL I pushed a PR to fix this issue. Please take a look at this PR and let me know if this works.