sindresorhus / p-queue

Promise queue with concurrency control
MIT License
3.48k stars 185 forks source link

One Queue, Multiple workers #211

Open gridagribnoy opened 3 months ago

gridagribnoy commented 3 months ago

I suggest adding pools and workers for them, possibly a separate library. This can be useful, for example, when you need to complete tasks in different ways; for this, separate workers are needed. Or, for example, there are tasks to get information from the API, but there is a limit, we can add another worker with a separate API key.

I already made a fork and tried to implement workers, it works. but it seems to me it is better to rewrite everything as a separate library.

Some ideas:

const pool = new Pool({
  // ?workers: []
  // ?resolveWorker: 'loop'|'random'|'priority'
});

const worker1 = new Worker({
  concurrency: 20,
  intervalCap: 100,
  interval: 1000,
  timeout: 1000,
  throwOnTimeout: true,
  // ??priority,
  // ctx: {apiUrl: "https://.."} ??
  // process: async payload => { ??
  //   return await (await fetch('https://specialEndPointForWorker/'+payload.id)).json();
  // }
 });

const worker2 = new Worker({
  concurrency: 50,
  intervalCap: 200,
  interval: 1000,
});

pool.attachWorker(worker1);
pool.attachWorker(worker2); // or pool.detachWorker(worker2); runtime control

pool.add(async ({ worker }) => {
  // console.log(`Fetchind... ${worker.ctx.apiUrl}`); ??
  await setTimeout(1000);
  console.log(`Task done! Worker#${[worker1, worker2].indexOf(worker) + 1}`);
  return "payload string";
});
worker1.pause(); // or pool.detachWorker(worker1);
pool.pause();
pool.on("completed", ({ payload, worker }) => {});
worker1.on("completed", payload => {});
// ...
gridagribnoy commented 3 months ago

See workers in action: https://stackblitz.com/edit/p-queue-workers?file=start.ts This code is not for production, but as an example it needs to be reworked.

gridagribnoy commented 3 months ago

I also suggest storing information in the queue not only in functions but also in the payload. For example, queue.add('https://...') and the worker itself decides how to process this information, this makes it possible to easily view the queue, serialize/deserialize it, for example for storing in the db. In general, you can write anything to the queue, be it a function, be it any data in a free form and structure. And the worker will decide how to process it.