aws / aws-sdk-java-v2

The official AWS SDK for Java - Version 2
Apache License 2.0
2.18k stars 840 forks source link

Sending SQS message using async client sometimes never completes the Future #1207

Closed ochrons closed 5 years ago

ochrons commented 5 years ago

Expected Behavior

Sending messages with SqsAsyncClient.send(req) should return a Future that eventually completes.

Current Behavior

Sending thousands of SQS messages in quick succession triggers a very rare case of single send(req) never completing the Future. No result, no error, just disappears.

Possible Solution

There have been Netty-related issues in this space before, so I would look there first. With a previous version of the SDK (2.4.14) this was more common than in the 2.5 version.

Steps to Reproduce (for bugs)

Send a few hundred thousand SQS messages using the SqsAsyncClient.send method and connect a Future that times out (say, after 10 seconds) with the returned future and see which one completes first.

A Scala example for retrying the send after a timeout.

  private def sendRetry(request: SendMessageRequest, retryCount: Int = 3): Future[SendMessageResponse] = {
    val res     = sqs.sendMessage(request).toScala
    val timeout = APIErrorJVM.delayFuture[SendMessageResponse](Failure(new TimeoutException()), 10.seconds)
    Future.firstCompletedOf(List(res, timeout)) recoverWith {
      case _: TimeoutException if retryCount > 0 =>
        log.error(s"Timeout while sending message $request, retry count = $retryCount")
        sendRetry(request, retryCount - 1)
    }
  }

Context

SQS is used as ground truth in our application, and if sending SQS messages just invisibly fails, the whole application logic is in jeopardy. Had to add an application level timeout to the SDK call to circumvent this.

Your Environment

zoewangg commented 5 years ago

Thank you for reporting!

I think this commit https://github.com/aws/aws-sdk-java-v2/commit/066e65d679d9c2ce8cf9b2b7ef5bc66404185ab6 (released in 2.5.0) might reduce the occurrences of the issue, but looks like there are more cases that could cause the uncompletable future. We will investigate it.

As a side note, the SDK supports timeout features out of box, see https://github.com/aws/aws-sdk-java-v2/blob/master/docs/BestPractices.md#utilize-timeout-configurations

ochrons commented 5 years ago

Actually my comment about 2.4.14 was incorrect, there was another issue that got fixed in 2.5 (received messages were rarely being left "in flight" but never delivered to application for processing). So I cannot say for sure if this uncompleting call behavior has changed from 2.4 to 2.5

millems commented 5 years ago

I've been able to reproduce this issue...

millems commented 5 years ago

Reproduction code:

/*
 * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

package software.amazon.awssdk.services.sqs;

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;

public class Issue1207 {
    @Test
    public void test() throws InterruptedException {
        try (SqsAsyncClient client = SqsAsyncClient.create()) {
            String queueName = UUID.randomUUID().toString();
            CreateQueueResponse queue = client.createQueue(r -> r.queueName(queueName)).join();
            try {
                loadTest(client, queue.queueUrl());
            } finally {
                client.deleteQueue(r -> r.queueUrl(queue.queueUrl())).join();
            }
        }
    }

    private void loadTest(SqsAsyncClient client, String queueUrl) throws InterruptedException {
        int concurrentRequests = 100;
        Semaphore concurrencySemaphore = new Semaphore(concurrentRequests);
        Instant endTime = Instant.now().plusSeconds(60);

        System.out.println("Starting...");

        Executors.newSingleThreadExecutor().submit(() -> {
            try {
                while (true) {
                    long timeLeft = Duration.between(Instant.now(), endTime).getSeconds();
                    System.out.println("Seconds left in test: " + timeLeft + ", Open permits: " + concurrencySemaphore.availablePermits());
                    Thread.sleep(5_000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        while (endTime.isAfter(Instant.now())) {
            concurrencySemaphore.acquire(1);

            client.sendMessage(r -> r.queueUrl(queueUrl).messageBody("{}"))
                  .whenComplete((r, t) -> {
                      if (t != null) {
                          t.printStackTrace();
                      }
                      concurrencySemaphore.release(1);
                  });
        }

        System.out.println("Spinning down...");

        if (!concurrencySemaphore.tryAcquire(concurrentRequests, 30, TimeUnit.SECONDS)) {
            int missingResponses = concurrentRequests - concurrencySemaphore.availablePermits();
            throw new IllegalStateException(missingResponses + " requests didn't complete.");
        }
    }
}
millems commented 5 years ago

This was a tricky one.

It looks like in some rare edge cases, when we acquire a connection from the connection pool, it isn't active, and the health checks at the netty level didn't catch it for us. Fixing the netty-level health check (it looks like it's broken?) improves things slightly, but it was still happening occasionally if the connection was closed between acquiring it from the pool and us attaching our handlers that monitor for the close.

I've moved the health check fully up the stack until after we've added our connection-close monitors and that seems to have fixed the problem.

I'll be running some longer-term tests to make sure it's definitely licked before putting out a PR.

millems commented 5 years ago

A fix will go out for this on Monday's release. Please reopen this issue if you're still seeing the problem at that time. Our tests are no longer able to reproduce it after this change.