Open Wolvan opened 3 years ago
This is a highly requested feature actually and BullMQ does not yet provide a way to do this natively, I would be happy to work on a feature like this if we can come up with a design that works well in javascript, currently the only "cancelable" asynchronous object that I know of are observables (since promise cancellation is not part of the official specification).
We can design promises in a way that allows cancelling, just like bluebird does. In case of cancellation, we reject the job promise. I can probably try and provide a code snippet of a promise like that later.
Important is also that we extend the job object to include an onCancel callback of some kind which allows teardown of resources that the worker is using at the moment. Maybe have job extend EventListener and have a "cancel" event, or have a if (typeof job.onCancel === "function") await job.onCancel()
somewhere in the code that handles cancelling in the queue.
I am not sure how to best notify the worker that the job should be aborted, but I can help looking into having the worker actually do the cancellation.
yeah, I know bluebird's cancellation mechanism and also I am familiar with the early cancellation specs for promises. The good thing was that you could cancel the promise easily and it would propagate all the way up and down in the promise chain. However AFAIK observables are currently the only "established" cancellation mechanism for async calls...
Now that I think about it, I don't think you even need to bother with cancelling the promise. The core issue here is that once the job has been started, there is no way to signal the worker anymore. Aborting/Cancelling should be handled by the processor function, it can throw/resolve as needed depending on how the queue user wants to handle it. We just need a way to actually notify the worker of a requested change.
Could we turn job into an eventemitter? That way we could potentially also send custom messages and the like to the workers.
@Wolvan an eventemitter would probably be a bit overkill, but a "cancel" method could do it, however we need a new "state" for cancelled jobs then, since currently they can only be completed or failed, of course they could be placed as "failed" with reason "cancelled", but that would be equivalent to just throwing an exception with that reason as message. In any case the hard part to solve is how to cancel the job from the outside, and I think the currently more javascript idiomatic way to achieve it would be to make the job processor return an observable instead of a promise.
Definitely an idea on how to handle it. Thinking about all of this, I think the biggest problem is that once a job is running we have no way of communicating with the worker that the job is running under.
Failed with the reason canceled is a valid strategy in my mind. Maybe the core of it is that a worker is not expecting to be cancelled but the result can be discarded (if the worker does not see this in time/at all) and (presumably) since any distributed transaction would have to defer to some rollback mechanism (automatic or initiated by the failure) then it should just work as-is.
If the debate is about whether or not a cancellation is a soft or hard error, it definitely is a hard fault/interrupt and can be seen as the side effect of an error propagation nonetheless. The question then becomes: what happens when you force kill the BullMQ process working the message? The same thing as normal, I suppose with a different error message if possible to notify the initial caller/client.
With cancelable promises it would be pretty straightforward to implement putting the effort on the processor function that needs to implement cancelation correctly. However this is not standard, so the two options left in my world are: 1) to require the processor to return a RXJs stream (so it can be cancelled), or 2) add an extra argument to the processor function with a token that emits an event when the processor has been requested to be cancelled, so that the processor can gracefully shutdown.
new Worker('cancelable', (job, eventEmitter) => {
// doStuff
eventEmitter.on('cancel', () => {
// do cleanup
}
})
I think it is quite standard to require the cancelation as a synchronous operation, but of course there would be cases where asynchronous is required, so then we also need to consider that the cancellation could hang indefinitely requiring a TTL for the cancellation itself.
The problem with the cancel
event approach is of course that at the end of the day it may even require to implement your processor as a rxjs stream anyway, since as soon as you use async/await you are back in square one.
I do not like the extra complexity of using rxjs streams, but do we have any other realistic alternatives?
For reference I will leave here a link to nodejs AbortController documentation: https://nodejs.org/api/globals.html#globals_class_abortcontroller This could be used to support job cancellation as in option 1 mentioned above.
Do you guys know of any workaround I could take in the meantime? I have some jobs that run over 14 hours, if the end user misconfigured some params it has to be cancelled to avoid wasting a lot of time.
I use ioredis to connect to a redis pub/sub and have the queue worker listen to cancellation requests. I use a combination between a job-specific random string and an ID to basically verify the job cancellation.
Just generate a job and have one of the data elements you add to it be that random string. When you have to cancel the job, you use that same random string.
I then use a variable that isCancelled
which is false and since my job is made up of multiple smaller tasks I just check after each subtask if that variable is true and bail on any further tasks/do my cleanup.
At the same time I have a promise that I keep a handle to the resolve method around so when the job cancellation is called I can call that resolve method.
I use an await Promise.race([cancellationPromise, (async actualJob() => { ... })()])
to get immediate cancellation of that task in the queue.
I use ioredis to connect to a redis pub/sub and have the queue worker listen to cancellation requests. I use a combination between a job-specific random string and an ID to basically verify the job cancellation. Just generate a job and have one of the data elements you add to it be that random string. When you have to cancel the job, you use that same random string. I then use a variable that
isCancelled
which is false and since my job is made up of multiple smaller tasks I just check after each subtask if that variable is true and bail on any further tasks/do my cleanup. At the same time I have a promise that I keep a handle to the resolve method around so when the job cancellation is called I can call that resolve method. I use anawait Promise.race([cancellationPromise, (async actualJob() => { ... })()])
to get immediate cancellation of that task in the queue.
Thank you so much for your quick response! For my understanding, you use ioredis to maintain a separate queue for cancellation events, and use bullmq for your jobs? Or have you sidestepped it altogether?
Thank you so much for your quick response! For my understanding, you use ioredis to maintain a separate queue for cancellation events, and use bullmq for your jobs? Or have you sidestepped it altogether?
Not using a queue, but a redis pub/sub via ioredis. If the worker isn't subscribed to the publisher, it probably isn't running a job anyways This way I can actually PUSH cancellation commands.
Thank you so much for your quick response! For my understanding, you use ioredis to maintain a separate queue for cancellation events, and use bullmq for your jobs? Or have you sidestepped it altogether?
Not using a queue, but a redis pub/sub via ioredis. If the worker isn't subscribed to the publisher, it probably isn't running a job anyways This way I can actually PUSH cancellation commands.
I haven't used Redis pub/sub before, I'll look into it tomorrow. Thanks :)
The Pro version supports TTL based cancellation however it requires writing the processor as a RXJS stream rather than a Promise: https://docs.bullmq.io/bullmq-pro/observables/cancelation In the pipe is the feature of allowing cancelling a given job just having access to the job ID. Btw pubsub may work but is not very reliable, it would be better to use Redis streams for a usecase like this.
Is there a way to stop a specific job from continuing to run? I have my jobs running with a default of 3 attempts. If for some reason the data is not what it should be, I want to end that job and log it. I threw an error but it continued to run. I also tried to use the moveToFailed(...)
but that doesn't seem to stop it, it just runs two more times before it's really failed. I can't seem to find anything in the documentation or source code that allows you to abort the job.
As soon as I wrote this I went back to the source code and found that you can discard the job. Is this the recommended way of aborting is by calling job.discard()
?
Why can't you just use an abort controller? That's seems like the simplest and standardized solution.
Processors can pass (or not pass) the abort signal to fetch requests to abort them mid flight. You can also periodically check during your long running job for the abort signal and throw an abort error to denote that the job failed if it's aborted...or you can choose to ignore it, and the job will be considered successfully completed at the end.
You can also have a single signal controlling multiple processes if you want to shared an abort controller to abort multiple jobs, and it's cross compatible with other libraries. This is convenient if you have many different things running at once. You don't want a bunch of different ways of cancelling everything. This just adds additional mental and computational overhead. You could also share the same abort controller across multiple jobs to cancel many jobs at once if there is a use case for that (e.g. cancelling all children of a flow). You can also add event listeners to the abort signal.
I've implemented it in my own code, and it works pretty well, but I think it would be better if it were built into the library.
Here's the code I'm using right now, for reference.
// abort.ts
import Redis from "ioredis";
// this is where we keep track of the abort controllers mapped to jobs
export const abortControllers: Record<string, AbortController> = {};
// subscribe to abort events and run the `abort` function if we have the controller (it's in band with our worker)
const subscriber = new Redis();
subscriber.subscribe('abort');
subscriber.on('message', (channel: any, abortId: any) => {
if (channel === 'abort') {
const ac = abortControllers[abortId];
if (ac) {
ac.abort();
}
}
});
// the api to abort the job(s)..go ahead and call `abort` if we have the controller, otherwise, publish the event
const publisher = new Redis();
export const abort = async (abortId: string) => {
when
if (abortControllers[abortId]) {
return abortControllers[abortId].abort();
}
await publisher.publish('abort', abortId);
}
// queues.ts
const createQueueComponents = <T extends Processor>({ name, runner, flow }: Props<T>) => {
const queue = {
queue: new Queue(name, opts),
worker: new Worker(name, runner, {
...opts,
concurrency,
}),
events: new QueueEvents(name, opts),
name,
flow,
}
// automatically discard the abort controllers after a job is completed
queue.events.on('completed', ({returnvalue}) => {
if (abortControllers[returnvalue]) {
delete abortControllers[returnvalue];
}
});
queue.events.on('failed', async ({jobId}) => {
await queue.queue.getJob(jobId).then(async (job) => {
if (abortControllers[job?.data?.abortId]) {
delete abortControllers[job?.data?.abortId];
}
})
});
return queue;
}
// etl.ts
export const flow = (options: ETLState) => {
const until = (new Date).toISOString();
const queueName = name;
const abortId = options.abortId;
return {
// the last step signifies that we can remove the abort id
queueName,
name: 'end',
data: {
until,
// the abort id is passed to each job so that we can associate jobs with their abort ids
abortId,
},
children: [
{
name: ETLStep.Call,
data: {
until,
abortId,
},
queueName,
children: [
{
name: ETLStep.Employee,
data: { until, abortId },
queueName,
},
{
name: ETLStep.Technician,
data: { until, abortId },
queueName,
},
],
},
],
}
}
export const runner = async (job: Job<ETLState>) => {
// if the job was ended, we return the abortId so that the listener can remove it
// so it can be garbage collected...this could just be passed in the job state
if (job.name === 'end') {
return job.data?.abortId;
}
let ac: AbortController;
if (!abortControllers[job.data.abortId]) {
ac = new AbortController();
abortControllers[job.data.abortId] = ac;
} else {
ac = abortControllers[job.data.abortId];
if (ac.signal.aborted) {
throw new Error('Aborted');
}
}
// ....more code
// in the loop, we can check for if the signal was aborted.
let nextBatch: Promise<Response> | null = getNextBatch(current);
while (nextBatch) {
if (ac.signal.aborted) throw new Error('Aborted');
let response: Response;
Then I can listen for the abort calls on my rest api:
router.delete('/:queue', catchErrors(async (req, res) => {
const queue = parseQueue(req);
await queue.drain();
await queue.clean(0, 1000000, 'wait');
await queue.clean(0, 1000000, 'delayed');
if (Array.isArray(req.body.abortIds)) {
for (const id of req.body.abortIds) {
await abort(id);
}
} else if (req.body.abortIds) {
await abort(req.body.abortIds);
}
return res.json({ message: 'Queue cleaned!' });
}));
and pass the abort ids to the endpoint on my front end (this is remix):
const abortIds = Object.keys(queue
.map((job) => !job.finishedOn && job.data?.abortId)
.filter(Boolean)
.reduce((acc, val) => {
acc[val] = true;
return acc;
}, {}));
const handleAbort = async () => {
const formData = new FormData();
for (const id of abortIds) {
formData.append('abortIds', '' + id);
}
fetcher.submit(formData, {
action: `/tasks/proxy/${task}`,
method: 'delete',
});
}
It would also be good to have a way to abort all items in a queue or maybe just make that the functionality of the clean
method on the queue.
Anyway, those are just some thoughts. I have no idea how this would fit into the actual project. I haven't actually even looked at the source code, but just an outsider's perspective.
@roggervalf Are there any plans to implement an AbortController-based solution for this? We are considering using Bullmq PRO+Taskforce in our organization, and not having convenient cancellation mechanism is rather painful.
@kibertoad proper cancellation is always going to be non-trivial. However, I think that with a correct implementation of a processor using observables, you can achieve a quite robust solution: https://docs.bullmq.io/bullmq-pro/observables
@manast Is triggering cancellation integrated into Taskforce as well already?
what about these plans to add the abort controller?
AbortController is just one piece of the puzzle, but you need to also send a signal from somewhere to your current worker that is working in a given job and then cancel it remotely. Otherwise it is easy, as demonstrated here: https://docs.bullmq.io/patterns/timeout-jobs
Queue processes we are running have a very long time to complete. Sometimes it turns out we made a mistake and need to abort the running task. Is there a way to signal the worker to stop its work/clean up and then discard the job? If not, how would one best go about implementing one? Does the worker still read queue data or something that we can use as some kind of abort list?