OptimalBits / bull

Premium Queue package for handling distributed jobs and messages in NodeJS.
Other
15.58k stars 1.43k forks source link

Issue with moveToFailed function reprocessing jobs and blocking queue #2750

Closed Haris-Ali closed 5 months ago

Haris-Ali commented 5 months ago

Hi, I was dabbling into bull queue because I was going to incorporate it into my project. The scenario in my project is that every job can be retried once (i.e. if it fails a second time, it should be removed from the queue). The job is basically handling some API call to a third party server which may or may not respond always. Which is why I have a setInterval to check if certain time has elapsed since job started processing. I have to fail the job and send it to retry if this is the first attempt otherwise remove it/mark as failed. Below is a minimum test code for the issue I am facing. Basically, moveToFailed function takes the job into waiting state (which doesn't make sense? Should it not move to failed state?) It then retries the job once more after which no other job is being processed.

I have to manually call moveToFailed and/or moveToCompleted because of my desired functionality. If there's something I should do differently please do let me know as it is pretty essential I incorporate this feature.

import Bull from "bull";
import dotenv from "dotenv";

dotenv.config();
const { REDIS_HOST, REDIS_PORT } = process.env;

const jobInactivityMap = new Map();
const jobTimeout = 180000; // 3 Minutes

const queueOptions = {
  redis: { host: REDIS_HOST, port: REDIS_PORT },
  defaultJobOptions: {
    attempts: 2,
  },
};

const testQueue = new Bull("testQueue4", queueOptions);

testQueue.process(async (payload, done) => {
    console.log(`Processing job ${payload.id} with data: ${JSON.stringify(payload.data)}`);
});

testQueue.on('completed', (job, result) => {
    console.log(`Job ${job.id} completed with result ${result}`);
});

testQueue.on('failed', (job, err) => {
    console.log(`Job ${job.id} failed with error ${err.message || err}`);
});

testQueue.on("active", (job) => {
    console.log("jobInActivityMap", jobInactivityMap);
    jobInactivityMap.set(job.id, Date.now());
    console.log("jobInActivityMap", jobInactivityMap);
});

const jobs = [...new Array(3)].map((_) => ({
    text: "Hello World!"
}));

jobs.forEach((job) => testQueue.add(job));

setInterval(async () => {
    const currentTimestamp = Date.now();
    console.log('jobInactivityMap inside setInterval', jobInactivityMap)
    for (const [jobId, startTime] of jobInactivityMap.entries()) {
        // Check if job elapsed time is greater than or equal to 3 minutes
        console.log(`Job ID: ${jobId} with start time: ${startTime}`);
        if (currentTimestamp - startTime >= jobTimeout) {
            console.log(`Job elapsed time is greater than job timeout`);
            const job = await testQueue.getJob(jobId);
            if (job && job.attemptsMade === 0) {
                // If this is first attempt, retry the job
                await job.moveToFailed(new Error('first attempt failed'), true);
                jobInactivityMap.set(jobId, Date.now());
            } else {
                // Job has reached/is on max attempts or has received no response
                console.log(`Job's last attempt`);
                jobInactivityMap.delete(jobId);
                await job.discard();
            }
        }
    }
}, 60000);

I am using bull version 4.12.9

manast commented 5 months ago

There are many reasons why your approach won't work, so I will not dig into details. However, I can give you a couple of hints, first I will recommend you to use BullMQ instead (newer, more stable, better maintained), and secondly, you should be able to use the standard retry settings for a use case like this, from what you write it seems like a standard retry case with the "removeOnFail" option.

https://docs.bullmq.io/guide/workers/auto-removal-of-jobs https://docs.bullmq.io/guide/retrying-failing-jobs