reconbot / streaming-iterables

A Swiss army knife for async iterables. Designed to replace your streams.
https://www.npmjs.com/package/streaming-iterables
MIT License
79 stars 8 forks source link

parallelMerge() does not propagate return() back to the merged iterables #46

Open dko-slapdash opened 4 years ago

dko-slapdash commented 4 years ago

Example code:

import delay from "delay";
import { parallelMerge } from "streaming-iterables";

async function* iterable(name: string, dt: number) {
  try {
    for (let i = 0; ; i++) {
      console.log(`${name}: ${i}`);
      yield `${name}: ${i}`;
      await delay(dt);
    }
  } finally {
    console.log(`Exited ${name}`);
  }
}

async function* caller() {
  //yield* iterable("A", 900);
  yield* parallelMerge(iterable("A", 900));
}

async function main() {
  for await (const message of caller()) {
    if (message.includes("4")) {
      break;
    }

    console.log(`Received ${message}`);
  }

  console.log("Finishing");
  await delay(3000);
}

main().catch((e) => console.log(e));

In this example I do a "dummy" merging of 1 iterable for simplicity (but we can merge multiple, the effect persists). The output is:

A: 0
Received A: 0
A: 1
Received A: 1
A: 2
Received A: 2
A: 3
Received A: 3
A: 4
Finishing

Notice that finally {} block in iterable() function was never executed. But it should: replace the call to parallelMerge() with yield* iterable("A", 900); to see the correct output (with "Exited A"):

A: 0
Received A: 0
A: 1
Received A: 1
A: 2
Received A: 2
A: 3
Received A: 3
A: 4
Exited A
Finishing

How it works: both for-await and yield* instructions call the source iterator's return() method once the loop is over, and they propagate that signal further up the stack. I think parallelMerge() just doesn't do this.

dko-slapdash commented 4 years ago

A few more notes:

  1. I think this library does a call to return BTW: https://github.com/fraxken/combine-async-iterators/blob/master/index.js
  2. There is not only .return() in each iterator, but also .throw() method which allows to "inject" an exception and throw it once the generator functions runs a yield. It can also be covered (but I think it's trickier, and propagating .return() would be enough).
reconbot commented 4 years ago

Sounds like something we should fix. I haven't had time to look into it but I'm happy to merge it in.

reconbot commented 4 years ago

So I did some research and this is part of the generator spec not the iterator spec. Iterator results only have a next() function and require no other functions. Parallel merge happens to be a generator but not all our functions are generators. It might be worth supporting this for everything, I'm wondering if doing so however would break any use cases. 🤔

dko-slapdash commented 4 years ago

Yeah. The entire topic of AsyncIterators (and especially AsyncGenerators) is extremely complicated. I spent several days so far trying to match all the corner cases in my own implementation of merge(), where return() is properly propagated to all merging iterables (and unfreezes at the proper moment), and I still don't have any luck. The idea is to guarantee that iterable =~= merge(iterable) where "=~=" means "equivalent in all aspects", i.e. merge() of one iterable acts exactly the same way as this iterable itself (when the iterable is received from e.g. a generator or not - doesn't matter).

For instance, here is one corner case for that (see inline comments):

async function standardThrow(wrap: <T>(inner: T) => T = (inner) => inner) {
  const log: any[] = [];

  async function* gen() {
    try {
      for (let i = 0; i < 3; i++) {
        await sleep(500);
        yield i;
      }
    } catch (e) {
      log.push(["gen caught", e]);
      throw e;
    }
  }

  const iterator = wrap(gen())[Symbol.asyncIterator]();
  iterator
    .next()
    .then((v) => {
      log.push(["next 1", v]);
      iterator
        .next()
        .then((v) => log.push(["next 2", v]))
        .catch(() => {});
    })
    .catch((e) => log.push(["next 1 e", e]));
  const promise = sleep(100)
    .then(async () => {
      log.push("calling throw()");
      return iterator.throw("test");
    })
    .catch((e) => log.push(["throw e", e]));
  log.push("start");
  await promise;
  log.push("finish");

  // Calling iterator.throw() in a random moment of time does not cause next()
  // to throw or return done=true. Instead, next() successfully yields the value
  // and returns it, and only the following call to next() returns done=true.
  // Also, iterator.throw() itself unfreezes only after next() returns done=true
  // and not earlier.
  expect(log).toEqual([
    "start",
    "calling throw()",
    ["gen caught", "test"],
    ["next 1", { done: false, value: 0 }],
    ["next 2", { done: true, value: undefined }],
    ["throw e", "test"],
    "finish",
  ]);
}

I have 6 test cases in total like that, and this all seems to be covered in https://tc39.es/proposal-async-iteration/ spec (this spec is unusable in practice BTW):