spring-attic / spring-cloud-aws

All development has moved to https://github.com/awspring/spring-cloud-aws Integration for Amazon Web Services APIs with Spring
https://awspring.io/
Apache License 2.0
589 stars 376 forks source link

Non-blocking SQS message processing in SimpleMessageListenerContainer? #166

Open Nakiami opened 7 years ago

Nakiami commented 7 years ago

After picking messages off an SQS queue using @SqsListener, it seems that the listener will wait for all message processing worker threads to finish before fetching more messages off the queue.

Looking at SimpleMessageListenerContainer.AsynchronousMessageListener, it seems that the message picking loop will wait for the CountDownLatch before continuing: https://github.com/spring-cloud/spring-cloud-aws/blob/master/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainer.java#L266

Is it possible to change this behaviour so the listener continues to pull and process messages off the queue? At the moment, it seems like if one thread becomes blocked for a long time, no other work gets done.

Thanks!

tsaqova commented 7 years ago

same here, are there any updates regarding this issue ?

Nakiami commented 7 years ago

I implemented a non-blocking queue listener / message processor here: https://github.com/Nakiami/spring-cloud-aws/commit/5bcae3e4bf1fba539de97cf480b86906e616b278

Basically it has MaxNumberOfMessages message slots. After getting MaxNumberOfMessages messages off the queue, it will wait until max(1, MaxNumberOfMessages / 2) message slots are available, and then fetch as many messages as it has slots.

demboos commented 7 years ago

@Nakiami thanks for your great effort! I'll test your listener when I have a chance.

What's even more bothering, the current implementation only allows for 10 messages (per queue) to be processed at a time, even if your server would allow for greater scalability. I know SQS has the limit of fetching only 10 messages with single call, but the listener could easily fetch next 10 messages and put them for processing to the executor, if the threadpool is big enough...

Another issue I found is that the default thread pool size is 1 thread too small: (from SimpleMessageListenerContainer.createDefaultTaskExecutor) int spinningThreads = this.getRegisteredQueues().size(); threadPoolTaskExecutor.setMaxPoolSize(spinningThreads * maxNumberOfMessagePerBatch);

Let's image we have 1 registered queue, and maxNumberOfMessages is 10. The thread pool max size will be 10. But 1 thread is used by the AsynchronousMessageListener:

messageBatchLatch.await();

which leaves us with 9 threads working on 10 tasks (messages from SQS). So not only do we have to wait till the longest processing task finishes, but then we have to wait for the 10th message to finish, and only then next 10 messages will be fetched from the queue.

wadechandler commented 7 years ago

One way to scale more is to hand off to your own executor as messages are retrieved, and do as little as possible on the thread calling the annotated listener methods. There are also plenty of cases where we don't use the annotations, but poll ourselves, use converters, and hand off to workers, as we only want to poll if we can do more work; I.e. if saturated, don't poll, because if you are using dead letter queues you can dead letter messages for invalid reasons.

jayanderson commented 7 years ago

Any news on this bug? Would the above implementation (https://github.com/spring-cloud/spring-cloud-aws/issues/166#issuecomment-246235443) fix the issue?

demboos commented 7 years ago

I believe there's no need to implement anything, because as of version 1.2, you can pass a Acknowledgement object to your listener method and manually acknowledge the message at the moment you want. So the best way to out-scale to more than 10 listener threads is to use your own executor and pass the message and the Acknowledgement to its threads.

jayanderson commented 7 years ago

@demboos Interesting. I don't see how that affects this message retrieval issue (the executors still need to finish and the count down latch get to zero before more the more messages can be processed: https://github.com/spring-cloud/spring-cloud-aws/blob/3a10f56a9b354027f1975614939994134024dbd2/spring-cloud-aws-messaging/src/main/java/org/springframework/cloud/aws/messaging/listener/SimpleMessageListenerContainer.java#L282). Do you have an example?

demboos commented 7 years ago

In your method annotated with @SqsListener you just need to start the processing of the message in an async way (for instance call a spring method annotated with @Async or submit a task to an executor managed by yourself).

This way the @SqsListener method will return immediately, and the countdown latch will get to 0 very fast.

Prior to 1.2 doing so would mean removing the message from SQS even before it was actually processed by your code, but now you since you can pass the Acknowledgement object around, you can control the moment you want the message to be removed from SQS queue (so for instance at the end of your async method).

I don't have any working example of this, since I haven't migrated my code to 1.2 yet.

toran414 commented 7 years ago

Honestly I think the best way to handle this is to allow the # of listeners on each queue to be configured since you can already configure the Task Executor Service yourself. Similiar to how http://docs.spring.io/spring-amqp/docs/current/api/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.html#setConcurrentConsumers-int- works would be brilliant. Even better would be to have the SqsListener annotation have a listenerCount value so it could be configured per queue. This would just make it start up multiple SimpleMessageListenerContainer.AsynchronousMessageListener for that queue.

Currently as a workaround I am just creating a second queue, and annotating two methods as listeners.

  public void submitForCommunication(Notification notification) {
    if (queueBalancer.incrementAndGet() % 2 == 0) {
      notificationCommunicationQueue.publish(notification);
    } else {
      moreNotificationCommunicationQueue.publish(notification);
    }
  }

  @BatchCancellable
  @Transactional
  @SqsListener("${notifi.aws.sqs-prefix}-notification-communication")
  public void processQueueForCommunication(Notification notification) {
    processForCommunication(notification);
  }

  @BatchCancellable
  @Transactional
  @SqsListener("${notifi.aws.sqs-prefix}-notification-communication2")
  public void processMoreForCommunication(Notification notification) {
    processForCommunication(notification);
  }

Doing this completely solved the bottleneck issues I was running into, although it feels kinda dirty. The only other option I could think of was to setup my own listeners and not use the annotation though.

demboos commented 7 years ago

But max threads per queue is only 10, since AWS allows to fetch only 10 messages at a time. This is the bottleneck. And if one of those 10 messages is processed considerably longer than the rest, new messages won't be fetched until the whole previous batch finishes.

toran414 commented 7 years ago

I can't tell which part you are responding to, my suggested fix, or my hackish workaround :). It will only allow you to fetch 10 in a single call to their REST service, but if you had multiple SimpleMessageListenerContainer.AsynchronousMessageListener objects on that queue, you could have more than 10 fetched at any given time. My workaround works the same way by introducing another queue with a new SimpleMessageListenerContainer.AsynchronousMessageListener for it.

To your point, it won't fetch more until they are all done. This does mean that if you fetch 10 and have 1/10 that takes WAY longer than the rest, then just having 2 listeners would not fully fix your issue. If both of these numbers were configurable though, and your system could have that kind of latency, you could just have 10 listeners fetching 1-2 at a time, and it would probably un-bottleneck you. For my use case, all my messages take about the same amount of time to process, so I just needed a way to ensure my executor service always had things to work on.

MrBuddyCasino commented 6 years ago

Moving work to a dedicated thread pool works, as @demboos suggested. However, we have now migrated to the JMS API and not looked back - it supports everything we need, plus some native AWS support (see https://github.com/awslabs/amazon-sqs-java-messaging-lib).

maciejp commented 6 years ago

I wanted to give it a try with the simplest configuration possible:

@Log4j2
@Component
public class SqsConsumer {

    @SqsListener("${my-queue.arn}")
    public void receiveMessage() {
        log.info("------------------------------ RECEIVED: " + LocalDateTime.now());
    }
}

and I see 2 log lines every 10 seconds. I suppose that the execution time of the method is not an issue here, what am I missing?

@MrBuddyCasino could you post your solution with a dedicated thread pool?

MrBuddyCasino commented 6 years ago

Sure:


  /**
   * Method return will not automatically mark the message as consumed. This is controlled by the
   * supplied {@link Acknowledgment} object.
   */
  @SqsListener(value = "${app.aws.sqs.queues.inventory}", deletionPolicy = NEVER)
  public void saveUpdatedInventory(@Payload String rawEvent, MessageHeaders headers,
      Acknowledgment ack) {

    log.debug("Received {}", rawEvent);

    // workaround for https://github.com/spring-cloud/spring-cloud-aws/issues/166
    // we don't care about the return value, only about moving processing off-thread
    CompletableFuture
        .supplyAsync(() ->
        {
          try {
            // do stuff with message

            // don't forget to acknowledge
            ack.acknowledge();

            return null;

          } catch (Exception e) {
            log.error("failed to process message", e);
            return null;
          }
        }, executor);
  }

Configure your executor somewhere:

  @Bean
  public Executor asyncExecutor(
      @Value("${app.executor.maxPoolSize}") int maxPoolSize,
      @Value("${app.executor.queueCapacity}") int queueCapacity) {

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

    if (maxPoolSize < 0) {
      maxPoolSize = Runtime.getRuntime().availableProcessors() - 1;
    }
    executor.setMaxPoolSize(maxPoolSize);
    executor.setQueueCapacity(queueCapacity);
    executor.setThreadNamePrefix("spring-async-");
    // if the queue is full, throttle task creation by running them in the caller's thread
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    executor.initialize();
    return executor;
    //return MoreExecutors.directExecutor();
  }
b2bking commented 6 years ago

Hello everybody, I am using @MessageMapping annotation to read messages from queue. I have implemented my application according to this tutorial (https://codemason.me/2016/03/12/amazon-aws-sqs-with-spring-cloud/) where it says that maxNumOfMsgs is the number of concurrent threads, I hardly can believe in that because when I looked at source code, it seems to me that this type of implementation does not really work asynchronously. It depends on queue number and I quite did not see any message receive threads. Does it work Asynchronously and how can i provide number of threads. Thanks in advance!

evantoli commented 5 years ago

Hi @Nakiami,

Thanks for sharing your effort.

I noticed that you forked from the spring-cloud/spring-cloud-aws repo.

I was wondering if you have considered requesting that your code get merged back into their repo.

Again, thanks!

spencergibb commented 5 years ago

PRs welcome

ShivaKumarPeddi commented 5 years ago

Can you please add the implementation to have multiple threads polling one queue. As of today only one thread polls right.

Here is the problem, During a busy hour ,As per metric from cloud watch average number of messages received per SQS poll is 4. And each poll is literally taking 750 ms. If i calculate this way in one minute i will be only able to get 320 messages. This is limiting performance of my application.

If i have multiple threads polling and if i can control that through a flag it will be a lot of help to many people.

kevinhwang commented 5 years ago

+1 For this issue.

I have use case where I need to consume from SQS as quickly as possible, and the current maximum throughput is unacceptably low: even if each message received is submitted to a ThreadPoolTaskExecutor be processed asynchronously, the SimpleMessageListenerContainer can still only get 10 messages per request, and requests themselves are very slow.

I would love support for polling on multiple threads concurrently.

RaymondHsu commented 4 years ago

Our team run into the same performance issue and ended up with overwriting the SimpleMessageListenerContainer. We choose the easier way of fix. Just add a configurable parameter to control how many times of reading 10 message. This fix increases throughput, but does not solve the issue of CountDownLatch wait and reading and processing in sequence.

A better solution is to use BlockingQueue with configurable fixed-length queue to have reading and processing in parallel. Also need a configurable thread pools for readers to increase reading throughput.

tcheek-bw commented 4 years ago

Any movement on this, or is it fixed?

huehnerlady commented 4 years ago

Is there any update? We use the latest version of this dependency and still run into this issue. Could someone explain why it makes sense that one message taking longer to process would prevent 9 other threads from continuing processing messages in the first place?

maciejwalkowiak commented 4 years ago

@huehnerlady we will revisit SQS implementation but it will take a while. In meantime you can watch this repository - https://github.com/maciejwalkowiak/reactor-sqs-poc. With a help from devs from Reactor team I hope to get simple but efficient implementation for SQS.

nickcaballero commented 3 years ago

https://github.com/awspring/spring-cloud-aws/issues/23