Open tgfisher4 opened 5 months ago
This would indeed be useful. I agree with the option name. PR welcome 🙏
It is a great use case, but I'm trying to approach it differently. It should be a much simpler solution @sindresorhus take a look here https://github.com/lacherogwu/p-map/tree/bug/pMapIterable-concurrency
I haven't touched the backpressure yet, and for some reason, I couldn't get the tests working with my implementation, and I found it hard to debug
this is the concept, I implemented this in a project I was needed it
export async function* mapIterable(items: any[], cb: (item: any) => Promise<any>, options?: { concurrency: number }) {
type P = { done: boolean; value: any; promise: Promise<any> };
const concurrency = options?.concurrency ?? 1;
const next = () => {
const item = items.shift();
const promise = cb(item).then(value => ({ value, done: items.length === 0, promise })) as Promise<P>;
return promise;
};
const promises = Array.from<undefined>({ length: Math.min(concurrency, items.length) }) //
.reduce(set => set.add(next()), new Set<Promise<P>>());
while (promises.size > 0) {
const { value, done, promise } = await Promise.race(promises);
promises.delete(promise);
yield value;
if (!done) {
promises.add(next());
}
}
}
pMapIterable
currently yieldsmapper
results in the order they were obtained from the inputiterable
. In other words,pMapIterable
always waits for, and returns, the first promise in the queue. In my use case, the order of my inputiterable
is insignificant: it is essentially an unordered stream. In this use case, I want toasync
ly map, with bounded concurrency, each element of the iterable and treat the results as another unordered stream: the output order does not need to match the input order. Forcing the output order to match the input order introduces head-of-line blocking:mapper
results later in thepromises
queue that are currently available are blocked from further processing by pending results earlier in the queue. Additionally, new promises cannot be started even though old promises have fulfilled, underutilizing the allowedconcurrency
.I'd like to propose adding a new
pMapIterable
option that treats thepromises
buffer like a pool rather than a queue. In other words, the returned async iterable would yield results from thepromises
buffer as soon as they are available, rather than in FIFO order. Perhaps it could be calledpreserveOrder
and default totrue
.I'd be happy to work on a PR if you're interested in accepting such a feature: I think it should be relatively straightforward to devise an implementation based around the conditional use of
await Promise.race(promises)
instead ofawait promises[0]
.P.S.: Thanks for all these nifty
p-
utils!