tc39 / proposal-async-iterator-helpers

Methods for working with async iterators in ECMAScript
https://tc39.es/proposal-async-iterator-helpers/
95 stars 3 forks source link

AsyncIterator.race/merge #15

Open bakkot opened 6 months ago

bakkot commented 6 months ago

Wanted to write this down so I don't forget, though it won't be in the first version of this proposal.

We should have a helper for merging or racing multiple AsyncIterators. When next is first called you pull from all of them, and then resolve with the first promise to settle. When next is called again, if any of the previously-pulled promises have already settled you immediately settle with that; otherwise you pull from all the underlying iterators which you aren't currently waiting on and resolve with the first to settle.

RxJS has approximately this (apparently reasonably popular), as do some Rust libraries.

This essay points out that this is the type-theory "sum" to zip's "product", for async iterators. That is, it's the natural extension of Promise.race, where zip is the natural extension of Promise.all.

The async-std library in Rust takes the interesting approach of randomizing the order it polls the underlying streams. I'm guessing that's mostly for the case where the first stream makes all of its results available immediately, which would prevent ever getting values from the second stream even if its results were also available immediately. I don't think that's relevant here since we can (and must) hold on to results from previous calls which haven't yet been merged in, and so can ensure that in in the case of two async iterators which vended immediately-settling promises we'd alternate between them.

conartist6 commented 6 months ago

Yep, if you're calling out existing implementations I'lll toss in mine: https://github.com/iter-tools/iter-tools/blob/d7.5/API.md#asyncinterleaveready

laverdet commented 6 months ago

@conartist6 this implementation does not forward terminations correctly. See:

import { asyncInterleaveReady } from "iter-tools";

async function* range(count) {
    console.log("start");
    try {
        for (let ii = 0; ii < count; ++ii) {
            yield ii;
        }
    } finally {
        console.log("done");
    }
}

try {
    for await (const ii of range(2)) {
        console.log("here", ii);
        throw 1;
    }
} catch {}

console.log("---");

try {
    for await (const ii of asyncInterleaveReady(range(2))) {
        console.log("here", ii);
        throw 1;
    }
} catch {}

Logs:

start
here 0
done
---
start
here 0
bergus commented 6 months ago

As for implementations, this one should fulfill all the requirements:

async function* merge(iterable) {
    const asyncIterators = Array.from(iterable, o => o[Symbol.asyncIterator]());
    const results = [];
    let count = asyncIterators.length;
    const never = new Promise(() => {});
    function getNext(asyncIterator, index) {
        return asyncIterator.next().then(result => ({
            index,
            result,
        }));
    }
    const nextPromises = asyncIterators.map(getNext);
    try {
        while (count) {
            const {index, result} = await Promise.race(nextPromises);
            if (result.done) {
                nextPromises[index] = never;
                results[index] = result.value;
                count--;
            } else {
                nextPromises[index] = getNext(asyncIterators[index], index);
                yield result.value;
            }
        }
    } finally {
        for (const [index, iterator] of asyncIterators.entries())
            if (nextPromises[index] != never && iterator.return != null)
                iterator.return();
        // no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
    }
    return results;
}

(although it does suffer from the potential memory leaks of Promise.race)

conartist6 commented 6 months ago

Ooh, thanks for the bug report, I'll fix that.

laverdet commented 6 months ago

This is the one we use. It avoids the race leak and terminates on closure. Not sure if a fast iterable will starve the others, that wasn't a design goal.

The specification should probably take a position on whether or not an iterator which throws should "cut the line" and resolve before other iterators which incremented normally.

export function collect<Type>(iterables: readonly AsyncIterable<Type>[]): AsyncIterable<Type> {
    switch (iterables.length) {
        case 0: return async function*() {}();
        case 1: return iterables[0]!;
    }
    return async function*() {
        type Accept = () => Type;
        let count = iterables.length;
        let capability: PromiseCapability<Accept | null> | undefined;
        const iterators: AsyncIterator<Type>[] = [];
        const queue: Accept[] = [];
        const accept = async (iterator: AsyncIterator<Type>) => {
            try {
                const next = await iterator.next();
                if (next.done) {
                    if (--count === 0 && capability !== undefined) {
                        capability.resolve(null);
                    }
                } else {
                    push(() => {
                        void accept(iterator);
                        return next.value;
                    });
                }
            } catch (error) {
                push(() => { throw error; });
            }
        };
        const push = (accept: Accept) => {
            if (capability === undefined) {
                queue.push(accept);
            } else {
                capability.resolve(accept);
            }
        };

        try {
            // Begin all iterators
            for (const iterable of iterables) {
                const iterator = iterable[Symbol.asyncIterator]();
                iterators.push(iterator);
                void accept(iterator);
            }

            // Delegate to iterables as results complete
            while (true) {
                while (true) {
                    const next = queue.shift();
                    if (next === undefined) {
                        break;
                    } else {
                        yield next();
                    }
                }
                if (count === 0) {
                    break;
                } else {
                    capability = Promise.withResolvers();
                    const next = await capability.promise;
                    if (next === null) {
                        break;
                    } else {
                        capability = undefined;
                        yield next();
                    }
                }
            }

        } catch (err) {
            // Unwind remaining iterators on failure
            try {
                await Promise.all(iterators.map(iterator => iterator.return?.()));
            } catch {}
            throw err;
        }
    }();
}
laverdet commented 6 months ago

Oh yeah, and I believe that all implementations shared here don't fire off a next() call immediately after the previous result resolves. What we're all doing is buffering 1 result from each iterable in a preamble, but we don't continue to buffer a next result as the results come in. So you may end up with a waterfall after the first result, in the case that the consumer is asynchronous.

bakkot commented 6 months ago

Thanks for the links!

I didn't mention it in the OP, but there's another requirement (or at least something to think through), which is that ideally when the consumer calls .next multiple times in succession, without waiting for previous calls to settle, that would cause multiple calls to .next on the underlying iterators. I'm not sure if that should be one-to-one or if we should take into account how many promises we're currently waiting on.

That is, suppose you have let it = AsyncIterator.merge(a, b, c), and then do it.next(); it.next(); it.next(); it.next(); it.next(); it.next(); (i.e. a total of 6 calls to it.next()). That should immediately perform at least 2 calls to the .next() method of each of a, b, and c, and maybe it should perform 6 calls to each of them.

As mentioned in the readme, async iterators which support multiple calls to .next() let you get concurrency driven by the consumer, possibly via a built-in helper. A goal of this proposal is to support that whenever possible. Unfortunately it does mean that these helpers cannot be implemented as async generators.

bakkot commented 3 months ago

Some more prior art in this library.

Also this one, which links to a bunch more.