Reactive-Extensions / rx-node

RxJS Bindings for Node.js and io.js
Other
219 stars 27 forks source link

Support Node Readable stream pause / backpressure #3

Open hekike opened 9 years ago

hekike commented 9 years ago

This issue is moved from: https://github.com/Reactive-Extensions/RxJS/issues/508

http://nodejs.org/api/stream.html#stream_readable_pause

I tried to request only two items from a Readable Node stream with Rx and I observed the following behaviour:

Rx

var source = Rx.Node.fromStream(dbUserStream).controlled();
source.subscribe(..);
source.request(2);

or

Rx.Node.fromStream(dbUserStream)
  .take(2)
  .subscribe(..);

I think Rx should support some backpressure here and pause the Readable stream. Would be great for take also.

In Highland it works in the following way:

_(dbUserStream)
  .take(2)
  .toArray(function (users) { ... });

Did I miss something?

paulpdaniels commented 9 years ago

Since down stream operators won't have control of this, I am thinking it should be built into the operator, so either:


var pauser = Rx.Observable.just(true).delay(400);

//Pass in an observable and pause the stream on truthy values and resume on falsey (or vice versa)
Rx.Node.fromReadableStream(stream, pauser);

or take the backpressure/connect approach


var pausableObservable = Rx.Node.fromReadableStream(stream);

//It is now paused
var disposable = pausableObservable.pause();

//It is now unpaused
disposable.dispose();

Personally I tend toward the first, since the second might have some confusing semantics for some people, and in general it forces yet another chain break. But I don't generally use this method so I would leave it up to people like @hekike .

Zalastax commented 9 years ago

I had to implement this in my own code today and solved it by adding:

        pauser.distinctUntilChanged().subscribe(function (b) {
            if (b) {
                stream.resume();
            } else {
                stream.pause();
            }
        });

in fromStream. This uses the Rx.Node.fromReadableStream(stream, pauser); syntax