sindresorhus / p-limit

Run multiple promise-returning & async functions with limited concurrency
MIT License
1.97k stars 102 forks source link

New feature: concurrency limiting pool/provider #60

Open nl-brett-stime opened 2 years ago

nl-brett-stime commented 2 years ago

The following is kind of a lazy way to do a PR--sorry.

Anyway, the following class acts something like an object/resource pool. However, instead of generating multiple instances of the underlying object, there's a single copy of the object but it only gets released while the number of in-flight async function calls remains below the concurrency limit:

import pLimit, {LimitFunction} from 'p-limit';

export class PLimitProvider<TContext> {
  private readonly _pLimit: LimitFunction;
  private readonly _pLimitedContext: TContext;

  constructor(pLimitedContext: TContext, concurrencyLimit: number) {
    this._pLimitedContext = pLimitedContext;
    this._pLimit = pLimit(concurrencyLimit);
  }

  public run<TResult>(f: (context: TContext) => TResult | Promise<TResult>): Promise<TResult> {
    return this._pLimit(() => f(this._pLimitedContext));
  }
}

Example usage:

import {S3} from '@aws-sdk/client-s3';

function fetchFromSomeBucket(s3Provider: PLimitProvider<S3>, fileKey: string) {
  return s3Provider.run(client => client.getObject({
    Bucket: 'someBucket',
    Key: fileKey
  }));
}

const fetchWithExpandedKey = (s3Provider: PLimitProvider<S3>, fileKey: string) =>
  fetchFromSomeBucket(s3Provider, `/expanded/${fileKey}`);

async function main() {
  const maxS3Concurrency = 2;
  const s3ClientProvider = new PLimitProvider(new S3({}), maxS3Concurrency);

  const getFilePromise1 = fetchFromSomeBucket(s3ClientProvider, 'someFileKey1');
  const getFilePromise2 = fetchWithExpandedKey(s3ClientProvider, 'someFileKey2');

  // Has to wait until one of the preceding completes
  const getFilePromise3 = fetchFromSomeBucket(s3ClientProvider, 'someFileKey3');

  await Promise.all([getFilePromise1, getFilePromise2, getFilePromise3]);
}

This makes it easier to pass the LimitFunction around with the client for a given service and make sure any calls made with the client are wrapped with the LimitFunction.