caolan / highland

High-level streams library for Node.js and the browser
https://caolan.github.io/highland
Apache License 2.0
3.43k stars 147 forks source link

Concatenated Node 0.8 streams don't get proper backpressure #87

Open pjeby opened 10 years ago

pjeby commented 10 years ago

If you use _.concat to concatenate two event-driven Node streams (e.g. a pair of gulp.src() streams), you can lose items unless you explicitly .pipe() the streams to new Highland Streams. That is, the following will not always output everything matched by both patterns:

_.concat(gulp.src(pattern1),
         gulp.src(pattern2))
.toArray(console.log)

But this will:

_.concat(gulp.src(pattern1).pipe(_()),
         gulp.src(pattern2).pipe(_()))
.toArray(console.log)

The problem appears to be caused by Highland's lazy initialization of converted Node streams, causing some items to be dropped. In particular, the stream being concatenated isn't listened to until the entire first stream has been consumed. (Hence, the workaround of explicitly piping.)

greelgorke commented 10 years ago

this seems to be a general problem. This case here has same problem. I'm using EventStream here:

_(['ab', 'c\nd', 'fg']).through(es.split()).through(es.join('-')).each(h.log)
// -
// dfg

abc is dropped, which is the first emitted value from es.split() if i remove second through, then everything is logged.

caolan commented 10 years ago

The solution might be to wrap all arguments to functions which accept node streams in the Highland stream constructor. Can you add a test case which demonstrates this problem?

If anyone wants to go ahead and make this change, then I'll be happy to merge it :)

pjeby commented 10 years ago

Wrapping them in the constructor was the first thing I tried for the concat issue. It didn't help, because the constructor doesn't do the pipe(). It happens later, lazily.

AFAICT, there either needs to be an option you can pass to the Stream constructor to eagerly pipe, or else eager piping needs to become the default.

Is there ever a reason why the Stream constructor should not immediately pipe() a wrapped Node stream?

caolan commented 10 years ago

Yes, because pipe() will cause a paused node stream to resume.

@pjeby could you put together a pull request with a test that demonstrates how you'd like to use the _.concat() API and shows it failing? I'd like to help out with this one.

pjeby commented 10 years ago

That's tricky. I know the problem happens with gulp.src(), but not much else. Basically, the problem happens if the things you're passing to concat are already unpaused, because they are themselves a pipeline. The simplest case I have so far is:

  eager = function(stream) {
    return stream.pipe(through());
  };

  _.concat(eager(streamify([3, 4])), streamify([1, 2])).toArray(console.log);

If the eager() call is removed, then it outputs [1, 2, 3, 4]. As-is, it outputs nothing.

So, basically, if you wrap an already-unpaused Node stream, you're gonna have a hard time. I don't know if there's any way to tell in advance that you have an unpaused stream, though, as my Stream-fu is lacking. ;-)

pjeby commented 10 years ago

Ok, I've posted PR #89 for this. I also added code to make the test pass, because checking in code with a failing test just seems so wrong. ;-)

Although it's true that piping will resume a paused Node stream, it's also the case that Highland will immediately exert backpressure, so I'm not sure what trouble could actually be caused. In order for there to be a problem, you'd need an opened node stream that was also somehow "lazy" and wasn't going to really open the file until you tried to read from it or something.

I'm not aware of any node streams that work that way, though: AFAIK they all do the opening when you open them, and begin producing data (or at least buffering it) unless paused. (That being said, my Stream-fu is yet weak, so perhaps I can learn something new here.)

I have also noticed that there are certain signs one can use to detect a stream that has definitely been piped to, or that is definitely in a state where it's already flowing. These are just heuristics though: i.e., looking for a drain event listener or a _readableState.flowing. If either of these signs are there, then the stream to be wrapped should definitely be piped eagerly, or its data is going to fall on the floor.

Despite the possible heuristics, however, I think that eager piping is probably the way to go in general, or else with time we'll see lots more issues like this one, for some new example of a stream that needs different heuristics for you to know that it's already flowing.

greelgorke commented 10 years ago

the way how Node.js streams are working depends on the version of Node you are using. Before 0.10 the where just pushing events out on you, if you attached 'data'-listener too late, you're screwed. With 0.10 they come in paused mode, but turn in this 0.8. mode as soon, as you listen for 'data' With 0.11. things get better, there are no 2 modes anymore, they start paused by default.

To the topic of eagerness. i'm not that sure about this idea. Highland streams are lazy evaluated, thanks to the pull-based reading. Even Node.js streams are pull-based by default now (with 'readable' + .read() in paused mode and that's a good idea.

pjeby commented 10 years ago

Okay, so is there any reliable way to tell which of those various states a readable stream is in? Because it sounds like that's what's needed.

Can we even reliably tell which protocol a stream is following, let alone what state it's in?

pjeby commented 10 years ago

Okay, some more experimenting shows that the problem is indeed specific to 0.8 streams, or 0.10 streams which have had a 'data' event registered, putting them into flowing mode.

Tentatively, it looks like if a stream has a read() method but not a data event registered, it can be lazily piped without losing any data. But if there is no read() method, or there's a data event registered, then the stream is already flowing and must be eagerly piped to avoid losing data.

Does this make sense? Should I change the pull request accordingly?

greelgorke commented 10 years ago

You can still pause them. Im 0.10 its reliable. I wouldnt care much about 0.8 since they are pretty much obsolete

Am 26.05.2014 um 21:08 schrieb pjeby notifications@github.com:

Okay, some more experimenting shows that the problem is indeed specific to 0.8 streams, or 0.10 streams which have had a 'data' event registered, putting them into flowing mode.

Tentatively, it looks like if a stream has a read() method but not a data event registered, it can be lazily piped without losing any data. But if there is no read() method, or there's a data event registered, then the stream is already flowing and must be eagerly piped to avoid losing data.

Does this make sense? Should I change the pull request accordingly?

— Reply to this email directly or view it on GitHub.

caolan commented 10 years ago

@pjeby I've merged your PR, since as you say it immediately exerts back-pressure, and I also think few node streams are expecting to be used in a strictly lazy fashion like Highland.

That said, if we can reliably detect a situation where a stream is paused we should do our best not to resume it. My only concern is that another listener might bind to the stream and then we'd lose data (although node 0.10.x streams don't really work with multiple consumers).

greelgorke commented 10 years ago

they do, but only via 'data' pushing, since you can attach many listeners to the events. they don't work in pull state, because stream.read removes items from the buffer. afaik. 0.8. streams don't have read anyway.

caolan commented 10 years ago

@greelgorke yes, that's what I mean, using them in non-flowing mode means you can't really have multiple consumers. Currently we're having to weight the case for strictly lazy evaluation of streams against the possibility of losing data. Since I don't expect many Node streams to be very strict on lazy eval using pause/resume (eg, not opening a file before first data is read), I'm currently thinking the pipe() solution so we don't lose data makes most sense. I'm very interested in more input here though, as we should really aim to play nicely with node 0.10.x streams as a priority.

greelgorke commented 10 years ago

Node.js .pipe() is event-driven with .pause()/.resume(), in 0.10 as well as in 0.11. we should stick with it too.

i'll fiddle a bit with that. probably we have a solution already in the code, but just do not use it everywhere.

pjeby commented 10 years ago

Right. Basically there are only two possible cases where eager piping can actually wake up something that wasn't already moving:

  1. You have a new stream that hasn't been piped anywhere, hasn't been listened to yet
  2. You have some kind of special lazy stream that doesn't even open the underlying resource until you ask for it

AFAIK, Node's built-in streams never do number 2, and number 1 is hard to detect reliably. And as you say, you can still lose data in case number 1 if somebody attaches a pipe or data listener after it's been passed to Highland.

It kind of comes down to policy, as to whether Highland should prioritize laziness or not silently dropping data on the floor, in a situation where you can't reliably do both. Whichever way the policy goes, having a clear policy makes it easier for people to reason about its behavior.

e.g., if the policy is, "use stream.pipe(_()) it if you want to keep your data, then you know to always do that. If the policy is, "Node streams aren't reliably lazy, so Highland doesn't wrap them that way,", then you know to write a short function like the one my PR removed. (Or there could be a _.lazyPipe(nodestream) API to do it, that works the same way, with the caution that it'll drop your data if something else pipes or subscribes to it before Highland gets around to reading it.)

Either way, guessing what needs to be done is probably a bad idea. As the Zen of Python says, explicit is better than implicit, and in the face of ambiguity, refuse the temptation to guess. ;-)

caolan commented 10 years ago

I'd be tempted to go with "Node streams aren't reliably lazy, so Highland doesn't wrap them that way," - since I don't think many people use Node streams to schedule side-effects in the same way Highland encourages you to do.

On 27 May 2014 17:31, pjeby notifications@github.com wrote:

Right. Basically there are only two possible cases where eager piping can actually wake up something that wasn't already moving:

  1. You have a new stream that hasn't been piped anywhere, hasn't been listened to yet
  2. You have some kind of special lazy stream that doesn't even open the underlying resource until you ask for it

AFAIK, Node's built-in streams never do number 2, and number 1 is hard to detect reliably. And as you say, you can still lose data in case number 1 if somebody attaches a pipe or data listener after it's been passed to Highland.

It kind of comes down to policy, as to whether Highland should prioritize laziness or not silently dropping data on the floor, in a situation where you can't reliably do both. Whichever way the policy goes, having a clear policy makes it easier for people to reason about its behavior.

e.g., if the policy is, "use stream.pipe(()) it if you want to keep your data, then you know to always do that. If the policy is, "Node streams aren't reliably lazy, so Highland doesn't wrap them that way,", then you know to write a short function like the one my PR removed. (Or there could be a .lazyPipe(nodestream) API to do it, that works the same way, with the caution that it'll drop your data if something else pipes or subscribes to it before Highland gets around to reading it.)

Either way, guessing what needs to be done is probably a bad idea. As the Zen of Python says, explicit is better than implicit, and in the face of ambiguity, refuse the temptation to guess. ;-)

— Reply to this email directly or view it on GitHub https://github.com/caolan/highland/issues/87#issuecomment-44300469.

pjeby commented 10 years ago

Works for me.

I took a quick look at @greelgorke's through example, but it doesn't seem to have the same cause even if it produces a similar symptom. The through method was already eagerly piping even before the change in PR #89.

It looks to me like the actual issue there is the way .pipe() immediately does a .resume() on its consumer. @greelgorke's through example works fine if the s.resume() call is deferred via setImmediate() or some other mechanism, so that information isn't pushed into the target before there's a chance to connect it to the next item in line.

Basically, the issue is that if you resume a pipe synchronously, then your first item is always going to fall on the floor, unless the thing you're piping to is already chained to everything it needs to be chained to. (Which isn't going to happen, because of the order in which chains are built: you'd have to build them right-to-left in order for this to not be a problem.)

In general, calls to Highland stream methods should not result in callbacks being fired before the method has a chance to return. But in this case, that's what's happening: .pipe() resumes the stream before it even returns to the .through() call, let alone the rest of the program. So the 'abc' is dropped because at that point, it hasn't been hooked up to the second .through() call.

It seems the heart of this, at least in the current instance, is that .resume() should not resume synchronously, but instead defer its actual operation to the next pass of the event loop. That way, you cannot have a situation where you write code that looks like it goes from point A to point B, but in actuality a bunch of events fire off in the middle and do god knows what.

(This is one reason why people are down on jQuery promises vs. Promises/A+, because jQuery promises can fire off callbacks while you're still setting them up. In general, callbacks shouldn't fire while you're calling into an API, but be deferred until the stack unwinds, or else you end up with problems similar to multithreading, where code can seem to run "at the same time" as your code, messing with stuff you don't realize it's messing with.)

I don't know if there are any other parts of Highland besides .resume() that need this, but .resume() is definitely an important one. I tried making .pipe() defer its call to .resume(), and that makes @greelgorke's example work in a test. Making .resume() always defer itself breaks some other tests, however, because they expect the resuming to happen synchronously.

I think the correct fix is probably to change those tests to check for .resume()'s effect asynchronously, the way the tests that do .resume() with Node streams do. That is, they use a setTimeout call to wait before checking that the .resume() has done what they're testing for.

To put it another way, I don't think that part of .resume()'s contract is a guarantee that everything it's supposed to do, will happen by the time it returns. It already breaks that contract in the event you re-enter it, so making it fully re-entrant (i.e., by deferring its actions to the next event loop), will not change that; it's just that the tests are being too eager in assuming it's finished.

If you like, I can take a look at what would be required to change .resume() and the tests. Or just submit a PR for the .pipe()-only fix, until we can figure out what to do with .resume().

(I think making resume always async will also simplify the code a bit, since there won't be any need to keep track of whether resume is recursing or how many times it was called -- it'll simply be called async each time.)

caolan commented 10 years ago

BTW, the tests currently fail for paused Node streams concatenated in node 0.11.x - it appears they get resumed but in 0.10.x they don't.

pjeby commented 10 years ago

The test is checking the wrong thing: the stream is resumed, yes, but it's also immediately paused again. That's why the toArray() still outputs the right thing. Node 0.11 streams can be safely resumed and paused in this fashion, without spewing stuff on the floor.

In Node 0.10, pause and resume are advisory, and once a stream is in "flowing" mode, it stays that way. In 0.11, pause and resume actually pause and resume the flowing mode.

Adding some console.logs, what I see that failing test doing on 0.11 is:

In other words, it actually works correctly. The test is overspecified by insisting that resume() shouldn't be called, and by expecting the flowing flag to be false before the current event loop pass completes. (It does go to false once pause() gets called.)

This is what I mean about the tests in general overspecifying what should happen in the current event loop pass. It looks to me that Node 0.11 is actually doing it right in terms of its event timing now.