Closed benlesh closed 9 years ago
My opinion about back pressure is that you either have a truly reactive system where the producer is fully in charge and the onus is on the consumer to keep up; or a truly interactive the consumer is fully in charge and the onus is on the producer to keep up. This is one of the cases where having different types of collections makes sense (just like we have different collections like lists, maps, sets, ... with different performance characteristics and different operations). @benjchristensen has a different opinion on that.
Any system that has hidden unbounded buffers is a recipe for disaster. RxJava was rightfully dismissed outright by many because it would consume an entire stream into unbounded buffers in zip
, merge
and observeOn
. Adding backpressure to RxJava was painful (to figure out and retrofit) but has been worth it.
Just because some use cases should be "completely reactive" (such as mouse events), it does not mean that signals are not needed for backpressure. Even with mouse events, if the consumer is slow, an unbounded buffer in observeOn
, zip
or merge
should not be accumulating and the consumer silently falling behind and the application losing memory. It should be an error and blow up if the developer has not put in necessary flow control.
In other words, an application should not silently drop or do unbounded buffering data. It should always be explicit to do either of those things.
So even for the mouse event use case, the backpressure mechanism becomes a signal for flow control to kick in (conditionally drop, buffer, debounce, throttle, etc only when backpressure happens) or errors out and tells the developer to add flow control either conditionally (i.e. onBackpressure*) or permanently (temporal operators like debounce, sample, throttle, etc).
If the "reactive collection" doesn't have unbounded buffers, them I'm fine with it existing separately, but that means it can't have any async operators like merge
, observeOn
, or zip
– and we know that those are necessary for application use cases. Since we need async operators, and I think we should never have unbounded buffering, that means the "reactive collection" should support backpressure signals.
The signals can be ignored. And infinite
can be requested if backpressure is not wanted. But the API and semantics should support it so it composes through and the library can exist without silent unbounded buffering, memory leaks, or dropping of data.
I suggest that the collaboration for Reactive Streams hardened the API, interaction model, and contracts quite well.
It would look something like this in Javascript:
interface Observable {
subscribe(o: Observer<T>) : void; // or could return Subscription
lift(operator): Observable<R>;
}
interface Subscription {
unsubscribe(): void;
isUnsubscribed(): boolean;
request(n: number): void // optional if backpressure is added
}
interface Observer<T> {
onSubscribe(s: Subscription): void;
onNext(t: T): void;
onError(e: any): void;
onComplete(): void;
}
The consumer then can either invoke subscription.request(Number.MAX_VALUE)
if no backpressure is needed (synchronous firehose) or request up the number of elements that can be enqueued: subscription.request(n)
. It keeps invoking request
for more as it drains its internal queue and permits the producing Observable
to keep emitting more.
If the Observable
is hot, or can't otherwise be "paused", then it either blows up (i.e. BackpressuneException) or another operator composes in a flow control strategy (drop, buffer, throttle, sample, debounce, etc).
zip
in particular was naively implemented, and should be a synchronization primitive like CyclicBarrier
.
People have been using observeOn
for ages in the UI space with no problems (https://msdn.microsoft.com/en-us/library/system.threading.synchronizationcontext.post(v=vs.110).aspx) or https://docs.oracle.com/javase/8/javafx/api/javafx/application/Platform.html, but if you do not want buffering you can use https://msdn.microsoft.com/en-us/library/system.threading.synchronizationcontext.send(v=vs.110).aspx.
If you are doing a merge
of and infinite number of infinite streams that is as bas as doing unbound recursion, or allocating an unbounded number of objects. You make sure that this does not happen.
Also note that anytime you use say a http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html or a http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html you are using a hidden unbounded buffer.
Saying "unbounded buffer are verboten" is as fundamentalist as Haskell people saying all side-effects are bad, or the Akka folks saying everything is an actor with strict asynchronous boundaries, or Java insisting on checked exceptions. Programming is tastefully composing side effect, to write real stuff you have to break the rules, so you never say never, or always say always.
Also note that anytime you use say a Semaphore or a Executor you are using a hidden unbounded buffer.
I know. That's why RxJava never holds a lock when emitting, and never uses synchronized
or Semaphore
to serialize multiple streams, but uses explicit, bounded, queues. This is why merge
is an async operator that has buffers. It never blocks a thread, whereas Semaphores do – which is queueing.
Same issue with Executor
as you mention. Again why we never expose Scheduler
directly as a raw API in Observable sequences. The scheduling into the underlying unbounded queues of Executor
are always governed by the backpressure of the Observable sequence so they are bounded.
if you do not want buffering you can use https://msdn.microsoft.com/en-us/library/system.threading.synchronizationcontext.send(v=vs.110).aspx.
Using synchronization, and call-stack blocking to avoid observeOn
(or other buffers) is not an option when event-loops are involved, so they are not a solution. Event loops are almost always involved, particularly in Javascript. Thus, synchronous approaches to backpressure are not an option.
fundamentalist
This is not a fundamentalist approach. In fact, I'd argue that you are taking the fundamentalist approach claiming "reactive" requires unbounded buffers. I'm arguing against the "purity" of pure-push, and allowing the type to support moving between push and pull depending on the speed of the producer/consumer relationship. It is embracing the messiness of our applications and abstractions.
I do not say that "unbounded buffers are verboten", I say that hidden "unbounded buffers are verboten". If someone wants to say observable.onBackpressureBuffer().subscribe(o)
and explicitly put an unbounded buffer in the mix, then so be it.
People have been using observeOn for ages in the UI space with no problems
It is not correct to say "no problems". People work around them because they haven't been given tools to do otherwise. Naively consume a Websocket stream for example using RxJS in a browser. I myself have had these problems. Ben Lesh linked to someone else having similar issues in another environment RxJS will be used: Node.
We can state that an Observable
is ONLY reactive, without ability to support backpressure and flip into pull mode. But if that's the case, then the type is itself being fundamentalist and pushing people to find other solutions, which they will have to.
@headinthebox
the producer is fully in charge and the onus is on the consumer to keep up
@benjchristensen
In other words, an application should not silently drop or do unbounded buffering data. It should always be explicit to do either of those things.
I don't think you guys fundamentally disagree, but as you can see from the quotes, we just need to find a way how to determine consumer behavior when producer is too fast. Implicit ("hidden") has gotchas for developers. Explicit definition of lossy or lossless consumption should be how the consumer handles its onus.
@benjchristensen could you give us a few use-cases for backpressure we'll run into at Netflix? (since that's part of the reason I'm proposing this) and I know that you an @jhusain disagree on the need for backpressure in RxJS.
we just need to find a way how to determine consumer behavior when producer is too fast
Agreed, and I suggest that this is exactly what @headinthebox and I spent 9+ months with many in the RxJava community figuring out with "backpressure" which behaves as an upstream signal from the consumer to the producer. Hence my advocating for it here. It's something we have implemented and proven in production. It's not just a theory.
Here is more information on it:
@benjchristensen can we get some use cases specifically to help make sure we're making good decisions?
I mean, it seems like one could do a something like this primitive example to deal with backpressure to some degree:
var requestor = new BehaviorSubject(true);
source.buffer(requestor)
.flatMap(buffer => Observable.from(buffer)
.finally(() => requestor.next(true)))
.subscribe(x => console.log(x));
I'm not for or against the idea (I created the issue, afterall), but I want it considered up front.
@blesh yes, I can do that a little later tonight
@headinthebox going back to your point about having multiple collection types, I'd like to explore that idea with you. Let's say we had something like this:
interface Flowable {
subscribe(o: FlowObserver<T>) : void; // or could return Subscription
lift(operator): Flowable<R>;
}
interface FlowableSubscription {
unsubscribe(): void;
isUnsubscribed(): boolean;
request(n: number): void // optional if backpressure is added
}
interface FlowObserver<T> {
onSubscribe(s: Subscription): void;
onNext(t: T): void;
onError(e: any): void;
onComplete(): void;
}
Then Observable as the original and simpler:
interface Observable {
subscribe(o: Observer<T>) : void; // or could return Subscription
lift(operator): Observable<R>;
}
interface Subscription {
unsubscribe(): void;
isUnsubscribed(): boolean;
add(s: Subscription): void
remove(s: Subscription): void
}
interface Observer<T> extends Subscription {
onNext(t: T): void;
onError(e: any): void;
onComplete(): void;
}
Then on Flowable
we would NOT have temporal flow control operators like sample, throttle, debounce etc. On Observable
we would have the temporal flow control.
A Flowable
can easily convert into an Observable
that ends up fire hosing. An Observable
would convert into a Flowable
with a strategy given to it such as drop, buffer, debounce, sample, etc.
Is this what you're thinking instead of having one type support both use cases?
Use cases for backpressure:
range
This is a dumping of thoughts, some are more important than others, and all definitely don't apply to all environments (desktop browser, mobile browser, mobile app, Node.js server), but they are representative of the type of stuff we will deal with as we embrace more stream oriented programming and IO.
Okay, it doesn't seem like the use cases you're mentioning are at all edge cases. In fact, some seem fairly common at least in server-land where you want to maximize data throughput.
So I think what we need to weigh here is:
I'm not too worried about cost in terms of "size" of the library, because there are plenty of ways to get around that.
@blesh people are pretty satisfied with the current backpressure mechanisms with pausableBuffered
and controlled
given that they're hard but they'e been fixed.
Let me state upfront that I am not against backpressure, not at all. All the use case above make total sense.
What I am against is the way backpressure is implemented in RxJava/Reactive Streams where every operator has to know about it. That is clowny and inefficient.
Just like the selector function of map
or the predicate of filter
doesn't know anything about backpressure, neither should map
and filter
have anything to do with backpressure. They should just let the values flow through them as fast as they can.
To achieve backpressure you wire up things like a classical feedback system https://en.wikipedia.org/wiki/PID_controller. There the plant/process, consisting of Rx operators, just transforms input to output streams. It knows absolutely nothing about backpressure. That is all done by the feedback loop where at one point you measure the value of a setpoint (which has no "knowledge" of the plant/process) and use that to throttle the input source, which does know how to handle backpressure (and thus probably has an interface like Flowable
above).
Note that in the current ReactiveStream model, you still need such a feedback loop + a controller to know how much to request. If you ever request > 1, you run the risk that the consumer gets overwhelmed.
I find myself agreeing with the sentiment of both @headinthebox and @benjchristensen here. I can see that Ben is worreid about the numerous unbounded buffers that exist in Rx. I can see Erik's point that to introduce the concept of backpressure to most operators in Rx is bloated and distracting.
While I am unfamiliar with The PID controller design, I think this argument echos what we have found with the limits of where Rx is suitable. Rx is great for composing many sequences of events. Rx is not great at workflow. Rx is not great at parallel processing.
The concern for hidden unbounded buffers seems to more suitably addressed by introducing explicit buffers, however I dont think these belong in the Rx query. I believe that in your Rx flow, you compose the event streams as you need to and then at points where unbounded buffers can occur, you introduce an explicit queue. On the other side of the queue may be another Rx query, if may also have some logic to notify other systems that the queue is too full. My point is that this should be solved as a pattern, with education, not as a bloating to the Rx API.
Keep Rx a single direction event query API.
They should just let the values flow through them as fast as they can.
@headinthebox That is how how it's implemented in RxJava. Note that map (https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorMap.java) does not know anything about backpressure. Filter has to know a little (https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorFilter.java#L57) because it affects the stream (dropping data), but it is trivial. Data flows through them "as fast as they can".
To achieve backpressure you wire up things like a classical feedback system
I understand this argument as we've discussed it during the RxJava backpressure design last year. However, it does not work well in practice because hidden unbounded queues in merge
, observeOn
, and zip
require internal knowledge of operator implementations to know where and how to wire up the feedback loops. And the feedback loops must be manually wired up, so it's not something most people can successfully do.
The end result is people choose (generally rightfully) to not use Rx. This was a massive push-back I received for server-side use of Rx in Java, and the same will apply to the Node.js ecosystem (which I am now involved in building for).
the current ReactiveStream model, you still need such a feedback loop
The feedback loop is handled automatically within the operator implementations, generally by the internal bounded queue depth. And this only applies to async operators that have queues, such as merge
, zip
, observeOn
, replay
, etc. It automatically slows the producer to the rate of the consumer because all queues are bounded and it requests data to fill the bounded queues, while remaining async, unlike an Iterable, and batched, unlike an AsyncIterable.
@mattpodwysocki The pause
approach was prototyped in RxJava first, but it does not work over network boundaries. By the time the pause signal reaches the async producer, all buffers can already be either saturated or overwhelmed. The controlled
model is out-of-band and does not compose. It does meet the need to request data at a controlled rate, but for the same reasons I describe above, the feedback loop needs knowledge of all hidden unbounded buffers so it can inject itself correctly, and is thus error prone, difficult to compose and very manual to make work.
The ReactiveStreams/RxJava model has been proven to compose and work well in large (distributed Netflix API and stream processing) and small (Android) systems, enabling it to be built without hidden unbounded buffers, and working over async subscription boundaries, such as networks. Pausable
does not meet those needs, and controlled
does not compose.
@LeeCampbell thanks for getting involved!
Keep Rx a single direction event query API.
This effectively means RxJava v1 should not exist as designed and that most of the use cases I currently use Rx for should stop using Rx. Nor will I be able to use Rx for the stream-oriented IO I'm working on between Java and Node.js. The reason is that Rx without backpressure support can't easily compose with the needed "push/pull" semantics, so I'll end up instead using RxJava/ReactiveStream APIs and exposing those, and then for composition reasons, adding operators to that library and end up with a competitor to Rx whose only difference is that I support the upstream request
capabilities.
@LeeCampbell My original vision (which has not changed) for Rx was as a single direction event query API (it used to be called "LINQ to Events") where the producer determines the speed. For (async) pull, where the consumer determines the speed, we shipped IAsyncEnumerable
at the same time, plus push-pull adapters to convert between the various kinds of streams.
@benjchristensen I was just using map
and filter
as generic examples for operators, like foo
and bar
. And see one of my older entries for comments on zip
(if you want to combine streams as opposed to synchronize you should use combineLatest
), and observeOn
; note thatflatMap
technically does not have internal queues or buffers.
Adding backpressure to RxJava has yielded in a very long bug tail that is still dragging on. The implementation of merge
with backpressure is nearly 800 of very intricate LOC, the implementation of merge
without backpressure is about 30 trivial LOC. The other operators are similarly more complex, even innocent operators like take
and single
(need to) deal with backpressure.
By factoring out back pressure to a special backpressure sensitive source that can be sped up or slowed down controlled by a feedback loop, you factor out all that complexity into a single place. And, if you don't use it, you don't pay.
If we were to merge the Subscription
and Observer
types into one type, with lift
would it not be possible to subclass Observable
into a BackpressureObservable
(or the like), and use lift
to add a request()
method to Subscription
, then override operators such as zip
and the like on the BackpressureObservable
to get exactly what @benjchristensen is looking for, with no cost? @trxcllnt has already demonstrated that overriding lift
can allow one to compose types through.
A 30LOC implementation in Java would have to assume use of synchronized
or mutex locks which is wrong as it blocks event loops. It would also have to ignore many other performance improvements like ScalarSynchronousObservable
optimizations that have nothing to do with backpressure – but add LOC.
I will stop arguing this point. But without composable backpressure in RxJS I will not be able to use it for the Node.js apps I am building so will end up building a competitor.
Doesn't node.js already have a stream API https://nodejs.org/docs/latest/api/stream.html?
I would also like back pressure control compose through. @benjchristensen setting aside operator complexity, is it possible to quantify the runtime cost when it isn't used?
@blesh it doesn't seem like something you can solve with lift since it affects operator behavior. If I understand correctly, you'd have to override every operator on a BackpressureObservable to respect back pressure semantics.
On Jul 14, 2015, at 12:26, Ben Christensen notifications@github.com wrote:
A 30LOC implementation in Java would have to assume use of synchronized or mutex locks which is wrong as it blocks event loops. It would also have to ignore many other performance improvements like ScalarSynchronousObservable optimizations that have nothing to do with backpressure – but add LOC.
I will stop arguing this point. But without composable backpressure in RxJS I will not be able to use it for the Node.js apps I am building so will end up building a competitor.
— Reply to this email directly or view it on GitHub.
you'd have to override every operator on a BackpressureObservable to respect back pressure semantics.
That's exactly right. I was just trying to think of a compromise.
At this point, I'm generally for an implementation with a solid backpressure story. As long as the impact on performance isn't grotesque. I think there are plenty of things we can do to make sure the queuing internally is highly performant, such as leverage libraries like @petkaantonov's deque, which we should honestly be investigating regardless.
Right now we have unbounded arrays in operators like zip, and there are definitely plenty of real-world use cases where that becomes problematic. Also, I'm not a big fan of Iterator of Promise, or Promises in general, so I find the idea that I'd have to use something like that for backpressure control across network boundaries really gross.
I want to set up a separate issue to discuss joining the Subscription and Observer as a single type (with separate interfaces), as discussed above. I think that's a related issue that needs flushed out.
Adding backpressure to RxJava has yielded in a very long bug tail that is still dragging on
Yes, it was painful retrofitting it into existing code. We had 50+ operators all implemented over 18+ months without it and have had to spend the long effort to go back and rewrite them. We also had to make complicated concessions to design to add backpressure without breaking the existing APIs.
Starting from scratch is a very different thing, especially after having done it once.
Doesn't node.js already have a stream API
Yes it does. But from what I can tell it target byte streams, like File IO, not Objects. Nor is it as flexible as Rx from what I can tell looking at it's API.
is it possible to quantify the runtime cost when it isn't used
In JS I can't give useful information unless we start building and compare. In Java we of course are concerned with performance as well since we execute 10s of billions of onNext
a day in Netflix production through RxJava. We have seen backpressure as an incremental impact on perf for raw throughput, as there is more bookkeeping.
However, it has also allowed us to do optimizations that were unavailable before. For example, we always use pooled ring buffers now, instead of requiring unbounded buffers that can grow. So our object allocation and GC behavior is actually better. This took effort, and was important to achieve good performance, but was an option we didn't even have previously since we had to support unbounded growth.
I suggest two approaches to move forward:
1) 2 separate types living alongside each other, such as Flowable
and Observable
.
2) We implement Observable
with and without backpressure and see if there are performance or complexity reasons to have 2 separate types or if we can just keep them combined.
A note on AsyncEnumerable
/AsyncIterable
. I'm aware of this type and know that functionally it fits many of my use cases, but it is too inefficient. It allocates a Future
/Promise
per element. It's the same reason we didn't implement backpressure on an Observable
using Future<Void> onNext(T t)
. Theoretically it all hangs together. In practice, it is far too inefficient due to allocations and GC.
In JS I can't give useful information unless we start building and compare.
Nobody can, really. It'll obviously have some impact, but it's hard to say what. I'd like to make some decisions about #75 before implementing this, because I think it could drastically change the implementation. I could, of course, be totally wrong about that.
Here are some concrete numbers of throughput for merge
in varying scenarios (the scenarios matter a lot) for RxJava on my laptop. The code for the tests is here: https://github.com/ReactiveX/RxJava/blob/1.x/src/perf/java/rx/operators/OperatorMergePerf.java
These results show ops/second. I annotated the first few to show how many onNext/second it translates into.
Benchmark (size) Mode Samples Score Score error Units
r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 4567774.035 148897.669 ops/s // 4.5million onNext/second
r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 68620.728 2404.157 ops/s // 68million onNext/second
r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 84.931 3.133 ops/s // 84million onNext/second
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 104864.661 1911.610 ops/s // 104k onNext/second
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 5.592 0.215 ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4254142.048 103396.131 ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 526155.607 10843.846 ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 59030.208 3284.554 ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 4662177.459 190521.858 ops/s
r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 72.060 1.741 ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 84378.798 988.059 ops/s
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 3529.251 154.939 ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 4495344.037 99403.589 ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 45401.568 7270.911 ops/s
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 45.958 1.373 ops/s
Note of course that these async cases are demonstrating synchronization across multiple threads, which would not exist in Javascript.
This effectively means RxJava v1 should not exist as designed
Ok. ;-)
@blesh @trxcllnt and I spent some time at a whiteboard ... since JS is a single-threaded environment, we likely can have very good fast-path options for performance, even if we have a single Observable
type that caters to both.
The options I see right now after that conversation are:
Observable
without backpressure - purely reactiveSomeCleverName
- push/pull with backpressure signalsConverting between them would look something like this:
observable.toCleverName(Strategy.DROP)
observable.toCleverName(Strategy.BUFFER)
observable.toCleverName(Strategy.THROTTLE, 5_SECONDS) // pseudo code obviously
clearName.toObservable() // no strategies needed since it will just subscribe with Number.MAX_VALUE
This has an Observer
like this:
interface Observer<T> {
onSubscribe(s: Subscription): void;
onNext(t: T): void;
onError(e: any): void;
onComplete(): void;
}
This is a cleaner type model, but means a firehose Observable
has a touch more code than in option 3 below:
Observable.create(o => {
var s = new Subscription();
o.onSubscribe(s);
var i=0;
while(!s.isUnsubscribed) {
o.onNext(i++)
}
})
The backpressure capable case would look like this:
Observable.create(o => {
var requested=0;
var i=0;
o.onSubscribe(new Subscription(n => {
if((requested+=n) == n) {
while(!isUnsubscribed) {
o.onNext(i++)
}
}
}));
})
interface Observer<T> extends Subscription {
setProducer(s: Producer): void;
onNext(t: T): void;
onError(e: any): void;
onComplete(): void;
}
This is more complicated than option 2 as it involves both Subscription
and Producer
types, but means the firehose Observable
case can completely ignore creating a Subscription
and Producer
, like this:
Observable.create(s => {
var i=0;
while(!s.isUnsubscribed) {
s.onNext(i++)
}
})
The backpressure capable case would look like this:
Observable.create(s => {
var requested=0;
var i=0;
o.setProducer(new Producer(n => {
if((requested+=n) == n) {
while(!s.isUnsubscribed) {
s.onNext(i++)
}
}
}));
})
I suggest we start building option 2 and let performance numbers prove out whether we should end up with a single type or two types, and then pursue the bikeshedding on naming the thing.
I agree with @blesh that if we do option 1 that we should have both types in the same RxJS library so the community, mind-share, collaboration, interoperability, etc all coexist. They will however be modularized so the artifacts can be published and depended upon independently.
Anyone have a recommendation for a better way forward?
@benjchristensen, per our in-person discussion with @trxcllnt, I'd like people to better understand the implications backpressure has on groupBy
and merge
in particular.
For example, we can fast-path merge
if the consumer requests Infinity
, but if you have a zip
after the merge
, even if you request Infinity
, you're going to create a slightly slower path in the merge
, because it will be under backpressure control everywhere above zip
(to prevent unbounded buffers in zip
)
I'm not saying this is good or bad, I'm saying it's the nature of real backpressure control that actually composes.
Also, some "Netflixy" context:
@benjchristensen's team has a very real need for an Observable type with backpressure. As such, this type WILL exist. And it can exist in one of three ways:
@trxcllnt and @jhusain have need for an Observable type that has very short function call stacks and is very, very fast. This is because they need an Observable that will operate in non-JIT'ed environments on very weak hardware (That cheap smart TV you bought three years ago, for example).
... so this decision will not be met with any small amount of debate here at Netflix. I'm really hoping the larger community of GitHub comes in handy with thoughtful comments and concerns on both sides.
Speaking more with @jhusain, I'm fine with RxJS not including backpressure for now. I will use alternate APIs (i.e. Reactive Streams) at the network protocol layer that expose backpressure over the network and then bridge into RxJS which will request 'infinite'.
The primary use cases right now in JS are tackling small enough datasets that they can be swallowed into buffers. Focusing on a highly performant RxJS is higher priority than supporting backpressure at this time, even if separated into a different yet-unnamed type
JS being primarily used to build user interfaces (and 90+% on the average browser if you consider the whole community), backpressure should not be a priority. If a push/pull sequence with backpressure is needed on Node.js, that isn't a use case for Rx. It's pretty "cheap" to create yet another JS library (much cheaper/quicker than to create a Java library, for instance), I don't see why force Rx to solve all use cases.
+1
Since @benjchristensen was the primary advocate for this feature, I'm closing this issue for now. If at a later time we decide to add an additional type to support this, we can revisit.
Doesn't node.js already have a stream API
Yes it does. But from what I can tell it target byte streams, like File IO, not Objects. Nor is it as flexible as Rx from what I can tell looking at it's API.
@benjchristensen Just for documentation completeness, Node.js's stream API does support objects as well as byte and string streams. Just set objectMode: true when creating a stream. https://nodejs.org/api/stream.html#stream_object_mode
I do agree that the Node.js stream API is not nearly as flexible as Rx.
It's a real shame RxJS 5.0 doesn't yet support backpressure. It precludes its use on the backend, or on the front-end when you're producing data.
Simple example I wanted to use RxJS for and can't because of this: interactive CSV parse from DOM File
reference into batched HTTP uploads. Can't overwhelm the uploader during slow uploads consumer or we'll ENOMEM.
Reactive-programming without back-pressure is try { } catch(e) { /* meh, let's hope this doesn't happen */ }
style development.
@timruffles You can handle back pressure in RxJS by using a BehaviorSubject and building an Observable chain that subscribes to itself.
Something like this might do:
// this behavior subject is basically your "give me the next batch" mechanism.
// in this example, we're going to make 5 async requests back to back before requesting more.
const BATCH_SIZE = 5;
const requests = new BehaviorSubject(BATCH_SIZE); // start by requesting five items
// for every request, pump out a stream of events that represent how many you have left to fulfill
requests.flatMap((count) => Observable.range(0, count).map(n => count - n - 1))
// then concat map that into an observable of what you want to control with backpressure
// you might have some parameterization here you need to handle, this example is simplified
// handle side effects with a `do` block
.concatMap(() => getSomeObservableOfDataHere().do(stuffWithIt), (remaining) => remaining)
// narrow it down to when there are no more left to request,
// and pump another batch request into the BehaviorSubject
.filter(remaining => remaining === 0)
.mapTo(BATCH_SIZE)
.subscribe(requests);
@timruffles keep in mind, you might want to code in delays or pauses for whatever other processes you might need to run. When you do that, you need to do it within that concatMap, most likely. In some cases you might want to do it just before the subscribe
at the end. It all depends on what you're going for.
So you can do back pressure, it's just more explicit. Which has pros and cons, of course.
I hope this helps.
@timruffles I'd enjoy eventually adding RxJava-style compositional back-pressure support to RxJS, but I doubt I could get @blesh to merge the PR ;-). So instead here's another example of something sort of backpressure-y that sounds similar to your use case. Recomputing the the thresholds and limits on each new request would probably get it closer to something you could use.
Is there any mechanism for adding metadata to or classifying the hidden queue that forms behind a slow consumer? If there was a way to tag a consumer's queue, then a producer could specify rules which state "when a consumer downstream (or elsewhere in the same production) as me that is tagged with 'SLOW' has over n items in his queue, then I will be paused".
I'm thinking of using RxJS to control the virtual machine for a game-like system of mine. I can't predict the maximum number of graphics objects that a rendering frame will need to draw, because the user is in charge of that in the little programs they write to run on the virtual machine. My renderer is double buffered, and I'd like the virtual machine to become paused while there are more than 1 undrawn frames in the rendering queue waiting to be drawn. I sortof kindof do this pausing manually today when this condition arises, but I'm not happy with it.
Perhaps there are more performant ways to do the scheduling I want to do than to use RxJS, but I'm interested in at least orchestrating the big state transitions in the system with RxJS. Having that kind of organization would definitely help me get a better handle on the asynchrony and frequent yielding that I need to do that is all over my system.
Perhaps simply being able to tap into the statistics of a queue leading up to a consumer would be enough for me to implement something like this flow control myself. Please pardon my ignorance, I have some experience tinkering with RxJS and perhaps what I am describing is antithetical to the design or how the internals actually work.
I spotted this project under the ReactiveX organisation: ReactiveX/IxJS
.
Is IxJS the proposed solution for "pull sequences"? I.e instead of an observable, use an iterable/async iterable?
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.
This issue on StackOverflow illustrates a problem with backpressure in current RxJS, I think
While I think this guy could solve his problem with an Iterator of Promise, I think it would be better if RxJS could support this with fewer object allocations.
cc/ @benjchristensen