spring-attic / spring-cloud-aws

All development has moved to https://github.com/awspring/spring-cloud-aws Integration for Amazon Web Services APIs with Spring
https://awspring.io/
Apache License 2.0
589 stars 376 forks source link

@SqsListener scaling #785

Closed sudiptad2017 closed 2 years ago

sudiptad2017 commented 2 years ago

Type: Bug

Component: SQS

Describe the bug I am using @SqsListener to read messages from SQS using the below config. I pushed 100 test messages to SQS and ran my SpringBoot application. What I observed is that after reading the first batch of 10 messages the listener was waiting for those 10 messages to be processed (which is simulated by 30 seconds of sleep) before reading in the next batch of 10 messages. I was expecting that given the core pool size it would keep reading messages and keep delegating to workers until it reads all 100 messages from the queue. Please advice what I am missing.

POM dependencies

...
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-aws</artifactId>
            <version>2.2.1.RELEASE</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-aws-messaging</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>
...

SQS Listener custom config

package com.coxauto.dataplatform.s3manager.rest.configs;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.aws.messaging.config.QueueMessageHandlerFactory;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class SqsListenerConfiguration {

  @Bean
  @Primary
  public AmazonSQSAsync amazonSQSAsync() {
    return AmazonSQSAsyncClientBuilder.standard().
        withCredentials(new DefaultAWSCredentialsProviderChain()).
        withRegion(Regions.US_EAST_1).
        withClientConfiguration(clientConfiguration()).
        build();
  }

  @Bean
  @ConfigurationProperties(prefix = "aws.configuration")
  public ClientConfiguration clientConfiguration() {
    return new ClientConfiguration();
  }

  @Bean
  @ConfigurationProperties(prefix = "aws.queue")
  public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
    simpleMessageListenerContainer.setMaxNumberOfMessages(10);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
    return simpleMessageListenerContainer;
  }

  @Bean
  public QueueMessageHandler queueMessageHandler() {
    QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
    queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
    return queueMessageHandlerFactory.createQueueMessageHandler();
  }

  @Bean
  public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    //Tune core pool size to max out capacity so that if the load reaches a point that requires >= 80% of CPU, it can
    // spin up new tasks
    executor.setCorePoolSize(500);
    executor.setQueueCapacity(0);
    executor.setThreadNamePrefix("TaskExecutor");
    executor.initialize();
    return executor;
  }

}

SQS Listener

package com.coxauto.dataplatform.s3manager.rest.daemon;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class SQSDaemon {

  @SqsListener(value = "${sqs.name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
  public void receiveMessage(String rawMessage) {

    TimeUnit.SECONDS.sleep(30)
    log.debug("Received message: {}", rawMessage);

  }

}
maciejwalkowiak commented 2 years ago

@sudiptad2017 this is by design - listener fetches more messages once all messages from the batch are processed. We can come up with better design likely, but what is there right now is not a bug.

You are welcome to experiment and propose another implementation.

Also, please use the latest version - 2.3.1. Lots of small internal things have been improved between 2.2 and 2.3 versions.

sudiptad2017 commented 2 years ago

@maciejwalkowiak Thank you for explaining that, also for pointing me to the updated version. I too was not quite sure whether it was a bug or designed that way.

I haven't gone through the code. But the improvement I can imagine would be to implement a config that can be turned up or down that controls the number of listener threads. So for a very busy queue it can read and delegate more batches to the worker threads.

For future reference, if I have a similar question what would be the right channel to reach out?

maciejwalkowiak commented 2 years ago

We have moved to a new repo - https://github.com/awspring/spring-cloud-aws. We have there "Discussions" options enabled, so for questions it would be the best place.