dominictarr / through

simple way to create a ReadableWritable stream that works
Other
669 stars 64 forks source link

Pausing internally gets overridden by downstream streams #11

Open ForbesLindesay opened 11 years ago

ForbesLindesay commented 11 years ago

e.g.

source.pipe(through(function (chunk) {
  var self = this;
  console.log('start');
  self.pause();
  setTimeout(function () {
    console.log('end');
    self.resume();
  }, 1000);
})
.pipe(process.stdout);

if you stream chunks from source, will print:

start
start
start
start
end
end
end
end

because the resumes from the process.stdout stream break the pauses within the through stream.

ForbesLindesay commented 11 years ago

Can be worked around with code like:

function throttle(data, end) {
  var inProgress = false;
  var finished = false;
  var res = through(function (chunk) {
    var self = Object.create(this);
    self.pause = function () {
      inProgress = true;
      res.pause();
    }
    self.resume = function () {
      inProgress = false;
      if (finished) end.call(self);
      else res.resume();
    }
    data.call(self, chunk);
  }, function () {
    if (inProgress) return finished = true;
    else return end.call(this);
  });
  var resume = res.resume.bind(res);
  res.resume = function () {
    if (!inProgress) return resume();
  }
  return res;
}
dominictarr commented 11 years ago

cool, that may be desired is some use-cases, depending on how the dest emits 'resume'.

You should publish this as an npm module!

ForbesLindesay commented 11 years ago

Will do, was just thinking perhaps this should be part of how the core of through works?

dominictarr commented 11 years ago

what version of node are you using?

ForbesLindesay commented 11 years ago

0.8, I'm looking to update to 0.10 soon, but I need to find time to re-test my apps.

dominictarr commented 11 years ago

Ah, so it must be that stdout is emitting resume too often. If stdout says it can handle data, then that is it's business though.

Pause is just for stopping the flow if the dest has more than it can handle. If you are trying to do something else, like just block output... That should be a different module.

It might be better to add another method hardPause which totally prevents output...