dominictarr / through

simple way to create a ReadableWritable stream that works
Other
670 stars 64 forks source link

Once .end() is called, it can never be called again even if autoDestroy is false. #18

Open cowboy opened 11 years ago

cowboy commented 11 years ago

Per these 2 lines once .end() is called, it's impossible to ever effectively utilize .end() again, because the local variable ended is always set.

Should ended only be set here if autoDestroy === true?

I mean, what's the point of keeping a stream around if you can no longer fully utilize it?

cowboy commented 11 years ago

Here's a fairly simple example.

This doesn't work like I'd expect:

var es = require('event-stream');

var myThrough = es.through(null, function() {
  this.queue('(DONE)\n');
});
myThrough.autoDestroy = false;

myThrough.pipe(process.stdout);

var noise1 = es.through();
noise1.pipe(myThrough);
noise1.write('This is test #1.\n');
noise1.end('Done #1!\n');

var noise2 = es.through();
noise2.pipe(myThrough);
noise2.write('This is test #2.\n');
noise2.end('Done #2!\n');

console.log('All done.');

Because even though noise2 is piped to myThrough the stream no longer works once noise1.end() is called.

This is test #1.
Done #1!
(DONE)
All done.

By overwriting myThrough.end like this, calling noise1.end() doesn't "break" myThrough.

var es = require('event-stream');

var myThrough = es.through();

myThrough.end = function(data) {
  if (arguments.length) { this.queue(data); }
  this.queue('(DONE)\n');
  this.writable = this.readable = true; // unsure if this is necessary
  return this;
};

myThrough.pipe(process.stdout);

var noise1 = es.through();
noise1.pipe(myThrough);
noise1.write('This is test #1.\n');
noise1.end('Done #1!\n');

var noise2 = es.through();
noise2.pipe(myThrough);
noise2.write('This is test #2.\n');
noise2.end('Done #2!\n');

console.log('All done.');

Which outputs this:

This is test #1.
Done #1!
(DONE)
This is test #2.
Done #2!
(DONE)
All done.
dominictarr commented 11 years ago

In node@0.4 supported multisink pipes like this, but it was removed in 0.6, as it's a pretty rare edgecase.

If you want to have a stream that can do that, you should make it a new module. In 0.4 it kept a count of the pipes that are currently flowing in, decrementing it when streams end, and only calling end() when the last stream ends.

cowboy commented 11 years ago

In my use-case, the myThrough pipe is going to be writing output (generated internally), but can also optionally read input to be muxed into its output. As such, the myThrough pipe should only close if ended explicitly. Does that make any sense?

dominictarr commented 11 years ago

Well, there is probably the odd case where you might want to do that. Overwriting the end method would like you have done here is probably the way I'd do it too.