Reactive-Extensions / rx-node

RxJS Bindings for Node.js and io.js
Other
219 stars 27 forks source link

toReadableStream, subscribeCallback #11

Open eschwartz opened 9 years ago

eschwartz commented 9 years ago

I put together a method for converting an Observable to a readable stream. It looks something like this:

function toReadableStream(observable) {
    var stream = new Readable({
        objectMode: true
    });

    stream._read = function() {
        observable.subscribe(
            this.push.bind(this),
            this.emit.bind(this, 'error'),
            this.push.bind(this, null)
        )
    };

    return stream;
}

I have another one called subscribeCallback, which "subscribes" an observable to a node-style callback:

function subscribeCallback(observable, callback) {
    return observable.
        // Only take the last item, to ensure we aren't calling `callback`
        // multiple times.
        last().
        subscribe(
            function(x) { callback(null, x); },
            function(err) { callback(err); }
        );
};

My use case is that I'm working with a library where we're trying to keep all of our interfaces node-standard (callbacks or streams), but I'd still like to use RxJS for implementations. So I have a lot of functions that look like this

function fetchAThing(callback) {
  // Bridge another node-style callback function to an Observable
  var getter = Observable.fromNodeCallback(getSomething);

  // Do my observable stuff
  var obs = getter.
    map(...).
    filter(...);

  // Bridge back to node-style callbacks
  subscribeCallback(obs, callback);
}

function fetchABunchOfThings() {
  var obs = RxNode.fromReadbleStream(getAStreamOfThings());

  // Do my observable stuff
  var obs2 = obs.
    map(...).
    filter(...);

  // Bridge back to a node stream
  return toReadableStream(obs2);
}

Is this something that you would see as in the scope of the RxNode library? If so, I can look at submitting a pull-request.