dominictarr / through

simple way to create a ReadableWritable stream that works
Other
670 stars 64 forks source link

`end` emitted while stream on pause #24

Closed alexbeletsky closed 10 years ago

alexbeletsky commented 10 years ago

I've met a problem, not able to find workaround.. and even not sure is it directly related to through, but will appreciate assistance. I stream mongo (using mongojs) collection and want to process all items there, one by one, which involves some async operations.

Simplified model of what I got,

var calc = through(function (user) {
    console.log('calc');

    var stream = this;
    stream.pause();

    process.nextTick(function () {
        console.log('tick');
        stream.resume();
    });
});

var stream = db.users
    .find({email: {$exists: true}});

stream
    .pipe(calc);

stream.on('end', function () { console.log('end') });

The collection contains only 2 documents, so if I run the code, I can see

› node source/scorer.js
calc
tick
calc
end    # why?
tick

Something I don't expect is end emitted while stream supposed to remain in paused state.

dominictarr commented 10 years ago

My guess is that this is a bug in mongo stream - buffering end is an edgecase that not every one implements correctly. if you listen on calc.on('end', ...) it will buffer the end event, so one simple solution will be to just drop in a empty through stream after the mongo stream (by default through acts as a stream that does nothing but respect back pressure)

alexbeletsky commented 10 years ago

Changed to,

    calc.on('end', callback);

it gives expected output. Thanks a lot!

mafintosh commented 10 years ago

@alexanderbeletsky your example is a little bit confusing since you use stream twice to donate different streams. just because you pause the stream being piped to doesn't mean that the stream being piped from won't emit end

mafintosh commented 10 years ago

in other words - your output is not a bug in general