Automattic / kue

Kue is a priority job queue backed by redis, built for node.js.
http://automattic.github.io/kue
MIT License
9.46k stars 867 forks source link

job complete event is missing after pause/resume #1058

Open fbergama opened 7 years ago

fbergama commented 7 years ago

Hi everybody.

If I run this code

var concurrency=1
mainq.process('myjob', concurrency, function(job,ctx,done) {

            ctx.pause( 50000, function(err){
                console.log("Worker is paused... ");
                setTimeout( function(){ ctx.resume(); }, 10000 );
            });

            setTimeout( function() { done(); }, 2000 );
});

I correctly receive the "job complete" events when I set the callback listener with mainq.on("job complete', my_complete_callback). However, if I set concurrency to any value greater than 1, my_complete_callback is not always called and some jobs remain stucked in "active" state.

Is it a bug? Did I forget something? Thank you

TheGrandmother commented 7 years ago

mainq.on("job complete', my_complete_callback). I'm assuming that the " is a typo?

Do you get the same error if you attach a job specific event?

fbergama commented 7 years ago

mainq.on("job complete', my_complete_callback). I'm assuming that the " is a typo?

Yes, sorry. It's a typo.

Do you get the same error if you attach a job specific event?

Yes, I've tried setting something like:

mainq.create( 'something', {
              ....
            }).on('remove',function(result) {
                console.log("JOB REMOVED");
            }).on('complete',function(result) {
                console.log("JOB COMPLETE");
            }).save

But the "JOB COMPLETE" is not written on the console for all the elements of the queue. Also in this case, with a concurrency=1 everything works as expected

TheGrandmother commented 7 years ago

Hmm... Just tested this in my code and it works. Although my jobs are actually not being executed concurrently (they contain a blocking loop for testing purposes) but the com completed event gets triggered simultaneously and correctly from the queue.

Make sure that all the string constants in the code are correct. Kue has a tendency just to silently continue when you give it faulty parameters about it, I am going to start working on a PR to try to introduce some more defensive programming into Kue when I know what branch to work against. I just spent a long time in utter confusion and despair until i realized that i had written .on('completed' ... instead of complete and Kue did not give me any indication to this being the case.

So I recommend carefully reading through everything.

fbergama commented 7 years ago

Make sure that all the string constants in the code are correct.

That must not be the case since everything is working with a 1 job concurrency :(

Anyway, thank you for helping. I'll create a new test script to clearly replicate the problem. If it's working for you maybe It's because I have a different version?

TheGrandmother commented 7 years ago

Things are indeed strange! I'm running version 0.11.5.

fbergama commented 7 years ago

Ok same version here. I'll post some testing code here if I cannot solve the issue by myself. I suppose that there must be something odd with my code

behrad commented 7 years ago

Best is to send us a replicating code @fbergama

fbergama commented 7 years ago

Ok I've created a simple test function:

function testkue( concurrency ) {

    console.log("Testing concurrency=%d", concurrency);

    var kue = require('kue');
    var queue = kue.createQueue();

    queue.on('job enqueue', function(id, type){
        console.log( 'job %s queued', id );

    }).on('job complete', function(id, result){
        console.log('job #%d completed', id);
    });

    queue.process('testtask', concurrency, function(job, ctx, done){

        ctx.pause( 5000, function(err){
            console.log("Worker is paused... ");
            setTimeout( function(){ ctx.resume(); console.log("Worker resumed"); }, 5000 );
        });
        setTimeout( function() { done(); }, 3000 );

    });

    for( var i=0; i<5; ++i ) {
        queue.create( 'testtask', {} ).save();
    }
}

here is the output I get when I invoke testkue(1)

Testing concurrency=1
job 1 queued
job 2 queued
job 3 queued
job 4 queued
job 5 queued
Worker is paused...
job #1 completed
Worker resumed
job #2 completed
Worker is paused...
Worker resumed
job #3 completed
Worker is paused...
Worker resumed
job #4 completed
Worker is paused...
Worker resumed
job #5 completed
Worker is paused...
Worker resumed

As you can see, the complete event is received for all the jobs. On the other hand, if I invoke testkue(2) I obtain the following:

Testing concurrency=2
job 1 queued
job 2 queued
job 3 queued
job 4 queued
job 5 queued
Worker is paused...
Worker resumed
Worker is paused...
Worker is paused...
job #3 completed
Worker resumed
Worker resumed
Worker is paused...
job #4 completed
job #5 completed
Worker is paused...
Worker resumed
Worker resumed

Basically, job 1 and 2 remain stucked in active state and no complete event is received. If I change concurrency to other values other than 1 I still obtain the same kind of error.

Hope that this can help

behrad commented 7 years ago

thank you @fbergama , this helps to find and fix the issue

MarkPieszak commented 6 years ago

Was anyone able to uncover what the underlying issue was for this one? I'm seeing the same thing happening with the current/latest version as well. Trying to debug Kue internals but not as familiar with the Lib just yet. @behrad @fbergama @TheGrandmother

If anyone has any info / fix for this - I'm all ears! 👍 🙏