dominictarr / through

simple way to create a ReadableWritable stream that works
Other
670 stars 64 forks source link

no data piped through async streams, stream finishes in wrong order #16

Closed timoxley closed 11 years ago

timoxley commented 11 years ago

I have some through streams piped to each other. one of them is pushing data asynchronously, but the second through stream never sees it, and the final concat call seems to finish first?

I think I might need to pause or something. Any tips?

from = require('from')
through = require('through')
concat = require('concat-stream')

from([1,2,3,4,5])
.pipe(through(function(data) {
  setTimeout(function(){
    console.log('pushing data', data)
    this.push(data)
  }.bind(this), 300)
})).pipe(through(function(data) {
  console.log('pushing data second time', data);
  this.push(data)
})).pipe(concat(function(result) {
  console.log("voila", result)
}))

output:

voila undefined
pushing data 1
pushing data 2
pushing data 3
pushing data 4
pushing data 5
dominictarr commented 11 years ago

the problem is that from dumps all the data through, and then the end event, while the timeout is still waiting, when it finishes the stream has already ended.

the solution is to tell from to slow down...

from = require('from')
through = require('../')
concat = require('concat-stream')

var n = 0
from([1,2,3,4,5])
.pipe(through(function(data) {
  this.pause()
  n ++
  setTimeout(function(){
    console.log('pushing data', data)
    this.push(data)
    if(!--n) this.resume()
  }.bind(this), 300)
})).pipe(through(function(data) {
  console.log('pushing data second time', data);
  this.push(data)
})).pipe(concat(function(result) {
  console.log("voila", result)
}))
dominictarr commented 11 years ago

that way, the from won't emit any end until the last data chunk has been handled.

timoxley commented 11 years ago

Great. the pause/resume thing worked.

The code seems to work without the n counter (i.e. pausing and resuming multiple times), does this make it more fragile? e.g. more sensitive to timing?

e.g.

from = require('from')
through = require('through')
concat = require('concat-stream')

from([1,2,3,4,5])
.pipe(through(function(data) {
  this.pause()
  setTimeout(function(){
    console.log('pushing data', data)
    this.push(data)
    this.resume()
  }.bind(this), 300)
})).pipe(through(function(data) {
  console.log('pushing data second time', data);
  this.push(data)
})).pipe(concat(function(result) {
  console.log("voila", result)
}))
dominictarr commented 11 years ago

the counter handles the case where the source isn't well behaved, if the source emits many data, this handles them all before resuming.

timoxley commented 11 years ago

I'm not quite sure by what you mean by "well behaved"?

dominictarr commented 11 years ago

in this case, well behaved means not emitting data when paused.