OptimalBits / bull

Premium Queue package for handling distributed jobs and messages in NodeJS.
Other
15.58k stars 1.43k forks source link

Possible bug in lifo and priority when using delay #945

Open viebig opened 6 years ago

viebig commented 6 years ago

Description

When I use delayed jobs, lifo and priority options don't work.

Test code to reproduce

Code without delay

const Queue = require('bull');

const callQueue = new Queue('q21');
const concurrency = 10;

setTimeout(() => {
    console.log('starting')
    callQueue.process(concurrency, (job, done) => {
        const status = (Math.random() * 2 >= 1) ? "Answer" : "Busy";
        console.log(job.data, status);

        if (status === 'Busy') {
            job.redial = true;
            callQueue.add({ id: job.data.id, redial: true }, { lifo: true });
        }

        setTimeout(() => {
            done();
        }, 1000)
    });
}, 5000)

let x = 0;
let jobs = [];

for (x = 0; x < 1000; x++) {
    callQueue.add({ id: x, redial: false });
}

Output without delay

starting
{ id: 0, redial: false } 'Busy'
{ id: 1, redial: false } 'Busy'
{ id: 2, redial: false } 'Answer'
{ id: 3, redial: false } 'Answer'
{ id: 4, redial: false } 'Answer'
{ id: 5, redial: false } 'Answer'
{ id: 6, redial: false } 'Busy'
{ id: 7, redial: false } 'Answer'
{ id: 8, redial: false } 'Answer'
{ id: 9, redial: false } 'Answer'
{ id: 6, redial: true } 'Busy'
{ id: 1, redial: true } 'Busy'
{ id: 0, redial: true } 'Answer'
{ id: 10, redial: false } 'Answer'
{ id: 11, redial: false } 'Busy'
{ id: 12, redial: false } 'Busy'
{ id: 13, redial: false } 'Busy'
{ id: 14, redial: false } 'Busy'
{ id: 15, redial: false } 'Answer'
{ id: 16, redial: false } 'Answer'
{ id: 14, redial: true } 'Busy'
{ id: 13, redial: true } 'Answer'
{ id: 12, redial: true } 'Busy'
{ id: 11, redial: true } 'Busy'
{ id: 1, redial: true } 'Answer'

Code with delay

const Queue = require('bull');

const callQueue = new Queue('q21');
const concurrency = 10;

setTimeout(() => {
    console.log('starting')
    callQueue.process(concurrency, (job, done) => {
        const status = (Math.random() * 2 >= 1) ? "Answer" : "Busy";
        console.log(job.data, status);

        if (status === 'Busy') {
            job.redial = true;
            callQueue.add({ id: job.data.id, redial: true }, { lifo: true, delay: 1000 });
        }

        setTimeout(() => {
            done();
        }, 1000)
    });
}, 5000)

let x = 0;
let jobs = [];

for (x = 0; x < 1000; x++) {
    callQueue.add({ id: x, redial: false });
}

Output with delay

starting
{ id: 0, redial: false } 'Answer'
{ id: 1, redial: false } 'Answer'
{ id: 2, redial: false } 'Busy'
{ id: 3, redial: false } 'Busy'
{ id: 4, redial: false } 'Answer'
{ id: 5, redial: false } 'Answer'
{ id: 6, redial: false } 'Answer'
{ id: 7, redial: false } 'Answer'
{ id: 8, redial: false } 'Busy'
{ id: 9, redial: false } 'Busy'
{ id: 10, redial: false } 'Answer'
{ id: 11, redial: false } 'Busy'
{ id: 12, redial: false } 'Busy'
{ id: 13, redial: false } 'Busy'
{ id: 14, redial: false } 'Busy'
{ id: 15, redial: false } 'Answer'
{ id: 16, redial: false } 'Busy'
{ id: 17, redial: false } 'Busy'
{ id: 18, redial: false } 'Answer'
{ id: 19, redial: false } 'Busy'
{ id: 20, redial: false } 'Answer'
{ id: 21, redial: false } 'Answer'
{ id: 22, redial: false } 'Busy'
{ id: 23, redial: false } 'Busy'
{ id: 24, redial: false } 'Busy'
{ id: 25, redial: false } 'Answer'
{ id: 26, redial: false } 'Answer'
{ id: 27, redial: false } 'Answer'
{ id: 28, redial: false } 'Answer'
{ id: 29, redial: false } 'Busy'
{ id: 30, redial: false } 'Busy'
{ id: 31, redial: false } 'Busy'
{ id: 32, redial: false } 'Answer'
{ id: 33, redial: false } 'Busy'

Bull version

^3.3.10

Additional information

My workaround was using setTimeout to delay with lifo

  async scheduleInQueue(queueItem, campaign, delay) {
    setTimeout(() => {
      this.queues[queueItem.campaignId].add({ contact: queueItem, tree: campaign.tree }, {lifo: true });
    }, delay);
    return true;
  }
manast commented 6 years ago

Priority support for delayed jobs have been added in 3.4.0: https://github.com/OptimalBits/bull/blob/master/CHANGELOG.md#v340

viebig commented 6 years ago

Thanks! Does it include lifo support?

manast commented 6 years ago

no, not yet.

nitzanav commented 6 years ago

+1

nitzanav commented 6 years ago

@manast @alexkh13 I am looking for guidance on how to PR delayed lifo jobs. I guess that I need to follow this commit of prioritized delayed, right? https://github.com/OptimalBits/bull/commit/e3c1775aa6b1541b623a58f411f62e6d6f2a71a2 but do I need to change any thing here: https://github.com/OptimalBits/bull/blob/f1d8cc3b082bfaf0b715438b9a98c75ee9e826a3/lib/job.js#L242 or is it just for retrying jobs, and it is unrelated.

alexkh13 commented 6 years ago

@nitzanav no, i don't see how this line is related.

i think you should focus on this block https://github.com/OptimalBits/bull/blob/f1d8cc3b082bfaf0b715438b9a98c75ee9e826a3/lib/commands/updateDelaySet-4.lua#L39

based on whether or not the job enables lifo you should LINSERT to right place

currently, it inserts it to the left of the "already inserted" priority jobs for example, if you insert a priority=2 job to the wait queue ZCOUNT will count all 1s and 2s which is 4 it will then take the length of the list and subtract 4 (6-4=2)

LEFT ||| 3 | 3 | **2** | 2 | 1 | 1 ||| RIGHT
               ^          

the ^ indicates where it will put it (to the left of index=2)

so what you basically want to do when lifo is enabled is to count (prioriy-1) and insert it to the left. so in the above example it would count only the 1s (6-2=4)

LEFT ||| 3 | 3 | 2 | 2 | **1** | 1 ||| RIGHT
                       ^          

if it didn't find any (priority-1) the RPUSH is still ok.

please, double check what i just said.

nitzanav commented 6 years ago

@alexkh13

Thanks a lot for the thorough input

I see now that there are two kind of tasks:

  1. prioritized delayed that respects lifo param
  2. delayed lifo that is not related to priority

It seems that you describe task 1

I am intetested in task 2.

maybe best to create two seperate issues and close this one, not sure what is the best.

Task 2 (delayed 2) is just to add condition that will do RPUSH in case of lifo around this line: https://github.com/OptimalBits/bull/blob/f1d8cc3b082bfaf0b715438b9a98c75ee9e826a3/lib/commands/updateDelaySet-4.lua#L39

then I need to follow you "prioritized delayed" commit to see how to propogate the parater to the lua script.

right?

BTW, if anyone have a remote developer that can make several PRs for me and do some Bull developmemt it will be nice. please contact me nitzan.aviram@ongage.com

alexkh13 commented 6 years ago

I'm not sure I understand what are the 2 kinds you described. But, yes, you'll have to propagate the lifo param like "prioritized delayed" commit did with the priority param.