awslabs / amazon-sqs-java-messaging-lib

This Amazon SQS Java Messaging Library holds the Java Message Service compatible classes, that are used for communicating with Amazon Simple Queue Service.
http://aws.amazon.com/sqs
Apache License 2.0
170 stars 147 forks source link

Spring Infinity Loop For QueueDoesNotExistException #205

Open enkuru opened 1 year ago

enkuru commented 1 year ago
java.util.concurrent.CompletionException: software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version. (Service: Sqs, Status Code: 400, Request ID: abf44cc8-bd4a-5abf-85a0-118a756161e8)
    at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:65)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51)
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:103)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:184)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:170)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

The logs above has been repeated nearly 1 million times in a 25 minutes period due to accidental deletion of the related queue. This caused to max usage of the CPU on the servers (in Elastic Beanstalk) and not receiving the incoming requests.

Our message consuming class is;

@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(value = "transaction.queue.consumer.enabled")
public class TransactionMessageConsumer {
    private final TransactionService transactionService;

    private final Tracer tracer;

    @SqsListener(value = "${cloud.aws.sqs.transaction-queue}")
    public void receive(final TransactionMessageVo transactionMessageVo) {

and Configuration class is;

@Configuration
@ConditionalOnProperty("transaction.queue.publisher.enabled")
public class SQSClientConfiguration {

    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.create();
    }

    @Bean
    public SqsTemplate sqsTemplate(final SqsAsyncClient sqsAsyncClient) {
        return SqsTemplate.newTemplate(sqsAsyncClient);
    }

    @Bean
    public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(final SqsAsyncClient sqsAsyncClient) {
        return SqsMessageListenerContainerFactory.builder()
                .sqsAsyncClient(sqsAsyncClient)
                .build();
    }
}

As you can see, we have a simple configuration. We have tried to find a solution to this problem (to slow down polling rate on failure), we could not succeeded.

I think the issue is on this code as it do not consider alike situations and runs infinitely;

AbstractPollingMessageSource.java::191

private void pollAndEmitMessages() {
        while (isRunning()) {
            try {
                if (!isRunning()) {
                    continue;
                }
                logger.trace("Requesting permits for queue {}", this.pollingEndpointName);
                final int acquiredPermits = this.backPressureHandler.requestBatch();
                if (acquiredPermits == 0) {
                    logger.trace("No permits acquired for queue {}", this.pollingEndpointName);
                    continue;
                }
                logger.trace("{} permits acquired for queue {}", acquiredPermits, this.pollingEndpointName);
                if (!isRunning()) {
                    logger.debug("MessageSource was stopped after permits where acquired. Returning {} permits",
                            acquiredPermits);
                    this.backPressureHandler.release(acquiredPermits);
                    continue;
                }
                // @formatter:off
                managePollingFuture(doPollForMessages(acquiredPermits))
                    .exceptionally(this::handlePollingException)
                    .thenApply(msgs -> releaseUnusedPermits(acquiredPermits, msgs))
                    .thenApply(this::convertMessages)
                    .thenCompose(this::emitMessagesToPipeline)
                    .exceptionally(this::handleSinkException);
                // @formatter:on
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(
                        "MessageSource thread interrupted for endpoint " + this.pollingEndpointName, e);
            }
            catch (Exception e) {
                logger.error("Error in MessageSource for queue {}. Resuming", this.pollingEndpointName, e);
            }
        }
        logger.debug("Execution thread stopped for queue {}", this.pollingEndpointName);
    }
lionelv-kry commented 11 months ago

I do have the very same problem, did you find a workaround?

lionelv-kry commented 11 months ago

using .autoStartup(enabled) worked for me

pstorch commented 7 months ago

Same infinite polling loop with authorization issues:

io.awspring.cloud.sqs.listener.source.AbstractPollingMessageSource - Error polling for messages in queue https://sqs.eu-central-1.amazonaws.com/.../queue.fifo
java.util.concurrent.CompletionException: software.amazon.awssdk.services.sqs.model.SqsException: The security token included in the request is invalid. (Service: Sqs, Status Code: 403, Request ID: 6f564cea-...)
    at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:65)

or

io.awspring.cloud.sqs.listener.source.AbstractPollingMessageSource - Error polling for messages in queue https://sqs.eu-central-1.amazonaws.com/.../queue.fifo
java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to load credentials from any of the providers in the chain AwsCredentialsProviderChain(credentialsProviders=[SystemPropertyCredentialsProvider(), EnvironmentVariableCredentialsProvider(), WebIdentityTokenCredentialsProvider(), ProfileCredentialsProvider(profileName=default, profileFile=ProfileFile(sections=[])), ContainerCredentialsProvider

We also found no way to configure a back off.

NightFox7 commented 1 month ago

I have the same issue. Any worksarounds ? Thanks in advance.