dead-claudia / proposal-es-streams

Streams for JS
14 stars 0 forks source link

Make `connect` return a promise #3

Open bergus opened 3 years ago

bergus commented 3 years ago

It would be pretty useful to allow waiting for the end of the stream, and more specifically to access the result value of the iterator and have exceptions propagate outwards:

async function find(stream, predicate) {
  const result = await stream.connect({
//               ^^^^^
    next(value) {
      return { done: predicate(value), value };
    },
  });
  console.log(result); // the first value matched by the predicate
  return result;
}

This promise seems to be more or less readily available in all connect implementations in the readme, so we might as well just return it.

A stream is semantically equivalent to a function that I can use to run an iterator with values. Getting back the completion value of this run seems only proper. It would also alleviate the need to directly report exceptions as unhandled rejections to the host - they would only become unhandled if you chose to ignore the promise.

dead-claudia commented 3 years ago

The intent was for the return parameter to receive the "return" value. And to await completion, you could do this:

return new Promise((resolve, reject) => {
  stream.connect({
    throw: reject,
    return: resolve,
    next(value) {
      const done = predicate(value)
      if (done) resolve(value)
      return {done}
    }
  })
})

As for why it must be received synchronously: https://github.com/tc39/proposal-observable/issues/131#issuecomment-269837234

Item of note: it would take me several weeks of dedicated work to collect even most the relevant material I've gone through leading up to this proposal, BTW. It quite literally spans years, and discussions exist across numerous repos and many distinct communities, as well as quite a bit of private research I'm not quite ready to make public. It ranges from discussion in the observable proposal repo to callbags to even Mithril's Gitter chat room, and I've also looked at the way several other languages deal with the problem domain - Go's channels and Clojure's atoms don't even fundamentally work the same way observables do, and Haskell uses arrows (take the concept of a function and abstract it to an "operation") and focuses on the interconnects (making it read almost like a functional programming equivalent of modern hardware description languages at times) rather than focusing on the sources like observables or individual nodes like shell pipes and Node streams.

Edit: formatting + clarification

bergus commented 3 years ago

Oh, I don't want to remove the .return and .throw handlers - I understand well that they must be called synchronously (you might want to put that link into your FAQ, btw). I omitted their default implementations above for brevity:

stream.connect({
  next(value) { return { done: predicate(value), value }; },
  throw(err) { throw err; },
  return(value) { return { done: true, value }; },
});

I just want to get a promise for their return values (or that of .next), without having to promisify connect myself. This includes error handling, e.g the promise would be rejected if predicate threw an exception. Here's a more complete example:

async function find(stream, predicate) {
  const [idx, result] = await stream.connect({
    i: 0,
    next(event) {
      const done = predicate(event, this.i++);
      if (done) return { done, value: [this.i, event] };
      this.i++;
      return { done };
    },
    return() {
      return { done: true, value: [-1, undefined] };
    },
  });
  if (idx != -1) console.log(result); // the first value matched by the predicate
  return result;
}

We could also write this with a generator function, I think (sans handling the stream end by itself? especially if empty?):

stream.connect(function*() {
  let i = 0, event = function.sent;
  while (!predicate(event)) {
     i++;
     event = yield;
  }
  return [i, event];
}());

I want these to return promises that fulfill with the tuple when a value is found or reject when the stream emits an error or my iterator throws. I really dislike wrapping stuff in new Promise if the library could've done it for me.