taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.2k stars 405 forks source link

[Feature Request] Weighted Jobs Concurrency #2670

Open SeanReece opened 3 months ago

SeanReece commented 3 months ago

What's the Problem?

We are running many different jobs within the same queue, and some are much more expensive to process. To avoid overrunning the service, we have to reduce our worker concurrency to a lower value because of the possibility of multiple expensive jobs running at the same time. The order of these jobs is important, so moving these expensive jobs to another queue is not an option.

Example:

Note: This is a contrived example. We are not processing videos.

Lets say we have different jobs

In this example, we would have to reduce the worker concurrency to 1, even though the service can handle 10x concurrency in some situations.

Possible solution

What I'm calling weighted jobs, allows specifying a "weight" to a job at creation time. This "weight" essentially specifies how many concurrency spots the job takes up on the worker.

Example:

Let's say we have a worker with 10x concurrency. When creating the jobs above you would specify weights like so.

await myQueue.add('encodeVideo', jobData, { weight: 10 })
await myQueue.add('updateThumbnail', jobData, { weight: 5 })
await myQueue.add('updateTitle', jobData) // weight defaults to 1

Now, the worker could handle a different # of jobs concurrently

The calculation for whether a worker could handle the next job or not would look something like this:

currentProcessingWeight + incomingJobWeight > concurrency

When the worker is handling the jobs, it could check the job weight of the incoming job, and compare it to how much processing "room" it has and wait until there is enough "room" for the incoming job to start processing.

Describe alternatives you've considered I think the above solution could be implemented with manual job processing, but this seems like a lot of unnecessary effort. Could this be solved with another pattern?

Additional context We are doing through BullMQ Pro procurement currently. This could be a BullMQ Pro feature.

rosslavery commented 3 months ago

I think you could use separate queues and then use a flow to dictate the order the jobs run in.

Each incoming video would dispatch a flow, 1 job per queue, constructed in such a way that they run in the order you detailed.

SeanReece commented 3 months ago

Here’s a few reasons why I don’t think multiple queues really solves this problem

manast commented 3 months ago

@SeanReece I think there are features in Pro that could be used to accomplish what you need, for instance you can define different concurrency factors per group: https://docs.bullmq.io/bullmq-pro/groups/local-group-concurrency And maybe also use a manual rate-limit per group: https://docs.bullmq.io/bullmq-pro/groups/rate-limiting#manual-rate-limit

Another possibility could be to have a specific rate-limit per group, we do not have this feature yet but maybe it is worth considering as a new feture.

SeanReece commented 3 months ago

@manast Thanks for the ideas. The way we're using groups is to be "fair" about processing the jobs for each tenant/customer. For example, if companyA comes in and loads in 1000 videos all at once, then companyB comes in and loads 1 video, we don't want companyB waiting for hours behind companyA. Having a concurrency specific to companyA doesn't solve the issue the issue since the service could still pull in too many big jobs at once.

My understanding of how rate-limiting works is that if a workers rate limiting gets triggered (either manually or by # of jobs) then the workers stop processing new jobs until the rate limit window resets. We never want our workers to be idle as that would slow down processing.