Open bsideup opened 4 years ago
A) It would lead to weird diagnostics in the cases where a Subscriber is accidentally reused—which Subscription sent the error? B) Logically: If no connection is ever opened, how is it to be closed?
A) since the error comes straight from subscribe
call, at least it is still clear which Publisher
produced it
B) subscribe
kinda opens the connection to Publisher
, "half-open", if I may.
Another scenario that came to my mind:
Subscriber
allocates memory in onSubscribe
Publisher
always returns an error
But, since onSubscribe
is mandatory, the subscriber will allocate and immediately terminate the buffer because onError
came right after onSubscribe
. One can workaround it on first request, but that will add unnecessary logic to the request
handling.
A) No, there's an asynchronous boundary between sending and receiving a signal. B) No, before onSubscribe is called, the Subscriber has no knowledge of a Publisher. As the spec says: "A Subscription is shared by exactly one Publisher and one Subscriber for the purpose of mediating the data exchange between this pair. "
Another scenario that came to my mind: Subscriber allocates memory in onSubscribe Publisher always returns an error
The same is true for any Publisher, which could send an error after onSubscribe. If the Subscriber has a very expensive resource which it does not want to needlessly acquire, it can defer allocating it until it receives the first element. (which means that onSubscribe has been received, subscription.request(n) has been issued, and then it receives its first element). There is no time-guarantee between request(n) and getting the first element, so allocating/acquiring an expensive resource should in that case be deferred for as long as possible—unless the acquisition itself takes a bunch of time, in which case the Subscriber might opt for betting on acquiring it early—a tradeoff.
"A) It would lead to weird diagnostics in the cases where a Subscriber is accidentally reused—which Subscription sent the error?"
Instead of focusing on exceptional corner cases - which as in this case already broke the spec (subscriber instance not allowed to be reused for by-me-not-understood reasons) - we should do what makes sense for the most common scenario(s).
Calling onSubscribe "just for the sake of it" isn't very fair and kind. Subscriber think's he has a legit subscription going on when he hasn't, and as the code example already showed, 99% of the subscriptions passed to the subscriber will be dummies, but shouldn't be. Because subscriber is allowed to cancel (!) the "dummy" in which case the publisher is no longer allowed to signal the error and the error is instead forever lost.
The only specification-approved way to implement the "immediately signal error" case is tedious and long, most likely not followed by many implementations at all:
class CanOnlyBeCancelledDummy implements Flow.Subscription { ... }
// in Publisher.subscribe(Subscriber)
Exception shitHappened = ...
CanOnlyBeCancelledDummy temp = new CanOnlyBeCancelledDummy();
subscriber.onSubscribe(temp);
if (!temp.isCancelled()) {
subscriber.onError(shitHappened);
}
else {
// Can only log this guy and hope that there's an administrator reading through 10 000 000 lines of log statements every day
}
On top of that - as has already been discussed - we effectively force the subscriber implementation to do error-prone lazy allocation of his buffer; undoing the whole purpose of the onSubscribe method to begin with - I reckon.
The requirement to force a onSubscribe call before onError is obviously not very great for several reasons. So why was it added? This is not clear to me. Later it was stated:
"A) No, there's an asynchronous boundary between sending and receiving a signal."
The specification does not mandate synchronous or asynchronous and we all probably agree that most operators or "item processing done by one or many chained subscribers" ever written will most likely not be asynchronous. Only the top source is likely to perhaps be asynchronous. But, even for an asynchronous subscription, how does that matter? The way I see it, can only matter for asynchronous subscribers that were wrongfully re-used but for some reason we want to make sure this particular guy can somehow (not clear how exactly) post-resolve his particular situation at the expense of every single spec-abiding user out there.
This is super simple. 1) It appears the specification is a bit overzealous and shouldn't really be concerned with the individual life-cycle of objects, for example if they are re-used or not. 2) An alternative without all the "does and donts": onSubscribe is called for successful subscriptions, onError is called as soon as there is an error, whether the onSubscribe started or not (edit: onError should be more precisely specified, #495).
Any other complications of such a simple rule should be explained. Currently, the only declared intent of "initialization logic" (§1.9) is completely counter productive. Anyone with the knowledge and experience of what we have discussed so far must do the initialization lazily on the first item delivery because he can't trust the subscription is a valid one.
It was later stated:
"B) No, before onSubscribe is called, the Subscriber has no knowledge of a Publisher. As the spec says: 'A Subscription is shared by exactly one Publisher and one Subscriber for the purpose of mediating the data exchange between this pair.'"
I still do not understand. Why do we assume the subscriber has to have knowledge of the subscription before receiving an error - especially given the already mandated requirement that the subscriber is unique and not shared? And hey, you said it yourself, the subscription has the purpose of mediating the data exchange. Well, if there's no data exchange because an error happened immediately then there is no data exchange! But from what I hear the subscription seems to have some weird role to play in error handling as well. Very confusing.
In summary: It is safe to say that it is not obvious why onSubscribe always have to be signalled first. If anything, it complicates matters, like tremendously, and even has an effect opposite of the one and only declared "intention" (vaguely stated - for what purpose would "initialization logic" be if not receiving items?).
So, smells like needed clarification or rework of the rule framework. I vote for the latter. Really, I don't expect anything to be that important we have to force everyone to write complex "dummy" code and counter-intuitive lazy initialization.
Edit: Look at the example in javadoc of Flow
. They call onError immediately without onSubscribe first. I'm just going to have to ignore this rule for now in the library I am writing as it seems to be counter-intuitive and document accordingly. I see several rules that I need to break anyways. One I will report soon.
99% of the subscriptions passed to the subscriber will be dummies, but shouldn't be
@martinanderssondotcom but isn't immediate error on subscription is the 1% case? so the spec DOES deal with the generalization of cases...
Note also that Subscription
DOESN'T have to refrain from sending onError
to its Subscriber
if it has been cancel()
.
And I think you're looking at things the wrong way: neither the Subscriber
nor the Publisher
have a knowledge of what the other side does. One cannot assume the Subscriber
doesn't do something meaningful pertaining to onError
in its onSubscribe
(eg. instantiate an object that allows to collect metrics). Any Publisher
can be connected to any Subscriber
. They have to be caring for the generalized case.
The spec really allows for the most general case, I think.
Superficially, rule [2.12] seems to make things a little bit complicated in @bsideup second case. But has he knows in Reactor we mitigate that a bit because as per 2.5, we immediately call cancel()
on the extraneous Subscription
in case of a second invocation of onSubscribe
. So we bluntly call onSubscribe
+onError
on reactor subscribers, relying on that fact.
The other solution would be to somehow track that onSubscribe
was already called. It should be feasible, but makes code mutualization a bit harder. In the second example, maybe the doSubscribe
should do the try/catch and thus directly call onError
in the catch part. Or maybe it should return a boolean
indicating whether or not onSubscribe
was invoked.
Note that if onSubscribe
itself throws (which breaks rule 2.13, we don't have much more options since we're in an undefined state and anything could happen in calling onError
by then.
Note also that Subscription DOESN'T have to refrain from sending onError to its Subscriber if it has been cancel().
I agree with you. But a synchronous direct call by the same thread "should" do "the right thing". All other rules and intent declarations that say the subscriber will stop receiving signals "eventually" after cancellation - all of them - has asynchronicity in mind. This example is synchronous. So again, the only "right thing" to do is to accept that the subscription could have been cancelled and then not call onError.
One cannot assume the Subscriber doesn't do something meaningful pertaining to onError in its onSubscribe (eg. instantiate an object that allows to collect metrics).
I believe this has no relevance. The implementations can only assume what we specify, whether that be A, B or whatever else. You're saying the subscriber could do something meaningful in that first onSubscribe call even if the publisher has no other intention than to reject the subscriber. Sure, I agree with you.
But, we could also specify B. Then, given your example, onError would have to instantiate the meter if it's null. But this is actually derailing a little bit from the topic. The one and only declared intent was for "initialization logic" which we must assume has been made lazy in the first place because it is costly and something we want to do only in preparation for actually receiving items, such as allocating buffers. "Instantiating objects" is probably best left for the constructor of said Subscriber, in full compliance with the language and established convention. We should really have no reason at all to "provide a second constructor" just for the fun of it.
Look, we have to make a choice here between two things, 1) Leave it as it is; "hey, always call onSubscribe first even for direct failures". But, this is bad for many reasons as earlier iterated and kind of counter-intuitive; really makes no sense from a publisher's point of view who knows perfectly well that he will immediately fail the subscription. Or 2) "only call onSubscribe on successful subscription so that the client can do his lazy initialization logic" which leads to zero surprises for the subscriber and is going to be overall far more kind to code complexity, CPU and memory.
This ticket simply looks at number 1, is confused by it and asks - and I quote: "what was the motivation to require onSubscribe before terminal onError/onComplete"?
Now is your turn. Why?
If this can not be motivated and instead we have all these problems, then it's time for a change.
Before an onSubscribe has been invoked, there is no relationship between a Publisher and a Subscriber—and as such there would be nothing to onError and nothing to onComplete. Having an established sequence of signals makes it much more tractable to reason about the statespace.
There is nothing which prevents you from having your own signalling for your own subtypes, as long as the standard is adhered to when plugging an unknown Publisher with an unknown Subscriber.
When proposing a change, I think it is prudent to list benefits, risks, and cost.
@martinanderssondotcom Please consider the first sentence on http://reactive-streams.org: «Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.» Concerns that arise in synchronous settings cannot drive changes to the spec since the intended audience is asynchronous. I am aware that similar interfaces have been used in synchronous settings, but that does not detract from my point.
As to the reasoning, there were many points not all of which I recall, here is a selection:
Publisher.subscribe()
is not necessarily (or even typically) called by the Subscriber but by a third party. As such, the Subscriber in general does not know about the state change in the Publisher that triggers all that follows.onSubscribe onNext* (onError | onComplete)
. This would become (onSubscribe onNext* (onError | onComplete)) | onError
, which is strictly more complex. This complexity affects the ability to reason about the protocol as well as the effort of correctly implementing the protocol.onSubscribe onError
needs to be handled correctly by all conforming Subscribers in any case.If you still disagree then you may obviously decide to not write a conforming Subscriber or Publisher. This may be a good option, especially if you don’t have the problem of passing streams over asynchronous boundaries — there are more efficient protocols for the purely synchronous case.
Lastly, I’d like to remark that you are coming to a mature and well-established project demanding a change because things don’t make sense to you. No matter what the project is, this attitude will make it unlikely that your proposed change will be accepted.
@viktorklang I very much accept and concur with your reasoning. There's just one small problem here. The error; publisher rejects the subscriber or let's call it the "subscription request" - this shouldn't have been passed to the subscriber but immediately thrown to the calling thread (see #495).
There's one solution to both of these problems the way I see it: 1) Specify that onSubscribe is only called when a subscription has been accepted. If a subscription is not accepted, then the exception is thrown right away. Super simple, and in total agreement with all the other asynchronous APIs that I know of. 2) Specify what onError is, which I believe should only be for Publisher errors after subscription was accepted, i .e., at a point when the subscriber expects actual item delivery but oooops, for whatever reason, he got an exception instead.
@rkuhn I do not buy that asynchronicity has anything to do with it. The first sentence you quoted should actually be reworked and is misleading if you ask me.
I am not saying we should make onError special. onError shouldn't be called for rejected subscriptions, that's the point. The caller should crash instead. onError/onComplete would both still only be called as a consequence of a successful subscription request and only after onSubscribe.
@martinanderssondotcom I am not saying we should make onError special. onError shouldn't be called for rejected subscriptions, that's the point. The caller should crash instead. onError/onComplete would both still only be called as a consequence of a successful subscription request and only after onSubscribe.
What would be achieved by changing this, what problem are you trying to solve?
@martinanderssondotcom Also, your proposal would not be backwards compatible, and that means that it is pretty much guaranteed not to happen.
The first sentence you quoted should actually be reworked and is misleading if you ask me.
Please take a step back and consider this: the Reactive Streams initiative was created more than half a decade ago around this particular sentence, and now you ask that it be “reworked”. This does not seem to be the basis for a fruitful discussion.
There are technical points in your post that would need elaboration, but I will refrain from doing that for now.
@rkuhn The spec doesn't require asynchronicity and asynchronicity explains things such as a void method return type but doesn't explain why an IllegalArgumentException object out of immediate method argument validation is created and then passed downstream to an oblivious subscriber instead of crashing the call immediately. This is simply a poor design and like totally break away from convention - with surprising results, if you ask me. I could be totally wrong of course =) Please correct me where you think I am wrong. So far, I do believe this whole "asynchronicity" argument is like blowing smoke. It's derailing and skewing the topic. I really fail to see how it is relevant. Help, please? (but perhaps, this is better suited for #495)
@viktorklang The problem I am personally trying to solve - related to this ticket - is the fact that I have to provide a "dummy" subscription to a subscriber that I intend to reject, so makes "little sense" and I also know the subscriber might now proceed to make lots of "initialization logic" despite me knowing I have no plans at all to deliver him any items. Further, if he cancels the subscription then I have to "do the right thing" and not pass him the error. So there's even a chance this error is lost! Although none of this is a "show stopper" for my particular code today, it's just .. you know, not very good either.
I understand it breaks backwards compatibility, so we should add it as a 2.0 milestone =)
Currently, one MUST call
onSubscribe
before callingonError
/onComplete
, even if there was no request yet (e.g. insubscribe()
).Example:
In this example,
DummySubscription
needs to be passed, although it is completely useless and ignored by the publisher (does not listen to requests/cancellation).Another example would be:
We already called
onSubscribe
, butcatch {}
does not know about it (nor it can check, without allocating a fake subscriber), so the only option would be to always addtry {} catch {}
afteronSubscribe
, which discourages the code reuse.But, in fact, in both cases of
onError
,onSubscribe
is there just to satisfy the rule and nothing else.While I understand that it is probably too late to change the spec (since some implementations most probably rely on
onSubscribe
and prepare some stuff in it), I am simply curious what was the motivation to requireonSubscribe
before terminalonError
/onComplete
?Thanks!