ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.07k stars 396 forks source link

[Feature request] Backpressure support #379

Open per-gron opened 7 years ago

per-gron commented 7 years ago

Background

I'm playing around trying to combine RxCpp and gRPC++. I want to use gRPC and Rx seems like an abstraction that is a nice fit for gRPC, given its support for unidirectional or bidirectional streams of values (and especially since the gRPC++ API is painfully low-level).

So far, I think the results look promising although it needs more work to be usable.

The biggest issue that I can foresee is RxCpp's lack of backpressure. gRPC supports backpressure by having a fixed-size send buffer and only reading when you ask for it. Right now, in my Rx<->gRPC wrapper I work around Rx's lack of backpressure support with an unbounded send buffer, but that really is not going to scale. I'm pretty sure that servers I write with it will run out of memory and crash under load.

In https://github.com/Reactive-Extensions/RxCpp/issues/177 you mention that contributions are welcome, and that they should start with a discussion and prototyping, so here I am, trying to open the discussion.

(Right now, I can't commit to actually finishing adding backpressure support because it seems like a big thing and I don't know how big it is yet, but hopefully this discussion can help even if I don't finish it.)

How other Rx libraries handle backpressure.

As I'm sure you are aware, backpressure is a pretty complicated animal in Rx. Here is some info on how others do it:

RxJS: Interesting discussion, backpressure docs. The documentation (see the bottom of the last link) says that RxJS is not finished with its backpressure, so it might not be a good thing to take its model.

RxJava 1.x: Documentation. RxJava 1.x retrofitted backpressure support to its Observable interface but that didn't work out very well for them because it led to cryptic MissingBackpressureException errors that were difficult to understand for beginners and easy to get by mistake even for people experienced with RxJava.

RxJava 2.x: Documentation. RxJava 2.x addresses 1.x's issues by introducing a separate backpressure aware Flowable concept, distinct from Observable.

Rx.NET: From what I can tell, Rx.NET does not have a coherent story around backpressure. Stack overflow question, really interesting Github issue (still open). An interesting point that's made in that issue is that perhaps Rx should not be used if backpressure is needed, which I will elaborate on below.

Reactive Streams: Reactive Streams is "an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure." Seems relevant too. What RxJava does looks similar to/compatible with this.

Should Rx libraries handle backpressure?

In the Rx.NET issue where they discuss backpressure, some people argue that if your Observable can react to backpressure, then it is not a true "push" source, and you should consequently use IAsyncEnumerable (which is an async "pull" source) if you need or are able to meaningfully use backpressure.

Using a dedicated pull style enumeration interface does seem like it would be a good idea for my gRPC++ use case, but C++ doesn't have anything like C#'s IAsyncEnumerable so I can't use the advice to use it instead of Rx.

To me it seems like if I made an IAsyncEnumerable inspired interface in C++, I would want basically the same functionality as RxCpp offers: All of the operators would be useful and most of the interface fits really well.

So, should RxCpp handle backpressure? The hypothesis I'm going to go for here is: Yes, but instead of making existing operators "backpressure aware", RxCpp should add support for a new distinct type of stream: The pull stream. The main feature of pull streams is that they don't yield values unless you ask for them. Because of this, they are automatically backpressure-compatible.

What I propose here is effectively to change RxCpp from a library that deals with push streams to a library that deals with both push streams and pull streams, providing a common set of operators on them and means to convert between them.

Concrete changes to RxCpp

enumerable

Add a new class that is the pull stream version of the push stream observable class: enumerable. I think enumerable is a nice name: instead of passively observing the stream you actively enumerate it. It also matches C#'s IAsyncEnumerable name, and as a bonus has as many letters as observable so the names will align perfectly (@punchfox would be proud).

I think enumerable here is the same thing as Publisher in Reactive Streams, so perhaps that is a better name. On the other hand, I think the symmetry between enumerable vs observable is nicer than publisher vs observable.

Creating an enumerable is similar to creating an observable:

// Make an enumerable that just counts up from 0 to 99.
auto e = enumerable<>::create<int>([](subscriber<int> s) {
  // This is not a push stream; we are not allowed to push things to the
  // subscriber yet.

  // Return a function that is invoked whenever the subscriber wants more
  // elements from the stream (like request() in RxJava).
  return [s{std::move(s)},val=0](size_t count) mutable {
    for (int i = 0; i < count; i++, val++) {
      if (val < 100) {
        s.on_next(val++);
      } else if (val == 100) {
        s.on_completed();
      } else {
        break;
      }
    }
  };
});

subscription

Enumerating an enumerable requires some means to request it to emit things (which eventuelly turns into a call to the request callback like the lambda above). RxJava and Reactive Streams does this on Subscription, and I think that makes sense.

Perhaps the most natural thing, given that enumerable is a separate type from observable, is to have a separate subscription type as well, but I'm not sure it's worth the additional code and concepts. Instead, I propose that subscribing to both observables and enumerables results in subscriptions of the same type. To support enumerables, a new method is added to subscription: request(int count), with the same semantics as in Reactive Streams.

When subscribing to observables, subscriptions behave as if a call to request with the magic unbounded (Long.LONG_MAX/std::numeric_limits<size_t>::max()) value is done as part of setting up the subscription. This makes request effectively a no-op for observable subscriptions: requesting more than all of the elements is the same as requesting all of the elements.

Continuing the example from above:

auto s = e.subscribe(
    [](int v) { printf("OnNext: %d\n", v); },
    [] { printf("OnCompleted\n"); });
s.request(50);
printf("Half-way through now...\n");
s.request(1000);

The difference if e would have been an observable such as observable<>::range(0, 99) is that subscribing in itself does not cause the enumerable to start emitting values; that only happens when they are requested.

observable vs enumerable

One way to implement the observable and enumerable types is to have completely separate implementations for them. To me, that sounds like a bad idea because observable and enumerable should be almost entirely identical; the differences between push and pull lie mostly in the operators, event sources and subscribers.

Another option is to have a shared base class, perhaps called stream, which has most of the actual code, and let observable and enumerable be subclasses of them. I'm not super fond of this option either because I don't like implementation inheritance.

My favorite option I've thought of so far is to have a main implementation class (again perhaps called stream), which is like the current observable but that takes another template parameter, an enum class (mode?) that has values push and pull. Then enumerable and observable could be defined as

template<class T, class SourceOperator>
using observable = stream<mode::push, T, SourceOperator>;

template<class T, class SourceOperator>
using enumerable = stream<mode::pull, T, SourceOperator>;

A possible benefit of this approach is that operator implementations could easily have either the same or different implementations for push vs pull using template specialization.

I think that with this approach, stream can do almost the exact same thing for push vs pull modes. The callback sent to observable<>::create to can be allowed to return a "request" callback (as in the example above) in both push and pull mode. In push mode, stream can be made to call that callback with the infinite value instantly. In pull mode, stream can require that such a "request" callback is returned.

I think that if stream<>::create allows returning a "request" callback in both modes like that, then it will often be possible to write operators with one implementation that work in both push and pull modes.

Operators

Ideally, I think the changes should be made in a way so that existing operators continue working, but only for observables. In order to support enumerable, operators will need to either add a separate implementation or have a templated implementation that operates on stream with a templated push/pull parameter.

This way, things remain reasonably backwards compatible while still making it reasonably easy to create operators that work on only push or pull or both of the modes.

I'm not 100% sure exactly what this needs to look like.

subscriber

It is a little bit awkward that subscriber exposes typedefs and methods with "observer" in their names. Perhaps they should be renamed or new ones should be added with that use the word "stream" and/or "enumerable".

Add new stream_overflow_error exception class

When an operator on an enumerable receives more elements than it has asked for and it can't handle it, it can fail the stream with a stream_overflow_error exception. stream_overflow_error only happens if an upstream stream pushes more elements than it has been asked to push, which is a bug in the source.

In RxJava 1.x, which doesn't keep track of push vs pull sources in the type system, incompatible sources and operators can be mixed in a way that is only found later on when the system is pushed to its limits. Because enumerable and observable are separate types, that type of bug would be a compile-time error in RxCpp.

stream_overflow_error should inherit from logic_error because it should never happen, not even under load.

Add operators to convert to and from observable and enumerable

The operator that converts enumerable to observable could be called firehose or to_observable. When subscribed to, it subscribes to the enumerable and immediately asks it to enumerate all of its elements with a call to request().

The operators that converts observable to enumerable could be called (names need improving, they are inspired by RxJava)

Differences in operator implementation for push vs pull

To get a feeling for what implementing operators for enumerable is actually like, and how (and how much) it is different from observable, let's look at some examples.

empty

My understanding of the Reactive Streams spec, which I suppose makes sense to follow as much as possible, is that terminating a stream by completing it or failing with an error counts as an event with regards to request. So if empty is an enumerable, it must wait until a request with a non-zero value until it completes, unlike an observable, which can complete immediately.

iterate, range

When iterate and range create an observable, they instantly start emitting values. In the enumerable case, they would have to wait until values are requested and stop when it has produced the allowed number of values.

The "observable vs enumerable" section proposes an approach where both observables and enumerables can be provided with a "request" callback, and observable simply immediately calls it with int_max. With this, iterable and range could be implemented in a pull style and automatically also support push. This would probably result in a little bit (does it matter?) of runtime overhead but less code than two separate implementations.

delay, delayWithSelector

I don't think these make sense for enumerable.

findIndex, find, just, elemebtAt, first, last, average, max, min, reduce, sum, every, some, includes

These operators take a stream and emit only one value. In a pull setting, I think it makes sense to ask for an unbounded number of inputs and simply buffer that one value until it is requested. So these would be implemented in a push-style and then converted to pull via buffering.

zip

zip is one of the operators that really benefit from a pull-style interface. In pull mode, it needs a buffer proportional to the number of input streams and the number of requested elements, which is probably constant in a pull situation.

Again, like iterate, a pull implementation of zip could be used for push as well by simply starting with requesting an unbounded number of values.

Open questions


What do you think? Does this sound reasonable? Insane? Incoherent to the point of making it impossible to read?

kirkshoop commented 7 years ago

This is a great start. I have some thoughts here, but it will take some more time to write them down.

While I am working this out.

Take a look at range-v3 as the pull model modified to match the for await proposal - begin() returns a future<iterator> and iterator::operator++ returns a future<iterator>. this has some issues - particularly for the operators that merge multiple inputs, but is more naturally c++. some prototype algorithms in this model are implemented here

Also think about using two observables. one for sending data and the other for sending back the request count. Two independent observables may avoid the impact on each algorithm.

Thanks for starting the discussion!

per-gron commented 7 years ago

@kirkshoop thanks for responding! I'm looking forward to reading your thoughts on this.

I have used the "co" Javascript library a lot, which exploits Promises and generators to effectively make it possible to write C#-style async/await style code. I really like it. To me it feels like a truly next-level approach to work with futures/promises.

From what I heard, when RxJava people attempted an approach that created a Future for each value, they ran into performance issues. I'm not sure that would happen for C++ though.

Not sure I fully understand your point about two observables. It seems to me like that approach would work only for the algorithms that are near-trivial to adapt to this anyway.

per-gron commented 7 years ago

@kirkshoop I did not want to wait anymore for rxcpp to add backpressure support, so I made my own library that has it: https://github.com/per-gron/shuriken/tree/master/src/rs It's based on Reactive Streams. It might perhaps have things in it that are interesting for RxCpp

kirkshoop commented 7 years ago

@per-gron, this is great! It is so nice to use C++14. My C++14 rxcpp prototype also resulted in a much cleaner implementation.

I would love to have you contribute more to this discussion as you run into success and failure with rs. Knowing what works and what does not and what usage patterns to choose and to avoid, will impact future back-pressure support in rxcpp.

I have followed the development of the RxJava ReactiveStream and RxJS Pausable back-pressure implementations. I have never been satisfied with the impact on the system caused by the design choices. Both work by abstracting out the step function from the generator and then doing math at each operator to count inputs and outputs. This feels the same as the RxJava and Rx.Net implementations pushing Concurrency into every operator. I want to avoid reimplementing complex policies in each operator.

The primary motivator in most conversations unbounded queueing in an expression. However, the current implementations represent that as a signal passing a count from consumer to producer. I would prefer a more direct solution to the problem.

So given that

I would like a solution that abstracted queueing, similar to the concurrency abstraction that the rx schedulers provide. In fact, since schedulers are already queues, I would like to make them the queueing abstraction as well. I just haven't figured out how to implement zip queueing in terms of a scheduler yet.

In any case, I would like the policies and math be implemented once and shared, but only affect the few points that queue and even there have very simple logic to implement.

The Disposable/Subscription/Lifetime defines a scope within an expression. Queue control needs a scope for the policy. Adding a queue control mechanism to the Subscription might work. I do not know the shape for a mechanism that would satisfy me yet.

per-gron commented 7 years ago

@kirkshoop yes, C++14 is nice for sure. I stole a few ideas from rxcppv3 that made things a lot nicer. Thanks! :-)

You are completely right that the way Reactive Streams works is that it pushes complexity to operator implementations. I find that a lot nicer than the only other option that I know of which is to push the complexity to application code. Some operators are very easy to do, for example map and reduce. Other operators are a bit in between (filter does a little bit of math to keep track of back-pressure). And then there are operators like zip and merge where backpressure becomes the main thing of the whole implementation.

My use case for rs is to be a nicer interface for the horribly low level async C++ API of gRPC. Because I want to write scalable backend services, the stance that rs is taking is that unbounded queues are simply not used, ever. Because of that, and because in backend services you can control your system to simply not have any push/non-backpressured sources, talking about reusable abstractions that control queueing behavior (drop, pause, ring) is irrelevant for rs (<- if you read only one part of this little rant, this is the most interesting I think). If at some point in the future rs will have such abstractions, it can be an only occasionally used set of helper functions rather than a central part of the design.

I think rs could possibly be improved by finding better abstractions for back-pressure handling, but it doesn't seem like a very easy thing to do. Different operators do quite different things to handle backpressure.

In the use case I have there is very little runtime penalty of requesting one element at a time, but in a system where Request calls result in a network call, I can see that it might become relevant to have logic that deals with buffers and watermarks and stuff like that. I think some of the more mature Reactive Streams libraries do. That is something that would definitely be helped by some reusable abstraction. But that seems quite doable as a separate operator.

per-gron commented 7 years ago

@kirkshoop I've kept hacking a bit on rs. One thing that I've been able to do is to avoid shared_ptr altogether, and make it so that some operators do no heap allocation at all, for example map.

kirkshoop commented 7 years ago

@per-gron, nice! yes I am working on a Single prototype that avoids the allocations as well. I changed the contract a bit to achieve this..

In doing so it occurred to me that with back-pressure. subscribe no longer invokes the work, request invokes the work. subscribe is just constructing the subscription instance, deferring the work until the call to request.

per-gron commented 7 years ago

@kirkshoop Fyi this is me seeing if there is any interest in Reactive Streams from the gRPC community: https://github.com/grpc/proposal/pull/33

akarnokd commented 7 years ago

RxJava 2 is Reactive Streams compliant and there is an influence path (among other paths) of RxJava 0.x -> Reactive-Streams -> RxJava 2 as the concept and implementation details were fleshed out. A separate backpressure enabled type is preferable to avoid "upsetting" existing users.

Note also that RxJava 2 redesigned the traditional non-backpressured Observable type by adding an inversion of control for cancellation: the onSubscribe(Disposable) allows synchronous cancellation without the need to go async. I was unable to convince the Rx.NET guys about this change and the benefits of a streaming-backpressured type (instead of a one-by-one asnyc enumerable) despite a proof such thing is possible.

I'm not a C++ developer and since I'm too much used to automatic memory management in Java/C#, I don't know how a Reactive-Streams supporting type could be implemented (i.e., how do you make sure a Subscription is properly freed?).

Bittuw commented 4 years ago

I will not offer anything new, but what about Spring Reactor? Given that I seen it's sources and new features is C++ like ranges and coroutines will be appeared in this year, I think that it's possible to porting Spring Reactor directly into C++ with some limitations. Or just using the experience with backpressure (My opinion - Spring Reactor is the best reactive/parallel library) from this library.