caolan / highland

High-level streams library for Node.js and the browser
https://caolan.github.io/highland
Apache License 2.0
3.42k stars 147 forks source link

back-propagation of _.nil #172

Open jeromew opened 10 years ago

jeromew commented 10 years ago

Hello,

I have been trying the following scenario :

var resource = open()
var s = _(function(push, next) {
    ..use resource to push elements and _.nil when resource is empty ..
})
.consume(function(err, x, push, next) {
    if (err === null) {
      push(err);
      next();
    }
    else if (x === _.nil) {
      resource.close();
      push(null, x);
    }
    else {
      push(null,x);
    }
})

I open a resource, consume from it, and wait for _.nil to close the resource.

This works well if I consume all the tokens with

s.resume();

now if i do

s.take(10).resume();

The resource is never closed, because take seem to send _.nil downstream but not inform upstream that they will not be pulled from again.

You might say that I could close the resource by catching the _.nil after take(10) but this is not what I want to do because I do not want the downstream code to have a reference to the resource.

Would it make sense to have back-propagation of _.nil or a mechanism to inform upstream streams that they will not be pulled from again ?

In node.js streams, when you do

s1.pipe(s2)

when s2 sends a 'close' event, s1 automatically unpipes s2 for example and you have a way to detect that nothing will be pulled again.

https://github.com/joyent/node/blob/master/lib/_stream_readable.js#L568

LewisJEllis commented 10 years ago

Do you mean:

    if (err !== null) {
      push(err);
      next();
    }
    else if (x === _.nil) {
      resource.close();
      push(null, x);
    }
    else {
      push(null,x);
      next();
    }
jeromew commented 10 years ago

Yes, sorry for the typo, here is an example showing the "bug"

var _ = require('./lib/index.js')

_([1,2,3,4,5])
.consume(function(err, x, push, next) {
    if (err !== null) {
      push(err);
      next();
    }
    else if (x === _.nil) {
      console.log('nil')
      push(null, x);
    }
    else {
      push(null,x);
      next();
    }
})
.take(2)
.resume()

I would have expected 'nil' to be logged but was surprised it is not. The same with the take(2) before the consume logs 'nil'

LewisJEllis commented 10 years ago

I see, but when take passes on nil, there's no guarantee that the source stream will never be pulled from again since we can have multiple consumers.

var _ = require('./lib/index');

var s = _([1,2,3,4,5])
.consume(function(err, x, push, next) {
  if (err !== null) {
    push(err);
    next();
  }
  else if (x === _.nil) {
    console.log('nil')
    push(null, x);
  }
  else {
    push(null,x);
    next();
  }
});

s.take(2).toArray(function (xs) {
  console.log(xs)
  s.take(3).toArray(function (xs) {
    console.log(xs);
    s.resume();
  });
});
vqvu commented 10 years ago

Should we even allow streams to be pulled from again?

This behavior results in situations like

var s = _([1, 2, 3, 4, 5]);
s.fork().take(1).toArray(_.log);
// => prints [1]

s.fork().take(2).toArray(_.log);
// => prints nothing.

The shared backpressure causes the second fork to never get the second element, since the first fork could be consumed later on.

ariutta commented 10 years ago

I've been defining all of my forks before consuming them, e.g.:

var s = ([1, 2, 3, 4, 5]); var sFork1 = s.fork(); var sFork2 = s.fork(); sFork1.take(1).toArray(.log); // => prints [1]

sFork2.take(2).toArray(_.log); // => prints [1, 2].

But I'd be interested in hearing best practices for this case. It seems like my example isn't the most functional way of handling this.

On Sat, Nov 29, 2014 at 3:44 PM, vqvu notifications@github.com wrote:

Should we even allow streams to be pulled from again?

This behavior results in situations like

var s = ([1, 2, 3, 4, 5]); s.fork().take(1).toArray(.log);// => prints [1]

s.fork().take(2).toArray(_.log);// => prints nothing.

The shared backpressure causes the second fork to never get the second element, since the first fork could be consumed later on.

— Reply to this email directly or view it on GitHub https://github.com/caolan/highland/issues/172#issuecomment-64969706.

vqvu commented 10 years ago

@ariutta Can you give a better example of what you're trying to do, as what you seem to be asking doesn't have anything to do with functional vs. not functional?

If you're just asking about whether you should assign your forks to a variable, I think the answer is "no unless you need to reference your fork later on." Otherwise, just take advantage of the chaining API instead of creating superfluous variables that you never really need.

I don't think I've ever run into a situation where I needed a reference to my fork later on.

ariutta commented 9 years ago

@vqvu, your second paragraph answered my question. Thanks!

jeromew commented 9 years ago

@vqvu the way I currently use highland is :ns

Ideally, each "transform" would have its own setup/teardown + an awareness of the global graph it is inserted into. In my example above, I thought there there could be a sort of back-propagation of the fact that the transforms will never be pulled from again.

but that is very fuzzy, now that I understand @LewisJEllis 's point (and yours) that streams can be re-used (I was under the wrong understanding that they had to be discarded when downstream decided everything was over)

I'll let you close this issue as it seems that my question is not a real issue.

ibash commented 9 years ago

I've been looking at using this library in a project and ran into this exact problem, there's not a safe time to cleanup a resource.

An alternative to forcing streams to be one time use only might be to take the approach bacon.js does.

That is, have the user provide a function that allocates the resource and gets values, and have the user provide a function to cleanup the resource. Make it explicit that their "subscribe" function may be called multiple times (i.e. if a stream is stopped and then re-used the "subscribe" function would be called again).

Disclaimer: I haven't used bacon.js quite yet (just read some docs / source), as I wanted to try highland first.

LewisJEllis commented 9 years ago

I also ran into a case sort of like this recently and thought about this a bit, esp. the question "Should we even allow streams to be pulled from again?"

I haven't managed to make a strong use case for the current way of allowing it - the example I gave above was just to make a point, and I can pull off the same thing using forks anyway if we disallow multiple pulls; this piece from my example above:

s.take(2).toArray(function (xs) {
  console.log(xs)
  s.take(3).toArray(function (xs) {
    console.log(xs);
    s.resume();
  });
});

could become:

s.fork().take(2).toArray(_.log)
// now we know the first fork is all done
s.fork().drop(2).take(3).toArray(_.log)

(assuming .drop makes its way in, of course)

Does anyone have a good example of where multiple consumptions/thunks on the same stream is useful/necessary? /cc @vqvu, @caolan @jeromew

I like the possibility that a stream should effectively "be discarded when downstream decide(s) everything (is) over" where "downstream decides everything is over" = "a thunk happens". In other words - can we safely add the restriction that every stream (or fork) only ever has one thunk? Then, once that thunk happens, we can back-prop _.nil and thus allow for teardown of encapsulated resources.

Barring that possibility, the setup/teardown handler idea is a good next option, but it feels weird to me for some reason that I haven't quite put my finger on. See also: relevant discussion in #161 on how .done might support either invoking the teardown handler or triggering the back-propagation.

vqvu commented 9 years ago

I'm not a fan of allowing multiple thunks for a single stream for the exact reasons mentioned in this issue; I've never had to do it myself. So I would support changing that behavior to allow for back-prop of nil.

ibash commented 9 years ago

Question about semantics. Will there be a way to differentiate between "values are no longer needed" and "this is the end of the stream, but continue to push values". In the first you'd want to cleanup resources, immediately but in the second you'd probably want to wait.Let me know if an example use case is needed.

--  Islam Sharabash islam.sharabash@gmail.com 217-377-9657

On Sat, Jan 3, 2015 at 12:23 PM, vqvu notifications@github.com wrote:

I'm not a fan of allowing multiple thunks for a single stream for the exact reasons mentioned in this issue; I've never had to do it myself. So I would support changing that behavior to allow for back-prop of nil.

Reply to this email directly or view it on GitHub: https://github.com/caolan/highland/issues/172#issuecomment-68608179

vqvu commented 9 years ago

I don't understand your latter case...how can you continue to push values after the end of the stream?

ibash commented 9 years ago

In particular I want to re-implement unix-sort using highland. The unix-sort library spawns the sort command and passes all the stream items to it. When it gets _.nil indicating end of items from the stream, it would send a EOF to the sort command so that everything can be sorted. After that it would send the stream down in sorted order.

I guess in this case it could cleanup in the onDone. The caveat being if something happens and you no longer care about the results of the stream, there's not a clear way to indicate "cleanup and ignore the results".

In pseudo-code it would look like this:

stream.consume(function (error, value, push, next) {
    if (error) {
      // pass along errors
      push(error);
      return next();
    }

    if (!sort) {
      var sort = initSort();
      sort.onDone(function() {
        sort.sortedValues.forEach(push);
      });
    }

    // stream is finished, tell sort that it can start
    if (value === _.nil) {
      sort.send(EOF);
    } else {
      sort.send(value);
    }
});
jeromew commented 9 years ago

There is an open PR on sort : https://github.com/caolan/highland/pull/169

what benefit do you see of using the unix-sort memory-wise. In both case it seems to me that you need to have the whole thing in memory just in case the last item becomes the first after sorting.

But you are right on the "cancel" pattern. Right now s.sort().take(1) will pull all the items in memory up until a nil is emitted by s, and there is no way to cancel it. This might be problematic in some cases and will probably need some thought.

vqvu commented 9 years ago

@ibash Ok, i get what you're saying. There will need be a way to differentiate between getting a nil from upstream (so you can start your sort), and getting a nil from downstream (a "cancel").

Perhaps what we need is an onDone and onCancel for the two cases, respectively.

ibash commented 9 years ago

@jeromew as discussed here unix sort makes temporary files so it doesn't have to hold everything in memory.

@vqvu exactly -- not sure the best syntax for it, whether it should be a different value pushed to the consumer like _.cancel or something else.

LewisJEllis commented 9 years ago

@vqvu you mean for onDone and onCancel to be handlers passed to .consume? So the .consume example from the docs would become something like:

var filter = function (f, source) {
    return source.consume({
        onDone: function (push, next) {
            push(null, _.nil);
        },
        handle: function (err, x, push, next) {
            if (err) {
                // pass errors along the stream and consume next value
                push(err);
                next();
            }
            else {
                // pass on the value only if the value passes the predicate
                if (f(x)) {
                    push(null, x);
                }
                next();
            }
        }
    });
};

My first thought was to use an additional special value (like _.cancel), but that would require modifying all the existing .consume implementations to handle that case. That might actually be a good thing, since some would treat it the same as _.nil and some would treat it differently - but it seems pretty boilerplatey for consumes that just go push(null, nil) in either case.

Another option might be to make the idea of a cancel live inside the idea of a nil - replace x === nil with _.isNil(x), which would return true in either the cancel or the nil case, and then have another way to determine if the nil is also a cancel. That way, consumes wouldn't have to care about the cancel case unless they behave differently for it.

I think out of these three options, I like the handlers the best, since consumes could have the option of just not passing the handlers if they don't have special behavior (and they'd default to something like push(null, nil)). That simplifies the boilerplate required for most consumes, and it also feels more like the through API, which takes a write handler and an end handler.

vqvu commented 9 years ago

I'm not sure on the exact API yet, I like handlers better than special values too. Using a special value may be complicated in implementation.

I was thinking more along the lines of

stream.consume(...).onCancel(...);

// Unlike done, onDone just registers a handler. It doesn't cause a thunk.
stream.consume(...).onDone(...);

I like this for a few reasons:

  1. keep backcompat.
  2. We can register an onCancel handler without having to write a consume handler.

We could also go with your handlers-to-consume option and assume that an undefined onCancel means "just push _.nil" to keep backcompat.

ibash commented 9 years ago

@vqvu I dig that syntax. Still on the fence about having a separate error handler, though you can do that with the current api using .errors, it's just a bit awkward.

ibash commented 9 years ago

As another datapoint, I ended up implementing the unix-sort / unix-join. _.nil is not back propagated so there could be some issue with slowly leaking resources.

jgrund commented 9 years ago

At the very least, how about Stream.end() walking back up the stream.source property until it hits null. Then it could write nil to that stream, which should flow back down.

jgrund commented 9 years ago

Or we could have Stream.write do the same if it encounters a nil.

vqvu commented 9 years ago

Writing nil won't work if the streams have buffered data. Not to mention that it won't even be emitted until downstream requests another element (which it won't because it's ended). And then we'd have to handle forks in a special way.

It also won't cover the important "generator holding a resource" case.

Plus, some transforms are implemented with pull and not consume, so walking up stream.source may not even reach the true source.

ibash commented 9 years ago

After working with highland a bit more I think I prefer the syntax recommended here: https://github.com/caolan/highland/issues/172#issuecomment-69215140

I'm also wondering if the cancel is actually needed -- yes, you could waste a bit of work, but I'm not sure the complication of having separate handlers out weighs the benefits. If there was an onDone and onCancel hnadler -- do you need to have both clean up resources, to play it safe? Or can you get away with just implementing onDone?

vqvu commented 9 years ago

The chaining syntax

Why do you prefer the other syntax?


Not sure about having separate handlers. The "clean up resources" and "fork" use cases that we have would work with just a unified onDestroy handler, and I agree that having one handler is better if we don't need two.

How about having a unified onDestroy (name subject to change) and just passing in the reason (caused by downstream or upstream) to the handler as an argument. That would allow handlers to differentiate from the two cases if they want to, and not if they don't.

vqvu commented 9 years ago

Or just don't bother passing a reason at all until someone comes up with a good use case for doing so...

The minimalist in me probably prefers this more.

ibash commented 9 years ago

In my comment above I linked to the chaining syntax -- I prefer it as well, sorry for the misunderstanding. +1 to doing as little as possible until there's a good reason for it.

vqvu commented 9 years ago

Oh, sorry! My browser must have been misbehaving.

jgrund commented 9 years ago

The case I'm always hitting is needing to rebind destroy to the last transform in a chain so I can destroy the resource as well. something like:

  stream = _(function (push, next) {  });

 stream._destructors.push(function () {
   console.log('resource cleanup happens here.');
 });

 s2 =  stream.map(function () {...});

 s2.destroy = stream.destroy.bind(stream);

@vqvu You are correct that we would have to resume to get the nil flushed down if the stream is paused and internal pulls would pose a problem.

Given the onDestroy syntax, how would that work in the context of walking back up through a fork?

Say we had a stream that could have forks that are added and destroyed dynamically.

How would one decide the difference between "this fork is done" and "this stream is done"?

ibash commented 9 years ago

@jgrund what do you use forks for? I'm curious as to whether they could be replaced with an observe...?

jgrund commented 9 years ago

@ibash observe would have the same problem, would it not? We would still need to remove the observe at some point when we are done with them.

jgrund commented 9 years ago

FWIW I am using them to present data differently in different parts of my application.

vqvu commented 9 years ago

@jgrund you're right that you wouldn't be able to add forks dynamically. The stream would be destroyed once all forks are destroyed.

The way I see highland being used is that you set up all of the transforms that you need, then call something that causes a resume. Once you resume a stream, you really shouldn't add any more transforms to it.

Are you doing anything that really necessitates dynamic creation of forks after data has started flowing?

vqvu commented 9 years ago

Also, one of the new restrictions in 3.0 will be that once you add a transform to a stream (not a fork), you cannot add another. More specifically, consume may be called at most once on any stream, and may not be called after a stream has started flowing, like fork.

This is precisely to solve a similar problem we had with an early-terminating transform like take possibly letting you apply it more than once to a stream.

var s = _([1, 2, 3]);

// Take one
s.take(1).toArray(function (a) {
    // Take two more. This second take works now,
    // but won't in 3.0.
    s.take(2);
});
vqvu commented 9 years ago

After some thought, I guess you would need dynamic forking if your views were dynamically created. And you may want the shared back pressure to synchronize your views. This is probably a good enough case to allow something like dynamic forking.

Perhaps a broadcast method that consumes the current stream and returns an object that has

  1. new() - create a new viewer stream that see the values emitted by the parent.
  2. destroy() - destroy the parent. No new viewers can be created after this.

The streams are synchronized via shared back pressure like a fork, but

  1. They can be created dynamically, after data has started flowing.
  2. The source does not get destroyed even when there are no viewers.
  3. If there are no viewers, the source will be paused.
  4. It is up to the user of publish to destroy the source once they are done with it.

Implementation-wise, the current 3.0 code already uses a similar kind of "fork manager", so this should be easy to do.

jgrund commented 9 years ago

@vqvu Nice, I like that approach.

One thing I would add is it would be nice if broadcast could remember the last thing that passed through it, so when a new viewer attaches they can receive that last token. Otherwise a viewer may be left hanging until another value is available.

vqvu commented 9 years ago

I'm hesitant to add specific functionality like that, since other people may not want it.

One way to get what you want is to just implement it yourself. Something like this

var s = makeSource();
var latest = _.nil;
s.observe().consume(function (err, x, push, next) {
    if (!err) {
        latest = x;
    }
    if (x !== _.nil) {
        next();
    }
}).resume();
var broadcast = s.broadcast();

function newViewer() {
    var view = broadcast.new();
    return _(function (push, next) {
        if (latest !== _.nil) {
            push(null, latest);
        }
        next(view);
    });
}

On Feb 24, 2015 9:47 AM, "Joe Grund" notifications@github.com wrote:

@vqvu https://github.com/vqvu Nice, I like that approach.

One thing I would add is it would be nice if broadcast could remember the last thing that passed through it, so when a new viewer attaches they can receive that last token. Otherwise a viewer may be left hanging until another value is available.

— Reply to this email directly or view it on GitHub https://github.com/caolan/highland/issues/172#issuecomment-75688927.

vqvu commented 9 years ago

Also, if you don't need the shared back pressure, using observers will also work. Observers will never destroy their source. This fits with the current behavior since they don't resume their source either. They're just passive observers.

ibash commented 9 years ago

Is there any consensus on what this api should look like?

vqvu commented 9 years ago

I like

s.onDestroy(function () { /* Do destroy stuff */ })
    .map(...)
    ...;

Called after s emits nil or one of s's downstream consumers emits nil, whichever comes first.

I think we should go this way unless there is a significant objection (no comment == implicit approval).

svozza commented 9 years ago

Sounds good to me.

ibash commented 9 years ago

+1