sindresorhus / p-map

Map over promises concurrently
MIT License
1.27k stars 58 forks source link

FR: introduce `p` function for concurrent promise execution with condition #66

Open rentalhost opened 1 year ago

rentalhost commented 1 year ago

I'm not sure if this is the right place to submit an idea, but it would be interesting to have a function p that allows executing a specified number of promises concurrently while a condition is true.

In my specific case, I have a list with 200 items and need to check if at least 5 of them have a certain "quality". Since it is a time-consuming process and the outcome is quite random, I activate 10 concurrent executions and keep performing an items.shift() operation to test each item.

Once I reach 5 successful elements, I pause the processing. However, when the number of elements drops below 5 again, I need to resume searching for new elements. In my particular case, it's not a problem if I find more than 5 elements, as some of the concurrent executions may still be pending even after pausing.

Here's a rough outline of the idea:

const p = pFunctionHere({ 
  concurrency: 10, 
  afterPausedInterval: 100, // When the condition is reached the first time (returning `false`), then every 100ms it will be executed again to see if it is necessary to restart the concurrencies.
});

const tester = async () => {
  // Get an item from the list.
  const item = items.shift(); 

  if (await isValid(item)) { // Time-consuming process.
    const testItem = addItem(item); // addedItems.push(...)
    testItem.test(); // Test the item every minute.
  }
}

const condition = () => {
  return items.length > 0 && addedItems.length < 5;
}

p.run(tester, condition);

This feature would allow executing tester function concurrently up to the specified concurrency limit (10 in this case) while the condition condition is true. Once the condition is no longer met, the execution would pause until the condition becomes true again.

It would be great to have this functionality as it provides a convenient way to handle concurrent promise execution with a condition.

rentalhost commented 1 year ago

I did a draft:

interface PConditionalOptions {
  concurrency?: number; // Maximum number of concurrent executions (default: 1).
  restartInterval?: number; // Interval at which the condition is checked when maximum concurrency is reached (default: 100ms).
}

type Runner = () => Promise<void>; // Function type for the task runner.

type Condition = () => boolean; // Function type for the condition checker.

export class PConditional {
  #concurrency: number; // Maximum number of concurrent executions.

  #currentConcurrences = 0; // Current number of concurrent executions.

  #restartInterval: number; // Interval at which the condition is checked when maximum concurrency is reached.

  #runner?: Runner; // Function representing the task to be executed.

  #condition!: Condition; // Function representing the condition to be checked.

  #restartTimer?: ReturnType<typeof setInterval>; // Timer used for periodic condition checking when maximum concurrency is reached.

  public constructor({ concurrency, restartInterval }: PConditionalOptions) {
    this.#concurrency = concurrency ?? 1;
    this.#restartInterval = restartInterval ?? 100;
  }

  // A getter that returns whether the execution is currently paused.
  public get isPaused() {
    return this.#restartTimer !== undefined;
  }

  // A method to start running the promises with the specified runner and condition.
  public run(runner: Runner, condition: Condition) {
    if (this.#runner !== undefined) {
      throw new Error("run() has already been executed");
    }

    this.#runner = runner;
    this.#condition = condition;
    this.#runConcurrences();
  }

  // Runs the tasks concurrently until the maximum concurrency is reached or the condition is false.
  #runConcurrences() {
    for (
      ;
      this.#currentConcurrences < this.#concurrency;
      this.#currentConcurrences++
    ) {
      if (this.#condition()) {
        void this.#runner!().finally(() => {
          this.#currentConcurrences--;
          this.#runConcurrences();
        });

        continue;
      }

      this.#startConditionalTimer();

      return;
    }
  }

  // Starts the timer to periodically check the condition when maximum concurrency is reached.
  #startConditionalTimer() {
    if (this.#restartTimer === undefined) {
      this.#restartTimer = setInterval(() => {
        if (this.#condition()) {
          clearInterval(this.#restartTimer);
          this.#restartTimer = undefined;
          this.#runConcurrences();
        }
      }, this.#restartInterval);
    }
  }
}
tommy-mitchell commented 1 year ago

https://github.com/sindresorhus/p-whilst? Alternatively, do any of these fit your use case?

rentalhost commented 1 year ago

@tommy-mitchell from my analysis, p-whilst is just a while() loop that processes one promise at a time, and I need concurrency. Of the other p-functions, none seemed to solve my problem.

In summary, the rules are:

sindresorhus commented 1 year ago

I wonder if this could be solved by p-whilst by adding a concurrency option to it.

Something like this:

const condition = () => {
  if (addedItems.length >= 5) {
    await delay(100);
  }

  return items.length > 0 && addedItems.length < 5;
}