taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
5.69k stars 369 forks source link

Atomically remove multiple jobs from queue by jobId #1951

Open BigsonLvrocha opened 1 year ago

BigsonLvrocha commented 1 year ago

Is your feature request related to a problem? Please describe.

I need to send a metric every time a job completes, together with a label that depends on the job's data. BUT, there too many jobs running, so the impact in our DataDog custom metric quota was big, so I need to collect metrics in batches. I need to get the jobs, using Queue.getCompleted, group then by return value and some specific data fields and, send the metric and remove the jobs from the queue.

Since the metrics must have specific label dependent on the job data, I cannot use the built in metrics from bull. I can't use queue.clean passing completed because between the call queue.getCompleted() and cleaning the queue, another job or even multiple jobs might be added in the queue and not count towards the metric.

Describe the solution you'd like Have a method in queue like queue.removeJobsById() or queue.removeJobs that accept an array of ids

Describe alternatives you've considered One way is to call job.remove on every job, but that is error prone, may crash in the middle and generate multiple calls to redis. I've noticed there is a lua script (await queue.client).removeJobs, for now I'll use that, but I don't feel confortable using a detail of implementation of BullMQ.

Additional context Love your work

manast commented 1 year ago

I have marked this issue as an enhancement, it should be simple to just expose the internal removeJobs script. It may not be possible for your project, but maybe the new batch feature in the Pro version could also be useful for you: https://docs.bullmq.io/bullmq-pro/batches

BigsonLvrocha commented 1 year ago

After some thought, I'm going an alternative way, which is storing the batch of metrics in the memory of the worker pods and then sending them periodically using @Interval, no need to manually handle job Don't know if others have the need to remove multiple jobs by ids atomically, so I'll leave this open, but feel free to close

mat0pad commented 1 month ago

Atomically remove multiple jobs from queue by jobId would be a nice feature otherwise we would need to write it in Lua ourselves