Adds a preserveOrder option to pMapIterable, which indicates
Whether the output iterable should produce the results of the mapper on elements of the input iterable in the same order as the elements were produced.
If false, mapper results will be produced in the order they are available, which may not match the order the mapper inputs were produced by the input iterable, but may improve throughput.
Type: boolean\
Default: true
Addresses #72.
This implementation a bit more complicated than I expected, and I discovered several edge cases and race conditions along the way, so I encourage careful review and the suggestion/addition of more appropriate test cases.
Implementation considerations
In order to
(a) treat promises as an unordered pool for use with Promise.race and
(b) avoid an O(min(concurrency, backpressure))promises.indexOf to know which promise to remove from promises,
the promise itself must return some identifying information. The promise's inputIndex is good for this purpose as it is a stable id that will not change when shuffling promises around. However, inputIndex alone is not enough to determine which member of promises has returned, so we also introduce extra bookkeeping promisesIndexFromInputIndex information. In order to correctly record promisesIndexFromInputIndex during mapNext, we "reserve" a spot in the promises array during trySpawn (from recursive trySpawn calls in particular, before we would've had a chance to promises.push(mapNext()), since the inner mapNext() expression executes additional trySpawns before we can perform the outer .push expression) by incrementing its length, so that we can use promises.length to determine promisesIndex.
Then, to avoid an O(min(concurrency, backpressure))promises.splice to remove the resolved promise, we instead swap the resolved promise with promises[promises.length - 1]. We could also overwrite the resolved member of promises in-place, but it seemed slightly cleaner to do it this way so we could reuse this logic in the pMapSkip + preserveOrder: true case where we currently indexOf + splice. In order to make the aforementioned swap, though, we need to update our promisesIndexFromInputIndex ledger: however, we cannot know promises[promises.length - 1]'s input index without extra information, so we introduce an additional inputIndexFromPromisesIndex record.
Speaking of which, to unify the preserveOrder: false logic with the preserveOrder: true case, for the latter we still treat the promises array in the same pool-fashion, where the result of mapping any inputIndex might end up anywhere in promises, but bookkeep an additional outputIndex datum to use in conjunction with promisesIndexFromInputIndex to determine which promise is next in sequence (and so await and process). As mentioned earlier, this pool-based pattern also allows us to handle the pMapSkip case in an O(1) manner, compared to the existing indexOf + splice strategy.
Further, pMapSkip is now handled within one of the promises itself, instead of in the main while loop. This is done by breaking out the IIFE assigned to promise into a mapNext function that can then be called recursively when pMapSkip is returned. However, we must also consider the case that preserveOrder: true and the current promise is being awaited already: in this case, if concurrency > 1 and we were to mapNext, the current promise would return the result of mapping an iterable element many inputIndexes in the future, failing to preserveOrder. Thus, we check if the myInputIndex + 1 mapper is running, and if so, return its promise after removing our current promise from promises: thus when the current promise returns we will process and remove the myInputIndex + 1 promise, as if we had await promises[promisesIndexFromInputIndex[myInputIndex + 1]]ed. But now our outputIndex is behind (it think we should return the promise for myInputIndex + 1 next, but we skipped that one, so we should return the promise for myInputIndex + 2 instead), so to compensate, we skip indices with promisesIndexFromInputIndex[outputIndex] === undefined.
As a last piece of miscellany, when preserveOrder: false && (await nextPromise()).result.done === true, stopping the async iterable is no longer safe, since other promises may still be pending in the array. So, we continue in this case.
Finally, I made other small optimizations where I spotted an opportunity, like only awaiting when necessary and trySpawning before awaiting the input iterable's .next response, in case we can start another promise chugging, too.
Summary
Adds a
preserveOrder
option topMapIterable
, which indicatesAddresses #72.
This implementation a bit more complicated than I expected, and I discovered several edge cases and race conditions along the way, so I encourage careful review and the suggestion/addition of more appropriate test cases.
Implementation considerations
In order to
promises
as an unordered pool for use withPromise.race
andO(min(concurrency, backpressure))
promises.indexOf
to know whichpromise
to remove frompromises
,the promise itself must return some identifying information. The promise's
inputIndex
is good for this purpose as it is a stable id that will not change when shufflingpromises
around. However,inputIndex
alone is not enough to determine which member ofpromises
has returned, so we also introduce extra bookkeepingpromisesIndexFromInputIndex
information. In order to correctly recordpromisesIndexFromInputIndex
duringmapNext
, we "reserve" a spot in thepromises
array duringtrySpawn
(from recursivetrySpawn
calls in particular, before we would've had a chance topromises.push(mapNext())
, since the innermapNext()
expression executes additionaltrySpawn
s before we can perform the outer.push
expression) by incrementing its length, so that we can usepromises.length
to determinepromisesIndex
.Then, to avoid an
O(min(concurrency, backpressure))
promises.splice
to remove the resolved promise, we instead swap the resolved promise withpromises[promises.length - 1]
. We could also overwrite the resolved member ofpromises
in-place, but it seemed slightly cleaner to do it this way so we could reuse this logic in thepMapSkip + preserveOrder: true
case where we currentlyindexOf
+splice
. In order to make the aforementioned swap, though, we need to update ourpromisesIndexFromInputIndex
ledger: however, we cannot knowpromises[promises.length - 1]
's input index without extra information, so we introduce an additionalinputIndexFromPromisesIndex
record.Speaking of which, to unify the
preserveOrder: false
logic with thepreserveOrder: true
case, for the latter we still treat thepromises
array in the same pool-fashion, where the result of mapping anyinputIndex
might end up anywhere inpromises
, but bookkeep an additionaloutputIndex
datum to use in conjunction withpromisesIndexFromInputIndex
to determine which promise is next in sequence (and soawait
and process). As mentioned earlier, this pool-based pattern also allows us to handle thepMapSkip
case in anO(1)
manner, compared to the existingindexOf
+splice
strategy.Further,
pMapSkip
is now handled within one of thepromises
itself, instead of in the mainwhile
loop. This is done by breaking out the IIFE assigned topromise
into amapNext
function that can then be called recursively whenpMapSkip
is returned. However, we must also consider the case thatpreserveOrder: true
and the current promise is beingawait
ed already: in this case, ifconcurrency > 1
and we were tomapNext
, the current promise would return the result of mapping aniterable
element manyinputIndex
es in the future, failing topreserveOrder
. Thus, we check if themyInputIndex + 1
mapper is running, and if so, return its promise after removing our current promise frompromises
: thus when the current promise returns we will process and remove themyInputIndex + 1
promise, as if we hadawait promises[promisesIndexFromInputIndex[myInputIndex + 1]]
ed. But now ouroutputIndex
is behind (it think we should return the promise formyInputIndex + 1
next, but we skipped that one, so we should return the promise formyInputIndex + 2
instead), so to compensate, we skip indices withpromisesIndexFromInputIndex[outputIndex] === undefined
.As a last piece of miscellany, when
preserveOrder: false && (await nextPromise()).result.done === true
, stopping the async iterable is no longer safe, since otherpromises
may still be pending in the array. So, wecontinue
in this case.Finally, I made other small optimizations where I spotted an opportunity, like only
await
ing when necessary andtrySpawn
ing beforeawait
ing the inputiterable
's.next
response, in case we can start another promise chugging, too.