caolan / async

Async utilities for node and the browser
http://caolan.github.io/async/
MIT License
28.2k stars 2.41k forks source link

Queue not execute when length hight #1421

Closed hieuit7 closed 7 years ago

hieuit7 commented 7 years ago

I use async version 2.4.0:

I have a issues with push 1000 task to queue concurency 100,

here this is code:

let index = 0;
let consumerGroup = new ConsumerGroup(consumerOptions, config.topic);
consumerGroup.on('error', onError);

let queue = async.queue((task, callback) => {
  parse(task).then(() => {
    debug('------>>>> Done task: ');
    callback();
  }).catch((e) => {
    debug('---Have error task!!', e);
    callback();
  });
}, 100);
let state = false;
queue.drain = function () {
  if (state) {
    state = false;
    debug('Resume consumer');
    consumerGroup.resume();
  }
};
queue.error = function(e){

}
consumerGroup.on('message', (message) => {
  if (!state && message) {
    debug('Pause consmer')
    consumerGroup.pause();
    state = true;
  }
  queue.push(message, (e) => {
    if (e) {
      debug('==> Have error', e);
    }
    if (state && queue.length() == 0) {
      state = false;
      debug('Resume consumer');
      consumerGroup.resume();
    } else {
      debug('Finished! ' + queue.length());
    }
  });
});
setInterval(() => {
  if (state && queue.length() == 0) {
    state = false;
    debug('Resume consumer');
    consumerGroup.resume();
  } else {
    debug('Finished! ' + queue.length());
  }
}, 10 * 60 * 1000);

and debug is:


27|consume | Tue, 23 May 2017 07:29:52 GMT Consumer:index Finished! 1006
27|consume | Tue, 23 May 2017 07:39:52 GMT Consumer:index Finished! 1006
27|consume | Tue, 23 May 2017 07:49:52 GMT Consumer:index Finished! 1006
27|consume | Tue, 23 May 2017 07:59:52 GMT Consumer:index Finished! 1006
27|consume | Tue, 23 May 2017 08:09:52 GMT Consumer:index Finished! 1006
27|consume | Tue, 23 May 2017 08:19:52 GMT Consumer:index Finished! 1006
đásd27|consume | Tue, 23 May 2017 08:29:52 GMT Consumer:index Finished! 1006
27|consume | Tue, 23 May 2017 08:39:52 GMT Consumer:index Finished! 1006
27|consume | Tue, 23 May 2017 08:49:52 GMT Consumer:index Finished! 1006
27|consume | Tue, 23 May 2017 08:59:52 GMT Consumer:index Finished! 1006

consumerGroup is kafka-node module, this is module working with kafka, and message event received is a block messages.

The queue length is big and not execute. Please help me debug this.

aearly commented 7 years ago

Sounds like you have an issue with multiple callbacks:

  parse(task).then(() => {
    debug('------>>>> Done task: ');
    callback();
  }).catch((e) => {
    debug('---Have error task!!', e);
    callback();
  });

This code is vulnerable to multiple callbacks if an error is thrown later on in the callback() chain. You can asyncify the parse function -- that will protect against this:

  async.asyncify(parse)(task, (err) => {
    if(err) {
      debug('---Have error task!!', e);
      return callback();
    }
    debug('------>>>> Done task: ');
    callback();
  });

or just:

let  queue = async.queue(async.asyncify(parse), 100)
hieuit7 commented 7 years ago

I use Promise for parse function. Only then or catch error occur.

On log debug, haven't debug error through out!

alasdairhurst commented 7 years ago

@hieuit7 if it's a promise then you can call the callback after the promise chain. at the moment, if it's inside the promise chain and it throws, then it will get caught by the catch. Try using finally.

  parse(task).then(() => {
    debug('------>>>> Done task: ');
  }).catch((e) => {
    debug('---Have error task!!', e);
  })
  .finally(callback);
hieuit7 commented 7 years ago

I'm sorry, this is my wrong.

The parse function has been return without resolve or reject call back by logic bussiness. I will close this thread. Thank you for help!