awspring / spring-cloud-aws

The New Home for Spring Cloud AWS
http://awspring.io
Apache License 2.0
856 stars 293 forks source link

Release Polling Permits on Conversion Errors #1051

Closed rerodrig closed 6 months ago

rerodrig commented 7 months ago

Type: Bug

Component: SQS

Describe the bug Spring Cloud AWS 3.1.0.

The message poller stops after trying to process some messages from a queue when they result in a "No class found" error during deserialization, causing messages not to be sent to DLQ after "maximum receives".

Scenario: Class that represents the message was moved to a different package and there are still old messages to be processed by new service version.

Related logs:

i.a.c.s.l.s.AbstractPollingMessageSource : Error processing message
java.lang.IllegalArgumentException: No class found with name com.sample.old.TestObject

Sample

  1. Create a record TestObject:
    
    package com.sample.new;

public record TestObject( String name ) { }

2. Create a simple SQS listener:

@SqsListener(value = {"test-type-queue"}, acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL) public void processMessage(@Valid TestObject message, Acknowledgement acknowledgment) { // process message acknowledgment.acknowledge(); }


3. Post 2 messages to the queue: `{"name":"custom name"}`, with `JavaType` header: `com.sample.old.TestObject`.
tomazfernandes commented 7 months ago

Hi @rerodrig.

Not sure what you mean. Are you saying that after these two errors, no new messages are polled from the queue?

Can you share the full stack trace with the error?

Thanks.

rerodrig commented 7 months ago

Hi @tomazfernandes,

After some errors for both messages, no new messages are polled from the queue.

Here is the stack trace:

2024-02-15T17:09:46.180Z ERROR 19269 --- [test-sqs] [c-response-5-12] i.a.c.s.l.s.AbstractPollingMessageSource : Error processing message

java.lang.IllegalArgumentException: No class found with name com.sample.old.TestObject
    at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.defaultHeaderTypeMapping(AbstractMessagingMessageConverter.java:197)
    at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.getTargetType(AbstractMessagingMessageConverter.java:179)
    at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.convertPayload(AbstractMessagingMessageConverter.java:170)
    at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.toMessagingMessage(AbstractMessagingMessageConverter.java:153)
    at io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource.convertMessage(AbstractMessageConvertingMessageSource.java:91)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    at io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource.convertMessages(AbstractMessageConvertingMessageSource.java:85)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:58)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:177)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$execute$0(MakeAsyncHttpRequestStage.java:110)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:253)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:167)
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)

After 10 attempts to process the messages (5 for each message), they stay in the queue and the following logs are displayed:

2024-02-15T17:09:46.970Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:47.975Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:48.976Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:49.980Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:50.985Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:51.985Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:52.989Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:53.994Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:54.995Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:55.592Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler   : Not able to acquire 10 permits in 10000 milliseconds for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM HIGH. Permits left: 0
2024-02-15T17:09:55.592Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : No permits acquired for queue test-type-queue
2024-02-15T17:09:55.592Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue test-type-queue
2024-02-15T17:09:55.592Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler   : Acquiring 10 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM HIGH
2024-02-15T17:09:55.999Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:56.999Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:58.000Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:09:59.003Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:00.007Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:01.009Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:02.014Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:03.015Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:04.019Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:05.024Z TRACE 19269 --- [test-sqs] [Container#0-0-1] c.s.l.a.BatchingAcknowledgementProcessor : Scheduling next acknowledgement execution in 1000ms
2024-02-15T17:10:05.597Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler   : Not able to acquire 10 permits in 10000 milliseconds for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM HIGH. Permits left: 0
2024-02-15T17:10:05.597Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : No permits acquired for queue test-type-queue
2024-02-15T17:10:05.597Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.s.AbstractPollingMessageSource : Requesting permits for queue test-type-queue
2024-02-15T17:10:05.597Z TRACE 19269 --- [test-sqs] [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler   : Acquiring 10 permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0 in TM HIGH

Thanks.

tomazfernandes commented 7 months ago

Yup, that's a bug @rerodrig, thanks for reporting and for the detailed logs!

When message conversion throws an error the framework does not release the permits so eventually it runs out.

I have an acknowledgement issue in the pipeline to fix that seems more urgent and should be able to look into this soon enough.

Thanks.

liwamdaman commented 6 months ago

Hey @tomazfernandes, curious if there are any updates on this? I think this bug is something my team has observed as well. Thanks!

tomazfernandes commented 6 months ago

Hi @liwamdaman, no updates,I haven't had the time so work on this yet.

If anyone would like to submit a PR with a fix I'd be happy to review and merge it.

tomazfernandes commented 6 months ago

Opened this PR: https://github.com/awspring/spring-cloud-aws/pull/1090/files

Reviews are welcome.