Automattic / kue

Kue is a priority job queue backed by redis, built for node.js.
http://automattic.github.io/kue
MIT License
9.45k stars 862 forks source link

my solution for stuck jobs in active and inactive state, but is it possible to make this run in every job process? without restart the server #1240

Open hotaryuzaki opened 4 years ago

hotaryuzaki commented 4 years ago

i don't know this is a good or a bad way, because i'm new in Kue. i got to maintenance my company system, they used Kue when develop this. and sadly this repo no longer maintained.

here is my solution, it works good for me. but i cannot make this works as programatically when server is running. (we have to restart the server to clear all mess)

so my question is it possible to make this run in every job process? maybe run as a function and we can call it when needed??


/***
    REMOVE ALL STUCK JOBS WHEN SAILS LIFT
    THEN RUN QUEUE PROCESS
***/
try {
    // GET SPECIFIC JOB BY JOB TYPE - CURRENTLY JUST FOR CHECKING STATUS
    kue.Job.rangeByType(jobType1, 'inactive', 0, 100, 'asc', function(err, jobs) {
        var ids = [];

      jobs.forEach(function(job) {
        ids.push(job.id);
      });

        console.log(jobType1, '=> STUCK JOBS INACTIVE TOTAL:', jobs.length, '=', ids);
    });

    kue.Job.rangeByType(jobType2, 'inactive', 0, 100, 'asc', function(err, jobs) {
        var ids = [];

      jobs.forEach(function(job) {
        ids.push(job.id);
      });

        console.log(jobType2, '=> STUCK JOBS INACTIVE TOTAL:', jobs.length, '=', ids);
    });

    kue.Job.rangeByType(jobType3, 'inactive', 0, 100, 'asc', function(err, jobs) {
        var ids = [];

      jobs.forEach(function(job) {
        ids.push(job.id);
      });

        console.log(jobType3, '=> STUCK JOBS INACTIVE TOTAL:', jobs.length, '=', ids);
    });

    /*########## KUE GET ALL ACTIVE JOBS ##########*/
    queue.active(function(err, ids) {
        console.log('QUEUE => JOBS ACTIVE STUCK IDS:', ids)

      ids.forEach(function(id) {
        kue.Job.get(id, function(err, job) {
          job.inactive(); // SET ALL STUCK JOBS ACTIVE TO INACTIVE
        });
      });

        /*########## KUE GET ALL INACTIVE JOBS ##########*/
        queue.inactive(function(err, ids) {
            console.log('QUEUE => JOBS INACTIVE STUCK IDS:', ids)

          ids.forEach(function(id) {
            kue.Job.get(id, function(err, job) {
                // REMOVE ALL STUCK JOBS
                    job.remove(function(err){
                    if (err) throw err;
                    console.log('QUEUE =>', job.type, '=> JOB REMOVED #', job.id);
                  });
              });
          });

            queue.process(jobType1, function(job, done){
                try {
                     // process
                     done();
                } catch(e) {
                    done(e);
                }
            });

            queue.process(jobType2, function(job, done){
                try {
                     // process
                     done();
                } catch(e) {
                    done(e);
                }
            });

            queue.process(jobType3, function(job, done){
                try {
                     // process
                     done();
                } catch(e) {
                    done(e);
                }
            });
        });
    });

} catch(e) {
    // statements
    console.log(e);
}```