reactive-streams / reactive-streams-jvm

Reactive Streams Specification for the JVM
http://www.reactive-streams.org/
MIT No Attribution
4.81k stars 531 forks source link

Example RangePublisher.java should call onComplete #458

Closed olotenko closed 5 years ago

olotenko commented 5 years ago

https://github.com/reactive-streams/reactive-streams-jvm/blob/7021f18/examples/src/main/java/org/reactivestreams/example/unicast/RangePublisher.java#L176-L178

should call downstream.onComplete(). Otherwise Subscriber will never know when it is ok to release its resources: the spec requires Subscriber to not assume anything upon a mere call to cancel(), and requires Publisher to indicate terminal state after all intended onNext have been called.

akarnokd commented 5 years ago

should call downstream.onComplete()

No. Rule §1.8 requires the producer to stop producing signals when cancellation is requested.

Otherwise Subscriber will never know when it is ok to release its resource

The Subscriber does know as it is in the position to issue cancel in the first place.

Do you want to implement some Publisher that has to deal with item lifecycle and/or resource utilization?

olotenko commented 5 years ago

No, not Publisher but Subscriber. One Subscriber I am looking at has to deal with item lifecycle. Not knowing when the Publisher is done, is problematic.

Since there is no happens-before specified between cancel and any onNext, there is no way to specify how many onNext or onComplete may occur, so the rule 1.8 cannot disallow onComplete from happening.

One part of the spec requires onComplete or onError to always be issued when no more onNext will be issued, as the indicator of the Producer reaching quiescence; one motivation being so that Subscriber can release the resources.

Term Definition
Terminal state For a Publisher: When onComplete or onError has been signalled. For a Subscriber: When an onComplete or onError has been received.

So, if onComplete is not received, Subscriber remains in non-terminal state. I see no good reason to allow not telling Subscriber that Producer is done in this case, except established precedent.

rkuhn commented 5 years ago

When cancelling, the Subscriber is shutting down, i.e. it can transition into its terminal state. Further signals from the Publisher may arrive, but there is no expectation that these will be serviced. There is deliberately no synchronization regarding cancellation since this upstream signal is sent after the downstream has stopped caring.

akarnokd commented 5 years ago

Why do you want to establish this type of signal pattern? There are techniques to serialize async cancellation with ongoing onNext servicing so that a thread unsafe resource can be released. For example, trampolining or queue-drain with or without thread confinement, actors, event loops, etc.

olotenko commented 5 years ago

No, the question is the other way around: why would you want to avoid notifying a terminal state in a process that knows when it terminated? :)

I can live with underspecified behaviour. Just don't want to do things that aren't necessary.

olotenko commented 5 years ago

Since there is no requirement (and no mechanism) to serialize calls to cancel and onNext, then the caller of cancel cannot rely on some future onNext or onComplete detecting the cancellation - which requires the cleanup to happen in cancel. But since there is no serialization with onNext, it means cancel must essentially be mutually exclusive with (parts of) onNext.

Basically, the Subscriber is required to have an extra lock to serialize onNext and cancel, if Publisher is not required to always call onComplete when it's done.

viktorklang commented 5 years ago

When the Subscriber calls cancel() it has already decided it needs no further information—so it can begin its cleanup. Any onNext/onError/onComplete received after can (and must!) safely be disregarded.

olotenko commented 5 years ago

You cannot demand a must here. For one, the propagation of cancel is subject to "delays" to Subscriber no less than it is subject to "delays" to Publisher.

"Beginning" a cleanup is not a problem. The problem is it is required to be mutually exclusive with onNext. There is no statement about this in the spec, but it is a consequence. onNext may be both unaware of the cleanup having "begun" by cancel, and still need the resource that is being cleaned up.

Such mutual exclusion is easy and cheap to achieve in a synchronous Subscriber that calls cancel from inside onNext - program order achieves the necessary exclusion. It becomes more costly to achieve in an asynchronous Subscriber. Now onNext should always execute some synchronization routine to ensure it is mutually exclusive with cancel, even if cancel will never be called.

All because invocation of onComplete is not guaranteed by the Publisher.


I am after a reason to not require the Publisher to signal onComplete. Existence of workarounds is not a reason. A reason would be something like impossibility of this in some valid scenarios.

viktorklang commented 5 years ago

@olotenko I'm not sure I understand the problem. If the Subscriber is async then there already needs to exist a happens-before relationship between signalling onNext and processing that onNext. onComplete/onError is only there to signal End-of-data and terminal stream failure. Note that the only way that onNext might come in after cancel() only applies if the Subscriber cancels while there's still outstanding unmet demand signalled by it.

Could you outline more in detail what you perceive the problem to be?

akarnokd commented 5 years ago

Rule §3.1

Subscription.request and Subscription.cancel MUST only be called inside of its Subscriber context.

💡 | The intent of this rule is to establish that a Subscription represents the unique relationship between a Subscriber and a Publisher [see 2.12]. The Subscriber is in control over when elements are requested and when more elements are no longer needed.

The key phrase here is "The Subscriber is in control". If it needs to coordinate signals, then it has to do it on its own. The Publisher is responsible to send items as requested and stop when the Subscribers says so. The Publisher has no understanding or expectation about what and when the Subscriber wants to do those things and is limited to the protocol described by the spec.

I don't understand why there is an issue here years after the spec has been finalized and adopted by many implementations without such problems.

Now onNext should always execute some synchronization routine

Yes. Whenever asynchrony is involved in a Subscriber, one must account for the serialization requirement of RS as well as whatever external relationships has to be maintained concurrently to what happens through the RS protocol.

If you are in control of both the Publisher and Subscriber, the spec allows onNext signals to be sent just right after a cancellation has been issued (§2.8). Although the wording is limited to onNext, I'm sure expecting the terminal events to still happen is also necessary by implementations (i.e., cancelling just after the last item and before an onComplete that would have arrived next).

I have extensive experience with implementing exotic Publishers and Subscribers that includes working out correct and performant serialization and coordination algorithms. If you have something concrete you need help with implementing, I'd be glad to help.

olotenko commented 5 years ago

@viktorklang The happens-befores between onNext and whatever Subscriber hands off to is Subscriber's problem. I don't even see why it needs to be part of the spec. However, that's the edges going one way, not both ways - the asynchronous part of Subscriber synchronizes-with the synchronous part sitting in onNext. Having a cancel that may dispose of resources then requires the edges to go the other way, too - from cancel back to onNext. This is an unnecessary constraint, because most of the time it is not needed, and there are legitimate edges that are allowed to exist even in the current spec, which would be a onComplete.

The distinction of after makes no sense without a happens-before. There is no happens-before in the spec which the Publisher must heed - there is no onNext that the spec identifies as happening after cancel. The Subscriber cannot detect when Publisher produces onNext after observing a cancel.


A motivating scenario

A Subscriber is responsible to put buffers back into a pool, and then the whole pool when all the buffers have been collected. When can you return the whole pool back?

onComplete is a definite point in time. If there is a cancel, then what? Try to detect the absence of any future legitimate onNext!

I posit that this problem should not exist, because Publisher is well aware when it is not going to send any more onNext.

olotenko commented 5 years ago

@akarnokd What is a "context"? :)

I don't understand why there is an issue here years after the spec has been finalized and adopted by many implementations without such problems.

They haven't thought of a motivating scenario and I am late to the party, that's all :)

viktorklang commented 5 years ago

@olotenko after makes sense locally within the Subscriber, since it is in control of its own execution.

akarnokd commented 5 years ago

A motivating scenario

Why is the Subscriber issuing a cancel and where does it originate?

The philosophy behind having only 7 methods is that the less constraints and interactions each component has to deal with the more efficient implementations there could be.

Requiring cancel to trigger an onComplete adds an extra call which is unnecessary most of the cases. It would require logic in every Subscriber in the chain to defend against calling onComplete downstream multiple times in case there is a cancel propagating up, hitting each Subscriber.

When can you return the whole pool back?

When there are no more uses for it. Reference counting (possibly combined with some state machine) could work here.

Try to detect the absence of any future legitimate onNext!

The can know how many future onNext to expect as it issued the request and can track how many have been received. In general, there is no way for a Subscriber to know when onNexts will arrive only that it will receive as much as requested and a terminal signal if there won't be any.

Perhaps the most relatable operator for this case is timeout. Each onNext item starts a timer that may fire before a subsequent onNext happens. This operator now has to cancel the source and issue signals, for example onError(TimeoutException). Since onNext and the timer are very likely firing on different threads, the Subscriber has to coordinate the two and make sure the downstream still receives onNexts and onError in a serialized manner. A straightforward state machine can block out one or the other party from issuing signals.

olotenko commented 5 years ago

@akarnokd

I am not sure why it matters what causes the cancel. It just is obviously separate from the resource management problem that the Subscriber has.

Subscriber still has to allow for conforming Publisher to push any number of onNext and onComplete (because perhaps the Publisher has run out of data to push just at the time cancel has been requested) after the cancel.

Reference counting does not work without Publisher promising to not borrow more buffers from the pool for any outstanding onNext.

Subscriber cannot track how many future requests are expected to be satisfied, because maybe it is Long.MAX_VALUE.

I take the discussions of the ways to try and solve the problem as the agreement that the spec does not allow the Subscriber to know for sure, and requires some indirect methods that may not work in all the cases.

I appreciate the power of precedent, and that even if this change were adopted, I would still have to deal with old implementations that won't issue onComplete, but can't one dream of a better future? :)

akarnokd commented 5 years ago

This indicates you need both the Publisher and Subscriber to be resource aware, which is not the aim of the RS specification and also wouldn't work between heterogeneous libraries unaware of those resources.

I experimented with resource-aware flows a couple of years ago but it is quite tedious and there was no practical demand for such extensions.

For example, a resource-aware iterable-wrapper must deal with the rest of the unconsumed items. An async consumer has to work out when to cleanup when receiving a cancel at anytime and also keep cleaning up for late items.

olotenko commented 5 years ago

the less constraints and interactions each component has to deal with the more efficient implementations there could be

Precisely why there should be two edges into clean-up (onError and onComplete) instead of three (+ cancel), and why you shouldn't need to introduce more concurrent interactions with onNext. :)

I'll take a look at that resource-aware.

viktorklang commented 5 years ago

I'm still not sure I understand the problem. The Subscriber cannot assume that the Publisher doesn't have other Subscribers. :(

olotenko commented 5 years ago

@viktorklang that's a separate issue. The issue at hand is that the spec allows (and judging by the discussion - encourages) the Publisher to introduce non-termination condition, which then the Subscriber needs to fight. You can see the code @akarnokd references to appreciate the complexity of the problem this oversight introduces.

rkuhn commented 5 years ago

@olotenko You are posing two questions, let’s clearly separate them:

  1. Is it possible to pass items across a Publisher–Subscriber boundary that require explicit lifecycle management?
  2. Why does the Publisher not confirm cancel with onComplete?

For the first question, it is crucial to consider the scope and purpose of Reactive Streams: it is a set of interfaces for interoperability of different stream processing libraries within the same JVM. As such, the notion of an open world is ingrained in these interfaces, they are meant to compose larger systems from small pieces that may be built on different foundations. As @akarnokd hinted at, building a resource-aware streaming framework is very challenging, I posit that applying such a framework is only possible in a closed world scenario where you are in control of the whole streaming pipeline—as a counter-example consider that someone might easily slip a .filter() combinator between your Publisher and Subscriber, dropping buffers without adding them back to the pool. Therefore, Reactive Streams are not the right tool for this job. Passing objects across asynchronous boundaries is only practical and controllable when automatic lifecycle management (like GC or reference counting) is in place—this is also what @viktorklang means when saying that a streaming combinator may well pass references to the items to multiple subscribers in general.

The second question is different (as shown above such a confirmation mechanism does not solve the first problem). Here, one guiding design principle of Reactive Streams is that it does not handle confirmations at all, neither upstream nor downstream. We explicitly discussed whether request() can be used to confirm reception but decided against it due to too much uncertainty—these semantics are not expressed in the types and are easily broken, while also not being strong enough to be useful. Requiring cancel to be confirmed by onComplete falls into the same category, since the Subscriber then cannot decide whether the stream was finite or whether its own cancellation came first. The question of synchronization between the Subscriber’s usage of cancel and its reception of onNext is a red herring since such synchronization is always necessary, regardless of the proposed onComplete signalling: if cleaning up Subscriber resources (not items!) requires mutual exclusion then that exclusion is needed in any case, as onNext may arrive truly in parallel to the code calling cancel on a different physical CPU core—recall that Reactive Streams is about asynchronous boundaries.

olotenko commented 5 years ago

I disagree with the second part of the statement. The "boundaries" between cancel and whatever follows in Subscriber are: a) not defined; b) not necessary (would not be necessary), since Publisher already establishes the necessary mutual exclusion between a series of onNext and onComplete.

Non-termination is a fundamental problem.

See, the chain of onNext and onComplete is a CPS-transformed loop:

try {
   while(it.hasNext()) { // publisher-specific condition
     subscriber.onNext(it.next()); // push
     if (cancelled) {
        break; // proposed
        // return; // actual!
     }
   }
   subscriber.onComplete(); // done
} catch(Throwable e) {
   subscriber.onError(e);
}

From the point of view of what the API promises, asynchrony here is immaterial. In the end, Publisher is required to establish happens-befores (so, it does make a difference to the implementor), so from the Subscriber's point of view asynchronous invocations are indistinguishable from the above loop being executed from the same Thread (the API's promise).

However, return vs break is a fundamental difference. From the Subscriber's point of view current behaviour is not distinguishable from non-termination. Resource management is only a motivating example: @akarnokd 's example doesn't need to solve concurrency problems, if cancel resulted in onComplete.

Not being able to distinguish natural end of stream from reaction to cancellation is not a fundamental problem - you can signal that distinction using a different terminating condition, even onError with some indicator. Just like Thread is allowed to be unparked and interrupted at the same time, the observer can tell which one the Thread reacted to.

Not being able to tell that the Publisher is not going to signal any more - is a fundamental problem. The solution to which is trivial - just signal the terminal condition when it decides it won't be calling onNext anymore because of that cancellation. The absence of that requires solutions of the kind @akarnokd posted. Comparable complexity of the two options is staggering.

viktorklang commented 5 years ago

@olotenko onComplete signals End-of-Stream, which is when there are no more elements—which is not true in the case that the Subscriber just isn't interested in any more elements. As soon as a Subscriber calls cancel it is no longer interested in more elements and can therefor do what it needs to do in terms of cleanup. If you require a richer behavior then you can of course special-treat known implementations of Subscriber in your own Publishers, as long as "unknown" Publishers and Subscribers are treated in a spec-compliant way.

olotenko commented 5 years ago

@viktorklang I don't buy the which is not true argument. Claiming that would require a use case where the Subscriber that has invoked cancel works differently when the end-of-stream is natural versus induced by cancel.

This issue is a grim reminder that async processing is a dual of sync processing. We waited for so long for Closeable in Java, and now this spec goes backwards on that concept. It's ok to have two ways of "closing" the Subscriber. It's not ok to have a third way with no "close".

Obviously, everyone has to deal with Publisher non-termination allowed by the spec. So this issue will probably remain a perennial bug no one will dare to touch.

viktorklang commented 5 years ago

@olotenko There is nothing which prevents the Subscriber from invoking its own onComplete after issuing the cancel signal, assuming that it iself disambiguates. Otherwise there is no way to distinguish a true EOF from a cancel-induced one.

olotenko commented 5 years ago

@viktorklang It is not about invoking onComplete. It is about a guarantee that no more onNext will be in the future. A guarantee is a fundamental property of a protocol.

Of course, there is no way to distinguish between a true EOF and a cancel-induced one, but that is a problem only when the Subscriber is unaware that a cancel has been invoked. However, everyone so far made suggestions how to work around this quirk (particularly visible in the resource management use case) only under assumption that the Subscriber is aware of a cancel.

A Subscriber calling cancel means "I don't care how long the rest of the stream is; I have seen enough". In this setting it does not matter whether EOF is "true" or induced. That is why I am asking to demonstrate a use-case where Subscriber would care whether the EOF is "true" or induced, if it can tell the EOF happened after a cancel.

The importance of knowing that the Publisher is done is the same as properly implementing a Closeable or try...finally in sync processing.

I thank everyone for offers of help with implementing a workaround for this design quirk, but this issue is really about the protocol quirk.

viktorklang commented 5 years ago

@olotenko If you want to create this "guarantee" yourself, it's not a lot of work to wrap your Subscriber in another Subscriber which will drop all signals that are received after cancel is invoked. Changing the protocol to send onComplete after seeing cancel still means that elements could arrive inbetween, and the Subscriber would still need to ignore those.

olotenko commented 5 years ago

@viktorklang It is amazing with which speed people can churn out "workarounds" for fundamental issues. It's not like I cannot come up with something like that. Only this workaround does not work with resource management use case - because the protocol does not implement the async dual of Closeable.

I am tired of explaining. No one is interested. I'll just close this issue. Reopen if you think it is worthwhile getting to the bottom of how to get the async Closeable right - define a new signal, define a special error type in onError, or just onComplete will be fine.

akarnokd commented 5 years ago

I'm surprised is how often people come into established projects and try to change it just because they have an idea or use case that doesn't fit into that established scheme, then get upset because the project leads refuse to change around things that would break the existing ecosystem.

If the Reactive Streams protocol doesn't fit your requirements, write your own protocol, create your own ecosystem and create support libraries yourself.

viktorklang commented 5 years ago

@olotenko This protocol is not intended to be used for async resource management as every component (Publisher, Processor, Subscriber) handles its own resources—especially since you can have fan-out and fan-in behavior. If onComplete would be issued as a response to observing a cancellation, it would still entail dealing with onNext being received before the onComplete coming in, and the Subscriber cannot assume that it is the sole Subscriber to the given Publisher, so it would only be able to relinquish resources it owns itself, and that it is free to do as soon as it sends the cancellation signal.

olotenko commented 5 years ago

@akarnokd I apologise unreservedly.

I have no issue with whatever decision the project leads make about this - particularly because this is not my project, and because I am "late to the party", whether my suggestions are good or bad. I have closed the issue because the discussion is not about non-termination.

viktorklang commented 5 years ago

@olotenko No worries, Oleksandr. I hope the reasoning in my previous message outlines why the proposed change wouldn't fix the problem you perceive.

Cheers!

olotenko commented 5 years ago

@viktorklang I think you are mistaken on resource management. In a garbage-collected language it is easy to overlook the delegation of ownership of the message, because it is not concerning most of the time - when everyone drops a reference, GC collects it. But if you consider managed memory, like is the case with buffers that require pooling and releasing, then it is clear Publisher cannot assume ownership of the buffer after passing it to any Subscriber - because there is no protocol indicating when the Subscriber(s) no longer need it. Or, looking at it differently, once the message has been pushed to onNext, Publisher can no longer mutate it.

So it seems clear enough Publisher can never be responsible for managing the lifecycle of the messages passed to any Subscriber(s).

Also, when I say "fundamental problem", I don't mean the protocol is massively broken. I only mean that the problem of resource management cannot become decidable by bolting-on some predicates on the existing signals. I am not in a position to require anything of the protocol, I am only saying that resource management is not decidable now, and is decidable if Publisher is required to signal terminal state of a given subscription upon cancel.

viktorklang commented 5 years ago

@olotenko Since a Publisher can have many Subscriber at the same time, and can decide to both do fan-out/broadcast and do routing, the Subscriber cannot assume that it owns the elements that are passed to it.