reactive-streams / reactive-streams-dotnet

Reactive Streams for .NET
MIT No Attribution
198 stars 28 forks source link

APIs are synchronous #19

Open sergeybykov opened 8 years ago

sergeybykov commented 8 years ago

The main issue I see with the APIs is that they are synchronous - methods return void. This makes it pretty much incompatible with IO operations, distributed computing, and limits the applicability to in memory operations only. Unless you put queues between every producer and consumer, I don't understand how you can achieve "asynchronously passing elements between components" with synchronous APIs.

I understand that you ported a solution that was defined for the environments with poor support for async operations. But .NET happened to have the powerful async/await constructs that enable truly asynchronous reactive programming.

For comparison, in Orleans Streams similar APIs are completely async - http://dotnet.github.io/orleans/Orleans-Streams/. In fact, the whole programming model of Orleans is based on async methods. This enables building of efficient high throughput applications that can make IO and remote calls without blocking threads, so that the latency of those calls doesn't impact throughput.

Please correct me if I misunderstood something from reading just the readme.

Horusiath commented 8 years ago

@sergeybykov Thanks to the void as a return type, you don't need to wait for the underlying operation to complete. It's non-blocking. From within you are free to interop with TPL (as i.e. in Akka SelectAsync) . Other nice thing with this approach is that it's universal for all currently used versions of .NET - making it available for things like Unity for example.

Async/await is necessary to keep things non-blocking only in request/response communication model, which reactive streams aren't.

akarnokd commented 8 years ago

The problem with async-await is that you individually await elements, setting up a modest set of infrastructure each time whereas in RS, everything expect there is going to be a next value just around the corner and the infrastructure cost is usually just an equeue/dequeue per item. (I've done an evaluation in context of Java two months ago.)

sergeybykov commented 8 years ago

@Horusiath

Thanks to the void as a return type, you don't need to wait for the underlying operation to complete.

I don't understand the failure semantics. Unless it blocks to wait to confirm that the item went through fine, it's fire-and-forget. Then you'd need a side channel for acks and failure notifications. What gives?

Async/await is necessary to keep things non-blocking only in request/response communication model, which reactive streams aren't.

I don't think this is correct. It is also to avoid blocking for acks, which are not responses. In many cases they are merely confirmations that event got successfully added to a persistent queue.

@akarnokd You can batch and amortize the cost of the infrastructure where it matters. What I don't understand is the failure semantics with fire-and-forget APIs. Async/await makes it very explicit.

Horusiath commented 8 years ago

Failures and acknowledgements can be send upstream by creating bidirectional stream (example). Why it's not default? Because it limits possibilities - by composing streams, you can decide if/when/where/how you want to use them. Depending on your protocol you may decide to ACK every message, or maybe apply microbatching? Maybe adding some more meaningful response than simple acknowledgement (partial failure for example)? Once you decide to set a baseline of an API to ack every single message, you're loosing those options.

It is also to avoid blocking for acks, which are not responses

I think, we have conflict of definitions here ;) I'm talking about acks as response signals send back to upstream. Also in scenarios where message order matters it doesn't avoid blocking, as you have to wait for message to be processed until end of stream before starting to process next one.

viktorklang commented 8 years ago

Hi Sergey,

apologies for the late response, I've been swamped.

Please correct me if I misunderstood something from reading just the readme.

Thanks for asking these questions, I hope I can answer them clearly.

The main issue I see with the APIs is that they are synchronous - methods return void.

I can see why you'd think that, so let's try to illuminate a bit!

«The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.»

Fortunately(!) Reactive Streams is not an API (Application Program Interface) but an SPI (Service Provider Interface)—this also explains why it is minimal, doesn't provide combinators etc.

The goal was to find the smallest possible, correct, high-performance, and reasonable contract and back that up with a spec and a TCK to guide Service Providers (think Library Developers) to implement it in conformance.

The methods are synchronous in the sense that methods are synchronous (i.e. they execute on the current thread until completion), but they are asynchronous in the sense that they are signals i.e. they are equivalent to message sends.

This makes it pretty much incompatible with IO operations, distributed computing, and limits the applicability to in memory operations only.

Au contraire! Akka Streams (implementation of Reactive Streams SPI) has a lot of support for IO, even piggybacking on TCP flow control but would most definitely work with things like SCTP, HTTP/2 etc.

For a network protocol compatible with Reactive Streams, see: https://github.com/ReactiveSocket/reactivesocket

Unless you put queues between every producer and consumer, I don't understand how you can achieve "asynchronously passing elements between components" with synchronous APIs.

I'm not sure how, or why, you'd want to have a completely queue-less asynchronous system—I mean, even a single event loop would need to have a run-queue.

I understand that you ported a solution that was defined for the environments with poor support for async operations. But .NET happened to have the powerful async/await constructs that enable truly asynchronous reactive programming.

You can most definitely do async operations rather nicely on the JVM: https://github.com/scala/async

We specifically avoided using Futures/Tasks/Promises because it performs poorly. We've benchmarked implementations of the RS spec on the JVM to > 200mops/s between threads, in other words: it's possible to do it completely allocation free.

For comparison, in Orleans Streams similar APIs are completely async - http://dotnet.github.io/orleans/Orleans-Streams/. In fact, the whole programming model of Orleans is based on async methods. This enables building of efficient high throughput applications that can make IO and remote calls without blocking threads, so that the latency of those calls doesn't impact throughput.

The impossibility of comparing SPIs to APIs aside, there's nothing about RS which necessitates synchrony in processing. Akka Streams (and Akka Http) are completely asynchronous, allow for network IO, allows concurrency (processing of stages concurrently) etc.

I hope that helped! :)

Cheers, √

sergeybykov commented 8 years ago

@Horusiath I'm not sure I understood the BiDi example correctly, but it seems like in the ping-pong case the pipeline executes synchronously, so that pong message is produced on the same thread as ping is delivered. This means to me that delivery of new events is blocked until pong is sent. Did I misunderstand that?

Depending on your protocol you may decide to ACK every message, or maybe apply microbatching? Maybe adding some more meaningful response than simple acknowledgement (partial failure for example)? Once you decide to set a baseline of an API to ack every single message, you're loosing those options.

Returning a promise from a method doesn't preclude batching or other manipulations with delivering of messages. Promises give an opportunity to await every call, but they don't force you to do that. It's also easy to implement fire-and-forget by returning an already resolved promise (with no allocation). IMO it's a more flexible and powerful foundation than one way messaging that requires acks or responses to be sent out of band and do additional work to correlate them with requests.

Hi Viktor, Thank you for the detailed answer.

I'm looking at https://github.com/ReactiveSocket/reactivesocket/blob/master/tck/rxclient.q. If I understand it correctly, SimpleRequestClient performs the jobs of correlating requests and responses relying on the order of responses. Is my understanding correct?

The mental picture I'm trying to apply here is as follows. Receiver of a message needs to make a remote call, e.g. to a web service, combine that response with some of its own data to compose its own response, and then send it back to the caller. A fairly typical case for a cloud service. If the receiver is invoked as a void method, it would either have to block the thread until it is done with processing of the message or to put the message into a queue for further processing on a different thread.

In both cases responses have to be sent out of band and correlated on the caller's side. In the blocking case, failures can be indicated and automated via exceptions. In the queue case, failures have to be sent out of band and correlated.

Promises nicely wrap this boiler plate coordination in a simple abstraction.

I get your argument about performance for local in-proc execution.

We specifically avoided using Futures/Tasks/Promises because it performs poorly. We've benchmarked implementations of the RS spec on the JVM to > 200mops/s between threads, in other words: it's possible to do it completely allocation free.

But I think it's much less relevant for distributed cases.

At the end of the day, if you guys prefer the API/SPI like this, who am I to object. :-) I only opened this issue because I was asked to provide my humble opinion.

viktorklang commented 8 years ago

Hi @sergeybykov,

Apologies for the extremely slow response, it seems like this is operating on eventual consistency :)

The mental picture I'm trying to apply here is as follows. Receiver of a message needs to make a remote call, e.g. to a web service, combine that response with some of its own data to compose its own response, and then send it back to the caller. A fairly typical case for a cloud service. If the receiver is invoked as a void method, it would either have to block the thread until it is done with processing of the message or to put the message into a queue for further processing on a different thread.

I'm not sure I understand, could you provide an example? If the thing which provides the remote call is a Processor from SomeRequest to SomeResponse, then it would receive demand for its downstream, and when that happens, it would issue demand to its upstream, and when it gets an element, it would make a remote call, and when that remote call arrives back, it will push it downstream?

In both cases responses have to be sent out of band and correlated on the caller's side. In the blocking case, failures can be indicated and automated via exceptions. In the queue case, failures have to be sent out of band and correlated.

Promises nicely wrap this boiler plate coordination in a simple abstraction.

You can most definitely implement Publishers / Subscribers / Processors internally using Promises (when needed). (You don't have to convince me of the utility of Promises ;))

But I think it's much less relevant for distributed cases.

Avoiding allocations whenever possible means that it is possible to run more "streams" in parallel (less GC OH) but also run them at higher speeds. (We've benched RS to > 200m handoffs per second)

At the end of the day, if you guys prefer the API/SPI like this, who am I to object. :-) I only opened this issue because I was asked to provide my humble opinion.

Thank you :)