jessetane / queue

Asynchronous function queue with adjustable concurrency
MIT License
764 stars 66 forks source link

Cannot update concurrency mid-process #37

Closed sdesalas closed 7 years ago

sdesalas commented 7 years ago

I'm looking to implement back-pressure on a project I'm working on but I noticed that the default behavior in this repository is to assume the queue concurrency is not going to change in the middle of a queue batch process.

Here is an example to illustrate:

const queue = require('queue');

const q = queue();
let count = 0;

const job = () => { 
  return new Promise((resolve, reject) => {
    console.log(`start job ${++count}`);
    setTimeout(() => {
      console.log('finish, q length = ' + q.length);

      if (count === 3) {
        // increase concurrency after 3 seconds 
        q.concurrency = 4;
      } else if (count === 16) {
        // add 10 jobs to the queue when 16 items are processed
        q.push(...new Array(16).fill(job));
      } else if (count === 17) {
        // then reduce concurrency back to 1 (back-pressure)
        q.concurrency = 1;
      }
      resolve();
    }, Math.round(Math.random()*1000) + 500);
  });
};

for (let i = 0; i < 19; ++i) {
  q.push(job);
}

// Start with concurrency of 1/sec
q.concurrency = 1;
q.start((err) => {
  if (err) {
    console.error(err);
  } else {
    console.log('yay');
    console.log(q);
  }
});

The output is as follows:

start job 1
finish, q length = 19
start job 2
finish, q length = 18
start job 3
finish, q length = 17
start job 4
start job 5
start job 6
start job 7
finish, q length = 16
start job 8
finish, q length = 15
start job 9
finish, q length = 14
start job 10
finish, q length = 13
start job 11
finish, q length = 12
start job 12
finish, q length = 11
start job 13
finish, q length = 10
start job 14
finish, q length = 9
start job 15
finish, q length = 8
start job 16
finish, q length = 7
start job 17
finish, q length = 22
start job 18
start job 19
start job 20
start job 21
start job 22
start job 23
start job 24
start job 25
start job 26
start job 27
start job 28
start job 29
start job 30
start job 31
start job 32
start job 33
start job 34
start job 35
finish, q length = 21
finish, q length = 20
finish, q length = 19
finish, q length = 18
finish, q length = 17
finish, q length = 16
finish, q length = 15
finish, q length = 14
finish, q length = 13
finish, q length = 12
finish, q length = 11
finish, q length = 10
finish, q length = 9
finish, q length = 8
finish, q length = 7
finish, q length = 6
finish, q length = 5
finish, q length = 4
finish, q length = 3
finish, q length = 2
finish, q length = 1

Note that after adding some items to the queue and reducing concurrency back down to 1 (at count=17), we get some weird behaviour where all remaining batches are fired in one go.

Instead, I would expect the queue to continue processing at 1 per second until its finshed.

jessetane commented 7 years ago

fixed by #38