Closed JustDoItSascha closed 2 years ago
@JustDoItSascha You can debug the library using Taskforce.sh, just make sure you are using the latest BullMQ Pro version, and you will get a new tab "Groups" when you choose the queue.
Status maxed means that the group has reached the maximum concurrency setting that you have specified for groups, the default is unlimited. So there may be workers processing the jobs that have not been completed. Let's work together to identify the issue.
@manast Thanks for your quick response. Unfortunately it's not possible to make my redis publicable available, so there is no option to use taskforce.sh or can I host it on my own server? Right now I'm using bull-board to get insights in the queue which is working good, but that's not what I meant when I said it's hard to debug.
At the moment I'm trying to isolate the bug. I want to use the ttl option of the worker and therefore I created a processor like the following:
new WorkerPro(
'name',
async job => {
return new Observable(subscriber => {
this.processJob(job).then((x) => { subscriber.next(x); subscriber.complete(); })
return {
unsubscribe() {
console.log('unsubscribe')
}
}
})
}
)
So my questions are:
@JustDoItSascha
- The unsubscribe() method gets triggered when ttl is reached (what is good!) but it gets also triggered when calling subscriber.complete(). How does I know that it's triggered by the ttl and not by complete? In your example in the docs you are not using a real world scenario, because you are just calling next() and complete() in a setTimeout. But in a real world scenario I have no setTimeout() but an async function
Let me come back on this, with a proper answer.
2. What should I do when unsubscribe() is called? It could be, that my async function is still running, I can't cancel a Promise (as far as i know).
Promises cannot be canceled, that's why we use Observables, so somehow you must provide a signal or similar so that the async function that you call can be cancelled, a trick could be throw an error with a special message like "Cancelled".
3. Should I call moveToFailed() or moveToComplete() by myself? Because right now the job is disappearing completely and has no status at all. I attached an image from redis and just remove data, name and timestamp. There is a failedReason with Timeout has occured what looks correct. But at the same time the job is not under failed Jobs and the active count for this group is still on 1...
You should not call moveToCompleted/Failed if using a processor. Could it be that you have set up the removeOnCompleted/Failed setting to true so all jobs gets removed after they finalize?
@manast
- The unsubscribe() method gets triggered when ttl is reached (what is good!) but it gets also triggered when calling subscriber.complete(). How does I know that it's triggered by the ttl and not by complete? In your example in the docs you are not using a real world scenario, because you are just calling next() and complete() in a setTimeout. But in a real world scenario I have no setTimeout() but an async function
Let me come back on this, with a proper answer.
Ok, no problem.
- What should I do when unsubscribe() is called? It could be, that my async function is still running, I can't cancel a Promise (as far as i know).
Promises cannot be canceled, that's why we use Observables, so somehow you must provide a signal or similar so that the async function that you call can be cancelled, a trick could be throw an error with a special message like "Cancelled".
Technically observables can be canceled, that's correct. But I cannot cancel the processing itself once started. In the example of your documentation with .next(1)
and .next(2)
; assume that between next(1) and next(2) would be some time, for example 5 seconds. When the ttl is occuring exactly in this moment, you have no chance to prevent next(2) from happening even the complete()
has already been called.
And a real world scenario: Let's say my jobs are for creating pdfs. And I call the pdf library to create the pdf and my ttl is 10 seconds. Maybe this particular pdf is way too big and it takes 10 minutes to complete. Then I want my job to fail with a timeout error. But there is no way to cancel the creation of the pdf in the pdf library. The pdf will still be created after 10 minutes, but that's ok for my purpose. But right now the job just disappears.
- Should I call moveToFailed() or moveToComplete() by myself? Because right now the job is disappearing completely and has no status at all. I attached an image from redis and just remove data, name and timestamp. There is a failedReason with Timeout has occured what looks correct. But at the same time the job is not under failed Jobs and the active count for this group is still on 1...
You should not call moveToCompleted/Failed if using a processor. Could it be that you have set up the removeOnCompleted/Failed setting to true so all jobs gets removed after they finalize?
No, I'm not using removeOnComplete and not removeOnFailed. And you ignored the fact and my image that the active count for this group is still 1 which prevents the next job from processing... (my concurrency is 1).
An additional info, maybe this helps:
In the bullmq event log I see that the jobs are going to delayed state when the ttl (of 5 seconds) kicks in. Exactly 15 seconds later the jobs are going to waiting state. But the active count for that group is still 1 and so they are stuck in waiting since I have a concurrency of 1. Could this be a bug?
Ok, we are trying to reproduce the issue, I will keep you updated as we know more about it.
Ok thanks! It really seems that the problem is the retry strategy of the jobs. When it should make more than one attempt, then it's not working.
As a control question, are you using the latest version? v2.7.1 ?
Affirmative :)
I wrote the following test. I have been trying with a lot of different settings, different concurrency factors, number of groups or number of jobs, but I could not reproduce the issue, I wonder if you have some settings that would reproduce it or if you can see something in this test that is different from your scenario:
it.only(
'processes group jobs that timeout without getting stuck',
async () => {
const numGroups = 5;
const jobsPerGroup = 38;
for (let i = 0; i < numGroups; i++) {
for (let j = 0; j < jobsPerGroup; j++) {
await queue.add(
'test',
{ foo: `bar${(i + 1) * j}` },
{ ...groupOpts(i), attempts: 2, backoff: 10 },
);
}
}
const concurrency = 100;
const maxGroupConcurrency = 1;
const queueScheduler = new QueueSchedulerPro(queueName, {
connection,
});
let count = numGroups * jobsPerGroup;
expect(await queue.getGroupsJobsCount()).to.be.eq(count);
let worker: WorkerPro;
const processedJobs: { data: any; opts: JobsProOptions }[] = [];
const processing = new Promise<void>(resolve => {
worker = new WorkerPro(
queueName,
async job => {
return new Observable(subscriber => {
const delay = job.attemptsMade == 1 ? 5000 : 25;
const timeout = setTimeout(() => {
subscriber.next();
subscriber.complete();
processedJobs.push({ data: job.data, opts: job.opts });
count--;
if (processedJobs.length == numGroups * jobsPerGroup) {
resolve();
}
}, delay);
return {
unsubscribe() {
console.log('Timedout!', job.id, job.opts?.group.id);
clearTimeout(timeout);
},
};
});
},
{
connection,
concurrency,
group: {
concurrency: maxGroupConcurrency,
},
ttl: 250,
},
);
});
await processing;
expect(processedJobs.length).to.be.equal(numGroups * jobsPerGroup);
await worker.close();
await queueScheduler.close();
},
).timeout(80000);
I will try your code and will try to figure out what the problem is, but here is already something wrong in your code:
That seems to be some typing error, the group option is really there.
What does the groupOpts(i) return in your example?
Found the error: Use a string as group id, not a number. Then the error will happen in your code.
More precisely: Use a string with at least 17 characters, I was using this: 89882390000057462
And when you fix this then another hint from my side: A group.id of 0 is also not working. I assume you are doing something like if(group.id) in your code, but you should explicitly check for undefined and null value like if (group.id == null)
Found the error: Use a string as group id, not a number. Then the error will happen in your code.
Wow. Incredible, I can reproduce it as well now.
And when you fix this then another hint from my side: A group.id of 0 is also not working. I assume you are doing something like if(group.id) in your code, but you should explicitly check for undefined and null value like if (group.id == null)
In the test we are actually using id 0 as number and it works, are you sure about this?
Ok, so the issue comes from the fact that in Redis, when we store a number, we are actually storing a string, so we made a trick to convert it back to numbers, except the trick does not work since parseInt does even parse strings like: "0123" as 123, or "123abc" as 123. The best solution will simply be not supporting numbers as gid's just strings. This will be a small breaking change, requiring a major version update.
Or you could just convert the numbers to strings instead of parsing the string to a number. This requires just a Patch Version Update. Because nothing changes in the API for the consumer. But the bug is fixed...
And yes i'm sure about the thing with the 0. The Job gets processed, but the group concurrency is ignored.
Ok sorry, I read it again. The return value of the group id would be always a string instead of the original number I used initially. Yes, this would be a breaking change.
The fix is ready but I have some tests that are giving me trouble, as soon as everything is green there will be a new release.
Please upgrade to version 3.0.0 for a resolution to this issue.
I have a queue where jobs are not getting processed. When I use the
getGroupStatus
method for a groupId, then I get amaxed
status for all group ids. What does that mean?Besides from that I don't see the jobs in delayed status or any other status. QueueScheduler is running.
The debugging possibilities in the whole library are unfortunately very low... :(