Automattic / kue

Kue is a priority job queue backed by redis, built for node.js.
http://automattic.github.io/kue
MIT License
9.45k stars 862 forks source link

Kue TTL exceeded error occures #1122

Open DipakTechifySolutions opened 6 years ago

DipakTechifySolutions commented 6 years ago

Hello guys,

I use Redis Redis 3.0.6 64 bit in Ubuntu 16.04LTS.

There is an error occurred "TTL exceeded", I don't know how do I resolve it. I have use kue module of Redis in NodeJS and after some job, there is an error of "TTL exceeded". I referred this error in google and found that when job TTL exceeded error occurred when the current job has been failed and stuck in between and another process which is in the queue also pending in the queue. I have referred about watchStuckJobs() method but I don't get the proper idea how to use it.

My concern is that how do I know stuck jobs and remove or restart the stuck jobs and same taking care for future jobs also.

I am really seeking for the guideline about this issue.

Following is redis-cli output:

` hgetall q:job:10 1) "failed_at" 2) "1507796226312" 3) "error" 4) "TTL exceeded" 5) "started_at" 6) "1507796105591" 7) "backoff" 8) "true" 9) "promote_at" 10) "1507796072433" 11) "delay" 12) "1000" 13) "ttl" 14) "120000" 15) "state" 16) "failed" 17) "type" 18) "import" 19) "removeOnComplete" 20) "true" 21) "updated_at" 22) "1507796226312" 23) "created_at" 24) "1507795948716" 25) "data" 26) "{\"type\":\"sf-org-import\",\"socketId\":\"0fv_xXAyQdph2lh5AAAO\",\"data\":\"f827ab3b-e6fe-477c-9b8d-949221fa52fb\"}" 27) "attempts" 28) "2" 29) "priority" 30) "0" 31) "max_attempts" 32) "2" 33) "workerId" 34) "kue:ip-172-31-71-238:16612:import:1"

Code

var queue = kue.createQueue({
    prefix: 'qt',
    redis: redisConst,
    jobEvents: false,
    removeOnComplete: true
});
var job;
var _io;
var concurrency = 1;

/* queue setting */
queue.on('ready', () => {
    console.info('Queue is ready!');
});

queue.watchStuckJobs(5000);

// A job get executed
queue.on('job enqueue', function (id, type) {
    console.log('Job %s got queued', id);
});

// A job get removed
queue.on('job complete', function (id, result) {
    kue.Job.get(id, function (err, job) {
        if (err)
            return;
        job.remove(function (err) {
            if (err)
                throw err;
            console.log('Removed completed job #%d', job.id);
        });
    });
});

queue.on("job process", function (id, result) {
    kue.Job.get(id, function (err, job) {
        if (err)
            return;
        console.log("job process is done", job.id);
    });
})

queue.on('error', (err) => {
    // handle connection errors here
    console.error('There was an error in the main queue!');
    console.error(err);
    console.error(err.stack);
    kue.Job.get(id, function (err, job) {
        if (err)
            return;
        job.remove(function (err) {
            if (err)
                throw err;
            console.log('Removed err job #%d', job.id);
            findJobCount();
        });
    });
});

process.once('SIGTERM', function (sig) {
    queue.shutdown(5000, function (err) {
        console.log('Kue shutdown: ', err || '');
        process.exit(0);
    });
});

/* workers */
queue.process('import', concurrency, function (job, done) {
    console.log('Start processing job of type "%s"', job.data.type);
    switch (job.data.type) {
        case 'consolidate-contact':
            // consolidation contact code
           done();
            break;
        case 'create-contacts-avatars':
            // phone avatar import
           done();
            break;
        case 'import-calendar-data':
            // import calender data
           done()
            break;
    }
});

function findJobCount() {
    queue.activeCount(function (err, count) {
        if (!err)
            console.log('**** Active: ', count);
    });
    queue.inactiveCount(function (err, count) {
        if (!err)
            console.log('**** Inactive:', count);
    });
}

module.exports = class QueueController {
    static init(io) {
        _io = io;
    }
    /* producers */
    static createJob(name, data) {
        console.log('create a job of type "%s"', data.type);
        job = queue.create(name, data)
                .delay(1000)
                .ttl(120000)
                .attempts(1)
                .backoff(true)
                .removeOnComplete(true);

         job
             .on('start', function () {
                 console.log('Job', job.id, 'is now running');
                 findJobCount();
             })
            .on('complete', function () {
                 console.log('Job', job.id, 'is done');
                 findJobCount();
             })
             .on('failed', function () {
                 console.log('Job', job.id, 'has failed');
                 job.remove();
                 findJobCount();
             });

        job.save(function (err, result) {
            if (err) {
                console.log('Error in adding Job: ' + err);
            } else {
                console.log("Job saved: ", result);
            }
        });
    }
}
popey456963 commented 6 years ago

Possibly linked with #1141