dominictarr / through

simple way to create a ReadableWritable stream that works
Other
669 stars 64 forks source link

Concurrency in this.queue() #31

Open vsukhomlinov opened 9 years ago

vsukhomlinov commented 9 years ago

Multiple queue() calls within the es.through() handler get sent downstream in wrong order:

var es = require('event-stream');

var streamLine = es.through(function (line) {
    this.queue("*");
    this.queue("|"+line+'\n');
});

process.stdin.on('readable', function() {
    var streamLine1 = es.through(function (line) {
        this.queue('*');
        this.queue("|"+line+'\n');
    });

    process.stdin.pipe(es.split()).pipe(streamLine).pipe(process.stdout);
})

I'm testing it like this

$ printf "a\nb\n" | node pipeTest.js
*|a
*|b
**|
|

An interesting note is that calling streamLine1 from closure renders everything properly.

dominictarr commented 9 years ago

can you post the "correct" output that you are expecting?

vsukhomlinov commented 9 years ago

I'd expect all queue() calls for the handler be executed sequentially within the function, like this

$ printf "a\nb\n" | node pipeTest.js
*|a
*|b
*|
*|
dominictarr commented 9 years ago

oh okay I see what the problem is now. you are using the readable event the wrong way. that is part of the private streams2 api. probably, this is causing you to pipe the same stream twice.

instead just do process.stdin.pipe(es.split()).pipe(streamLine).pipe(process.stdout)