ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.88k stars 7.6k forks source link

Backpressure #1000

Closed benjchristensen closed 10 years ago

benjchristensen commented 10 years ago

Thus far Rx has left backpressure solutions "an exercise for the user". It can be done via manual feedback loops and operators like throttle, sample, window, etc.

The drawback of this is that it requires someone understanding the implication of every operator and when it might be async (buffer/queue) and requires the effort to create a feedback loop and hook it up correctly. Some use cases will probably always require this type of manual effort, but it's worth exploring whether we can solve the general use cases within Rx.

Most Rx operators are synchronous in nature meaning the producing thread does not return until the work is completed. The map operator is a good example. It does computation work to transform from T -> R and nothing else.

There are some operators though that are async in nature and involve unbounded buffers. These include observeOn, zip and merge (as of 0.17.1 which means flatMap is affected). It also happens if an Observer is writing the data out to non-blocking IO (such as a Netty channel).

In all of these example, if the producer is fast, such as an in-memory Iterable, loading a file, or a firehose of events over a network, buffer-bloat (and eventually OutOfMemory) can easily occur.

After some experimentation and talking with various individuals, teams and companies about this topic, a possible solution is the use of co-routines from the producing Observable that a Subscriber can request data from in batches.

Prototype Code

A prototype of this is being experimented with at https://github.com/benjchristensen/RxBackpressure though it is not yet complete (as of this writing).

The general idea is that an Observable.OnSubscribe implementation could register a producer co-routine with the Subscriber and then only push data down when the Subscriber has said how much it can receive. It becomes a conditional push-model. The co-routine can then be "parked" by the producer by finishing work and returning (releasing the thread) and then "unparked" or resumed by the Subscriber when it wants more. In this way no threads are blocked and only the amount of data the Subscriber can handle is sent. The same model can work across threads or across the network.

If the Observable chain has no async operators then it will be executed with an "infinite" request and behave exactly as it does today without any parking.

If an async operator is in the chain then it will "request" a batch size equaling it's internal buffer (say 128, 512, 1024 etc).

A simple producer of an Iterable would look like this (ignoring debates over naming and API):

public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {

    final Iterable<? extends T> is;

    public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
        this.is = iterable;
    }

    @Override
    public void call(final Subscriber<? super T> o) {
        // state is outside the co-routine below
        final Iterator<? extends T> iter = is.iterator();
        // define the co-routine
        Action1<Request> func = new Action1<Request>() {
            @Override
            public void call(final Request r) {
                if (!r.countDown()) {
                    return;
                }

                while (iter.hasNext()) {
                    final T value = iter.next();
                    o.onNext(value);

                    if (!r.countDown()) {
                        // we have delivered all that was requested
                        // so "park" be returning and releasing this thread
                        return;
                    }
                }
                o.onCompleted();
            }

        };

        // register co-routine with Subscriber
        o.setProducer(func);
    }
}

The observeOn operator is async and thus will request batches as needed. The relevant code looks like this:

private void pollQueue() {
    do {
        Object v = queue.poll();
        if (v != null) {
            if (v instanceof Sentinel) {
                if (v == NULL_SENTINEL) {
                    observer.onNext(null);
                } else if (v == COMPLETE_SENTINEL) {
                    observer.onCompleted();
                } else if (v instanceof ErrorSentinel) {
                    observer.onError(((ErrorSentinel) v).e);
                }
            } else {
                observer.onNext((T) v);
            }
        }
        requested--;
    } while (counter.decrementAndGet() > 0 && !observer.isUnsubscribed());
    if (requested == 0) {
        requested += SIZE;
        // request more (starting the co-routine up again)
        request(SIZE);
    }
})

The subscribeOn operator decorates the co-routine so scheduling is retained and is started on the desired thread each time rather than the consuming thread doing the work.

public void setProducer(final Action1<rx.Subscriber.Request> producer) {
    subscriber.setProducer(new Action1<Request>() {

        @Override
        public void call(final rx.Subscriber.Request r) {
            inner.schedule(new Action1<Inner>() {

                @Override
                public void call(Inner inner) {
                    producer.call(r);
                }

            });

        }

    });

}

The take and skip operators compose the batch sizes to adjust accordingly to what is being taken or skipped:

The Subscriber manages the life-cycle of when the co-routine is run: https://github.com/benjchristensen/RxBackpressure/blob/master/rx-bp-prototype/src/main/java/rx/Subscriber.java#L97

For example, if the final Subscriber in the chain is hit and no async operator was involved it immediately invokes the co-routine with "-1" request size so it is infinite.

Example Use Cases

Use case are being written here: https://github.com/benjchristensen/RxBackpressure/tree/master/rx-bp-examples/src/test/java/rx/examples

Handling Observables Without Co-routines

Implementing an Observable that supports backpressure is more complicated and not all will support it. In those cases the async operators will onError stating that an Observable is not respecting the backpressure and suggest resolutions (linking to documentation probably).

The two routes at that time are: 1) fix the Observable to support backpressure, or 2) use one of several backpressure operators that will be added such as:

An earlier prototype (not public) had zip functioning but the current code does not. We know how it is done and this model works with it but have not yet spent the time to re-implement it. We need to.

The biggest outstanding item is making merge work (and thus flatMap). This is the major hurdle of any backpressure solution working.

We (myself, @abersnaze, @headinthebox and others) have whiteboarded it and believe we have a solution but my schedule has not allowed me to code it. I have held off on writing this post as I wanted to code it up first, but since I still haven't had time I wanted to get this information public instead of holding it up.

The planned design for merge is that each Observable being merged would have it's own buffer (of say 128 items) and then the consumer would request n items which would come in round-robin fashion from all of the merged buffers. Each individual Observable would exert it's own backpressure upstream. Slow Observables may never fill their buffer while fast ones will.

We will not attempt to limit the horizontal growth (number of Observables being merged) but will limit the vertical growth (size of buffer for each Observable).

The challenge with this is making sure performance and fairness are balanced.

Next Steps

1) Finish prototype implementatation of merge and zip 2) Finish examples across threads and netwwork. 3) Bikeshed and debate APIs and naming conventions 4) Prove or disprove performance/functionality tradeoff. 5) Test long enough to convince ourselves we're not going to cause deadlocks/livelocks/hangs somewhere (co-routines not running).

I look forward to your help making this happen.

zsxwing commented 10 years ago

Does the source Observable need to guarantee to call setProducer? If not, looks setProducer can not be triggered automatically.

kirkshoop commented 10 years ago

The main issues that I have run into while trying to implement back-pressure involving continuations in rxcpp are; synchronization and additional complexity everywhere (it is not localized). Combined these really pessimize performance.

I think that there is a way to localize the complexity behind three interfaces. Two already exist; scheduler and connectable_observable. The new interface is resumable_observable which just adds pause() and resume() to observable. Edit: resumable_observable is not required of course, it is just a renaming of connectable_observable. The new name for connect() is resume() and the new name for unsubscribe() is pause()

The usage depends on the type of source.

Hot Observables

The pause() implementations that are meaningful for Hot Observables are; unsubscribe or buffer. These can be realized with some operators.

Cold Observables

The Hot back-pressure policies are available by applying publish() to turn the observable Hot, then use one of the Hot operators that return a resumable_observable.

The additional policy, which only applies to Cold observables, is to stop the production of the next value until resumed. This does require a continuation. Fortunately, Cold Observables already create a continuation per value. Cold Observables take a scheduler and call schedule in some form for each value. The continuation that is needed to support resumable_observable is passed to schedule already!

To exploit this a new regulator type wraps an existing scheduler and has methods get_scheduler(), pause() and resume(). The regulator scheduler keeps its own queue and schedules itself on the wrapped scheduler to pop the next item and run it.

A new operator, that takes the regulator and an observable, returns a resumable_observable. pause() will stop scheduling the regulator queue and resume() will start scheduling the regulator queue.

Join Operators

These would change to accept resumable_observables.

Cold Example

regulator<current_thread> r;
auto counter = range<int>(0, 100000000, 1, r.get_scheduler());
counter.skip(1).resume(r)
    .zip(counter.resume(r), [](int lhs, int rhs){return std::make_pair(lhs, rhs);})
    .subscribe([](std::pair<int, int> p){std::cout << p << std::endl;});

Hot Example

button_downs.resume_with_subscribe()
    .zip(button_ups.resume_with_subscribe(), 
        [](button_down lhs, button_up rhs){return std::make_pair(lhs, rhs);})
    .subscribe([](std::pair<button_down, button_up> p){std::cout << p << std::endl;});
benjchristensen commented 10 years ago

Does the source Observable need to guarantee to call setProducer?

No. It can continue to act just like right now. Supporting backpressure is optional. While discussing with @abersnaze and @headinthebox we contemplated using a minimum buffer size throughout the codebase (like 32, 64, or 128) so if an Observable is going to emit less than that it doesn't need to worry about back pressure at all.

If not, looks setProducer can not be triggered automatically.

What do you mean by this? If it's not set, there's nothing to trigger and the source Observable is responsible for emitting via onNext as normal.

benjchristensen commented 10 years ago

@kirkshoop Thank you for getting involved in the conversation!

I apologize that I don't fully understand the proposed implementation but I'll provide comments based on my interpretation.

We initially implemented the pause() solution and moved on from it after discussion with other teams (such as at Typesafe) who have used the pause() solution before and found issues with it. We instead inverted "pause/resume" to the batch continuation model.

The pause solution works very well within a single process where pausing can be fairly deterministic by sharing an atomic/volatile reference across thread boundaries. It does not work well across network boundaries though which is often the source of data requiring back pressure. It is far too non-deterministic to asynchronously request a source to "pause" and therefore still requires unbounded buffers or dropping data.

Also, anything that is modeled after ConnectableObservable is immediately not composable and requires awkward coding just like publish() and connect() requires.

Cold Observables take a scheduler and call schedule in some form for each value.

This is very inefficient and exactly what we have sought to avoid in RxJava, particularly as of version 0.17 where the Subscription is injected into the source so that a while loop can happen synchronously and still unsubscribe. Scheduling each notification is not something we do very often at all.

Also, this does not apply well to network requests, which are the source for most cold observables. Generally they are happening on a separate thread, or an event loop and the notifications are not rescheduled again on a separate thread. Nor do we use CurrentThreadScheduler which was significantly diminished in importance as of RxJava 0.17 (due to the inefficiency of it in normal cases). Thus, cold observables generally do not have a thread or event loop to "pause". Forcing them to do so to achieve back pressure would force everyone to pay a performance penalty even in the best case where back pressure is not needed.

synchronization and additional complexity everywhere (it is not localized). Combined these really pessimize performance.

What extra synchronization is required? The proposed solution (using continuations) does not change the threading model or introduce any extra use of schedulers/concurrency.

I am concerned with performance impact for cases that don't require back pressure, but thus far because of the batch request approach which then allows async onNext notifications, it does not seem to be a performance problem (still too early to perform all performance testing to know for sure though). The case where performance will be impacted are the very ones where buffer-bloat is a problem and it is purposefully slowing it down to the rate of consumption. The one area where performance is an interesting challenge (and will be with continuations as well) is non-blocking serialization: https://github.com/Netflix/RxJava/issues/998

kirkshoop commented 10 years ago

Okay I am caught up now. I was designing for a different system. :)

I think that we agree that we are happy with the existing Rx behavior for Hot Observables and that this back-pressure discussion is for Cold Observables.

It is far too non-deterministic to asynchronously request a source to "pause" and therefore still requires unbounded buffers or dropping data.

I disagree with this. The buffer can be bounded declaratively on the source operator. In fact, I argue that a declarative chunk-size and buffer-size, that is passed to Request in your design, provides two benefits.

  1. The appropriate chunk-size is dependent on the source, not the subscriber or operators. Modifications of the request value in Take and Skip would not affect the throughput of the source because the Request will always schedule chunks to keep the local buffer full, so they are not needed.
  2. Providing declarative sizes allows the Request to create overlapping calls for subsequent chunks (when the buffer has room for a chunk). I have a application where the latency for a request is 13sec but the data in that request only contains the data needed for the next 10sec. If the requests are not overlapped then the pipeline stalls horribly.

The buffer-size ensures that no data is dropped and that the size is bounded. A good minimum for the buffer-size is 1.5 * chunk-size.

I still think that pause() and resume() suffice as the control surface. They would not control the scheduling directly, they would synchronously control the buffer draining, which would indirectly trigger more chunks as the buffer emptied. The source should be paused synchronously in on_next when the declared input buffer for the subscriber is full. The source should be resumed when the buffer has space. This ensures that the buffers in the whole expression are bounded.

However, the buffer draining could be controlled with the current request(count) control surface as well. I still think it is important to change the Request impl to support overlapped chunk requests with declared sizes for the chunk and buffer.

I think that in addition to chunk-size and buffer-size, there should be two scheduler arguments. One for the overlapped chunk requests and the other for the buffer draining.

This is very inefficient and exactly what we have sought to avoid in RxJava

Yes, this is very inefficient. :) It added complexity to the scheduler in rxcpp to control the perf issue while still allowing current_thread to be fully supported. I have looked at the code for RxJava's approach now and I have a background mental task deciding whether to similarly limit support for current_thread in rxcpp. Thanks!

What extra synchronization is required?

The Request design does not have the same issues that I was running into with both the resumeWith + is_resumed and the 'allocateMarble + completeMarble' designs. Although the later would have had some interesting debugger applications.

benjchristensen commented 10 years ago

The source should be resumed when the buffer has space.

Agreed, but I don't understand how we would have resume() and request(count) in the same system. To me resume() and request(count) are competing implementations for the same thing – restarting work after it being paused.

It seems you are referring to having batch sizes when you say "the buffer can be bounded declaratively on the source operator" but then suggest we can work with just pause/resume and not request(count), so I'm not following.

What we're trying to achieve is that when 'resume` is invoked (whatever it is called) that is passes along how much space is in the buffer for the upstream to send.

Are you suggesting that only the IO boundary needs the request(count) functionality and everything operating internally in the same process then just use pause/resume to drain from the IO Observable buffer?

I think that in addition to chunk-size and buffer-size, there should be two scheduler arguments. One for the overlapped chunk requests and the other for the buffer draining.

We attempted this in an early prototype and it became ridiculously complex because the co-routine state now needed to be thread-safe and handle concurrent execution. I'm open to it as I agree it would be good, but it's not clear yet how to make it simple to implement and I haven't spent enough time to try and determine alternative implementations.

To understand what I mean, take a look at the OnSubscribeFromIterable example at the beginning of this issue, it has this line in it:

final Iterator<? extends T> iter = is.iterator();

If we allowed overlapped requests in this example the producer could no longer use a simple iterator.

The buffer can be bounded declaratively on the source operator.

Since I'm obviously not quite understanding, can you elaborate more on what you mean by this and how it interacts with resume/pause/request(count) please?

kirkshoop commented 10 years ago

I don't understand how we would have resume() and request(count) in the same system. To me resume() and request(count) are competing implementations for the same thing – restarting work after it being paused.

I am suggesting that a cold observable that supports back pressure has both a producer and a consumer inside.

The producer generates requests for a chunk-size of items and the consumer stores a buffer-size of items.

Subscriber::setProducer(void(Request, Subscriber){}) and int Request::get_chunk_size() are the surface of the producer.

pause(), resume() and optionally request(count) are the surface of the consumer. The internal consumer is exposed as the Observable that can be subscribed to.

resume, pause and request(count) all control the draining of the buffer in the source operator.

What we're trying to achieve is that when 'resume` is invoked (whatever it is called) that is passes along how much space is in the buffer for the upstream to send.

Conceptually request(count) is just resume() -> count * on_next -> pause(). Once the source has separated the producer from the consumer what benefit is achieved by passing the count up?

benjchristensen commented 10 years ago

Thanks for explaining. I need to think about this more and trace it through the composition of merge, zip, skip, take, observeOn, subscribeOn to understand. It won't happen quickly unfortunately as I'm wrapping up other things and traveling to London in a couple days for a conference.

My initial reaction though is that we shouldn't have both pause/resume and request(count).

kirkshoop commented 10 years ago

traveling to London in a couple days for a conference

Me too - will you be at React?

My initial reaction though is that we shouldn't have both pause / resume and request(count) .

I agree.

Complexity of source operator implementation

I was thinking more about the operator implementations in this design and it changed the design a bit.

range should not be overlapped or buffered. iterator should not be overlapped and should be buffered (I am thinking of a generator that takes time in getNext).

Thus, I think that a declarative max-concurrency parameter to the producer is also required. iterator and range would set it to 1 and neither would have to constrain the chunk or buffer sizes.

To simplify the implementation of the operators supporting overlapped calls:

Add a function Subscriber::setStateSelector<T>(T(T){}) and change Subscriber::setProducer<T>(void(T, Subscriber){})

The producer function calls

state = stateSelector(state); 
bufferStart = bufferNext;
bufferNext = bufferStart + chunk_size;
produce(state, new BufferSubscriber(buffer, bufferStart, bufferNext);

The producer will not overlap calls to stateSelector and the BufferSubscriber will control the synchronization of the buffer access for produced values.

The produce and stateSelector functions supplied by the source operator can choose to share the same state across overlapped calls and deal with the sync themselves or stateSelector can choose to return an independent state value so that produce does not synchronize at all.

benjchristensen commented 10 years ago

Me too - will you be at React?

Yes I will be. If you're there we'll definitely need to meet.

alexandru commented 10 years ago

IMHO, both pause/resume and request(count) is trying to shove a square peg into a round hole. I've been thinking about an alternative design inspired by alternative abstractions for handling streams:

  trait Observer[-T] {
    def onNext(elem: T): Future[Unit]
    def onError(ex: Throwable): Unit
    def onComplete(): Unit
  }

So basically Observer.onNext can tell the Observable about when it is ready for more by means of its return type, a Future. For the Observable itself, nothing much changes, i.e...

UPDATE: because I pulled this out of my arse, haven't thought about the implications - but the above doesn't solve the problem, still thinking about it.

benjchristensen commented 10 years ago

Hi @alexandru, thanks for getting involved.

In early prototypes I tried making 'onNext' return 'Future' and it became unwieldy very quickly. However, it still feels like it may be a valid approach. I too am considering this more.

As a side note, one practical concern is memory impact of allocating a future for every onNext.

alexandru commented 10 years ago

@benjchristensen you're right, it would be more heavy and I believe this would go against the design goals of Rx.NET / RxJava, as you'd have to make the whole abstraction to be entirely async. I personally prefer it that way, for the use-cases that are of concern to me, but it would make it impractical for certain use-cases for which Rx was designed for (the usual pick your poison applies).

I'm exploring it for a bit, asking for feedback on the scala-user mailing list, so if you can tolerate Scala code and my strong opinions, here are some details I've thought about: https://groups.google.com/forum/#!topic/scala-user/ckgrXz_4F_A

alexandru commented 10 years ago

@benjchristensen in case you're interested, I have a proof of concept. flatMap was a little tricky to implement, but I think I got it:

https://github.com/alexandru/monifu/blob/e2c756a3fd5018c87159d9ae5a3f6ff1487aefc3/monifu-rx/src/shared/scala/monifu/rx/async/Observable.scala#L135 (link update Apr 11)

The interesting difference is that the obvious flatMap implementation behaves like you'd expect from Stream or Iterable, as in, as observables are emitted by our source, new subscriptions only happen after the previous ones are done, so everything is ordered and for example this producer would generate elements once per second:

Observable.fromTraversable(0 until 1000)
  .filter(x => x % 5 == 0)
  .flatMap(x => Observable.interval(1.second).take(5).map(y => x + y))

To implement flatMap with the behavior from RxJava, I would need to do buffering, whereas RxJava needs to do buffering for the above behavior.

benjchristensen commented 10 years ago

Thanks @alexandru for this. I'm just catching up after traveling so I probably won't have time today to review this but I definitely will.

I also had the chance to talk with the Akka team (Roland, Viktor, Jonas) and @kirkshoop from Microsoft working on RxCPP and got some interesting implementation ideas, so there are several things I need to prototype and report on here when I get through my backlog.

alexandru commented 10 years ago

@benjchristensen no rush, it's good to evaluate all options and I'd be glad if I could give you some ideas to make RxJava better. As I was saying, I'm not seeing you implementing my solution, because for it to work well it means to turn it into something that's maybe not Rx.net anymore - and maybe you got better ideas already.

Btw, another idea I had as part of my toy project is to provide 2 implementations for Observers and Observables - one synchronous and one asynchronous, much like how Scala separates between immutable and mutable collections, since sometimes you can't have a one size fits all.

The asynchronous observer would be:

trait Observer[-T] {
  def onNext(elem: T): Future[Ack]
  def onError(ex: Throwable): Future[Unit]
  def onCompleted(): Future[Unit]
}

The synchronous observer would be:

trait Observer[-T] {
  def onNext(elem: T): Ack
  def onError(ex: Throwable): Unit
  def onCompleted(): Unit
}

Where Ack is basically an acknowledgment flag with which the Observer can send messages to upstream:

sealed trait Ack
object Ack {
  case object Continue extends Ack
  case object Stop extends Ack
}

So you return Continue if you want to receive the next element, or Stop to inform the source that you want the stream to stop (as an alternative to calling onCompleted and relying on auto detaching). In case of the asynchronous Observer, it's a Future[Ack] and the rule is ... until the Future isn't completed, then the source doesn't send. And because I added it in the synchronous version - the interesting thing is that this Ack message makes jumping threads harder - e.g. there's no room for subscribeOn or listenOn anymore.

But that's OK, because for async stuff, you've got the async version that I think will work much better. To be honest, I was a little afraid that it won't work before implementing flatMap, but if flatMap is possible, then everything is :-)

Btw, I'm not sure how familiar you are with Scala's Future/Promise, but their design is really sweet and it's the reason for why my implementation was doable in a short amount of time. In case you'll look at my code and don't understand something, I'll be happy to reply.

benjchristensen commented 10 years ago

The work on this issue for supporting back pressure relates to interoperability work being done as part of http://www.reactive-streams.org which is "an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure on the JVM."

benjchristensen commented 10 years ago

Discussion from https://github.com/Netflix/RxJava/issues/1001#issuecomment-44448887:

However, backpressure is a huge undertaking; most of us need to learn a new paradigm and way of coding, and it will certainly affect all operators. Even if I had the time, I couldn't make it under a month, not to mention, in a battle-ready fashion.

In what way is this considered a new way of coding? I also don't see it affecting all operators, only the ones that are async (like zip, merge, observeOn).

akarnokd commented 10 years ago

There are other complicated cases:

headinthebox commented 10 years ago

Agree with @akarnokd and @benjchristensen; adding back pressure leads to a rather different API with fewer operators, so I think it is better to consider it as a separate effort.

benjchristensen commented 10 years ago

Progress on developing this solution can be seen at https://github.com/Netflix/RxJava/pull/1382

benjchristensen commented 10 years ago

Progress on this can now be seen in 0.20.0-RC1 being released today.

benjchristensen commented 10 years ago

Release notes for 0.20.0-RC1: https://github.com/Netflix/RxJava/releases/tag/0.20.0-RC1

DavidMGross commented 10 years ago

I'm trying to sketch out some wiki documentation for this as well. But I'm not entirely clear as to what's going on, so the page now is in a sort of "thinking aloud" state. I'd appreciate some feedback.

https://github.com/Netflix/RxJava/wiki/Backpressure

In particular, under what circumstances do creators of new observables and operators need to implement Producers and what other steps do they need to take to support backpressure (or to indicate that they don't support it)? Who calls a Subscriber's setProducer(p)/onSetProducer(p) methods and why?

On Tue, Jul 15, 2014 at 10:30 AM, Ben Christensen notifications@github.com wrote:

Release notes for 0.20.0-RC1: https://github.com/Netflix/RxJava/releases/tag/0.20.0-RC1

— Reply to this email directly or view it on GitHub https://github.com/Netflix/RxJava/issues/1000#issuecomment-49065540.

David M. Gross PLP Consulting

benjchristensen commented 10 years ago

@DavidMGross I'll need to spend more time on the documentation in the next week or two with you.

benjchristensen commented 10 years ago

0.20.0-RC2 successfully passed a 24 hour production canary on the Netflix API.

Can anyone using Android please test 0.20.0-RC2 and report back on issues if any? In particular we need to make sure merge/flatMap work okay as they have fast paths that conditionally use sun.misc.Unsafe that won't work on Android if the conditions are not correctly using alternate data structures for Android.

Any other issues related to 0.20 changes that need to be solved before releasing 0.20?

benjchristensen commented 10 years ago

@DavidMGross I have opened https://github.com/Netflix/RxJava/issues/1541 to track and discuss documentation.

benjchristensen commented 10 years ago

Marking this done. There are other open tasks to complete some operators, but the foundation of "reactive pull" backpressure is in place and functioning with major operators like merge, zip and observeOn.

codetinkerhack commented 10 years ago

@benjchristensen I was thinking one of possible solutions already exist and implemented in TCP flow control. Amount of data sent by producer is contolled by window size that is established dynamically based on the how fast consumer acknowledges the receipt and network condition (if anything not being acknowledged). So throttling is done dynamically based on the behavior of the consumer. Having said that considering that we do not need to wait for individual acknowledgement (as in TCP we considering unreliable network environment) all we need is to implement a counter that would simply add packet IDs when packets sent to consumer and reduce when packet acknowledged (so one accumulator instead of per individual packet Future). So if the number growing too fast meaning consumer is not able to consume fast enough. If the number stays near 0 - consumer either consuming too fast and we could send a bit more or we found equilibrium - rate when consumer is consuming at comfortable rate. By right both producers and consumer should be aware of this rate or we end up in situation when we have data growing in a stream buffer. This may sounds like a big bull... considering i am writing after consuming a considerable amount of alcohol on Sat night... Apologies in advance if I am interfering. :)

abersnaze commented 10 years ago

We definitely thought about TCP sliding windowing when coming up with what back pressure for Rx would look like. I'm not sure we need an actual ID for each onNext(T t) because we don't have to worry about reordering the packets. The equivalent of the ACK is the onNext(T t) returning. I've thought about encapsulating more of the flow control logic inside the RxRingBuffer so that it could dynamically resize the ring (aka sliding window).

In addition to the speed of the producer and consumer the algorithm should also optimize for the producer latency (the time it takes from consumer calling request(long n) and the first time the producer calls onNext(T t)). It probably makes sense to measure producer latency in terms of consumer processing speed. So if the consumer can process 10 onNext(T t)'s in the time it takes the producer to respond then producer latency would be 10. When the ring buffer gets to less than 10 items to consume then the request(n) should be sent. This way the ring buffer probably won't be completely empty, causing the consumer to wait.

benjchristensen commented 10 years ago

@codetinkerhack Thanks for getting involved and your thoughts on this.

You can read more about some of the design discussions that inspired this model on the ReactiveStreams project, including this issue: https://github.com/reactive-streams/reactive-streams/issues/62 TCP windowing has been evaluated (as per the comment from @abersnaze who has been involved in this design, and in the ReactiveStreams project).

Think of the Producer.request(n) model as "async Iterables" that allow pulling data. That is what happens when a consumer is slow and the Observable emits a Producer. This is different than TCP requirements on top of potentially unordered, lossy packets.

A sophisticated consumer could dynamically size the requests, but so far in practice I have not seen a need for this, but I imagine a use case will arise at some point. Currently, the ability for n to be arbitrary and dynamic is really only beneficial for 2 things that I'm aware of:

1) an arbitrary size > 1 for batching requests to something that may be costly over an async boundary. 2) allowing a requested value of n to be divided across many sources such as on the concat operator.

Whether or not performance gains can be achieved via dynamically changing the requested amount is still left to be proven. As for over network boundaries, TCP will not be replaced by this, but the Producer.request values will allow application level decisions to batch requests for data.

ldaley commented 10 years ago

The link from http://www.reactive-streams.org/ leads me here. Are there adapters for the org.reactivestreams.* types as part of this work?

benjchristensen commented 10 years ago

The adapter has not been created yet. I've been meaning to but haven't gotten to it yet.

The intent is for there to be an rxjava-reactive-streams module inside rxjava-contrib.

polytypic commented 9 years ago

Hello! In the past couple of months I've put some time into developing a library for reactive/responsive programming based on a pull model called choice streams. I believe that you might find it interesting.

Here is a little bit of prose on the idea:

https://github.com/Hopac/Hopac/blob/master/Docs/Inverting.md

I must apologise that the above document is somewhat out of date with respect to my improved understanding of Rx and contains a couple of incorrect assertions.

And here is the current reference documentation on choice streams:

http://hopac.github.io/Hopac/Hopac.html#def:type%20Hopac.Stream.Stream

Summary: It turns out that it is quite possible to do responsive and reactive programming with a pull model. All you need is lightweight threads, lazy futures/promises and non-deterministic choice. Even operators like groupBy and operations like delay (I call it shift) can be implemented for choice streams. Many operations appear to be significantly simpler to implement on pull -based streams than on push -based streams. The main feature lacking from choice streams is the subscription protocol, which means that operations like Using cannot be implemented with comparable semantics.

Please note that choice streams are still work-in-progress and, compared to Rx, there are a few combinators missing that I haven't yet had the time to implement, although I'm confident there will be no problems. I'm already using choice streams in an actual application for reactive/responsive programming and they have worked without problems.

benjchristensen commented 9 years ago

Hi @VesaKarvonen The Reactive Streams and Rx model supports both push and pull to achieve the backpressure. Adopting a completely pull model defeats the point of Rx as that would eliminate all of the data sources that are "hot" that fire events without regard for when we pull from them. A pull-based stream is an Iterable, even if it is an AsyncIterable.

RxJava has created push/pull implementations of publish/groupBy/merge/etc that all change from push to pull when the downstream can not keep up with push. This may be of interest to you based on your comments in the Inverting document about groupBy and merge. This has all occurred since Erik mentioned the "don't know how to do groupBy" comment. Indeed groupBy was tricky to get right with backpressure, but it works with push and pull and moves between them as necessary.

Your document also references closing over locks ... that does not happen in RxJava. It did in very early versions, and it may still do so in Rx.Net, but a lock is never held while emitting in RxJava.

The Reactive Streams Spec is a good document with over a year of collaboration amongst several companies that is applicable to this topic: https://github.com/reactive-streams/reactive-streams#goals-design-and-scope

polytypic commented 9 years ago

Hi @benjchristensen,

Hi @VesaKarvonen The Reactive Streams and Rx model supports both push and pull to achieve the backpressure.

I assume the "Reactive Streams" model refers to the specification in

https://github.com/reactive-streams/reactive-streams

What do you mean by the "Rx" model?

If, by Rx, you refer essentially to the interfaces IObservable and IObserver of .Net, then I don't see how they could support the same push-pull model as Reactive Streams.

Note: My point here is just to distinguish between the different models. In the rest of this reply, I will use "Reactive Streams" to refer to the referenced specification and "Rx" to refer to the .Net interfaces.

Adopting a completely pull model defeats the point of Rx as that would eliminate all of the data sources that are "hot" that fire events without regard for when we pull from them. A pull-based stream is an Iterable, even if it is an AsyncIterable.

Here the definition of "completely pull model" is significant. Choice streams do support generating a stream based on push events. You probably understand this already, but let me elaborate on this.

An ordinary singly linked list could be implemented using a pair of types that roughly looks like

class List<X> {
  volatile Cons<X> list;
}

class Cons<X> {
  X       head;
  List<X> tail;
}

in a Java style syntax. Suppose one has a method OnClick that is called in response to external events (or is "pushed to") such as button clicks. One could then generate a list clicks

List<Button> clicks = new List<X>(); // clicks.list == null

in that method by writing

void OnClick(Button theButton) {
  List<Button> newClicks = new List<Button>();
  Cons<Button> newCons = new Cons<Button>();

  newCons.head = theButton;
  newCons.tail = newClicks;

  clicks.list = newClicks;
}

One can then observe the list being generated from another thread by following the list references:

List<X> cursor = clicks;

while (true) {
  while ( cursor.list == null ) { /* wait */ }
  System.out.println( cursor.list.head.toString() );
  cursor = cursor.list.tail;
}

Of course, the above sketch is an absolutely silly way to do things, but I hope that this example is simple enough to be understood.

Now, the choice stream type is only a small change from the above List<X> type and would look roughly like

class Stream<X> inherit Promise<Cons<X>> { } // Read: inherit = typedef

class Cons<X> {
  X         head;
  Stream<X> tail;
}

in a Java style syntax. The Promise<X> type supports a number of operations, including the ability to create an empty "unfulfilled" promise and an operation to "fulfill" such a promise:

class Promise<X> {
  Promise();  // Create a new unfulfilled promise
  // ...
  void fulfill(X value);
  // ...
}

Using these types, you can similarly generate a stream:

Stream<Button> clicks = new Stream<Button>();

void OnClick(Button theButton) {
  Stream<Button> newClicks = new Stream<Button>();
  Cons<Button> newCons = new Cons<Button>();

  newCons.head = theButton;
  newCons.tail = newClicks;

  clicks.fulfill(newCons);
  clicks = newClicks;
}

I hope this helps to understand the very simple model underlying choice streams and one way to generate such streams.

Basically, choice streams are just linked lists of promises. A promise can be defined in a variety of ways (most of which I haven't talked about here), including as the result of a non-deterministic choice of two or more promises.

What is interesting, IMO, is that such a simple model, measurably one to two orders of magnitude simpler (roughly 500 lines in choice streams vs 30k lines in RxJava or 50k lines in .Net Rx), is able to support very similar styles of programming as Rx. In my opinion, the comment in

https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorMerge.java#L75

is quite telling. By comparison, many of the basic operations on choice streams are next to trivial to define. Here is merge

let rec mergeSwap ls rs =
  ls >>= function Nil -> rs
                | Cons (l, ls) -> cons l (merge rs ls)
and merge ls rs = mergeSwap ls rs <|>* mergeSwap rs ls

Also, compared to Rx, in particular, choice streams allow forms of composition that simply are not available in Rx. For example, the map operation (or Select) can be defined via unfold (Job = lightweight thread/async monad):

let mapJob f xs =
  unfoldJob
   (fun xs ->
      xs >>= function Nil -> Job.result None
                    | Cons (x, xs) ->
                      f x |>> fun y -> Some (y, xs))
   xs

Note that the above is also an asynchronous version of map as opposed to the synchronous version of Select in .Net Rx. It does not seem possible to implement Select in Rx in the above style in the original .Net Rx model. I believe the comment made by @headinthebox above about "fewer operators" is a realization of this property in conjuction with the Reactive Streams model.

I believe the Reactive Streams model supports similar forms of composition as choice streams, but at a huge increase in complexity.

RxJava has created push/pull implementations of publish/groupBy/merge/etc that all change from push to pull when the downstream can not keep up with push. This may be of interest to you [...]

Indeed! That is an interesting ability. In the choice stream model, as in the Rx model, there is no built-in mechanism for switching between push and pull models. The Rx model is fundamentally push and the choice stream model is fundamentally pull.

[...] based on your comments in the Inverting document about groupBy and merge. This has all occurred since Erik mentioned the "don't know how to do groupBy" comment. Indeed groupBy was tricky to get right with backpressure, but it works with push and pull and moves between them as necessary.

That is nice!

Your document also references closing over locks ... that does not happen in RxJava. It did in very early versions, and it may still do so in Rx.Net, but a lock is never held while emitting in RxJava.

That is also a cool feature. There is, however, a fundamental difference here between Rx/Reactive Streams and choice streams. In Rx/Reactive Streams the OnNext calls are synchronous and a subscriber must return quickly from OnNext or there will be trouble. In choice streams the act of producing a new element is fully asynchronous. Any number of consumers of a choice stream can be immediately dispatched when a new element is produced and, from the point of view of the stream producer and other consumers, it doesn't matter if some consumer is slower than others or even fails catastrophically (raising an exception).

At any rate, thanks for the reply, I believe this discussion helped me to better understand some further differences between the different models.

benjchristensen commented 9 years ago

If, by Rx, you refer essentially to the interfaces IObservable and IObserver of .Net, then I don't see how they could support the same push-pull model as Reactive Streams.

RxJava has advanced beyond Rx.Net and supports the Reactive Stream style backpressure.

benjchristensen commented 9 years ago

Basically, choice streams are just linked lists of promises.

I have played with approaches like this and found the object allocation overhead to be far too restrictive on the JVM. Perhaps with value types it would be more viable, but right now I don't see how this would be done without measurable impact on throughput.

How do you address that?

How does your model differ from CSP or Actors? The fact that every stream is a queue (linked list) suggests that it is very similar.

polytypic commented 9 years ago

I have played with approaches like this and found the object allocation overhead to be far too restrictive on the JVM. Perhaps with value types it would be more viable, but right now I don't see how this would be done without measurable impact on throughput.

How do you address that?

At the present, choice streams are a library built on top of a concurrent programming library and there are only some minimal manual optimizations done on the choice stream implementation. The concurrent programming library, Hopac, on which choice streams are written in is, however, aggressively optimized.

I haven't yet done comprehensive benchmarking or performance testing (and subsequent optimization) on choice streams. A couple of very simple tests suggested that choice streams are competitive with .Net Rx in terms of performance, but I really wouldn't draw any conclusions from those yet. More comprehensive benchmarks and performance tests are needed.

Theoretically, it should be possible to implement choice streams so that they are nearly as efficient as regular singly linked lazy lists: single allocation to add an element to a choice stream, zero or one allocations to request an element from a stream. This requires either a very good optimizing compiler (e.g. MLton) or manual specialization of the stream representation, ultimately fusing the Cons and Promise types together.

How does your model differ from CSP or Actors? The fact that every stream is a queue (linked list) suggests that it is very similar.

The design of the Hopac library is heavily influenced by Concurrent ML. CML can be seen as being based on pi-calculus with non-deterministic choice. More recent languages/libraries like Go and Clojure core.async said to be based on CSP can basically be seen as providing a subset of CML—they lack the built-in notion of first-class events (called alternatives in Hopac) and negative acknowledgments.

A choice stream can basically be seen as a kind of multicast communication mechanism allowing one-to-many and many-to-many messaging, with the property that all receivers (Rx subscribers, choice stream consumers) get all the messages (after the point they start receiving). In Reppy's Concurrent ML book there is an implementation of multicast channels that can be seen as a particular use case of choice streams.

It should be straighforward to encode choice streams in terms of pi-calculus, CSP or join-calculus, possibly even with more limited actor models. However, I don't really think of them as being related. Choice streams are about broadcasting messages with ordering and delivery guarantees to any number of processes, which is not a basic mode of communication for those process calculi.

benjchristensen commented 9 years ago

Thanks for the further information. I'll think about what you've shared.

exceptionplayer commented 6 years ago

excellent