caolan / highland

High-level streams library for Node.js and the browser
https://caolan.github.io/highland
Apache License 2.0
3.43k stars 147 forks source link

Working with async streams #54

Open jwalton opened 10 years ago

jwalton commented 10 years ago

I'm thinking this is mainly a need to document the end event somewhere here, but:

Here's a very simple async stream that generates [0,1,2,3,4,5,6,7,8,9,10]:

var val = 0;
function generator(push, next) {
    process.nextTick(function() {
        push(null, val++);
        if(val > 10) {
            push(null, highland.nil);
        }
        next();
    }
}
var stream = highland(generator);

Now I want to write a consumer which counts the number of elements in this stream:

var count = 0;
stream
    .stopOnError(function(err) {console.log("Bleugh", err.stack);})
    .each(function(val) {count++;});

Ok... Now, how do I know when it's safe to read the count? How do I know when the stream is done? This seems to work, so I'm guessing this is how I'm supposed to do it:

var count = 0;
stream
    .on("end", function() {console.log("Count", count);})
    .stopOnError(function(err) {console.log("Bleugh", err.stack);})
    .each(function(val) {count++;});

But since this is undocumented, it seems like something I shouldn't rely on? Is there some method that I missed somewhere? It would be slick if there was a function for this, like say then(), and if each() returned this, then you could:

var count = 0;
stream
    .each(function(val) {count++;})
    .then(function() {console.log("Count", count);})
    .stopOnError(function(err) {console.log("Bleugh", err.stack);});

Which reads nicely... Although would then() get called even if we stopped on an error?

Or am I using your library completely wrong? :P

caolan commented 10 years ago

@jwalton I think a then() or done() make sense to add. In this case of course I would probably do a reduce to count then use .apply(function (result) { ... }) or something after it, but I can see the use of a method which simply gets called after it's source has ended.

Perhaps .each() should return a new stream which only emits a nil value on stream end?

makara commented 10 years ago

I think @jwalton 's usecase is more of a reduce, not a map or each.

On the other hand, each() can be handy to transfer one or more whatever streams into an endless stream. Usecase can be transferring something from backend to frontend through a websocket stream. And thus a side question: how would you transfer an arbitrary stream to an endless stream other than a each()?

justincy commented 10 years ago

+1 for done() and making each() chainable. The lack of both of those were unwelcome surprises when I first tried highland.

jwalton commented 10 years ago

Yeah, my trivial example is obviously better served by a reduce. :P

My real life example involves one service pulling records out of MongoDB and serving them via a REST api, and a second service pulling those records over REST and stuffing them into a Cassandra DB for reporting/visualization purposes. I'm writing a client for this REST API which is going to return a Highland stream (since there's going to be a lot of records, and this nicely hides all the mucky details of pulling multiple records over REST from the consumer of the API), but the consumer needs to know when it's run out of things to push things to Cassandra.

Counting from one to ten seemed like an easier example to write up for this bug, though. :D

makara commented 10 years ago

Hmm.. to me it's still a reduce.. Also I don't see why you wouldn't rely on .on('end'). But I agree the document should mention the events somewhere, also for .on('drain') etc. It can be useful. Or probably some alias for the events can be also a good idea.

ghost commented 10 years ago

I came here with a similar request, now off to see if I can better express my process as a reduce :)

ghost commented 10 years ago

Hmm... yes, using a reduce is better...

Prestaul commented 10 years ago

:+1: for adding documentation of events