caolan / highland

High-level streams library for Node.js and the browser
https://caolan.github.io/highland
Apache License 2.0
3.43k stars 146 forks source link

keepAlive option #266

Open cphoover opened 9 years ago

cphoover commented 9 years ago

Is there away to keep a consuming stream alive even if one of it's feeders sends an end of stream marker?... In my use case I'm dealing with multiple child process stdout streams that will die if the process dies.

for e.g

var _ = require("highland");

var mainStr = _().map(function(dat){
   return "[prefix] - " + dat;
});
mainStr.pipe(process.stdout);

str1 = _();
str1.pipe(mainStr);
str2 = _();
srt2.pipe(mainStr);

str1.write("here is a message 1");
str2.write("here is a message 2");
str1.write(_.nil);
str2.write("this message will cause an error to be thrown");

// results in error thrown: 
// Error: Can not write to stream after nil

Is there a way to keep the stream alive, but still clean up after itself when feeders die? for e.g a keepAlive method would be useful.

var mainStr = _().keepAlive().map(function(dat){
   return "[prefix] - " + dat;
});
vqvu commented 9 years ago
var _ = require("highland");

var mainStr = _().map(function(dat){
   return "[prefix] - " + dat;
});
mainStr.pipe(process.stdout);

str1 = _();
str2 = _();

_([str1, str2]).merge()
    .map(function(dat){
       return "[prefix] - " + dat;
    }).pipe(process.stdout);

str1.write("here is a message 1");
str2.write("here is a message 2");
str1.write(_.nil);
str2.write("this message will cause an error to be thrown");

You can also use parallel or sequence depending on your needs.

cphoover commented 9 years ago

Hmmm the problem is your example assumes I have all of the streams ahead of time... I have to merge readable streams into the main stream asynchronously...

A better example would be

    var _ = require("highland");
    var exec = require("child_process").exec;
    var cmd = "somecommand";

    var mainStr = _().keepAlive().map(function(dat){
       return "[prefix] - " + dat;
    });

    mainStr.pipe(process.stdout);

     this.interval = setInterval(function(){
            //need to merge exec(cmd).stdout to mainStream.
            exec(cmd).stdout.pipe(mainStream);
    }, 10 * 1000)
vqvu commented 9 years ago

The problem with your proposed keepAlive is that there's no way to know when all sources are done. Sources are merged asynchronously, so how do you know how many sources there are?

Can you do it like this?

var mainStr = _(function (push, next) {
    setInterval(function () {
        push(null, _(exec(cmd).stdout));
        if (done()) { // Figure out somehow
            push(null, _.nil);
        }
    });
}).merge().map(...);
cphoover commented 9 years ago

vqvu... In my use case I want it to stay alive forever. I'm creating a daemon. My assumption is that when each stream ends it will be automatically cleaned up... (i.e references will be removed so it can be gc'd). Have not tested tho.

vqvu commented 9 years ago

Just don't push nil if you want it to stay alive forever. The gc should happen correctly, I think. If it doesn't, then that's a bug in merge.

cphoover commented 9 years ago

thanks for the tips.