dvlsg / async-csp

CSP style channels using ES7 async/await
MIT License
317 stars 18 forks source link

Branching from channels in a pipeline. #4

Closed trusktr closed 8 years ago

trusktr commented 8 years ago

I haven't actually needed to do this yet, but the idea popped into my head while reading the examples:

Channel.pipeline creates a pipeline, and returns a 2-element array with the first and last channels of the pipeline.

What about returning an array containing all the channels in the pipeline so that we may branch from the pipeline at any point? f.e.:

let [ ch1, ch2, ch3 ] = Channel.pipeline(
    x => x + 2,
    x => x.toString(),
    x => ({ x })
);

let ch4 = new Channel(x => [x, x+2])
ch2.pipe(ch4) // branch.

/*
    +---+
    |ch1|
    +---+
      |
      |
      V
    +---+
    |ch2|-----+
    +---+     |
      |       |
      |       |
      V       V
    +---+   +---+
    |ch3|   |ch4|
    +---+   +---+
*/
dvlsg commented 8 years ago

The thought certainly crossed my mind. I also haven't needed it yet, since I find myself creating channels separately and setting up the pipes more often than using pipeline. Returning all channels probably makes the most sense -- especially since destructuring is essentially guaranteed to be available if you're using async/await.

Technically, you can get access to ch2 in this scenario by doing something like this:

let [ ch1, ch3 ] = Channel.pipeline(
    x => x + 2,
    x => x.toString(),
    x => ({ x })
);

let [ ch2 ] = ch1.pipeline; // pipeline will be an array containing ch2. similarly, ch2's pipeline will contain [ ch3 ]
let ch4 = new Channel(x => [x, x+2])
ch2.pipe(ch4) // branch.

But that feels wrong / hacky / annoying to type.

The downside of returning all channels might be additional difficulty for an end-user with a longer pipeline that really only cares about the entry and exit points. I suspect your case may be more common than this one, however.

trusktr commented 8 years ago

What about creating pipelines from channels with Channel.pipeline? f.e.:

let [ ch1, ch3 ] = Channel.pipeline(ch1, ch2, ch3) // each channel may have a transform.

In that case we already have a ref to ch2 to branch from at some later point.

trusktr commented 8 years ago

By the way, the reason I'm really interested in this async/await channel approach you've got going is that some of us in the community are working on http://infamous.io (name of the group and libraries subject to change) which will let users create a scene graph (basically a tree where each node represents something that can be rendered, and a node in the tree can have any number of child nodes. I have a feeling that async-csp's channel piping might possibly be a good fit for eventing along the tree branches, but I'm not entirely sure yet. There might be two channels per node, one for pipes that merge up the tree, and the other for pipes that split down the tree. I'm still thinking of ideas around this.

dvlsg commented 8 years ago

Piping from channels is certainly fair, and would be very simple to add. I was also considering allowing Channel#pipe() to accept functions and creating channels from them (or just passing the args along to the Channel constructor if the first arg isn't an instance of a Channel).

Personally, I prefer this style, but I know there is a reasonable amount of repeated code which could go away with some of the suggestions above:

let ch1 = new Channel(x => x);
let ch2 = new Channel(x => x);
let ch3 = new Channel(x => x);
ch1.pipe(ch2).pipe(ch3);

Oh nice. I'll have to check that out when I get a chance.

trusktr commented 8 years ago

I like it too for manual things, but ch1.pipe(ch2).pipe(ch3) could get unwieldy with unknown numbers of channels. Suppose we have the array channels of arbitrary and/or unknown length, we could write.

channels.forEach((channel, index) => {
  if (index+1 !== channels.length)
    channel.pipe(channels[index+1])
})

We can see where Channel.pipeline becomes handy.

But anyway, I haven't even needed this yet! Let me actually put this thing to real-world use first... x]

petebacondarwin commented 8 years ago

I would steer clear of returning all the channels. Surely the most common use case is to grab the first and last channel? In which case receiving a arbitrary length array would make getting the first and last less obvious ...

let [ ch1, , ch3 ] = Channel.pipeline(
    x => x + 2,
    x => x.toString(),
    x => ({ x })
);
dvlsg commented 8 years ago

I think I agree. After sitting on it, it seems like Channel.pipeline() should be used to handle any number of callbacks. If we need access to a channel in the middle, you probably already know how the channels should be setup, and have the capability to set them up ahead of time to get reference to the middle channels.

I will continue to return [ first, last ] from Channel.pipeline() for now. I will also add support for passing Channel as one of the arguments in an upcoming minor release (or maybe a patch release, since I'm still cheating with the 0.X.X semver).

trusktr commented 8 years ago

@petebacondarwin

Surely the most common use case is to grab the first and last channel?

It's possible. Maybe better safe than sorry by giving all of them?

In which case receiving a arbitrary length array would make getting the first and last less obvious ...

It's more clear without destructuring in that case:

let pipeline = Channel.pipeline(
    x => x + 2,
    x => x.toString(),
    x => ({ x })
);

let first = pipeline[0]
let last = pipeline[pipeline.length-1]
dvlsg commented 8 years ago

I feel like the destructuring of first and last does lead itself to nice clean code for pulling first and last out of the pipeline, but I also agree that if you do have a use case for picking created channels out of the middle of the pipeline that declaring all the channels ahead of time would be fairly more verbose as well. I think I'm going to sit on returning [ first, last ] for now, if for no other reason than maintaining backwards compatibility. I'm not completely ruling it out, though, for a future patch. I could definitely understand the argument that it would be more intuitive to return an array with the same length as the original input.

By the way, do you have any use cases that wouldn't be covered by allowing something like this?

let ch2 = new Channel(x => x.toString());
let [ first, last ] = Channel.pipeline(
    x => x + 2,
    ch2,
    x => ({ x })
);

I suppose it gets more verbose the more channels you have in the middle, but I'm having a hard time coming up with scenarios where you'd actually want to pick channels out of the middle of a pipeline by position, unless you already had the channels ahead of time and already knew from which one you wanted to branch.

petebacondarwin commented 8 years ago

Maybe better safe than sorry by giving all of them?

One should optimize for the most common use case. Even you have said that you haven't yet need to access the channels inside the pipeline, which implies that in the vast majority of cases you just want the first and last.

Your example demonstrates how unwieldy it would be for this use case if you only wanted the first and last. My suggestion of destructuring was my best attempt at making this not so ugly. BTW one could also use shift() and pop() to get the first and last:

let channels = Channel.pipeline(trans1, trans2, trans3);
let first = channels.shift();
let last = channels.pop();

But this is still verbose compared to simply:

let [first, last] = Channel.pipeline(trans1, trans2, trans3);

@dvlsg's idea of allowing channels to be provided to the pipeline is cleaner:

let ch1 = new Channel(trans1);
let ch2 = new Channel(trans2);
let ch3 = new Channel(trans3)
let [first, last] = Channel.pipeline(ch1, ch2, ch3);
petebacondarwin commented 8 years ago

If you knew you wanted to branch at a certain point then it might be better to make joining two pipelines simpler:

Currently...

let [in, out] = Channel.pipeline(transArray1);
let [in2, out2] = Channel.pipeline(transArray2);
out.pipe(in2);

Perhaps...

let pipeline1 = Channel.pipeline(transArray1);
let pipeline2 = Channel.pipeline(transArray2);
pipeline1.pipe(pipeline2);

I guess what I am saying is that a pipeline could really be considered just a single external channel, which happens to map .put() onto the first internal channel and .take() onto the last internal channel.

dvlsg commented 8 years ago

Absolutely, it could be considered that way. I did consider making Channel.pipeline() return a single channel that used the pipe of provided functions / channels as its internal async transform. I opted against doing that for simplicity, maintainability, and performance reasons, though. It was a hit in all 3 of those categories (in my opinion) for a slight convenience, so I didn't think it was worth it. May be worth a revisit, now that the internals of the code have settled / simplified a bit.

Simplicity issue: A lot of the complexity in trying to support a wrapper Channel came from the ability to close a parent pipe which would (optionally) result in triggering the child pipes to know that know more data would be coming in, and they could close as well, as soon as they caught up (similar to nodejs streams). We get that functionality for free when a pipeline is a simple set of channels piping to each other, but it requires some pretty serious shenanigans to pull off when they are wrapped in a single channel. The ability to close an entire pipe with one call to the parent when incoming data is finished is not functionality I'm willing to lose, as well.

Maintainability issue: Another option was to return an interface object which allowed you to call #put() and #take() on the interface, which would send the methods to the proper beginning / ending of the pipeline, but I would either have to maintain an interface which adheres to every Channel method (which still wouldn't cover the people who prefer calling the static methods like Channel.take(ch)), or accept only allowing certain methods (such as #put() and #take()) on that interface. Neither of those seem like great options to me either.

dvlsg commented 8 years ago

Here, this may do a better job of explaining what I had originally attempted to do (with some of the arg manipulation taken out -- just assume all transformers are being passed as arguments for now, not an array as the first argument):

static pipelineWithWrapper(...args) {
    let channels = args
        .filter(x => x instanceof Function || x instanceof Channel)
        .map(x => x instanceof Channel ? x : new Channel(x));
    let first = channels[0];
    let last = channels.reduce((x, y) => x.pipe(y));
    let piped = new Channel(async x => {
        first.put(x);
        return last.take();
    });
    return piped;
}

This is functional, as far as #put() and #take() go, but what should happen if ch.close(true) is called to try to empty out the entire pipe? I could set piped.pipeline to point to first.pipeline (either directly or through an array which contains first), but we're not actually piping from the wrapper channel to the first channel, so that falls apart pretty quickly, since the closing of the child pipes occurs after piping from parent to children.

Here's an attempt with a partially implemented interface object returned:

static pipelineWithInterface(...args) {
    let channels = args
        .filter(x => x instanceof Function || x instanceof Channel)
        .map(x => x instanceof Channel ? x : new Channel(x));
    let first = channels[0];
    let last = channels.reduce((x, y) => x.pipe(y));
    let piped = {};
    piped.put = first.put.bind(first);
    piped.take = last.take.bind(last);
    piped.close = first.close.bind(first);
    piped.done = last.done.bind(last);
    piped.pipeline = first.pipeline;
    return piped;
}

I think the maintainability problems are fairly clear here, and this isn't really a road I want to go down.

Part of me is tempted to just let Channel#pipe() accept a callback, so code like this will just work (it already works with a Channel arg, so it would just need an extension to accept a function and turn it into a Channel on the fly). I know it still doesn't completely solve all the issues brought up, but it may alleviate some of the need for Channel.pipeline() in the first place.

let first = new Channel(x => x + 1);
let last = first
    .pipe(x => x + 2)
    /* ... */
    .pipe(x => x + 3);
trusktr commented 8 years ago

Interesting ideas! Yeah, I couldn't really think of a good case for when accessing middle channels would be handy. True, that if we're constructing the pipeline, then we can just save the reference to the middle channel that we want, as in your example:

let ch2 = new Channel(x => x.toString());
let [ first, last ] = Channel.pipeline(
    x => x + 2,
    ch2,
    x => ({ x })
);

Now what if the pipeline is something being imported?

import somePipe from 'somewhere'

Now, we don't have references to the middle transforms/channels here, but, I also don't see a good reason to know what the middle channels are since the point of the pipe is to use the overall transform. There would have to be some sort of documentation on the somewhere library describing what channels are in the middle of the pipe and at which positions. At that point, it might just be better to export and document those channels individually rather then telling people to grab them from a certain index in a pipeline.

So yeah, I think first and last is very likely to be all we ever need. :+1:

dvlsg commented 8 years ago

Cool. I'm going to close this out then. We will be able to use a Channel with Channel.pipeline() and the use a Function with Channel#pipe() with the next publish (it's in github, but not npm yet). Planning on publishing the next version at some point this week.