bbc / sqs-consumer

Build Amazon Simple Queue Service (SQS) based applications without the boilerplate
https://bbc.github.io/sqs-consumer/
Other
1.74k stars 333 forks source link

Why wont the consumer does not consume multiple messages when batchSize is defined. #266

Closed michaelparkadze closed 3 years ago

michaelparkadze commented 3 years ago

I am trying to use sqs-consumer to process multiple messages in parallel and for some reason I can not get it to work, so I hope someone could show me what I am doing wrong here.

Here is the code example:

SqsConsumer class

 export default class SqsConsumer {
  public sqsConsumer: Consumer;

  constructor(params: CreateSqsSubscriberParams) {
    const { queueUrl, handleMessage, sqsOptions, visibilityTimeout, waitTimeSeconds, batchSize } = params;

    this.sqsConsumer = Consumer.create({
      queueUrl,
      handleMessage,
      visibilityTimeout,
      waitTimeSeconds,
      batchSize,
      sqs: new SQS(sqsOptions),
    });
  }

  startReadingMessages(): void {
    console.log('reading messages');
    this.sqsConsumer.start();

    this.sqsConsumer.on('error', (err) => {
      logger.error(err.message);
    });

    this.sqsConsumer.on('processing_error', (err) => {
      logger.error(err.message);
    });

    this.sqsConsumer.on('timeout_error', (err) => {
      logger.error(err.message);
    });

    this.sqsConsumer.on('response_processed', () => {
      logger.info('First Batch has successfully processed');
    });
  }
} 

Example subscriber:

export const assetNotificationSubscribe = () => {
  const sqsConsumer = new SqsConsumer({
    queueUrl: config.awsSqs.assetsAssetNotificationQueueUrl,
    visibilityTimeout: 30,
    waitTimeSeconds: 20,
    batchSize: 5,
    sqsOptions: {
      correctClockSkew: true,
    },
    handleMessage,
  });

  sqsConsumer.startReadingMessages();
};

Even though the batchSize is 5 the 'response_processed' log gets logged each time a single process is done. Thanks in advance.

michaelparkadze commented 3 years ago

Update: I read about handleMessageBatch and passed it in the new SqsConsumer and added it in the constructor of the class and I am still having the processes handled one at a time and not parallel.

michaelparkadze commented 3 years ago

I ran some more test and I found one that even though my batchSize was for example 5, I only got 2 messages in the batch even though there were more in the queue itself. I found that this might happen in the amazon receiveMessage page:

MaxNumberOfMessages: "Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1. "

So I assume the same might happen in this situation.