Automattic / kue

Kue is a priority job queue backed by redis, built for node.js.
http://automattic.github.io/kue
MIT License
9.45k stars 862 forks source link

Forever active jobs after Kue terminated and recovered #1160

Open denigmus opened 6 years ago

denigmus commented 6 years ago

We are trying too build a fault-free solution for jobs. We have tested and found that if we halt Kue or node in the middle of jobs processing then these jobs are never done after Kue/node recovered. These jobs are in forever active status. Do you plan to solve this problem, i.e. re-queue active jobs on start?

joshbeckman commented 6 years ago

The best way to handle restarting stuck active jobs is to set a TTL on them upon creation. This causes kue to automatically fail and then (if possible) requeue them. https://github.com/Automattic/kue/issues/604#issuecomment-99548966

arnaudbesnier commented 5 years ago

Here is the code we use to handle this problem (using Heroku). It is a first shot, it might contain issues.

const P = require('bluebird');
const kue = require('kue');

const processId = process.pid;
const queue = kue.createQueue({ redis: redisUrlForJobs });

function deactivateIfCurrentWorkerJob(jobId) {
  return new P((resolve) => {
    kue.Job.get(jobId, (error, job) => {
      if (job.workerId.includes(`:${processId}:`)) {
        job.inactive();
        job.log(' (!) Worker shut down - ready to restart');
        job.progress(0);
      }
      resolve();
    });
  });
}

function setActiveWorkerJobAsInactive() {
  queue.active((error, jobIds) => {
    const promises = [];
    jobIds.forEach((jobId) => {
      promises.push(deactivateIfCurrentWorkerJob(jobId));
    });

    return P.all(promises)
      .finally(() => { process.exit(0); });
  });
}

// NOTICE: On local servers, Cmd+C fires a SIGINT event. Heroku workers that shutdown send a SIGTERM
//         (https://devcenter.heroku.com/articles/dynos#graceful-shutdown-with-sigterm).
const shutdownEventName = process.env.NODE_ENV === 'development' ? 'SIGINT' : 'SIGTERM';
process.on(shutdownEventName, setActiveWorkerJobAsInactive);