dvlsg / async-csp

CSP style channels using ES7 async/await
MIT License
317 stars 18 forks source link

The "alts" method. #3

Closed trusktr closed 8 years ago

trusktr commented 8 years ago

In this article section (which uses a generator-based lib instead of async/await) they use an alts method in the second example.

How might we do the same as that alts example with async-csp? Here's js-csp's alts docs.

dvlsg commented 8 years ago

Just to make sure I understand the usage of alts, is it similar to Promise.race, where you intend to wait for the first response from 1 or more inputs, and stop blocking as soon as one response is available?

trusktr commented 8 years ago

Yep, that seems right.

Here's an attempt using Channel.merge:

import Channel from 'async-csp'

function listenTo(el, type) {
  let ch = new Channel()
  el.addEventListener(type, event => ch.put(event))
  return ch
}

~async function() {
  let el = document.querySelector('#ui2');
  let mousech = listenTo(el, 'mousemove');
  let clickch = listenTo(el, 'click');
  let allEvents = Channel.merge(mousech, clickch)
  let mousePos = [0, 0];
  let clickPos = [0, 0];

  while(true) {
    let event = await allEvents.take();
    if(event.type === 'mousemove') {
      mousePos = [event.layerX || event.clientX, event.layerY || event.clientY];
    }
    else {
      clickPos = [event.layerX || event.clientX, event.layerY || event.clientY];
    }
    el.innerHTML = (mousePos[0] + ', ' + mousePos[1] + ' — ' +
                    clickPos[0] + ', ' + clickPos[1]);
  }
}()

The only real difference with this approach is that we don't get a reference to the channel that acted first, just the value that went through any channel first, so we're comparing event.type === 'mousemove' instead of v.channel === mousech. I think that's just fine.

I like the semantics of async-csp's piping a whole lot more than alts.

dvlsg commented 8 years ago

Yeah, Channel.merge was the first thought that came to my mind. It's not quite the same, since it seems like with alts only one of the values will resolve, but merge will continue to accept values from both entry points.

I think you could probably wire something up with Promise.race() and two Channel#take()s (one from each channel), but you'd have the same potential issue of not knowing which channel the value came from.

Might be worth adding some sort of race or alts static method, still -- especially in the timeout cancel scenario they provided. It would likely just be a wrapper around some Promise.race() logic, though. I'll think on it.

trusktr commented 8 years ago

When using Channel.merge, I think we can distinguish the channels if we modify listenTo to

function listenTo(el, type) {
  let ch = new Channel()
  el.addEventListener(type, event => ch.put([ch, event]))
  return ch
}

and modify the other part accordingly. I think that's fair, but requires the channel input sources to account for it.

I'm not even sure what alts stands for. A name whose meaning is more obvious might be nicer to use, if you do decide to make such a function.

dvlsg commented 8 years ago

Looks like alts may be from clojure perhaps? I don't have any experience with clojure, so I'm not 100% on that.

I am definitely leaning towards race (if it gets added), due to the already existing/upcoming Promise.race() which strikes me as similar (functionality-wise, anyways).

trusktr commented 8 years ago

Promise.race seems to give us only the winning resolved value, but not a ref to the Promise that actually won, so race in that manner would be unlike alts, and would be the same as merging pipes (but delightfully less verbose). race would be a nice shortcut, and it's way more semantic than alts. It's got my vote. :]

bbarr commented 8 years ago

@dvlsg (and @trusktr) I see we have very similar efforts going here with async/await CSP implementations (I am working on medium)

Anyway, check out my implementation of any(ch1, ch2, ch3...) here, it is my take on "alts".. you can check the tests for how it handles the different cases (pending values, etc). Feel free to tell me if I missed something in the implementation, or if not, feel free to rip it off :)

dvlsg commented 8 years ago

@bbarr Oh that's interesting -- we started our projects at almost exactly the same date, too, haha. I see you opted to return the values as an array of [ val, channelReference ]. I know js-csp opts for returning an object containing value and channel properties. I'm still leaning towards leaving it alone, and allowing the user to collect a reference to the original channel if they need it, so its up to the user to decide how they want the channel reference formatted.

I've started taking a closer look at this last night, and the more I simplify the code the more I realize it's turning into a loose wrapper for Promise.race() that automatically expands any Channel in the array of arguments into the promise from Channel#take() and passes it along to Promise.race(). This allows for the usage of any number of promises mixed in with any number of channels.

It's ending up a lot simpler than I was imagining. In fact, this is as simple as I can make it while still passing the few unit tests I have written up:


// defined on class Channel
static race(...args: Array<any>) {
    return Promise.race(args.map(arg => arg instanceof Channel ? arg.take() : arg));
}

//...

let ch1 = new Channel(async x => x);
ch1.put(1);
let val = await Channel.race(timeout(5), ch1); // assume timeout returns a promise which resolves after 5ms
assert.equal(val, 1);

The only other downside I can think of is that if you race between multiple channels, all of those Channel#take()s are going to get executed and values will be taken from each channel. Only the fastest will be returned (as expected), but the remainder will just be thrown to the garbage collector after being taken from the channel. Personally I think that's acceptable, since Channel.merge() should cover the use-case where no values should be dropped. Are there any objections to this?

trusktr commented 8 years ago

@bbarr

On this line it seems like you're choosing a random channel to wait on? If so, that seems like it would not necessarily act on the first channel that finishes, but only on the chosen channel, which I don't think would be the same as Go's alts. Did I misread it?

@dvlsg

a loose wrapper for Promise.race()

What if you pass an arg that isn't a Promise? Will it resolve immediately to that value (therefore ignoring any actual channels and promises?

Only the fastest will be returned (as expected), but the remainder will just be thrown to the garbage collector after being taken from the channel. Personally I think that's acceptable, since Channel.merge() should cover the use-case where no values should be dropped. Are there any objections to this?

I think that's acceptable. It could obviously be solved internally with a Channel.merge(). What does Go's alts do? js-csp's alts also has the ability to perform a put, not just a take. That might be nice to support too. Using the array notation would limit arrays from being a static value passed to Promise.race().

let val = await Channel.race(timeout(5), ch1, [ch2, valueToPut]);

but that would limit arrays from being passed in to Promise.race. You could detect an special Object literal:

let val = await Channel.race(timeout(5), ch1, {put: ch2, val: valueToPut});
// or
let val = await Channel.race(timeout(5), ch1, {put: [ch2, valueToPut]});

but that's more verbose. I think the array syntax outweighs the need for primitives. What do you think?

dvlsg commented 8 years ago

That should depend on the implementation of Promise.race(), but I believe it actually prefers resolved promises over values (or at least the Babel polyfilled version does).

Try this -- I see 1 consistently in the babel repl, and I think I would expect 2 if the standard values were preferred over resolved promises, since it comes before the promise resolving to 1 in the array provided to race.

async function run() {
  let p1 = Promise.resolve(1);
  let p2 = 2;
  let p3 = new Promise(resolve => {
    setTimeout(resolve, 0);
  });
  let val = await Promise.race([p3, p2, p1]);
  console.log(val);
}

run();

Of course, if you used a value with channels/promises that don't resolve immediately, then it would probably resolve to the value first -- but I think that is what I would expect.

As for put, since it returns a promise, you could skip to something like this:

let val = await Channel.race(timeout(5), ch1, ch2.put(valueToPut));

Although, the return value of that probably isn't super helpful. I believe it would be undefined if the put wins.

trusktr commented 8 years ago

but I believe it actually prefers resolved promises over values (or at least the Babel polyfilled version does).

I think the native implementations in Chrome and Firefox are converting primitives to resolved Promises:

Promise.race([4, 5]).then(val => console.log(val))
Promise.race([5, 4]).then(val => console.log(val))
// output:
// 4
// 5

So this is the same as your example (this is what the browser seems to be doing to primitives, as if I didn't insert that extra Promise.resolve()):

async function run() {
  let p1 = Promise.resolve(1);
  let p2 = 2;
  let p3 = new Promise(resolve => {
    setTimeout(resolve, 0);
  });
  let val = await Promise.race([p3, Promise.resolve(p2), p1]);
                                   // ^ here
  console.log(val);
}

run();
trusktr commented 8 years ago

As for put, since it returns a promise, you could skip to something like this:

let val = await Channel.race(timeout(5), ch1, ch2.put(valueToPut));

Ah, makes sense! Maybe, if Channel.race detects one parameter is the Promise to a put, it can return something special in that case (whatever that may be? Maybe just the value that was put?).

Maybe you can just to what js-csp's alts does:

"Returns" an object with 2 properties: The channel of the succeeding operation, and the value returned by the corresponding put/take operation.

Since we already have Promise.race, which means we can already do

let val = await Promise.race(timeout(5), ch1.take(), ch2.put(valueToPut))

it might make sense to have Channel.race do the extra thing of providing the two values?

trusktr commented 8 years ago

What does Channel#put's promise resolve to currently? That (if it exists) isn't covered in the README yet. It could resolve to true if an item was successfully put into a channel, false if the put failed (f.e. the channel was already closed).

let value = await channel.put(1)
console.log(value) // logs true
channel.close()
value = await channel.put(2)
console.log(value) // logs false

^ That could be convenient if you don't have control of the channel, but are just using a channel given to you:

// as long as some channel is accepting values:
while (await channel.put(2)) {
  // ...
}
trusktr commented 8 years ago

Speaking of Promise/Channel.race, here's an interesting syntax I haven't seen before:

async function getData() {
    let data = await {
        model1: new MyModel('something'),
        model2: [new MyModel(1), new MyModel(2)]
    };

    console.log(data); //{model1: 'something result', model2: ['result 1', 'result 2']} 
}

found here. What's going on there, some type of structuring? It seems like that could be an alternative to using Promise/Channel.race at all.

bbarr commented 8 years ago

@trusktr I believe that handling of object syntax is courtesy of "bluebird-co", which does some fancy implicit handling of values given to yield (and by extension, await).. So, an interesting idea, but not "native".

Also, in regards to the "random" line in Medium's any function, this is only run if there are multiple puts already pending in the given channels. I believe core-async does it this way, and the intention is to prevent developers from accidentally relying on a specific resolution order that commonly occurs.. It should always be unknowable, which channel is going to win.

I am going to mess around a bit with the code and Promise.race today, will post some more ideas if I have any!

dvlsg commented 8 years ago

@trusktr The object is special syntax, similarly to how co handles yielding objects. Natively, I believe the closest method we have is Promise.all, but neither all nor the object syntax would resolve immediately when the first channel/promise/whatever resolves, which is what we want (I believe) with Channel#race().

dvlsg commented 8 years ago

@bbarr the random seems like a reasonable option. Aside from the random selection, I would be concerned about race skipping it's place in line (so to speak) if I accessed the buffers directly, though. So for example, if a race gets executed while the internal take queue is not empty, and it sneaks a value off the buffer when that value should have gone to the first take in the queue instead.

May not actually be a problem. I'd have to trace through some of the sliding methods to see if that could actually happen in my case, but at first glance I could definitely see it being a (rare) problem.

bbarr commented 8 years ago

So the more I look at this, the more I think it is important to only allow one channel operation to actually execute. This is how core-async does it, and without this, it gives alt (or any) this weird quantum uncertainty feeling, with data going away just because we asked for it. I just pushed up an implementation on my end with a few tests (supports receiving channels for takes, promises, and [ ch, val ] for puts), and I am going to see if I run into any major issues building something bigger with it. If you can solve things in a similar fashion using race, I would be excited to see how, because it is beyond me :)

dvlsg commented 8 years ago

Here is the main problem I see: async-csp supports asynchronous transforms, I intend to continue supporting them as a main feature of the library, and they really don't seem to mesh well with the concept of alts (or maybe they do, and I'm just not seeing the solution yet).

Using a Channel, we can assume that by the time the value reaches the buffer, it has been fully transformed. However, not every Channel uses a buffer (per a recent update). In those cases, the potentially asynchronous transforms will not begin executing until a take is made available.

For the sake of an example:

let count = 0;
let doubler = new Channel(async x => {
    count++;
    await timeout(200); // fake async work
    return x * 2;
});
let tripler = new Channel(async x => {
    count++;
    await timeout(400); // fake async work
    return x * 3;
});

doubler.put(1);
tripler.put(1);

let winner = await Channel.race(tripler, doubler);
console.log(winner); // => 2, taken from doubler
console.log(count); // => 2, incremented once from each channel

With the way the code is currently set up, both the puts from doubler and tripler must be started via takes in order to properly receive the first resolution.

We could do one of a couple things (I'm sure there are others, but these are the ones that come to mind), but I still see lots of potential issues. We could make both put and take cancel-able, but if a put is canceled in the middle of execution and there are any side effects in the transformer (heaven forbid, but possible), I don't see any way that we could possibly revert those changes.

On the other hand, we could allow every put to continue on to completion, then re-encapsulate them in a special wrapper, putting them back on the puts. However, that could cause an unintended/unexpected resolution of the original put. We would also have to know in what order to put the values back. That could get complicated very quickly, especially if other values/puts arrive during the execution of any of the asynchronous transformers being executed during race/alts.

I'm open to suggestions, but I'm not seeing any nice way to encapsulate all the requirements without losing anything (ie - ditching async transforms, and assuming any value in the puts buffer can be gathered synchronously).

If nothing else, then Channel.merge() can at least be used in the interim. It will still accomplish the end goal of collecting values from multiple channels in order of completion without losing any of them, albeit with slightly different syntax. The cancel timeout execution can be achieved with Promise.race(ch.take(), timeout(500));. The mixed put and take may not be solvable, but I'm having a hard time thinking of a legitimate use case for that, so that's pretty low on my list.

dvlsg commented 8 years ago

How do we feel about continuing support for async transforms / expanding one put into multiple values? My original goal was to make piping channels as simple as through2 streams, including the ability to work asynchronously and to expand a single input into multiple outputs.

I think the extended transforms add a lot of value / ease of use (especially when working with piped channels) and have come in handy during my personal use of this library, but the complications / inability to properly support alts makes me wonder if they are worth the hassle, or if they should be pushed off of the library to be handled by the user externally.

Would love to get some thoughts on the matter.

trusktr commented 8 years ago

It's tricky. If we're taking from pipes (in the Channel.race) that receive UI events, then it may not always matter if values taken from the racing channels are lost (similarly to how we care about losing events from the UI when we're using debounced functions as listeners).

Maybe you can add an API to Channels, say Channel#insert (for lack of another name at the moment) that lets the user put a value into a channel at the front of the channel instead of at the end, then in Channel#race, you can just insert the unused items back into the channels synchronously after the await Promise.race() call? Hmmm, but how would we be able to prevent something else from taking from a channel that may have completed before all the channel racing have completed?

trusktr commented 8 years ago

Maybe Channels can have a Channel#cork (for lack of a name, and visually shows the concept) method that blocks things from coming out of the channel. If something takes from the channel, it will be blocked, but while the channel's flow is stopped (has a cork in it) other mechanisms can put onto the back and/or insert onto the front of the channel, then when the channel is finally unblocked (Channel#block?) the call to take will get the value that is currently at the front of the channel (which might have been modified while the channel was blocked).

Then, you can use this in the Channel#race implementation to block each channel as soon as the take() call on each channel is complete, and place the taken value back onto the front of the channel. The Channel#block/cork method would need to do something internal to prevent the next promise from fulfilling in case there's already other things that have called take() on the channel (is that possible?).

Or perhaps there might be a Channel#takeAndBlock method if that makes more sense for the implementation needed in order to take and imultaneously block before the next promise can resolve?

Just throwing some ideas out there. I wish I had time to try an implementation.

dvlsg commented 8 years ago

I think re-inserting the values into puts (or buffer) is probably the simplest way to go about this. It's a bit odd, and we'd need to watch out for accidentally re-transforming the values (not an issue with a buffer, but would be with the puts). I'm still a bit worried about accidentally resolving a put when we didn't intend to, though -- like this scenario:

let ch1 = new Channel();
let ch2 = new Channel();
(async() => {
    await ch1.put(1);
    console.log('made it passed ch1 put!');
})();
(async() => {
    await ch2.put(2);
    console.log('made passed ch2 put!');
})();
(async() => {
    let val = await Channel.race([ch1, ch2]);
})();

In that case, I'm almost positive both of the console.log lines will be executed, even if we put the value which wasn't taken back onto the channel. There may be a way to squeeze in a property which would tell the remaining puts to not actually resolve when they finish running any asynchronous transform (similar to block/cork, but for put instead of take). Or we could just allow the put to be resolved for simplicity's sake. Might not be the end of the world, but it does break outside of what I would expect, I think. Hm.

I'm short on time as well, unfortunately, between holidays and impending work deadlines. But! I will certainly keep it on the list of implementations to attempt as soon as I do have time.

bbarr commented 8 years ago

So, question about the async transforms. Do puts resolve when a value starts to be asynchronously processed, or do they wait unit it has been processed and taken or placed in a buffer?

dvlsg commented 8 years ago

Right now, a put gets resolved only after the transform is completed (async or not). The transform will not start until there is at least one "space" available (either in the form of a take or an empty spot on the buffer). I believe right now that even with asynchronous transforms, only one transform can be executing at a time. So to answer your question -- the second statement in the question is how it works.

Well, that's how it works in most cases, anyways. Expanding a single input to multiple outputs is slightly more complicated, but isn't necessary relevant right now.

To get around it, I'm thinking that one possible solution would be to have the ability to commit the transformed values, or decide to hang onto them for later usage. It would be a bit wonky, but it would prevent us from wasting execution time by needing to re-execute transforms, and it would prevent the put from resolving. And then the extra take floating around would have to go away. I think that would work. Maybe. Hard to say before starting on it, but it might be best to get a solution that works first, and then worry about making it right later.

trusktr commented 8 years ago

I believe right now that even with asynchronous transforms, only one transform can be executing at a time.

Interesting. Yeah, because the transform code bits are synchronous even if the API is "asynchronous", so if we are only ever on a single thread, it'd be impossible for transform statements to happen simultaneously.

dvlsg commented 8 years ago

Yup. ... Well, sort of. If the transforms were asynchronous it would teeeeechnically be possible like this:

let ch1 = new Channel(async x => {
    console.log('starting transform');
    await timeout(500);
    return x;
});
(async() => {
    ch1.put(1);
    ch1.put(2);
    let take1 = ch1.take();
    let take2 = ch1.take();
})();

take1 will kick off the internal processing. take2 will actually also attempt to kick off internal processing while the first async transform is running, but the channel won't actually move on to the next item while it is processing another. Technically we could make it work that way, even with javascript's single threading, but that would be undesired behavior in this case (in my opinion).

trusktr commented 8 years ago

Interesting. If you did let it process the second take at the same time, then

    let take1 = await ch1.take();
    let take2 = await ch1.take();

would be in sequence, but

let take1, take2
let winner = await Promise.race(take1 = ch1.take(), take2 = ch1.take())

could go either way. Is that what you mean? And that you'd prefer for the race to always result in take1 winning (due to the internal enforcement of the order)?

dvlsg commented 8 years ago

Yup. And there are other subtleties like the fact that transforms can drop values, so the first put may not actually have a value prepared for the first take, in which case the second put would be handed to the first take, and other little issues like that. I don't think a channel should work on more than one value at a time anyways. I think async transforms are pretty rare anyways. I've only had to use them once or twice (but they were very handy to have available when I needed them).

We could get more aggressive about running transforms as soon as a put becomes available regardless of whether or not a take or a buffer space was available, but I think I'd prefer the execution to be lazy-loading.

trusktr commented 8 years ago

That's an interesting thing to consider. Should transforms run at the beginning of the time frame between a put of a value and the take that takes the value, or not? There might be cases for both. For example, what about a WebGL renderer that is going to take from a channel values in order to render something? Maybe there can be some static methods (or properties) on Channel in order to configure the behavior that gets applied to new instances?

dvlsg commented 8 years ago

That's certainly a possibility. We could definitely trigger a channel to be eager instead of lazy at creation time, and then alter how the slide method decides when to begin transforming values.

My use case typically falls into lazy loading as I will often have a lot of front-loaded puts from some sort of input (file stream, etc) and I would rather wait to run the transforms until later. Otherwise my resource usage tends to explode, since my cpu will be busy spinning up more and more transforms before allowing the values to stream down the pipeline, and I'd end up with a ton of unnecessary memory usage. It's not that far of a stretch for me to imagine other use cases, though.

dvlsg commented 8 years ago

Well, I took a crack at this over the holiday break. The TL;DR is that I was unable to find a reasonable way to support this, and it is proving to be quite difficult. I got maybe 60-70% of the way there, but at a significant performance cost (unit tests were taking up to 50% longer) / code complexity increase.

Here is the gist of the main issue I haven't been able to solve yet: the order of operations with synchronous transforms is as follows:

let ch1 = new Channel(); // synchronous
let ch2 = new Channel(); // synchronous
ch1.put(1.1);
ch1.put(1.2);
ch2.put(2.1);
ch2.put(2.2);
let winner = await Channel.race(ch1, ch2);
//=> race sets up Channel#take().then() handlers, one for each channel, in an attempt to determine the winner
//=> ch1.take() resolves internally
//=> ch2.take() resolves internally
//=> ch1.take().then() resolves, attempts to cancel the other take, which fails since the other take has already resolved

Using Promise.race() directly resulted in the same order of operations, as far as I could tell. I was a bit surprised, but I think we can confirm the order of operations with code like this:

let make = () => {
  return new Promise(resolve => {
      console.log('resolving make');
      resolve();
  });
}
async function run() {
  let p1 = make();
  p1.then(() => console.log('p1 .then'));
  let p2 = make();
}
run();

//=> resolving make
//=> resolving make
//=> p1 .then

.then must be handled in a setTimeout or something similar. I was unable to find a setup which would reliably result in ch1.take() resolved, ch1.take().then() resolved, then ch2.take() resolved. I could put the value back from the ch2.take() we didn't want to resolve, but the original put is already resolved without a way to undo the put, which is a step in the wrong direction in my opinion.

Any thoughts? I'm close to shelving this one (at least temporarily).

trusktr commented 8 years ago

Interesting. Hmmm, well, I don't actually need this functionality yet in a real project yet. Yeah, just shelve it until someone actually needs this. If we really do in the future (if there's some real-project problem that would be neatly solved with this feature) then we can just come back to this idea. :]

dvlsg commented 8 years ago

I think it's still possible, but I'd have to rip apart / refactor the majority of the internal put/take logic to allow a more reflective look into what actually exists on the channel. I think. I'll close the issue for now and re-open if we decide to revisit it in the future.

trusktr commented 8 years ago

Cool. Thanks for the awesome work. I'm using async-csp right now to communicate between my URL router and the UI. I love the decoupled nature of the system. It's so niiiice.

ivan-kleshnin commented 8 years ago

Did anybody find production-ready Task implementation with race method?

Demo usage:

function race(channels) {
  return Promise.race(channels.map(c => { // all Promises fullfill :(
    return new Promise(resolve => {
      return c.take().then(x => {
        resolve([x, c]);
      });
    });
  }));
}

// => 

function race(channels) {
  return Task.race(channels.map(c => { // cancels all but the winning Task :)
    return new Task(resolve => {
      return c.take().then(x => {
        resolve([x, c]);
      });
    });
  }));
}
dvlsg commented 8 years ago

I haven't, yet. Is Task a reference to an existing library?

I think the real issue comes in when the Channel has an asynchronous transform, and both transforms can be running before #take() is resolved. Since the first channel's transform might finish in 300ms, and the second channel's transform might finish in 100ms, we really need to have both running for a proper race, and I don't think any additional task implementation would help us there.

The end result with the current structure and proposed race solution is that we have a transformed value that we don't actually want to take off the channel. So, we could put it back in its original position. Unfortunately, race does not guarantee that no other values can be taken during this time. We could very well have a separate #take() executing on the channel for a value further down the queue which resolves properly in, say, 200ms. If we were to put back the losing value from race, this should have been the value which went to the separate #take(), since it should have been higher up in the queue.

So, putting back values can mess up the queue order, with the way we currently have it. We could possibly implement some sort of #cork() method which would prevent any values from being taken while race is running, but I wouldn't be super thrilled with the hit to concurrency that would incur. I haven't thought of a way around it yet, though.

I'm 100% open to pull requests if you have any suggestions which would not lose the asynchronous transform functionality. I've considered / attempted a few options already, but they were all huge complexity / performance hits, so far.

ivan-kleshnin commented 8 years ago

Is Task a reference to an existing library?

No, it's an imaginary Promise-like, unicast primitive which supports cancellation. The closest version I saw is https://github.com/kriskowal/gtor/blob/master/task.js#L436-L438 but it mentions TODO in race implementation :smile:

I'm 100% open to pull requests if you have any suggestions which would not lose the asynchronous transform functionality. I've considered / attempted a few options already, but they were all huge complexity / performance hits, so far.

Thanks for providing valid points of concern. I don't have enough experience with the subj, but I'll give it a time to think what can be done.

For now I can only say that IMO we shouldn't allow to mix different primitives in single race call like JS-CSP tries. Promise vs Task. Task vs Channel... This piles up assumptions and is totally mind-bending. If you want to parallel Promise and Channel – wrap one into another. Then the questions shifts from the library... and it's good.

smmoosavi commented 6 years ago

alts function only trigger one of take functions:

const csp = require('js-csp')

const ch1 = csp.chan()
const ch2 = csp.chan()

function* fn (ch, delay, value) {
  yield csp.timeout(delay)
  yield csp.put(ch1, value)
}

csp.go(fn, [ch1, 1000, '1k'])
csp.go(fn, [ch2, 2000, '2k'])

csp.go(function* () {
  const result1 = yield csp.alts([ch1, ch2])
  console.log('result1', result1.value)

  const result2 = yield csp.alts([ch1, ch2])
  console.log('result2', result2.value)
})

output:

result1 1k
result2 2k