Open NiteshKant opened 9 years ago
@smaldini and I were discussing backpressure with merge
the other day and I agreed to share some details of what I've learned while implementing the beast in RxJava.
A concern brought up was the default behavior of RxJava merge
to allow unbounded horizontal growth. This is not accidental and is permitted by default to prevent deadlock. Here are use cases and options we have considered:
I think of buffers in a streaming system in two directions, vertical and horizontal.
The vertical buffering is the pub/sub relationship and the one we generally think of. For example:
Observable.range(0, 10000).observeOn(Schedulers.io()).subscribe(observer);
This goes from "top" to "bottom" and has a "vertical" buffer inside observeOn
.
All "vertical" buffers in RxJava are bounded. This is the Reactive Streams backpressure semantics. No producer can overwhelm a consumer.
Horizontal buffering comes when composing multiple streams. For example:
Observable s1 = Observable.range(0, 10000);
Observable s2 = Observable.range(0, 10000);
Observable.merge(s1, s2).observeOn(Schedulers.io()).subscribe(observer);
In this case we have bounded vertical buffering on both s1
and s2
and 2 streams buffered "horizontally".
If I artificially bound the horizontal buffering a system can deadlock. Here is an example showing how:
Observable s1 = Observable.never(); // simulate a very latent or empty stream that never completes
Observable s2 = Observable.never();
...
Observable s10 = Observable.just(1);
Observable.merge(s1, s2, ..., s10).take(1).observeOn(Schedulers.io()).subscribe(observer);
If merge
limits the number of concurrent subscriptions to 8 (such as number of cores) then this will deadlock because:
This behaves the same if I structure it as an Observable<Observable<T>>
as under the covers it all becomes an Observable<Observable<T>>
with request(n)
semantics:
Observable s1 = Observable.never(); // simulate a very latent or empty stream that never completes
Observable s2 = Observable.never();
...
Observable s10 = Observable.just(1);
Observable<Observable<?>> os = Observable.from(s1, s2, ..., s10);
Observable.merge(os).take(1).observeOn(Schedulers.io()).subscribe(observer);
Even though we can use request(n)
semantics on this Observable<Observable<?>>
it is not approprate to do so for the merge
case unless we know for sure that all of them are emitting data equally and completing.
A similar issue is with long-lived streams, fairness and starvation.
Observable s1 = Observable.interval(1, TimeUnit.SECONDS).map(i -> a());
Observable s2 = Observable.interval(1, TimeUnit.SECONDS).map(i -> b());
...
Observable s10 = Observable.interval(1, TimeUnit.SECONDS).map(i -> j());
Observable.merge(s1, s2, ..., s10).take(1).observeOn(Schedulers.io()).subscribe(observer);
If concurrency is limited to 8, we will receive data from functions a()
through h()
but never from i()
or j()
since they are infinite streams and the first 8 never complete.
For these 2 reasons we can not artificially limit the horizontal buffering. If a developer merges n
streams, we must subscribe to all n
streams.
There are however use cases where this results in horizontal buffer bloat, primarily when trying to do computationally driven parallel processing such as this:
Observable.range(0, 50000000)
.window(500)
.flatMap(work -> {
return work.observeOn(Schedulers.computation()).map(item -> {
// simulate computational work
try {
Thread.sleep(1);
} catch (Exception e) {
}
return item + " processed " + Thread.currentThread();
});
}).toBlocking().forEach(System.out::println);
In this example it will horizontally buffer bloat and subscribe to 1000s of windowed Observables concurrently and start queueing it all up on the computation
Scheduler. Not at all what is wanted.
It will run through 50,000,000 items very quickly and turn this into 10,000 windows and all of them will be enqueued. An infinite stream would be far worse.
In this case we leave it to the developer to specify that they want to limit concurrency using mergeMaxConcurrent
or flatmap(o, maxConcurrent)
by adding a parameter like this:
Observable.range(0, 50000000)
.window(500)
.flatMap(work -> {
return work.observeOn(Schedulers.computation()).map(item -> {
// simulate computational work
try {
Thread.sleep(1);
} catch (Exception e) {
}
return item + " processed " + Thread.currentThread();
});
}, 8).toBlocking().forEach(System.out::println);
This will now only process 8 windowed Observables at a time and correctly emit backpressure upstream so the 50,000,000 will emit only when downstream has processed and the next chunk is wanted.
The upside is that backpressure all composes here and does what is wanted. The downside is that the backpressure only really composes vertically, not horizontally. The horizontal backpressure needs to be decided upon by the developer.
I am not aware of an approach to automatically solve horizontal backpressure in stream composition use cases without risking deadlock or starvation. We determined that deadlock (especially the silent async variety) and starvation are far more damaging and difficult to debug that the rare time when manual concurrency limits are needed and thus we have chosen what we consider the safest default behavior.
The context above is shared as I see IO writes as the same. We have to subscribe to all of them and allow buffering per Publisher
. Backpressure will still exist vertically, but horizontal backpressure likely needs to be done by the developer if they are doing one of the edge cases where it is needed. This is because a Publisher
given to a write
does not necessarily need to emit and complete and thus deadlock and starvation are both possible.
This issue intends to discuss how would backpressure be implemented on writes to a
Connection
Key considerations
There have been some discussions around the same in issue #21 specifically in comments here and here