sindresorhus / p-queue

Promise queue with concurrency control
MIT License
3.45k stars 185 forks source link

Backpressure #87

Open waynerobinson opened 5 years ago

waynerobinson commented 5 years ago

These libraries are awesome and make building consistent asynchronous code much easier.

But, is there a mechanism in this library or one of the other p-libraries to have a queue-like data structure like this that implements some type of back-pressure so that if a queue gets to a certain size, the function calling the add() method will pause until the queue size has reduced?

flux627 commented 5 years ago

Was just about to open this issue, glad to see it already here 👍

sindresorhus commented 4 years ago

Not at the moment, but I can see how that would be useful.

It shouldn't be hard to add a maxSize option and have add() just wait when it's reached. PR welcome for that.

MrNghia123 commented 3 years ago

Currently I achieve some sort of backpressure using this library with the onEmpty promise const pQueue = new PQueue({concurrency: 1000}); //Adjust this for your need while (true) { await pQueue.onEmpty(); const inputs = await feedMoreInputs(); pQueue.add(()=>work(inputs)); } Not sure if it is a common pattern of using onEmpty, but it is working for me.

dobesv commented 3 years ago

Actually since add returns a promise that waits for the task to finish, it doesn't make sense for add to wait for the queue size to decrease because if you await the result of add you're already going to wait for the queue to process its way up to the item being added.

dobesv commented 3 years ago

A function has been added onSizeLessThan which you can call before calling add to implement backpressure. This feature couldn't be done as part of add as I noted above because add already returns a promise with an incompatible meaning.

Richienb commented 2 years ago

When using maxSize in conjunction with signal, aborted tasks won't be removed until they reach the front of the queue. We'd have to publicly expose a queue array which might be a breaking change for libraries that don't define arrays for queues (i.e. linked lists).

Richienb commented 2 years ago

intervalCap?