sindresorhus / p-limit

Run multiple promise-returning & async functions with limited concurrency
MIT License
1.97k stars 102 forks source link

Avoid AsyncResource.bind #83

Closed sugoroku-y closed 3 months ago

sugoroku-y commented 3 months ago

Change to new Promise(r => queue.enqueue(r)).then(...) so that the asynchronous context is not switched, making AsyncResource.bind unnecessary.

Since AsyncResource.bind is no longer needed, the time required to call the limit function will be reduced.

However, because of the change in the way Promise is used, the timing of changes such as limit.activeCount will be slower than before the change, which may be fatal if the usage situation is severe.

Performance

Before:

>node sample.js
>node sample.js
queuing: 0.0100483000010252
avg: 0.0036830892265141907
>node sample.js
queuing: 0.01016361999809742
avg: 0.0034371111732547956
>node sample.js
queuing: 0.010111030000448227
avg: 0.003734002007524611

After:

>node sample.js
queuing: 0.004205410000681877
avg: 0.002716399300301685
>node sample.js
queuing: 0.004278499999642372
avg: 0.0024704816498034216
>node sample.js
queuing: 0.00385004999935627
avg: 0.00261593758760986

Source Code:

import pLimit from './index.js';

const limit = pLimit(30);

let end;
const queuingStart = performance.now();
const promises = Array.from({length: 10000}, (_, id) => limit(async () => {
  const lap = end !== undefined ? performance.now() - end : undefined;
  // console.log(id, 'start', limit.activeCount, limit.pendingCount);
  await new Promise(r => setTimeout(r));
  // console.log(id, 'end', limit.activeCount, limit.pendingCount);
  end = performance.now();
  return lap;
}));
const queuing = (performance.now() - queuingStart) / promises.length;
console.log('queuing:', queuing);
Promise.all(promises).then((_laps) => {
  /** @type {number[]} */
  const laps = _laps.filter(e => e !== undefined);
  const avg = laps.reduce((a, b) => a + b) / laps.length;
  console.log('avg:', avg)
})
sindresorhus commented 3 months ago

Change to new Promise(r => queue.enqueue(r)).then(...) so that the asynchronous context is not switched

How did you confirm that the context is not switched in this case?

sugoroku-y commented 3 months ago

Change to new Promise(r => queue.enqueue(r)).then(...) so that the asynchronous context is not switched

How did you confirm that the context is not switched in this case?

I think it is confirmed by the fact that the test case "propagates async execution context properly" passed.

https://github.com/sugoroku-y/p-limit-avoid-async-resource/blob/main/test.js#L44-L59

sindresorhus commented 3 months ago

// @hyperair

sugoroku-y commented 3 months ago

Let me explain what I understand:

import { AsyncLocalStorage } from 'async_hooks';
import pLimit from 'p-limit';

const limit = pLimit(2);
const store = new AsyncLocalStorage();
const result = await Promise.all(
    Array.from({ length: 100 }, async (_, id) =>
        store.run({ id }, () =>
            limit(() => Promise.resolve(store.getStore().id)),
        ),
    ),
);
console.log(result);

When this script is executed with the original p-limit, the output will be:

[
   0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11,
  12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
  24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
  36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47,
  48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
  60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
  72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83,
  84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95,
  96, 97, 98, 99
]

However, if you edit node_modules/p-limit/index.js to remove AsyncResource.bind and then execute the script, the output will be:

[
  0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
  0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
  0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
  0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
  0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
  0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
  0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
  0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1,
  0, 1, 0, 1
]

This happens because the function passed to limit is enqueued with queue.enqueue, and when the previous function completes, the function retrieved with queue.dequeue is executed in the context of the previous function’s asynchronous context.

To avoid this, AsyncResource.bind is used to wrap the function and add context switching, but this wrapping introduces a slight delay when enqueuing and starting execution.

With my changed p-limit, the function_ is executed within the generator and not from another asynchronous context.

So it is no longer necessary to wrap it in AsyncResource.bind.

If you execute the previous script with my modified p-limit, the output will be:

[
   0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11,
  12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
  24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
  36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47,
  48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
  60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
  72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83,
  84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95,
  96, 97, 98, 99
]

You can see that it is executed in the assumed asynchronous context without wrapping with AsyncResource.bind.

hyperair commented 3 months ago

Looks good to me, but the starter promise is a little weird. I see that tests won't pass without it though, but I don't understand what's happening.

sugoroku-y commented 3 months ago

Looks good to me, but the starter promise is a little weird. I see that tests won't pass without it though, but I don't understand what's happening.

Is it the following code that seems weird to you?

        starter = (async () => {
            await starter;
            if (activeCount < concurrency && queue.size > 0) {
                queue.dequeue()();
            }
        })();

This part of the Promise chain is used to delay the comparison of activeCount and concurrency.

If it were written more like a Promise chain, it would look like this:

        starter = starter.then(() => {
            if (activeCount < concurrency && queue.size > 0) {
                queue.dequeue()();
            }
        });

As mentioned in the original comment, it is necessary to insert an await because the activeCount changes asynchronously.

Furthermore, since the activeCount does not change immediately when the function retrieved from queue is executed in this commit, it is necessary to delay it one more step.

sugoroku-y commented 3 months ago

Although this would be a major change, it may be easier to understand if you do the following:

const resumeNext = () => {
    const resolve = queue.dequeue();
    if (!resolve) {
        return;
    }

    resolve();
    // Since `pendingCount` has been decreased by one, increase `activeCount` by one.
    ++activeCount;
};

const pending = () => {
    (async () => {
        // This function needs to wait until the next microtask before comparing
        // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
        // after the resolve function is dequeued and called. The comparison in the if-statement
        // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
        await Promise.resolve();
        if (activeCount < concurrency) {
            resumeNext();
        }
    })();

    // Queue `resolve` function instead of `function_` function
    // to preserve asynchronous context.
    return new Promise(resolve => {
        queue.enqueue(resolve);
    });
};

const generator = async (function_, ...arguments_) => {
    await pending();

    try {
        return await function_(...arguments_);
    } finally {
        activeCount--;

        resumeNext();
    }
};

Yet the weirdness that @hyperair feels may not have lessened still.

2024-06-23 Postscript:

Changed to apply 0ca399d.

2024-06-24 Postscript:

Changed to apply cf5e18a, 4aa57b7.

sugoroku-y commented 3 months ago

I have eliminated the starter and tried to use activatingCount to cover the time lag in updating the activeCount.

@hyperair Does this eliminate some of the weirdness?

hyperair commented 3 months ago

Oh yeah that looks great now, thanks!

sugoroku-y commented 3 months ago

I merged activatingCount into activeCount because I didn't see much need to keep them separate.

sindresorhus commented 3 months ago

@sugoroku-y This is not a breaking change, correct?

sugoroku-y commented 3 months ago

@sindresorhus

@sugoroku-y This is not a breaking change, correct?

There's the fact that I don't think it's a major issue, but I have my suspicions about that could be considered a breaking change.

import pLimit from 'p-limit';

const limit = pLimit(3);

Promise.all(Array.from({length: 10}, (_, i) => limit(async (id) => {
  console.log(id, 'start', limit.activeCount, limit.pendingCount);
  await Promise.resolve()
  console.log(id, 'end', limit.activeCount, limit.pendingCount);
}, i))).then(() => console.log('end'))

The above script, when run with the original version of p-limit, will output the following:

$> node sample.mjs
0 start 1 9
1 start 2 8
2 start 3 7
0 end 3 7
1 end 3 7
2 end 3 7
3 start 3 6
4 start 3 5
5 start 3 4
3 end 3 4
4 end 3 4
5 end 3 4
6 start 3 3
7 start 3 2
8 start 3 1
6 end 3 1
7 end 3 1
8 end 3 1
9 start 3 0
9 end 1 0
end

After the change, it would be as follows:

$> node sample.mjs
0 start 3 7
1 start 3 7
2 start 3 7
0 end 3 7
1 end 3 7
2 end 3 7
3 start 3 4
4 start 3 4
5 start 3 4
3 end 3 4
4 end 3 4
5 end 3 4
6 start 3 1
7 start 3 1
8 start 3 1
6 end 3 1
7 end 3 1
8 end 3 1
9 start 1 0
9 end 1 0
end

The values of activeCount and pendingCount are changing for two tasks that are immediately executed.

sindresorhus commented 3 months ago

Thank you 👍