awslabs / amazon-sqs-java-messaging-lib

This Amazon SQS Java Messaging Library holds the Java Message Service compatible classes, that are used for communicating with Amazon Simple Queue Service.
http://aws.amazon.com/sqs
Apache License 2.0
167 stars 146 forks source link

Does AWS JMS library work well with Spring JMS Library? #10

Closed sinusekhar closed 1 year ago

sinusekhar commented 8 years ago

I am using aws jms library with spring, versions are shown below.

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-sqs-java-messaging-lib</artifactId>
            <version>1.0.0</version>
            <type>jar</type>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.1.4-RELEASE</version>
        </dependency>

In my spring application context, the definition of my listener is as shown below (simplified).

    <bean id="credentialsProviderBean" class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain"/>

    <bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
        <!-- Connect via a Proxy -->
        <property name="proxyHost" value="${png.http.proxy.host}"/>
        <property name="proxyPort" value="${png.http.proxy.port}" />
        <property name="maxConnections" value="${png.sqs.connections.consumer.pool.size}"/>
    </bean>

    <bean id="connectionFactoryBuilder" class="com.amazon.sqs.javamessaging.SQSConnectionFactory$Builder">
        <property name="regionName" value="${png.aws.region}"/>
        <property name="awsCredentialsProvider" ref="credentialsProviderBean"/>
        <property name="clientConfiguration" ref="clientConfiguration"/>
    </bean>

    <bean id="ConnectionFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"
          factory-bean="connectionFactoryBuilder"
          factory-method="build" />

<bean id="processorListener" class="com.intuit.mobile.png.notification.processor.ProcessorListener" init-method="init"/> 

    <jms:listener-container container-type="default" connection-factory="ConnectionFactory" acknowledge="auto"
                            concurrency="${png.sqs.connections.consumer.pool.size}">
        <jms:listener destination="${png.processor.queue.normal}" ref="processorListener" method="onMessage" />
    </jms:listener-container>

We are noticing multiple delivery of messages to consumers running on the same instance, separated by milliseconds apart. I understand that the contract is at-least once and there are cases where duplicate delivery can occur...but can that happen in such short intervals.

Basically what we are seeing is message duplication happening at really short intervals....where one message gets multiplied into 2 or 3 sometimes.

Some questions that comes to mind is..

jgangemi commented 8 years ago

just as an fyi, you may be waiting on an answer for this until who knows when. there has not been a lot of activity in this project since it was initially released and i don't know if anyone is really paying any attention.

i only mention this b/c i filed #8 and have heard nothing but crickets.

pkrish commented 8 years ago

The answer is yes, you can get duplicate messages. It's hidden away in the faqs. https://aws.amazon.com/sqs/faqs/

Q: How many times will I receive each message? Amazon SQS is engineered to provide “at least once” delivery of all messages in its queues. Although most of the time each message will be delivered to your application exactly once, you should design your system so that processing a message more than once does not create any errors or inconsistencies.

jgangemi commented 8 years ago

lol at getting an immediate answer - not it issue track but what's the status of this project seeing some love?

kuba-aws commented 8 years ago

After analyzing the thread dump, it does not seem like there any locking issue. All of the threads are simply waiting on a message from SQS queue, which seems to imply there were no more message available in the queue.

kuba-aws commented 8 years ago

Spring JMS is supported. In fact, any framework relying on JMS is theoretically supported, as this library implements a standard interface.

Spring JMS uses the synchronous JMS API of this library. Prefetching messages does not have any mechanism of extending the visibilitytimeout, so prefetching a lot of messages with clogged up consumers might mean you will see duplicates as the timeout expires.

sinusekhar commented 8 years ago

Opened a case with AWS and received a response on some of the questions. attaching it here for everyone's reference.

  1. once and only once delivery is not supported. At-least once delivery pattern is derived from SQS within the library.
  2. Yes, SQS behavior of at-least-once is not altered, you still might see duplicate messages being delivered. The library itself does not generate any extra duplicates, nor does it remove them -> the behavior is the same as if using plain vanilla SQS APIs.
  3. Since we support the JMS specification, you can safely use spring jms framework.
  4. In case of the synchronous receiver, the prefetched messages (the count of which you can configure through the SQSConnectionFactory$Builder bean) are queued up for the receive calls and their visibility timeout is not adjusted as they wait. In your case, since you do not specify a custom prefetch count, up to a single message is waiting for the receive call on the SQSMessageConsumer.
smileatom commented 7 years ago

Since we support the JMS specification, you can safely use spring jms framework.

Um... right. Unfortunately, you will run into performance and functionality issues if you try and do this. Having tested Spring jmsTemplate using MessageListener and Producer code that works flawlessly with other JMS providers, I removed Spring JMS from the stack when using SQS. Under the hood, this library is not JMS and the code in Spring JMS cant handle the custom client ack that this library supports (although you can provide wrapper classes to work around that). Also I found that JMSTemplate was about 2-3X slower to post messages to an SQS queue that using a plain MessageProvider.

kuba-aws commented 7 years ago

Marking as feature request, to test more closely with Spring JMS.

sinusekhar commented 7 years ago

Quick update on this thread:

Our team worked closely with SQS team and determined that there is a bug in the SQS JMS library where close() methods on the Message Listener takes a long time to release resources within the SQS JMS client. When everything is running fine, Spring DefaultMessageListenerContainer works flawlessly with SQS JMS libs... but when there are issues such as SQS throttling (which is hard to believe, but it did happen to us when we had a noisy neighbor in our SQS clusters) or some other network issues....SQS JMS libraries doesn't recover fast enough. Due to this, we ran into hung consumers with Springs JMS libraries which internally uses SQS JMS libs.

Since the JMS libraries are not aggressively maintained and since we don't mind the duplicate delivery, we decided to go native by removing all the JMS wrappers (Spring JMS and SQS JMS libraries) and are using async message consumption wrappers that will use AWS SQS SDK directly. This is working very well for us so far and gives a lot of control on the consumer code. Also we have seen from our tests that we can scale much better, the code is more resilient to failures at the SQS layer (throttling, network issues, timeouts) etc.

pkrish commented 7 years ago

@kuba-aws any update on fixing the issue with close() methods in SQS JMS lib? We are facing a similar issue where SQS returns a HTTP 500, which results in blocked consumers. Only solution is to restart the consumer process.

Stacktrace of the exception that causes the issue. com.amazonaws.services.sqs.model.AmazonSQSException: We encountered an internal error. Please try again. (Service: AmazonSQS; Status Code: 500; Error Code: InternalError; Request ID: 0436cf79-13c2-5432-9b57-9eaffd70a25b) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1579) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1249) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:1689) at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:1665) at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:762) at com.amazon.sqs.javamessaging.AmazonSQSExtendedClientBase.deleteMessage(AmazonSQSExtendedClientBase.java:266) at com.amazon.sqs.javamessaging.AmazonSQSExtendedClient.deleteMessage(AmazonSQSExtendedClient.java:558) at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessage(AmazonSQSMessagingClientWrapper.java:141) at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.acknowledge(AutoAcknowledger.java:47) at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.notifyMessageReceived(AutoAcknowledger.java:56) at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.messageHandler(SQSMessageConsumerPrefetch.java:491) at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.receive(SQSMessageConsumerPrefetch.java:424) at com.amazon.sqs.javamessaging.SQSMessageConsumer.receive(SQSMessageConsumer.java:171) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:412) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:298) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:251) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1164)

ghost commented 7 years ago

We too have started to encounter this at heavy workloads with high concurrency settings and prefetching whenever SQS returns 500s (which can just happen from time-to-time). @sinusekhar it seems like one simple fix would be to update SQSMessageConsumerPrefetch.cannotDeliver() to throw a JmsException instead of returning true. This leads to the "greedy infinite" looping that hogs all resources and doesn't let the consumer stop processing for cleanup to finish.

EDIT: After digging further, we have found that if a SQSconnection is closed before the receiveMessage is invoked (but not during) this generally reproduces the hang.

els-david-young commented 7 years ago

We hit this same issue (blocked busy threads) on two different servers in the last couple of days. Stacktrace of the error that appeared to trigger the condition -

01:06:08.768 [gatewayMessageListenerContainer-7] ERROR com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper - AmazonServiceException: deleteMessage. RequestId: 2c564b4f-7c59-587e-a70b-1959ab19a3dc
HTTPStatusCode: 500 AmazonErrorCode: InternalError
com.amazonaws.AmazonServiceException: We encountered an internal error. Please try again. (Service: AmazonSQS; Status Code: 500; Error Code: InternalError; Request ID: 2c564b4f-7c59-587e-a70b-1959ab19a3dc)
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1378) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:924) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:702) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:454) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:416) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:365) ~[aws-java-sdk-core-1.11.18.jar!/:?]
at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:1741) ~[aws-java-sdk-sqs-1.11.18.jar!/:?]
at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:1711) ~[aws-java-sdk-sqs-1.11.18.jar!/:?]
at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:792) ~[aws-java-sdk-sqs-1.11.18.jar!/:?]
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessage(AmazonSQSMessagingClientWrapper.java:141) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]
at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.acknowledge(AutoAcknowledger.java:47) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]
at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.notifyMessageReceived(AutoAcknowledger.java:56) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.messageHandler(SQSMessageConsumerPrefetch.java:491) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]
at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.receive(SQSMessageConsumerPrefetch.java:424) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]
at com.amazon.sqs.javamessaging.SQSMessageConsumer.receive(SQSMessageConsumer.java:171) [amazon-sqs-java-messaging-lib-1.0.1.jar!/:?]

Have since upgraded to 1.0.4 of the library in the hope that it may have been fixed, but looking at the change logs and the above posts, it appears unlikely.

ghost commented 7 years ago

@daveyoungbcat I can confirm it has not been fixed in 1.0.4. Those are precisely the same errors we get. With our SQS message volume and configuration, we generally encounter this no less than every 8-12 hours.

ghost commented 7 years ago

Upon closer inspection, here's what is happening.

Preconditions to reproduce:

Steps to reproduce:

  1. Multiple consumers are spawned that reuse a shared connection
  2. In one consumer the AmazonSQSClient throws an exception (typically during a delete request, but can be anything really)
  3. The Spring JMS AsyncMessageListenerInvoker handles this first failure when using shared connections (based on cache level) by refreshing the shared connection (i.e. closing it and opening a new one).
  4. Closing the shared connection causes all open sessions against that connection to be closed. This will cause all of the SQSMessageConsumers executing against those sessions to be closed, closing their wrapped SQSMessageConsumerPrefetch instances.
  5. The original consumer thread handling the failed request dies and the AsyncMessageListenerInvoker thread completes.
  6. Based on concurrency settings, a new consumer will be spawned. The new consumer will use the new shared connection and a new session appropriately.
  7. All previously running consumers that reused the initial connection are exposed to a race condition:
    1. If the consumer is processing a message against the closed session and invokes any method against the client (such as acknowledge, or delete) for a different message, an exception is thrown and the consumer thread dies (closing the already closed session), but most importantly it does not attempt to recover the shared connection. In this situation the recovery marker has not changed as a new connection or session were not created. This consumer thread dies, and a new consumer is spawned correctly to take its place.
    2. If the consumer was idle, this causes the problem. When this consumer executes its next receive() call it will enter cannotDeliver(), check the state of the consumer, see that it is closed, and will return null causing for the executeOngoingLoop() to continue to loop. This results in the busy/hung thread. This consumer has no way to recover or be expelled from the consumer pool.

I may be mistaken, but I believe the simplest and clearest fix is to still throw an exception from the cannotDeliver whenever the consumer has been closed.

I have opened PR https://github.com/awslabs/amazon-sqs-java-messaging-lib/pull/41 accordingly.

robin-aws commented 5 years ago

I've merged PR #41 and it has been released in 1.0.8. There are a lot of testimonials on the PR about the change fixing things for various people, but I'm hoping for confirmation here before I close the issue itself.

SchulteMarkus commented 5 years ago

@robin-aws At least the problem we have in production still occur with v1.0.8 (Spring-boot 1.5, spring-cloud-aws-messaging:2.1).

java.lang.IllegalStateException: Connection pool shut down
    at org.apache.http.util.Asserts.check(Asserts.java:34)
    at org.apache.http.pool.AbstractConnPool.lease(AbstractConnPool.java:191)
    at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:268)
    at sun.reflect.GeneratedMethodAccessor96.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
    at com.amazonaws.http.conn.$Proxy103.requestConnection(Unknown Source)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
    at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1297)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
    at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:2214)
    at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2181)
    at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2170)
    at com.amazonaws.services.sqs.AmazonSQSClient.executeReceiveMessage(AmazonSQSClient.java:1607)
    at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1578)
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:351)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:259)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessagesWithBackoff(SQSMessageConsumerPrefetch.java:303)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:221)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
dhirenpratap commented 4 years ago

spring-cloud-aws-messaging

@SchulteMarkus Any work around you found?

SchulteMarkus commented 4 years ago

@dhirenpratap No. In the end, we decided to suppress this specific error.