JaidenAshmore / java-dynamic-sqs-listener

Java SQS Listener library built to be customisable and dynamic during runtime
MIT License
51 stars 13 forks source link

Facing an issue while continuously listening on SQS queue. #352

Closed LathaVoggu closed 3 years ago

LathaVoggu commented 3 years ago

Facing an issue while continuously listening on SQS queue, this runtime exception not exiting the listener, and the listener getting stuck with this. can find the stack trace here. here the visibility timeout is 3 minutes.

2021-03-17 11:57:29,038 ERROR [LambdaMessageProcessor] Error resolving successfully processed message java.lang.RuntimeException: The receipt handle has expired at com.jashmore.sqs.resolver.batching.BatchingMessageResolver.lambda$null$6(BatchingMessageResolver.java:197) at java.util.ArrayList.forEach(ArrayList.java:1249) at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080) at com.jashmore.sqs.resolver.batching.BatchingMessageResolver.lambda$submitMessageDeletionBatch$8(BatchingMessageResolver.java:194) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

JaidenAshmore commented 3 years ago

Hey, can your provide more details to cause this problem to occur? if you have a minimal reproducible solution showing the problem that can help out a lot. There are a lot of examples in the example directory that you could base your changes off of. For example, how long does the message take to process, are you using spring annotations, etc.

You should note that a sqs listener will not stop running if a message fails to be resolved. It will keep processing other messagess until the server shuts down or you explicitly shut it down. What do you expect to happen if you failed to delete the message from the queue?

LathaVoggu commented 3 years ago

Hi, I am not using spring implementation just took the base core example and listening in the queue. when it comes to error, after throwing that runtime exception a sqs listener will not stop running if a message fails to be resolved it will be run for some time then the listener is getting stuck (it is not able to read any of the tasks from queue) neither it is not exiting nor getting shutdown. What do you expect to happen if you failed to delete the message from the queue? for this will suggest it should throw an error and let the user handle that.

JaidenAshmore commented 3 years ago

Hmm I haven't seen it stop processing messages before if there was an error. I can try and create a minimal reproducible solution and get back to you.

JaidenAshmore commented 3 years ago

So I have been able to replicate the error by using a fake receiptHandle "blah" when the message is being deleted. This should be similar to a failure if the receipt handle is expired. You can see the stack trace here:

22:01:55.131 [application-enhancer-by-spring-cglib-9c222f09-message-listener-message-resolver] DEBUG c.j.s.r.b.BatchingMessageResolver - Sending batch deletion for 1 messages
22:01:58.453 [sdk-async-response-0-1] DEBUG c.j.s.r.b.BatchingMessageResolver - 0 messages successfully deleted, 1 failed
22:02:02.328 [sdk-async-response-0-1] ERROR c.j.s.p.LambdaMessageProcessor - Error resolving successfully processed message
java.lang.RuntimeException: The input receipt handle "blah" is not a valid receipt handle.
    at com.jashmore.sqs.resolver.batching.BatchingMessageResolver.lambda$null$6(BatchingMessageResolver.java:198)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080)
    at com.jashmore.sqs.resolver.batching.BatchingMessageResolver.lambda$submitMessageDeletionBatch$8(BatchingMessageResolver.java:195)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:138)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:106)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:207)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
22:02:14.656 [application-enhancer-by-spring-cglib-9c222f09-message-listener-message-retriever] DEBUG c.j.s.r.b.BatchingMessageRetriever - Downloaded 0 messages
22:02:14.656 [application-enhancer-by-spring-cglib-9c222f09-message-listener-message-retriever] DEBUG c.j.s.r.b.BatchingMessageRetriever - Waiting for 5 requests for messages within 2000ms. Total currently waiting: 4
22:02:16.663 [application-enhancer-by-spring-cglib-9c222f09-message-listener-message-retriever] DEBUG c.j.s.r.b.BatchingMessageRetriever - Requesting 4 messages
22:02:22.535 [application-enhancer-by-spring-cglib-9c222f09-message-listener-message-retriever] DEBUG c.j.s.r.b.BatchingMessageRetriever - Downloaded 1 messages

As can be seen after this failure this continues to process the SQS queue and therefore it doesn't appear to be stuck and can obtain more messages from the queue. You said you still had messages on the SQS queue that could be obtained but they weren't being picked up.

I don't believe a failure to delete a message because it took too long to be processed should result in the listener to shutdown and it should keep trying to process more messages. Instead you should make sure you have an applicable redrive policy with a DLQ to handle these problems (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html).