eclipse / microprofile-reactive-streams-operators

Microprofile project
Apache License 2.0
79 stars 33 forks source link

Clarify the behaviour of imported/exported org reactive stream elements #105

Closed hutchig closed 5 years ago

hutchig commented 5 years ago

There are a number of places in the specification where org reactive stream (and presumably in the future - j.u.c.Flow) elements are imported or exported. User can't really tell how these objects should (for import) or will (for export) behave in some important aspects of their behaviour.

For exported objects we do not specify any aspects of the behaviour of the implementations exported so all the user has to work with is the ORS specification.

For example, for a Publisher that is the result of

   myPublisher = ReactiveStreams.of( 'a', 'b', 'c' ).buildRs();

How would this Publisher react to a second 'subscribe( subscriber )'?

Without additional specification here, a reasonable interpretation of the underlying ORS spec might suggest the equivalent of WITH_FANOUT (meaning taken from https://doc.akka.io/japi/akka/current/akka/stream/javadsl/AsPublisher.html) as the default so it could be expected to 'work' correctly for both subscribers.

Furthermore with two subscribers, without further specification, one would expect a backpressure policy of calling onNext() to Subscribers where the request(n) count is >0 for their subscription and sending sufficient request(n)'s upstream to enable this, buffering for those Subscribers where request(n) count is 0. So what of the buffer size to handle the variance in total request(n)'s received? It could either be implementation specific or added to the buildRs() signature like RxJava operators do.

Similarly, for imported objects we do not specify any requirements on the implementation of the ORS/Flow interface methods. Or on the state of the objects, for example what if one 'imports' a Publisher that already has a Subscriber subscribed?

The other 'exit doors' (to external types not defined in this spec) are analogous but they appear to be covered adequately (I will append a list below and check each).

For example - the specified behaviour of what comes out of the CompletionSubscriber::getCompletion() method when a CompletionSubscriber is created from a Subscriber that is 'imported' from a user is covered adequately by the javadoc of the CompletionSubscriber::getCompletion() method which states: "This should be redeemed by the subscriber either when it cancels, or when it receives an

hutchig commented 5 years ago

I would like to do a strawman PR for this as I have spent so many hours discussing what is in the spec without actually writing any of the text. Would that be OK @jroper @cescoffier ?

cescoffier commented 5 years ago

+1

Sent from my iPhone

On 7 Nov 2018, at 13:19, Gordon Hutchison notifications@github.com wrote:

I would like to do a strawman PR for this as I have spend so many hours discussing what is in the spec without actually writing any of the text. Would that be OK @jroper @cescoffier ?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.

jroper commented 5 years ago

From an implementation perspective, you'll find different behaviours. So consider:

Publisher<Integer> pub = ReactiveStreams.of(1, 2, 3).buildRs();

An RxJava implementation of this I believe will likely support multiple subscriptions, and each subscriber will receive [1, 2, 3], no matter when they subscribe, and multiple subscribers will have no impact on each other. In fact, I believe in general, for RxJava at least, that multiple subscribers to the same publisher never have any impact on each other, and fan out is never done (except for publishers that are explicitly created for the purpose of fanout). For RxJava, a publisher is either cold, meaning it holds no resources, and can be subscribed to over and over, as in the above case, or for a more complex case a publisher could open a new TCP connection each time you subscribe, or a publisher is hot, meaning it represents usually an active TCP connection, and any subsequent subscriptions to it will fail.

Akka's approach here is different, since Akka's core API is inherently lifted, Akka Sinks/Sources/Flows are designed to never represent a hot resource (though there's an mismatch between that design and real world use cases, where many real world use cases require them to, such as in Akka HTTP where hot HTTP request body streams are represented as Source). From Akka's perspective, as soon as you move from Akka's lifted API to Reactive Streams, that action is making a stream hot, so any returned publisher is hot (and corresponds to a running actor underneath), even if that publisher just publishes an in memory list of elements. So because all of Akka Streams' publishers are hot, Akka offers two modes that they can have, either they are single use, where additional subscribers fail, or they are fan out, where additional subscribers get any elements that haven't yet been published, and back pressure from one subscriber back pressures all of them (note that there is always a buffer that Akka streams uses in its stages, by default it's 16 elements long, so this means the maximum that any subscriber can get ahead of other subscribers is 16 elements before it starts being throttled).

The above are two very different approaches, and I think there are other approaches as well. I don't think either approach is right or wrong, they both have their advantages and disadvantages. The question is whether we should specify the behaviour to be one or the other, or if we leave it unspecified. The danger of leaving it unspecified is that people may write code that relies on the particular behaviour of one, without realising that it's unspecified, and thus end up with a non portable app. If we do specify it, then my preference would be to say that the publishers are all single shot, and will fail if a new subscriber comes along. The reason for this is that if we take rxjavas approach, then different publishers will behave differently depending on what stages are in the graph, and that's going to be very convoluted to specify. If we require fanout, that puts an implementation burden on implementations that don't already support this. If we require it to fail, that's quite easy to implement even if the implementation doesn't support this already.

@hutchig I'm completely fine with you doing the PR for this.

jroper commented 5 years ago

Also, when it comes to wrapping subscribers/publishers/processors. One thing that Akka streams does that we haven't done here is rather than wrapping subscribers and processors directly, Akka streams wraps Supplier<Subscriber> and Supplier<Processor>. The reason for this is that subscribers are by design single shot (the RS specification says its illegal to invoke onSubscriber twice on a Subscriber), but since Akka Streams' lifted API is cold by design, you can't wrap a hot resource in a cold graph. So, instead they wrap suppliers of the subscribers and processors, which means that each time the graph is materialized, a new subscriber or processor is created. At least that's the theory, in practice that often is not possible, you often have a subscriber that's been passed by some other API and you wrap it in a supplier that returns that subscriber each time - this is again is an example of the mismatch between the design and real world use cases.

So, the question is should we do the same as Akka Streams? I'm leaning towards no, given the fact that in many real world use cases, you're forced to wrap a single subscriber because you can't create a new one each time (for example if you're given a subscriber that represents the output for an HTTP response, the server can't resubmit the request on itself from the client, so you can't create a new subscriber each time), so the extra complexity in the API is just a hat tip towards an in unachievable ideal. My thoughts is that when you do want to do something like this, we could introduce a lazy operator that would take a creator for a builder, eg SubscriberBuilder ReactiveStreams.lazySubscriberBuilder(Supplier<SubscriberBuilder>), and then this can be used for more than just subscribers, it could be used for aggregations that depend on external state too (though, Collector is really a supplier of aggregations itself, so it's not as useful for subscribers, but for publishers and processors, it will be).

hutchig commented 5 years ago

So considering the exports of RS elements and James' first comment above in this comment and imports in the next one...

For exports we need to square the need for the spec to be able to cover real world use cases along with it being implementable using real-world libraries like rxjava/akka/reactor. The very simple use case that brought me to this problem was how does the user build a 'tree' rather than a serial pipeline.

James' option above of "my preference would be to say that the publishers are all single shot, and will fail if a new subscriber comes along" can support this use case indirectly as once the users have a hot single use RS Publisher (cf .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT) ) they can subscribe their own RS Processor to that that can have whatever semantics they want on the downstream side. So o-o< rather than o<
One could argue that - "well maybe we could do this for them even if on top of a underlying library that can't (have a Publisher that can have two Subscribers)" but then there are other policy decisions to make too - and as is so often said 'it depends on the situation'. Perhaps the specification should make the minimum constraints on the implementation:

  1. Single use Publisher that fails on a second Subscriber is ALLOWED by spec, [This used to say "Single use/Subscriber is ALLOWED by spec"]
  2. Publisher is ALLOWED to back pressure on subscriber backpressure (but may buffer to an implementation dependant amount) - less of an issue for single use Publishers.
  3. Publisher's Graph is ALLOWED to internally subscribe when outlet subscribed to OR when emitted by buildRs). (Does it make sense to not resolve the Graph on buildRs?).
  4. Discuss that if a user wishes a specific set of behaviours the most portable thing is that they create a RS Publisher that is 100% their own code and which behaves exactly as they wish and Subscribe it to the synthetic one that is exported.
  5. If we do specify that a second subscription is allowed to fail - should we specify the nature of the failure if it does? Akka has for example "Additional subscription attempts will be rejected with an IllegalStateException." I do not know any reason not it use this?
  6. For a Processor exported, are internal Subscriptions within a Graph that resolves to the Processor done on build or when it is externally Subscribed to? I propose that we specify it may be done on build/export but COULD occur on subsequent first external Subscription.
jroper commented 5 years ago

When it comes to the tree use case, I don't think multiple subscribers to the publisher is the mechanism that we want to make available to do that, since that will offer very limited ways of doing things. If we do decide to support trees (and I'm not sure that we do, since even the minimum number of use cases will require a very large API surface area, this is one of those things where I think it's better to leave it to users to select a third party library that does what they want for an advanced use case), then I think the tree should be specified and wired by the builder API, not the publisher itself.

For reference, Akka Streams documentation on graphs is here:

https://doc.akka.io/docs/akka/2.5/stream/stream-graphs.html

This is a fairly comprehensive API that allows you to do almost anything. You can see the broad range of fan out and fan in operators (broadcast, balance, unzip, 4 different types of merge which have different options like whether a particular inlet is preferred or prioritised etc, concat and zip), and each of those have many options. Akka Streams also supports cycles. But if you scroll down the code examples, you can see it's a lot of code necessary to support all this, and while some of this could be simplified if not so many features were offered, I'm not convinced that it could be that much simpler.

I do think if we're going to support fan out, we also need to support fan in, and that it all needs to be declared in the builders, not by converting to Reactive Streams and back.

Just some comments on the above:

  1. Single use/Subscriber is ALLOWED by spec,

This is not just allowed, it's a constraint set by Reactive Streams, Subscribers are single use, and if you attempt to use them multiple times, their behaviour is well defined (they must immediately cancel) by section 2.5 of the Reactive Streams spec.

  1. Publisher's Graph is ALLOWED to internally subscribe when outlet subscribed to OR when emitted buy buildRs). (Does it make sense to not resolve the Graph on buildRs?).

I think this is an implementation detail, though I would say that implementations should try to fail early with any errors in the graph itself (eg unsupported stages) if possible.

  1. Discuss that if a user wishes a specific set of behaviours the most portable thing is that they create a RS Publisher that is 100% their own code and which behaves exactly as they wish and Subscribe it to the synthetic one that is exported.

I wouldn't word it that way, I'd say that they should use a third party library that matches the semantics they need to create the publisher that they wish.

  1. If we do specify that a second subscription is allowed to fail - should we specify the nature of the failure if it does? Akka has for example "Additional subscription attempts will be rejected with an IllegalStateException." I do not know any reason not it use this?

I wouldn't specify the type of exception, since sometimes there may be a particular reason that a second subscriber is not allowed, this would particularly be the case for publishers from implementations that sometimes support multiple subscribers and sometimes don't, and we would want to let implementations convey that. Nevertheless, something to keep in mind is section 1.9 of the Reactive Streams spec, which states that no matter what, Publisher.subscribe is not allowed to throw exceptions unless the subscriber is null, so the rejection of a second subscriber can only be done by first calling onSubscribe on the subscriber, then invoking onError with the particular error.

  1. For a Processor exported, are internal Subscriptions within a Graph that resolves to the Processor done on build or when it is externally Subscribed to? I propose that we specify it may be done on build/export but COULD occur on subsequent first external Subscription.

It's a good question. The asynchronous nature of Reactive Streams, that is, the fact that both onSubscribe and subscribe could be invoked by different threads at different times or concurrently, means that the question of "when" it's done is much more vague. I would say neither implementations, nor end user code, should make any assumptions about when, in what order, and from what thread the subscribes are done. The Reactive Streams spec doesn't say this explicitly about processors, but does imply it.

There is actually something about this already in the spec:

https://github.com/eclipse/microprofile-reactive-streams/blob/master/spec/src/main/asciidoc/implementation.asciidoc#memory-visibility

When Reactive Streams interfaces, that is, Publisher, Subscriber or Processor, are wrapped in a stage, implementations may place an asynchronous boundary between that stage and its neighbors if they choose. Whether implementations do or don’t place an asynchronous boundary there, they must conform to the Reactive Streams specification with regards to memory visibility.

This doesn't directly answer your question, but it does give a hint - what it's implying is that all callbacks on wrapped Reactive Streams interfaces may be dispatched by a different thread to the rest of the graph, and though not explicitly stated, it's intended that this includes onSubscribe and subscribe.

Regardless of exact timing though, there is another aspect, which is probably actually closer to what you're getting at, and that is is it expected that an implementation will wait until it gets a down stream subscribe or upstream onSubscribe before doing any internal subscribes or onSubscribes. I don't think it should be specified strictly, though I think it may be worth saying that where possible, subscribing should be eager. One of the reasons for this is that it allows eager cleanup of resources in the middle of the graph in case the graph is terminated from one direction before the other direction has established a subscription. It's also generally easier and more performant to implement eager establishment of everything since you don't need to keep track of what's established where, you just proceed as far as possible immediately.

hutchig commented 5 years ago

Apologies, in my text above "Single use/Subscriber is ALLOWED by spec", what I was trying to convey was equivalent to "A single use Publisher that only accepts a single Subscriber is ALLOWED by the spec" not that Subscribers are single use. I understand the logic of the statement of "Subscribers being single use" not being needed as, for example, a Subscriber could not tell which Publisher onComplete() etc. methods were from. On reading the RS spec, I see clause 2.5 has the term "active Subscription" which would allow for serial reuse. So even my choice of the term 'single use' is not great.

hutchig commented 5 years ago

Having thought about this, I now understand that the number of possible options for an emitted Publisher's or Producer's behaviour is large. Even if one takes one aspect of what is sometimes simplified to hot/cold (not just in this spec but in the implementations out there) for example the topic of subscription and whether a Publisher subscribes when it is subscribed to, it quickly expands as one thinks about it.

For example, a Publisher in some libraries may initialize/subscribe itself either: when asked to do so manually (e.g. connect or automatically - on buildRs/materialization, or any number of subscribers acting as a trigger and that trigger number can be: 0 - during i.e. buildRs()/materialize 1 - during first subscribe n - for example ConnectableFlux.refCount() and is the subscribe();onSubscribe() chain occurring 'depth first' where a Publisher does not call onSubscribe on it's Subscribers until has received its own onSubscribe.

It goes on...what if the number of active Subscribers falls below this 'trigger' level number (or to 0 if it subscribes only when being subscribed to). after it is subscribed - is the Publisher then obliged to cancel it's subscription? What is it to do with any arriving onNext() that arrives in between it receiving the downstream cancel() that takes the number of Subscribers below the trigger level and it calling a future (new) Subscriber's onSubscribe. If there is more than one subscriber is one of these a 'primary' subscriber that any catch-up buffer trials behind or can this role move amongst any non-backpressuring subscriber.

Would a Publisher ever limit the number of Subscribers - for example Akka can limit the Subscribers to 1 and (only vaguely connected I know) RSocket can use 'leases' to limit the number of Subscriber connections a publisher has to support to N.

The net of all this is that I think it is not appropriate to specify one particular set of behaviours for most of these aspects but that I think it is appropriate to spell out to the users that at these edge behaviours are implementation specific and they need to not make assumptions about what they will get with exported Producers etc.
One cannot assume (as I did initially) that there is some 'vanilla' or 'common sense' behaviour outside of what is specced. I will try to cover what is safe and can be relied on (including that a subscribe may 'appear' to work but just result in an onError as James mentions). I will create some draft spec text that tries to inform the user of this. For advanced usage patterns I will try to use the terminology of appropriate Library API use rather than referring to the user's own code.

hutchig commented 5 years ago

Reading James' thoughts on running a Graph twice. I think this specification is introducing Reactive Streams to quite a lot of people who do not have a long background with Reactor/Akka/RxJava - given that we don't have that much in the API or the spec that points towards the issues with running a Graph or using a built Subscriber more than once, I think it is important that the PR for this issue adds some text that discusses this - even though it is not the job of the spec to be a RS tutorial.

The 'lazy operator' is really using 'lazy' to indicate that it will get re-run when the Graph is rerun. for Subscribers this 'as late as possible', it is on each run/materialization. However if we use this term for publishers and processors, they could be even lazier and the mechanism activated when that node in the Graph gets the first subscribe, in that run. As an aside, I notice that one thing that is not complete is a set of terminology that we all share. I have learnt (I think) what James means by 'fan out' (over and above having more than 1 Subscriber) and the implications some people take from 'hot' in terms of if a Publisher can have more than one subscriber (there does not seem to be a seperated out term for if stream of onNexts() can be or is joined part way through - though 'replay' and 'resume' from Reactor are related. This seems a sightly separate but related issue to if a Publisher will start Subscribers at a notional 'start' element or how much processing it does when it is created versus on the first subscribe - and all these get squashed in 'hot'/'cold'?? Indeed the RxJava/Reactor/Akka communities seem to have a lot of common meanings but not 100%.

I think we can have a seperate issue to introduce/discuss something like a SubscriberBuilder ReactiveStreams.lazySubscriberBuilder(Supplier<SubscriberBuilder>) that is in a release beyond 1.0?

@jroper I have to admit, did not understand the reference to Collector in (though, Collector is really a supplier of aggregations itself, so it's not as useful for subscribers, but for publishers and processors, it will be).

hutchig commented 5 years ago

Apologies also for not having done the PR this week. I wanted to discuss the above on todays call before creating it.