reactive-streams / reactive-streams-jvm

Reactive Streams Specification for the JVM
http://www.reactive-streams.org/
MIT No Attribution
4.8k stars 528 forks source link

Clarification of “Asynchronous Boundary” #46

Closed rkuhn closed 10 years ago

rkuhn commented 10 years ago

Introduction

It was from the very beginning that we agreed upon the following basic problem as the prime motivator for the Reactive Streams project: “How do we pass a stream of data items across an asynchronous boundary without requiring lossiness, unbounded buffering or blocking?”

The first two items come from the fact that without the ability to express back pressure the recipient has to either buffer or drop items (since it is impossible to guarantee that the recipient will always be faster than the sender—given that the scenario involves an asynchronous boundary).

The last item comes from the desire to keep both parties of the exchange responsive to external inputs during the whole process. Image you want to implement a stream processing component that does conflation (e.g. taking in stock ticks and handing out the most up-to-date prices to potentially slower consumers). If signaling the consumers would block, then the ticks arriving in the meantime could not be handled, conflation could not be implemented.

These are to my understanding the points that we all agree upon already. We probably are in agreement about the following, but rather than assume I would like to confirm this understanding.

Problem Statement

Asynchronous boundaries imply a certain cost—as witnessed by this project—and therefore should only be applied when needed. The crucial question is what the reasons are that are driving the need for asynchrony, and the answer boils down to variations of a common theme: asynchrony is needed for decoupling two components.

Components that interact synchronously cannot meaningfully be decoupled, consider two processes where one asks the other for something and synchronously expects the answer back. If the second process does not provide an answer, then the first process is broken (might hang indefinitely or fail surprisingly). In systems that are built with responsiveness in mind, there is only one scenario in which the answer remains outstanding: the second process has failed unexpectedly. Synchronous interaction then leads to cascading failures across component boundaries, which means that the components are not properly encapsulated and isolated from each other.

Decoupling two components does not only mean stopping failures from propagating in this direct fashion, it also is crucial in separating their resource usage. The trivial example is running different parts of an application on different machines, allocating specific resources to them. This introduces asynchrony by virtue of requiring network interaction, and we all agree that papering over that by simply blocking until the network comes back with a reply is unacceptable (including for the reason given in the previous paragraph). Staying in this picture, we have component A and component B for which there are distinct resource allocations (e.g. separate machines), and now our mission statement reads “how do we pass a stream of data items from A to B without requiring A to block or B to drop or buffer unboundedly?”.

This very same problem applies whether A and B are living inside the same JVM or not, since we can run them on different thread pools which we could (using native libraries) even pin to different CPU sockets, purposefully giving them fairly well separated resources (I would love to be able to separate memory allocations as well, but that is not needed to make this point). The reason why we do this is to encapsulate and isolate the two components such that misbehavior in A does not lead to failure in B and the other way around. In other words, decoupling A and B means removing their ability to interfere with each other down to the minimum—the actual data items passed across the boundary between them.

Conclusion

With this argument we are in a position to clarify that the purpose of an asynchronous boundary in the context of Reactive Streams is that the sender and the recipient of the data items that flow across this boundary are clearly separated in the resources that they use, which is a necessary prerequisite for decoupling. This is an important goal for complex systems, because only through decoupling can we keep the maintenance burden manageable.

Consequences

It is my understanding that the purpose of the Reactive Streams project is to govern the signaling of data and demand between decoupled components, and we agreed on several occasions that it deliberately does not concern itself with how the streams are processed within these components. Therefore it is imperative that we are very clear about this in the specification: all processing of the data items must occur inside the components, which means on the resources that are allocated to either of them.

This translates into the requirement that the specified methods (in particular onNext, onError, onComplete and request as the main data carriers) must be treated exclusively as signals to be sent across the asynchronous boundary, and no processing can happen while they are on the call stack.

Another consequence of the reasoning above is that if we want to solve the problem of decoupling components then the resulting specification will have to be limited to this scope. If we start adding behavior which is orthogonal to this goal, but opens up the specification in a way that breaks down when requiring isolation of sender and recipient, then we will not succeed. We should avoid the trap of trying to solve everything.

This means that the intention of making it easy to implement one side of the boundary (as has been suggested for the Subscriber end) cannot really work, since the requirement of decoupling sender and receiver implies that even a functionally simple transformation needs to have access to an asynchronous execution facility—I say functionally simple because for example a filter expression that is applied to items in onNext can consume arbitrary resources, and only excluding IO as a special form of blocking is not good enough.

Closing Remarks

This basic problem statement is the core upon which everything else is built, so we need to be very clear about it and have a complete common understanding in order to proceed. I expect that we will continue to have discussions about the precise consequences, but if we do not agree on what it is that we want to be building, then these discussions will not lead to consensus.

benjchristensen commented 10 years ago

This translates into the requirement that the specified methods (in particular onNext, onError, onComplete and request as the main data carriers) must be treated exclusively as signals to be sent across the asynchronous boundary, and no processing can happen while they are on the call stack.

The wording here is hard for me to fully understand so I want to clarify.

Here are two (poorly described) use cases:

Use Case 1

A == thread retrieving data (IO) B == thread waiting to receive network response C == thread to write to user (IO)

There are 2 async boundaries here, reactive-stream being used between A->B and B->C (and possible A to it's source and C to it's destination).

Type T from A needs to be transformed to type S before it gets delivered to the end user.

A, B or C could do the synchronous transform that doesn't block the thread (no wait/sleep/park etc).

Use Case 2

Event-loop based system (like Node.js, Netty, Vert.x etc).

In this case A, B and C can all be on the same thread or event-loop. They are async operations (non-blocking IO with callbacks, composition operators, whatever).

The transformation from T -> S is a synchronous, non-blocking computation. It could be done on any of the stacks, either A, B or C.


When B receives data from A, are you saying that it must not do the synchronous transform inside the onNext from A but instead wait until after it has scheduled itself (moved across the async boundary) and then B can then do whatever it wishes before sending data to C?

benjchristensen commented 10 years ago

This means that the intention of making it easy to implement one side of the boundary (as has been suggested for the Subscriber end) cannot really work

I don't understand this. The word "easy" is very subjective, so what is this trying to say?

And what "suggestion" are you referring to with "as has been suggested for the Subscriber end"?

smaldini commented 10 years ago

Why the asynchronous boundary can't happen at the Subscription level which also holds its capacity information ? That would leave the Subscriber free from any constraint to implement execution logic. I am investigating this part as I find unfortunate to share this pain with external Subscribers e.g. :

Publisher<Integer> source =
    Streams.defer([1,2,3,4,5], env, "ringBuffer")

Publisher<Integer> mapped =
    source.map{integer -> integer}

//do consume upstream mapper publisher, which propagate the request upwards. 
mapped.subscribe(new Subscriber<Integer>(){
    public void onSubscriber(Subscription sub){
        sub.request 1000 //will only fetch up to 5 elements + 1 complete
    }
    public void onNext(Integer data){
    }
    //....
}

Also I find having Subscriber owning async code surprising since even the current TCK (0.3) is using simple and convenient interface to verify the Publisher (and/or Processor).

Of course with Reactor you can do this to have non blocking facility:

//do consume upstream mapper publisher, which propagate the request upwards.
publisher.subscribe(new Action<Integer,Void>(){
    protected void doSubscriber(Subscription sub){
        sub.request 1000 //will only fetch up to 5 elements + 1 complete
    }
    protected void doNext(Integer data){
    }
    //....
}
Coneko commented 10 years ago

When B receives data from A, are you saying that it must not do the synchronous transform inside the onNext from A but instead wait until after it has scheduled itself (moved across the async boundary) and then B can then do whatever it wishes before sending data to C?

That's how I interpreted the original post as well.

And what "suggestion" are you referring to with "as has been suggested for the Subscriber end"?

The suggestion of having the publisher's request method schedule subsequent calls to onNext asynchronously to avoid stack overflows, so that the subscriber doesn't have to worry about it and can call request synchronously from onNext.

@rkuhn Is saying asynchronous boundaries are for more than just avoiding stack overflows, so asynchronous scheduling should be done on both sides of the boundaries.

rkuhn commented 10 years ago

@Coneko you are right, this is about more than just avoiding stack overflows.

@benjchristensen The suggestion that Subscriber “should be dead simple to implement” was one that you first offered, and it is reiterated in #41 and here by @smaldini. @jrudolph has already argued in the linked discussion that this will not be attainable due to the asynchrony requirement.

To your use-cases question: yes, your summary at the end is what I mean—each thread has its given function, which means that the associated code runs on that thread. If an implementation chooses not to separate the asynchronous components that are using its own execution resources (the event-loop examples you give) then that is okay. What is not okay is to inflict this on others with which this code inter-operates: an Actor-based Akka producer must be able to send elements to any arbitrary consumer without having to worry that its thread pool will now execute that foreign processing pipeline (and vice versa).

@smaldini Please refer to my argument above as to why invocations in both directions must be asynchronous—merely being non-blocking is not sufficient to achieve decoupling and isolation of different components. Reactive Streams are not meant to provide the most efficient way of passing streams around locally, there will be better ways than this (as you are aware of and using in Reactor). The important problem that we are solving here is fundamentally about asynchronous boundaries, synchronous optimizations are invalid since they violate the core quality that we want to achieve with this abstraction.

As a bit of explanatory background: of course we are working on synchronously fusing the stream processing steps within actors in order to avoid the message passing overhead where possible. We will offer the Reactive Streams API only between these fused pieces of pipeline, not within them, and I assume that RxJava will do the same (last time I spoke with Ben that was what he told me). This means that synchronous execution concerns are outside of the scope of this project.

benjchristensen commented 10 years ago

Am I correct in understanding there are 2 cases being addressed here?

1) blocking

A consumer should never block the producer thread (sleep/park/etc) such as by doing IO.

2) starvation

A consumer should not be capable of starving the producer thread by performing arbitrarily long computations on it.

benjchristensen commented 10 years ago

If we force this interface definition to require consumers to always schedule work asynchronously it seems this limits usage of this interface to purely interop, or implementations that choose every stage to be async.

I suggest a different approach, similar to what we agreed upon for multicast/unicast where we leave it to the Producer to decide. This would mean we actively choose for the standard to not have an opinion and document that fact and leave it to Producer implementations to do what is right for their needs. The less opinions this interface and standard has the better as it allows the greatest flexibility and broadest implementations. Also, we can not build an interface that assumes all producers and consumers are well behaved. It won't happen. A TCK, even if thorough will not help in this regard. Most implementations will never pass through the TCK, and even if they do, the user code that gets run by those implementations definitely will not be. Thus, even if we have the requirement for a Consumer to be async, a Producer will need to code defensively and likely schedule onNext delivery on a separate thread anyways if it is sensitive to starvation.

I still think the rule should simply be: a Consumer should never block the Producer thread.

Beyond this, in an ideal world a Consumer would not perform long-running computations on the Producer thread, but as stated above this is not something that can ever be trusted or enforced. Therefore, if a Producer is concerned with resource starvation, it should decouple itself. This also suits itself well to the fact that different Producer/Consumer relationships have very different needs. If a Producer or Consumer needs greater isolation than this it can easily choose to do so in its own implementation.

This follows the multicast/unicast pattern because a Producer can choose to schedule delivery of events on a separate thread. This also fits the model of keeping the complexity in the Producer and keeping Consumer implementations as simple as possible.

For example, a "cold" Producer with a unicast relationship to a single Consumer would be okay with map/filter type computations being done on it. Consider a file being streamed. In this case it is fine for a consumer to process (non-blocking of course) the data and transform/filter it on the producer thread as that producer has no other consumers to serve. There is no reason to hop thread boundaries here. If for some reason the producer didn't want its thread used, then it could schedule delivery on a separate thread.

A "hot" Producer such as streaming stock prices would likely want to ensure its thread is always free to consume incoming stock prices and would therefore want to ensure all onNext deliveries are scheduled on a separate thread (likely via something like Disruptor). Even if the contract stated that Consumer.onNext was async, a defensively coded Producer would probably want/need to do this anyways to be robust if being consumed by unknown 3rd party consumers.

My goal is for a consumer to be as simple as either of these:

// async handling and synchronous request to async `Subscription.request`
onNext(t) {
   queue.add(t)
   request(1)
   processItemsAsynchronously()
}
// synchronous handling and synchronous request to async `Subscription.request`
onNext(t) {
   nextConsumer.onNext(transformer.call(t))
   request(1)
}

I suggest we let the Producer contain the scheduling complexity and choice so the Consumer can be implemented obviously and simply.

rkuhn commented 10 years ago

If we force this interface definition to require consumers to always schedule work asynchronously it seems this limits usage of this interface to purely interop, or implementations that choose every stage to be async.

What we are discussing in #45 is to scope this effort to the exchange of streams between different applications and technology stacks over remote transports (TCP, websockets, etc.). In this light it would be inconsistent to choose a different goal for one of the language bindings (namely on the JVM): what we want to solve is how to pass data across an asynchronous boundary. Synchronous stream transformations are of course an interesting topic, but it is a different one, and I believe that we should not mix up these concerns. A specification that solves the asynchronous case will obviously not solve the synchronous one and vice versa. A specification that leaves this question open solves neither of them!

Your initial statement I quote above is mostly right in that Reactive Streams are about interoperation of different components, which may or may not be implemented using different technologies. Users may choose to segregate parts of an RxJava stream pipeline into separate asynchronous execution contexts, which means that it is about foreign as well as “domestic” interop, and it is not opinionated as to how many synchronous steps make up one asynchronous compartment.

Just to be clear: I do not suggest that RxJava (as an example) should use Publisher and Subscriber throughout, in fact I would expect that Observer and Observable stay as they are right now and that only specific Subjects will implement the asynchronous behavior and interfaces. In the same vein Akka Streams introduce the Flow abstraction for modeling possibly synchronous parts of a pipeline and only produce a Publisher or Subscriber when explicitly asked to.

Thus, even if we have the requirement for a Consumer to be async, a Producer will need to code defensively and likely schedule onNext delivery on a separate thread anyways if it is sensitive to starvation.

This must be avoided at all costs; if this happens then we have failed. The specification that we are writing is foremost a statement of intent. If users combine different implementations which abide by the specification then everything will work as intended. If someone implements the interfaces in violation of the specified semantics then they get to keep the pieces.

A standard is only as strong as the compliance of its vetted implementations. And a standard’s strength is otherwise given by how much it restricts the implementation choices. If we want to solve a specific problem (as I set out to define in this Issue) then we need to concentrate on that and solve it as precisely as we can, without overstepping the scope. This allows for wide applicability: a screw whose threading is extremely well specified can be combined with any nut that satisfies the same spec, and the pair can hold together a wide range of things. A screw and nut whose threading is ill-specified will not hold together anything.

For example, a "cold" Producer with a unicast relationship to a single Consumer would be okay with map/filter type computations being done on it.

If I construct a Producer of a certain sequence, then I need to make sure that I give it enough resources to produce its items. When I hand it to you, then you can use it because of this resource allocation; but if you burden this producer with more work that I could not have planned for, then it will exhaust my resources and introduce failure. Therefore it does not matter whether “hot” or “cold”, resource separation is the same problem in both cases.

The suggestion that “[the Publisher] could schedule delivery on a separate thread” does not work because it would still be the Publisher that needs to allocate that Thread, the Publisher cannot schedule something to run on the Subscriber’s thread (pool), only the Subscriber can do that.

Finally, I would suggest that we schedule a hangout to hash this out, it seems that written communication is not the most efficient fashion to convey this discussion.

smaldini commented 10 years ago

We could use our hipchat too there is now voice support. I agree with both in fact, but I am more keen into holding this very queue in Subscription because it is just working fine. I tried scheduling in Producer and Consumer, and currently this is still in Consumer for Reactor but it just feels poor because only subscription is aware of current capacity and we can't pass this information to Susbcriber without breaking interop (subclass). Capacity is a great tool for batching scheduling, e.g. optimizing message passing. A flow will always be Producer -> Subscription -> Consumer/Producer -> Subscription -> Consumer ... It seems to be a natural explicit boundary to me unless I miss something.

rkuhn commented 10 years ago

The subscriber is at all times aware of how many items are outstanding, because it is the Subscriber that informs the Subscription about this (Subscriber can count how many it requested and how many it received). Insofar I do not fully understand what your comment about the queue means.

When handing data off over an asynchronous boundary in a non-blocking fashion, there is only one queue which is needed, and that is on the receiving side. The sender just pushes data into it, and the receiver empties it at its own pace. The queue can be of size one, which means that the receiver only ever requests 1 element; if it requests more than that it must be prepared to buffer N elements since the sender might enqueue them before the receiver gets any CPU time.


I think I just realized how we are talking past each other: in your mind the Subscription holds the queue, the Publisher puts things into the queue and the Subscriber synchronously pulls them out of it, right? This means that the details of how asynchrony work are completely unspecified (left to the implementation between Publisher and Subscription) while Subscription and Subscriber mutually recurse into each other to drain the queue (which will lead to stack overflows; users will always find ways to make them happen ;-) ask Doug!).

If my assessment is right, then we are basically talking about the same thing, but we assigned names and drew boundaries in different places. This specification should not be about the mostly synchronous part of reading from a queue, in fact I think it should not prescribe anything like a queue directly. The most pressing problem is to describe the asynchronous part of how components interact, and that is what the fully async-geared interfaces of Subscription and Subscriber are made for. If it helps, move the queue that you have in mind from the Subscription to the Subscriber, then we should be talking about the same thing. How you call these things within Reactor and how you present them to your users is fully independent of the interop standard—that at least is my goal.

smaldini commented 10 years ago

Thanks Roland, I'll give another spin. The reason the logic is at Subscription level is to leave user free to implement Subscriber as per suggested. But yeah a RingBuffer implementation is basically an event loop (or multiple ones) draining the buffer when signaled to do so with an hi-lo index value (defined through current capacity + requested items).

But yeah I think we are talking about the same thing as you outlined it but with a boundary at more neutral place. I don't mind using the Subscriber tho, however the current TCK is therefore not entirely correct but this is a separate issue.

I still think hangout or hipchat would be great as I am pretty sure we converge to something.

rkuhn commented 10 years ago

The TCK will obviously have to implement whatever we determine here (and in the other ongoing discussions), so I would not worry about that too much at the moment. Most important is that we are on the same page, so a chat would be good; let’s wait until SF wakes up.

Coneko commented 10 years ago

Be sure to post a summary of the discussion here as well.

jbrisbin commented 10 years ago

@rkuhn here are my thoughts on this topic:

High availability and predictable performance depends on a number of factors. Thread scheduling is just one part of a much larger picture. I feel like trying to promote adherence to a specific way to implement thread scheduling isn't practical in a specification designed to be as flexible as possible so as to incorporate a wide variety of implementations. Context switching is a somewhat artifical method of resource segregation in the JVM anyway because the underlying OS is ultimately responsible for task scheduling and one would have to use a native library on a limited number of platforms to be able to have more influence than "normal". IMO it's not practical for a specification to stipulate conditions that simply can't (or won't) be met by some implementations.

What is not okay is to inflict this on others with which this code inter-operates: an Actor-based Akka producer must be able to send elements to any arbitrary consumer without having to worry that its thread pool will now execute that foreign processing pipeline (and vice versa).

While I understand perfectly the reasoning here, I really feel it is the responsibility of the implementation of Reactive Streams to determine the specific scheduling characteristics of its internals and describing this implementation should not be a part of the specification. It is simply impossible to enforce this requirement in all situations. As I mentioned, context switching isn't a guarantee against resource starvation anyway and it is extremely expensive at scale. By "scale" I mean millions of events per second and by "expensive" I mean too costly to be bothered with implementing the spec if the cost to performance is that high.

IMO it's impractical for the specification to demand this kind of thread scheduling for all implementations when we don't even know for certain whether potential implementations would be able (or even willing) to implement this properly. It might be impossible on a mobile device with extremely limited resources to conform, as one example. If the specification is so limiting that only a rather thin class of applications could be applied to it and that its performance would be bottle-capped by this requirement (as it would inevitably be, especially on systems that are extremely senstive to scheduling overhead like JavaScript VMs), I think people would simply abandon the effort to use Reactive Streams at all.

I think the benefit of having interoperability and the separation of conerns achieved with Publisher, Subscriber, and Subscription will still greatly benefit JVM, JavaScript, mobile, and other problem domains we haven't considered yet. But it seems overly burdensome to those potential implementations to stipulate a particular kind of thread scheduling in a specification knowing ahead of time that some of them will not be able to implement it and thus reap the other substantial benefits of using Reactive Streams.

This project holds tremendous potential value to a wide range of use cases--but only if the specification is flexible enough to accommodate implementations that, for whatever reasons particular to their domain, choose not to implement a scheduling pattern as described in the original issue. We can still achieve the original goal of providing a set of rules for async interoperation and backpressure and we can still provide significant value to potential users without requiring adherence to a certain low-level pattern of task scheduling.

benjchristensen commented 10 years ago

it is the responsibility of the implementation of Reactive Streams to determine the specific scheduling characteristics of its internals and describing this implementation should not be a part of the specification

I strongly agree with this. The Producer implementation should be what controls scheduling and should never assume anything of the Consumer. The Consumer is a mostly passive player in this since this is a push based stream. The only thing the Consumer must do is signal how many elements it can receive. The Producer defines everything else including whether there are any elements to send, when they will be sent and the resource doing the pushing whether that be a native thread, green thread, thread pool, event loop, actor, trampoline or some other thing I'm not thinking of.

Regarding resource isolation, it is not within the control of interface specifications to control how many CPU cores it will be run on – it may be a virtualized single core where resource isolation isn't even possible. A single task in a tight infinite loop can take down the entire application regardless of whether it was asynchronously scheduled if there is a single CPU core. Multi-core systems increase the chances of isolation but do not guarantee it, nor do they protect against infinite loops or heavy computation by tasks, and context switching to different CPU cores is a very expensive requirement and rarely desirable.

This project holds tremendous potential value to a wide range of use cases--but only if the specification is flexible enough to accommodate implementations that, for whatever reasons particular to their domain, choose not to implement a scheduling pattern as described in the original issue

I think this is a key point. If the goal is widespread adoption then the spec must be simple to implement, simple to use and more importantly simple to understand.

Just to be clear: I do not suggest that RxJava (as an example) should use Publisher and Subscriber throughout, in fact I would expect that Observer and Observable stay as they are right now and that only specific Subjects will implement the asynchronous behavior and interfaces. In the same vein Akka Streams introduce the Flow abstraction for modeling possibly synchronous parts of a pipeline and only produce a Publisher or Subscriber when explicitly asked to.

This greatly diminishes the value and purpose of this project in my opinion. If this is only for the "async edges" then this interface will not become an integral part of libraries and will be a non-critical addon. Interop between libraries is not a massive problem needing a solution, popular libraries achieve these pretty quickly via contrib modules, extensions, plugins etc that bridge them. The primary thing trying to be solved here is backpressure in a push based stream that has async behaviors somewhere within the stream. This does not require the interface to only allow use when the Consumer schedules every event asynchronously.

Streams are typically going to be composed of computations, and if the "Reactive Streams" Producer/Consumer are not allowed to be composed flexibly then libraries will end up creating their own interfaces with the exact same or very similar signatures, but different names and identities. Why? Because the key aspect of the interface is the Subscription.request that needs to be composed through the stream pipeline. Thus, each library will end up building their own modeled after this and then the only thing the "Reactive Stream" interfaces can do is be layered on top, possibly for interop reasons. Once libraries have the concept though, the types themselves are not super important as it takes little effort to bridge two libraries with compatible concepts (onNext etc + request(n)).

If an implementation wants to extend the base types to stricter versions that's fine. But if the base types are made strict and dictate scheduling then nothing can ever extend them and reduce the requirements.

In addition, when using composition the types are quickly lost. The very first composition would hide the ReactiveStream.Producer with the Library.CustomProducer. For example: ReactiveStream.Producer.map() would not be able to return ReactiveStream.Producer so would now be Library.CustomProducer. This is similar to why in #19 we decided to not have separate types for multicast and unicast. The types get lost immediately anyways when composing streams and computations.

A single Producer/Consumer pair should be able to work over asynchronous and synchronous boundaries without stipulating how scheduling is to be performed. This allows the broadest adoption and enables all types of implementations without restricting performance or choice.

I suggest the contract in regards to async behavior be non-opinionated by design. A Producer should be capable of being consumed by a completely synchronous Consumer on one extreme or one that schedules the work on a thread pinned to its own CPU core at the other extreme. In either case the Consumer communicates how many elements it can receive and the Producer is free to use whatever resources it desires to send the events. The only rules a Consumer needs are "don't block" and "signal demand".

rkuhn commented 10 years ago

We have a fundamental disagreement which the two of you might want to summarize as “does Reactive Streams define asynchronous boundaries, or does it merely accommodate them?” (I expressly disagree that the second even holds true as proposed.)

We all signed off on the following:

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure on the JVM.

The goals we define must conform to this mission statement.


@jbrisbin I cannot reconcile the contradicting statements within your last paragraph, may I ask you to elaborate? Of particular interest would be the question how a Publisher is supposed to dispatch a stream to multiple subscribers in a manageable fashion without one subscriber having to suffer from the slowness of another? How would that not involve other threads? How can a Publisher schedule to dispatch a Subscriber on anything but its own thread (pool)? How does it help reduce thread hopping that due to the unclear contract Publishers have to defensively spawn async tasks for calling onNext?

In general, the details you are talking about only apply to JVM implementations; it is clear that “asynchronous” on Node.js has a different implementation, and even there it is useful to dispatch the processing of the Subscriber separately. And asynchronous does not necessarily mean “context switch” also on the JVM.

@benjchristensen Your library argument is a red herring: it is your choice to represent async Observables as Publisher, and it will be an essential part of the library.


In general I would like to recall that the crucial consequence I arrived at in the initial post is this:

This translates into the requirement that the specified methods […] must be treated exclusively as signals to be sent across the asynchronous boundary, and no processing can happen while they are on the call stack.

This says nothing about particular thread scheduling patterns. All technologies mentioned in this discussion have supporting facilities which enable implementations to comply efficiently.

tmontgomery commented 10 years ago

I think it is quite easy to let certain real world (or what we seem to think are real world at the time) constraints permeate the wording here. This can lead to much more coupling that is really desired by anyone.

cough JMS! cough

For the most part, I think it makes sense to think more in terms of protocol here.... Yes, yes, I'm a protocol guy. But I think it plays in here.

Specifications work best, IMO, when they talk about reactivity to input and per element state. This is irregardless of stack, threading, scheduling, storage, etc. Which, IMO, are legitimate implementation concerns, but very very limiting specification concerns.

So, if I think about how I would specify the interactions like a protocol (which, BTW, is an inherently asynchronous binary contract), then being artificially limited by assumptions of implementations with respect to scheduling and threading makes very little sense. Or at least it does to me.

@rkuhn there are actually solutions to not having to have a Publisher schedule on its own thread (pool). As well as other questions you ask. Most involve one or more intermediaries, but really why would that necessarily matter to the external user spec? Or should it? I don't think it really should or needs to. It might be a restriction for a given implementation, though, which is different. And if the spec is well written, then it should leave some room for innovation and growth of implementations.

I actually think a specification becomes much stronger when the assumptions of runtime, language, implementation, etc. all melt away into reactivity and per element state.

@benjchristensen

I suggest the contract in regards to async behavior be non-opinionated by design. A Producer should be capable of being consumed by a completely synchronous Consumer on one extreme or one that schedules the work on a thread pinned to its own CPU core at the other extreme. In either case the Consumer communicates how many elements it can receive and the Producer is free to use whatever resources it desires to send the events. The only rules a Consumer needs are "don't block" and "signal demand".

I agree with this. I also agree on the reasoning Ben has put forth. I think the spec gets stronger with this.

Think about wording.

If a spec says MUST NOT then no implementation can ever do it. If a spec says MUST then every implementation has to do it. This is limiting, but is sometimes needed for security, correctness, etc. But if the reason for the MUST or MUST NOT is a limitation in the minds of the specifiers based on current understanding of how to implement it, then that seems very dangerous. Maybe not really bad, but can be dangerous....

It might be good to adopt the IETF Wording from RFC 2119. The terms are very good for this type of compromising.

With those terms, how does the more contentious points change?

sirthias commented 10 years ago

there are actually solutions to not having to have a Publisher schedule on its own thread (pool)

This is interesting. What solution in particular do you have in mind, @tmontgomery?

jbrisbin commented 10 years ago

I just want to echo @tmontgomery and @benjchristensen a little in responding to a few of @rkuhn's concerns specifically:

We have a fundamental disagreement which the two of you might want to summarize as “does Reactive Streams define asynchronous boundaries, or does it merely accommodate them?”

I would prefer to state it something like: "Reactive Streams describes a standard for pushing data across an asynchronous boundary". We can, of course, define what an asynchronous boundary looks like, but as you point out, what a JVM developer sees as asynchronous may be quite different from what a browser-based JavaScript developer might. It may not be the best term to use for purposes of accurately describing the situation but, for lack of a better one, we should probably just stick with "asynchronous" without going into more technical detail about what is meant by that.

The goals we define must conform to this mission statement.

I completely agree; and we may actually be talking at cross-purposes here. I don't think I've suggested anything that is in violation of this mission statement.

how a Publisher is supposed to dispatch a stream to multiple subscribers in a manageable fashion without one subscriber having to suffer from the slowness of another?

It was my understanding that we're defining how data crosses an asynchronous boundary and not how that crossing affects different components nor describing a system of resource "protection". Isolating a misbehaving task is certainly an issue to be dealt with in any asynchronous system, but I don't see how it is imperative that the specification address this issue when it's more important to describe the relationship between the components and not necessarily what all the side effects of that relationship are, nor how one should go about addressing those side effects. I don't see how we can prescribe resource protection by simply varying the number of items sent across the boundary anyway. Items of equal size do not necessarily have equal cost to process. Since the cost of handling a single item is not part of this interaction, I can't see a successful outcome by trying to prescribe the ways in which parties on either side of the asynchronous boundary should "behave".

How would that not involve other threads? How can a Publisher schedule to dispatch a Subscriber on anything but its own thread (pool)? How does it help reduce thread hopping that due to the unclear contract Publishers have to defensively spawn async tasks for calling onNext?

I go back to my original argument that assuming there are "threads" at all is assuming too much (JavaScript VM event loop being the canonical example). We simply can't know how these signals will be interpreted in the systems in which Reactive Streams might be implemented. "Asynchronous" and "non-blocking" are the key phrases in our mission statement that hold the real value. We can address the problem we set out to without defining in the specification the mechanisms used to respond to those signals.

@benjchristensen Your library argument is a red herring: it is your choice to represent async Observables as Publisher, and it will be an essential part of the library.

I disagree. Reactor is also leveraging Reactive Streams interfaces directly, not just on the edges to facilitate library interoperability. We have asynchronous boundaries all over inside Reactor and what we've seen so far is that Reactive Streams can help us there. I don't see value in discouraging implementors from using Reactive Streams as an integral part of their implementation library. Implementors should be free to use as much or as little of Reactive Streams as they wish without the the specification getting in their way or precluding them from using it in the manner that provides they and their users the most value.

This says nothing about particular thread scheduling patterns.

Precisly the point. Nor should it.

All technologies mentioned in this discussion have supporting facilities which enable implementations to comply efficiently.

As long as we don't specify the details of task scheduling I agree this is true. But if we prescribe task scheduling/resource isolation in the specification, then we are limiting the applicability of Reactive Streams unnecessarily.

I really worry that if we can't come to a compromise on this fairly soon, it will be discouraging to implementors that will see this effort as of interesting, but essentially marginal benefit. It could easily be a big tent under which a wide variety of implementations handle data in basically the same way if they simply follow the specification guidelines in spirit, if not the API exactly. There is still benefit in doing this without the lower level line in the sand declaring, as @tmontgomery says "MUST" and "MUST NOT" as it relates to task scheduling.

tmontgomery commented 10 years ago

@sirthias Specifically, many organizations have built extensive multi-layered persistent queuing solutions so that sources don't have to block when sending. This is a common pattern for JMS usage at enterprise scale. Essentially, (and this is a heinous oversimplification), try to enqueue and if it would block, then try to enqueue onto worker broker 1, if that doesn't work try to enqueue onto worker broker 2... Then consumers drain queues in specific order and often enforce ordering as well. It's ugly, but it can be made to work.... under conditions. Variants of this exist for backing durable topics with queues to avoid slow receivers slowing down sources.

These are really a mess. But it can be done if you don't want to flow control the source.

And even the lowly, "let it blow memory and I'll capacity plan" placebo approach.

More elegant solutions exists in the various forms of work stealing. Just flip it on its head. i.e. instead of pull, push work off.

benjchristensen commented 10 years ago

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure on the JVM.

@rkuhn I agree with this statement, just not the interpretation that every consumer must schedule every notification asynchronously.

Take this example:

nioSelectorThreadOrigin map(f) filter(p) consumeTo(toNioSelectorOutput)

It has an async origin and an async destination. Let's assume that both origin and destination are selector event loops. The Subscription.request(n) must be chained from the destination to the origin. This is now where each implementation can choose how do do this.

I'll use the pipe | character to signal async boundaries (queue and schedule) and R# to represent resources (possibly threads).

nioSelectorThreadOrigin | map(f) | filter(p) | consumeTo(toNioSelectorOutput)
-------------- R1 ----  | - R2 - | -- R3 --- | ---------- R4 ----------------

In this example each of the 3 consumers, map, filter and consumeTo asynchronously schedule the work. It could be on the same event loop (trampoline), separate threads, whatever.

nioSelectorThreadOrigin map(f) filter(p) | consumeTo(toNioSelectorOutput)
------------------- R1 ----------------- | ---------- R2 ----------------

Here it is only the final step that asynchronously schedules, by adding work to the NioSelectorOutput event loop. The map and filter steps are synchronously performed on the origin thread.

Or another implementation could fuse the operations to the final consumer:

nioSelectorThreadOrigin | map(f) filter(p) consumeTo(toNioSelectorOutput)
--------- R1 ---------- | ------------------ R2 -------------------------

All of these variants are "asynchronous streams". They all have their place and each has different tradeoffs including performance and implementation complexity.

This is how I have always interpreted "asynchronous stream processing" and why I don't see a conflict with not requiring async scheduling by a Consumer but instead see it as the flexibility required for different implementations to do what is right for their needs while still allowing async stream processing with backpressure.

benjchristensen commented 10 years ago

@tmontgomery Thanks for your input and insights. I like the idea of using IETF Wording from RFC 2119 and will draft up some specifications using that wording and post here for discussion.

jbrisbin commented 10 years ago

I should also add that we have a production application that is processing billions of events per day without ever stopping to do a full GC and the only reason we can do that effectively is by consolidating as much async processing as possible onto as few threads as possible. In many cases, we're using the RingBuffer so you only have a single, shared thread. It's essentially an event loop. Our async processing pipeline looks very much like @benjchristensen 's first example--with the exception that our R1-R4 is actually just R1.

The point of this is that we are able to maintain extremely high volumes of data by judiciously managing our resources and one of the ways we must do that is to eliminate, as far as is possible, context switches and thread scheduling of any kind. If the Reactive Streams specification demands that we introduce expensive resource management between each operation, then I'll be honest with you: we won't use it. If, however, it leaves resource management up to us to decide, then we will be whole-hearted champions and fast friends. :)

benjchristensen commented 10 years ago

Here is an attempt at distilling the contract of the 3 types into this language. These are modified and augmented from #41

The key line from below evolving from this thread of discussion is: "A Subscriber MAY behave synchronously or asynchronously but SHOULD NOT synchronously perform heavy computations."

I know this line is not good enough, but it's a single sentence we can now debate :-)


Subscriber

Publisher

Subscription

smaldini commented 10 years ago

@benjchristensen @rkuhn @jbrisbin I can see the argument of having a different execution context by subscriber and I also see the benefits of some freedom of implementation in terms of scheduling. So I am not in a position of disagreeing with anyone, and even if the current implementation I work with Reactor does pass the current tck, I won't have any issues modifying as necessary.

rkuhn commented 10 years ago

We are making progress:

The implication of SHOULD NOT in that crucial sentence you quote is that different implementations will choose to push the burden of compliance to the end-users to varying degrees, but I am beginning to see that that may make sense: if someone wants to use a sharp knife they should be free to, even if the knife’s spec cannot keep them from cutting an artery. And some implementations may choose to implement stronger guarantees.

As this is the central point of the Reactive Streams spec, I would like to have @mariusaeriksen and @normanmaurer or @purplefox chime in before drawing the final conclusion. It would be great if we could settle this and #50 early next week so we can progress towards a 0.4 release which contains the discussed changes (and fix the website as well).


One technical point towards Ben’s concrete proposal: requiring (as in MUST) a certain thread scheduling pattern for Subscription.request() is equally limiting than the other cases we discussed. I propose to change it to “MUST assume that onNext() invokes request() synchronously and MUST NOT allow unbounded recursion; it SHOULD NOT invoke onNext() synchronously”. We will use this to implement a Publisher of strict collections of size one (for example) without any asynchronous facilities.

alexandru commented 10 years ago

On Fri, May 16, 2014 at 9:55 AM, Roland Kuhn notifications@github.comwrote:

“MUST assume that onNext() invokes request() synchronously and MUST NOT allow unbounded recursion; it SHOULD NOT invoke onNext() synchronously”

I was also one that raised concerns about the requirement for onNext() to invoke request() or cancel() asynchronously and this is exactly what I wanted.

Cheers,

Alexandru Nedelcu https://bionicspirit.com

benjchristensen commented 10 years ago

And some implementations may choose to implement stronger guarantees.

Yes, this is the flexibility to implementations I hope for.

pattern for Subscription.request() is equally limiting than the other cases we discussed.

I like the changes you propose. They achieve the same thing while leaving the implementation to choose how to do it. I think it would end up something like this then?

... and replace these lines ...

rkuhn commented 10 years ago

If you add the same qualification as for onNext (i.e. “request SHOULD NOT synchronously perform heavy computations”), then I think we have a winner!

tmontgomery commented 10 years ago

Awesome! I like how this is shaping up!

sirthias commented 10 years ago

@tmontgomery Ok, thanks for the explanation!

benjchristensen commented 10 years ago

I added this line:

I modified these lines:

Here is the full contract again with the changes made:


Definition of binding words in CAPLOCKS from https://www.ietf.org/rfc/rfc2119.txt

Subscriber

Publisher

Subscription


If we agree with these specifications (the grammar and wording can definitely be improved later) I can start a new pull request to replace #41 that is updated with this text. Do we have agreement?

mariusae commented 10 years ago

I’m chiming in a bit late here. Sorry about that; it’s been a rough couple of weeks. Let me first respond to the discussion, and then to @rkuhn’s specific request later.

@benjchristensen said:

I suggest the contract in regards to async behavior be non-opinionated by design. A Producer should be capable of being consumed by a completely synchronous Consumer on one extreme or one that schedules the work on a thread pinned to its own CPU core at the other extreme. In either case the Consumer communicates how many elements it can receive and the Producer is free to use whatever resources it desires to send the events. The only rules a Consumer needs are "don't block" and "signal demand".

I agree with this, too. I think that, if we wanted it the other way, we’d phrase the entire API in terms of queues, whose production and consumption must be managed entirely by the user.

I think the confusion here comes from the fact that we’re conflating asynchronous execution with asynchronous messaging semantics. This is the same reason I suggested earlier that, if we instead phrased the operations in terms of, say, Futures, we’d have separated these concerns cleanly.

For better or worse, they are conflated in this design. This simplifies some aspects on the one hand, but causes semantic ambiguities on the other; however, in my mind at least, I think the interpretation outlined above by @benjchristensen is the most understandable one, that is both practical and useful and has a high chance of being implemented correctly.

benjchristensen commented 10 years ago

We may want to define "heavy computations" since it's on several of the points and is up for interpretation.

It's difficult to use exact timing as that changes based on the execution environment and over time with advancing hardware.

I think of it this way:

Heavy
Not Heavy

Anyone have ideas on how to define this? It will be abstract and subjective ... kind of a "you know it when you see it" ... but it would be good to have some kind of definition.

mariusae commented 10 years ago

@rkuhn consider yourself chimed; i agree with the phrasing.

mariusae commented 10 years ago

@benjchristensen I tend to define it in terms of suspension and pre-emption. E.g. if a piece of code is allowed to run without being suspended by the OS the process in any way (because it issued a blocking operation or because it was pre-empted), then it’s not “heavy”.

benjchristensen commented 10 years ago

I like that definition.

jrudolph commented 10 years ago

Wrt the question of "heavy" vs. "non-heavy":

1) What about user-defined operations? E.g. should a map-operation that runs a user defined function always be considered heavy? 2) If the subscriber is a step in a processing pipeline (i.e. it's also a publisher), should calling onNext on the downstream side be considered as heavy? If calling onNext transitively would be allowed this would mean that transitively an arbitrarily large processing pipeline could be executed by just calling onNext at its beginning. Even if every single step would be non-heavy (by your definition) the sum of all those steps could be heavy.

alexandru commented 10 years ago

I think the term "heavy" as used here is very confusing, like take this line from @benjchristensen ...

anything keeping a CPU busy for > 1second (arguably anything > single-digit milliseconds)

On top of the JVM, because of 1:1 multithreading, anything keeping the CPU busy for > 1second is a win, as efficient usage of resources one important point of asynchronous computations on top of such a platform. On top of Javascript or Node.js this may be relevant, because there you rely on cooperative yielding for slicing pieces of work and in the browser it can for example make the interface unresponsive. But scheduling things to run that take more than 1 second should be a decision that the user makes, after all the effects are usually visible. And for example on top of Rx, even if a pipeline is synchronous, you can always make the subscriber asynchronous by means of subscribeOn and observeOn.

When I'm thinking of heavy I'm thinking of this

Taking into account the in-browser liveness issue, the common theme is anything that prevents the overall progress of the system.

mariusae commented 10 years ago

@alexandru that is true, but we need to recognize the trade-off between latency and throughput. The "heavy" clauses are inserted wherever control is yielded to the user. If the user code is unexpectedly "heavy", the underlying system may not be able to achieve its latency goals in a simple and efficient manner.

It may be worth distinguishing between terminating and nonterminating "heavy" code, though -- the former causes performance bugs; the latter can cause correctness bugs (e.g. deadlocks).

rkuhn commented 10 years ago

We have agreement on the proposed specification (counting in @benjchristensen, @jbrisbin, @tmontgomery, @mariusaeriksen and myself), so we can go ahead and fix the discrepancy between code and spec.

On the topic of “heaviness” I propose that we leave the wording as is for the current change and discuss further (and subsequently fix) in issue #54.

benjchristensen commented 10 years ago

@rkuhn I agree with proceeding with wording as is and using #54 to clarify the "heaviness" part.

benjchristensen commented 10 years ago

I think we can close this now as the specification has been merged.