Open tgfisher4 opened 3 months ago
Thanks for working on this. However, it's a lot going on and it's really hard to review this as it is. I would leave out any optimization for later PRs. Focus only on the change to add preserveOrder
.
Instead of using object/array for bookkeeping, maybe a WeakMap/Map would be simpler.
Apologies for my lack of activity on this issue/PR. I have gotten a chance to look back at it again, and noticed some additional bugs that I have now addressed and for which I have added tests. This concurrency stuff is tricky ;)
For the bookkeeping data structures, I chose an object and array precisely because I thought they were the simplest and easiest-to-reason-about options. A WeakMap
for promisesIndexFromInputIndex
and Map
for inputIndexFromPromisesIndex
would probably work, with the strong references from the latter supporting the weak references from the former, but this introduces the need to consider a new, and in my opinion more complicated (perhaps because I do not have much experience with WeakMap
), line of reasoning about the state of the bookkeeping data structures. I personally prefer the object/array structure and the clarity afforded by explicit delete
s (of which there is now only one, anyway).
As far as the optimizations included in this PR, I am hoping we can discuss each class of optimizations I have introduced and whether you would consider accepting it in this PR:
popPromise
. I would like to keep this one, if possible, since without it, when preserveOrder: false
, processing a resolved promise entails multiple O(min(concurrency, backpresssure))
operations over the promises
array (indexOf
+ splice
), a potentially significant bookkeeping overhead for a feature that is a performance optimization to begin with. Further, I think the popPromise
logic and bookkeeping ledgers are fairly pedestrian: they just keep some pointers in sync.pMapSkip
from a promise. I've removed this: as it turns out, this was unnecessary and a bit silly, and the code got cleaner and more concise when I removed it.promises
when !isSyncIterator && backpressure !== Number.POSITIVE_INFINITY
. This was only a 3-LoC optimization (plus some explanatory comments), so I thought it within reason to include it, but I can defer it to another PR for the sake of keeping this one more focused.await
that which is .then
able. Similar to (4), this is another fairly small and straightforward change, but again, I can defer it to another PR for the sake of keeping this one more focused.What are your thoughts on the above? Do (4) and (5) complicate the review of this PR sufficiently that it justifies migrating these smaller changes to a different PR? Does it make sense for (1) to be included here, being more important to the performance of a performance-focused feature? Perhaps you're ok with me using optimization (1) for the new preserveOrder: false
feature, but not for the existing preserveOrder: true
behavior, to lessen regression risk? Perhaps you would like some of these different optimizations to be included in separate PRs for the sake of reviewability, but would be willing to release them together? Let me know what you see as the best path forward here.
@Richienb @dmitri-gb given your interest in the correctness and efficiency of pMapIterable
, you may also be interested in this PR, which:
promises
when the input iterable is async and backpressure
is boundedO(min(concurrency, backpressure))
operations like promises.indexOf
, promises.splice
, and promises.shift
await
ing .then
ablespreserveOrder
option to pMapIterable
to optionally allow results to be produced out of order These optimizations are useful, but they significantly increase the complexity of the code.
I would like for there to first be a PR just adding the bare minimum 20-or-so lines to get preserveOrder
working for 99% of cases, and then to add minor fixes approaching the release.
I might do it myself by splitting off from some earlier commit, so that this PR just becomes your ideas on improving performance and edge cases.
Since I already have context on the minimum functionality needed for this feature, perhaps I can save you some work :) https://github.com/sindresorhus/p-map/pull/79
Apologies for the back and forth: I had assumed that it would be best and most convenient to optimize and make release-ready the feature before submitting a PR. But of course, I should take into consideration the fact that code merged to main
is not immediately released.
Summary
Adds a
preserveOrder
option topMapIterable
, which indicatesFixes #72. Fixes #76.
This implementation a bit more complicated than I expected, and I discovered several edge cases and race conditions along the way, so I encourage careful review and the suggestion/addition of further appropriate test cases.
Implementation considerations
In order to
promises
as an unordered pool for use withPromise.race
andO(min(concurrency, backpressure))
promises.indexOf
to know whichpromise
to remove frompromises
,the promise itself must return some identifying information. The promise's
inputIndex
is good for this purpose as it is a stable id that will not change when shufflingpromises
around. However,inputIndex
alone is not enough to determine which member ofpromises
has returned, so we also introduce extra bookkeepingpromisesIndexFromInputIndex
information. In order to correctly recordpromisesIndexFromInputIndex
duringmapNext
, we "reserve" a spot in thepromises
array duringtrySpawn
(from recursivetrySpawn
calls in particular, before we would've had a chance topromises.push(mapNext())
, since the innermapNext()
expression executes additionaltrySpawn
s before we can perform the outer.push
expression) by incrementing its length, so that we can usepromises.length
to determinepromisesIndex
.Then, to avoid an
O(min(concurrency, backpressure))
promises.splice
to remove the resolved promise, we instead swap the resolved promise withpromises[promises.length - 1]
. We could also overwrite the resolved member ofpromises
in-place, but it seemed slightly cleaner to do it this way so we could reuse this logic in thepMapSkip + preserveOrder: true
case where we currentlyindexOf
+splice
. In order to make the aforementioned swap, though, we need to update ourpromisesIndexFromInputIndex
ledger: however, we cannot knowpromises[promises.length - 1]
's input index without extra information, so we introduce an additionalinputIndexFromPromisesIndex
record.Speaking of which, to unify the
preserveOrder: false
logic with thepreserveOrder: true
case, for the latter we still treat thepromises
array in the same pool-fashion, where the result of mapping anyinputIndex
might end up anywhere inpromises
, but bookkeep an additionaloutputIndex
datum to use in conjunction withpromisesIndexFromInputIndex
to determine which promise is next in sequence (and soawait
and process). As mentioned earlier, this pool-based pattern also allows us to handle thepMapSkip
case in anO(1)
manner, compared to the existingindexOf
+splice
strategy.Further,
pMapSkip
is now unconditionally handled within amapNext
(a helper that is roughly equivalent to the previous IIFE) promise viapopPromise
. This preserves the existing behavior of avoiding counting towardbackpressure
by occupying a position inpromises
, in case the mainwhile
loop does notawait
andpopPromise
this promise for some time (for example, perhapspreserveOrder: true
and this promise is deep in the queue). To accomplish this unconditionalitywhile
loop now checksvalue === pMapSkip
andcontinue
s beforepopPromise
ortrySpawn
, since these were already called whenmapNext
observed thepMapSkip
valuepMapSkip
results in apopPromise
, it can leave holes in thepromisesIndexFromInputIndex
ledger: to compensate, in thepreserveOrder: true
nextPromise
, we skipoutputIndex
es wherepromisesIndexFromInputIndex[outputIndex] === undefined
await
ing only when necessary (see last ¶), whentrySpawn
executesmapNext
, thismapNext
can end up synchronously observing apMapSkip
value and callingpopPromise
as a result, if there are no promises involved (the input iterable is sync and produces a non-promise which maps to a non-promise-wrappedpMapSkip
). This can cause thepromises[promisesIndex]
assignment intrySpawn
both to undopopPromise
's work by writing a promise topromises
that was supposed to be removed already, and further, to clobber another element ofpromises
that was position in thepromises
array it no longer manages. To account for this, we perform the assignment if and only ifpromises[promisesIndex] === undefined
, since if we have been popped, ourpromises
position will already be taken by thetrySpawn
that follows thepMapSkip
check (there is necessarily headroom inconcurrency
andbackpressure
since we have just decrementedrunningMappersCount
andpromises.length
(the latter viapopPromise
) (even if we are the last element of the input iterable,trySpawn
will still try to iterate further and receive adone
result).As a last piece of miscellany, when
preserveOrder: false && (await nextPromise()).result.done === true
, stopping the async iterable is no longer safe, since otherpromises
may still be pending in the array. So, wecontinue
in this case.Finally, I made other small optimizations where I spotted an opportunity, like only
await
ing when necessary andtrySpawn
ing beforeawait
ing the inputiterable
's.next
response, in case we can start another promise chugging, too.