pull-stream / stream-to-pull-stream

convert a node stream (classic or new) into a pull-stream
MIT License
30 stars 5 forks source link

Problems combining streams and pull-streams #1

Closed bnoon closed 11 years ago

bnoon commented 11 years ago

I would like to write some pull-stream Throughs, but I am having a hard time attaching to old and new streams.

The following works with no pull-streams:

var fs = require('fs')
     , zlib = require('zlib')
     , toPull = require('stream-to-pull-stream')
     , Pull = require('pull-stream');

var p1, p2, p3, ps1, ps2, ps3, total=0;

p1 = fs.createReadStream('data.gz');
p2 = zlib.createGunzip();
p3 = fs.createWriteStream('out');

p1.pipe(p2).pipe(p3);

I had expected that I could pipe the ReadStream to the Gunzip and create a new pull-stream, but the following doesn't work:

toPull(p3)(toPull(p1.pipe(p2)));

The following gives an eventEmitter leak warning and doesn't finish (when the file is small, it does finish in new streams):

toPull(p3)(toPull(p2)(toPull(p1)));

Removing the final conversion back to a stream doesn't change the results.

Pull.drain(
    function (d) { total+=d.length; },
    function () { console.log('total: '+total);} 
    )(toPull(p1.pipe(p2)));

Are pull-streams intended to be used like this?

This is with node 0.8.22 and 0.10.3.

dominictarr commented 11 years ago

Hi! thanks for trying pull-streams!

This is fixed in 1.1.3

Also, it's best to use pull streams like this:

toPull(p1).pipe(toPull(p2)).pipe(toPull(p3))

Since each pull stream returns it's read function, you can also use it like this:

toPull(p3)(toPull(p2)(toPull(p1)))

But it's a lot more confusing... I had to try several times to get this right, and I wrote the module.

bnoon commented 11 years ago

Thanks for looking into this.

Your fix got the toPull(p1).pipe(toPull(p2)).pipe(toPull(p3)) case working, but I still can't compose a pipeline like toPull(p1.pipe(p2)).pipe(toPull(p3)). No output is written to the final sink.

Using a pull-stream sink (like Drain):

Pull.drain(
    function (d) { total+=d.length; },
    function () { console.log('total: '+total);}
    )(toPull(p1.pipe(p2)));

raises the following:

/Users/noon/nenvs/ps/node_modules/stream-to-pull-stream/index.js:35
    read(null, function (end, data) {
    ^
TypeError: object is not a function

Wrapping each stream with toPull will run, but the final callback isn't called:

toPull(p1).pipe(toPull(p2)).pipe(Pull.drain(
    function (d) { total+=d.length; },
    function () { console.log('total: '+total);}
    ));

I have ended up using the streamz module from @ZJONSSON to handle the case of multiple sources piping into one sink.

dominictarr commented 11 years ago

composing multiple streams into one pull stream only works will pull steams. if you use regular streams (new streams or classic streams) then pipe returns the last stream.

I think the problem here toPull(p1.pipe(p2)).pipe(toPull(p3)) is that toPull gets p2, which is a transform stream. It can't tell that it's piped to already, so it's left waiting for a source.

;(toPull(p1).pipe(toPull(p2))).pipe(toPull(p3)) should work.

What are you trying to do?

if you want to pipe multiple sources into one sink you have to use a special stream that does that. (like, with physical pipes you must have a T section.) For the sake of simplicity, pull-streams can only be piped one to one.

bnoon commented 11 years ago

Thanks for clarifying things, I realize I would need a special 'merge' pipe, but just wanted to try pull-streams given the good stuff you are putting together with leveldb.

I need to take several data feeds, break them into individual products, merge and remove any duplicates. Diagnostics are collected to spot failing feeds.

Using streamz, here is a test program.

var splitter = require('./libz/split_products')
var dedup = require('./libz/dedup_products')
var add_source = require('./libz/add_source')

var dd = dedup()
dd.on('stats', function (d) {
  console.log('stats: ',d);
});
dd.pipe(zlib.createGzip()).pipe(fs.createWriteStream('out.gz'));

fs.createReadStream('local.raw.gz').pipe(zlib.createGunzip())
  .pipe(splitter()).pipe(add_source('nrcc')).pipe(dd);
fs.createReadStream('13012512.raw')
  .pipe(splitter()).pipe(add_source('srcc')).pipe(dd);
fs.createReadStream('13012512.raw.gz').pipe(zlib.createGunzip())
  .pipe(splitter()).pipe(add_source('hprcc')).pipe(dd);

The add_source through looks like:

var StreamZ = require("streamz");

module.exports = function (src) {
  return new StreamZ(function (chunk) {
    this.push({src: src, data: chunk});
  })
}

The dedup_products.js is:

var StreamZ = require("streamz")
var crypto = require('crypto')

module.exports = function () {
  var hashes = {},
    dedup = new StreamZ(function (prod, done) {
      var h = crypto.createHash('sha1').update(prod.data).digest('hex');
      if (!hashes[h]) {
        hashes[h] = true;
        this.push(prod.data);
      }
      done();
      }
    );
  dedup.on('end', function () {
    this.emit('stats','TODO');
  });
  return dedup;
}

In production, this will be pulling from http / sftp / local files and pushing to S3.

Thanks for your input.