spring-projects / spring-integration

Spring Integration provides an extension of the Spring programming model to support the well-known Enterprise Integration Patterns (EIP)
http://projects.spring.io/spring-integration/
Apache License 2.0
1.54k stars 1.1k forks source link

Enable cachability of consumers for reply-side of JmsOutboundGateway [INT-2683] #6660

Closed spring-operator closed 12 years ago

spring-operator commented 12 years ago

Mark Fisher opened INT-2683 and commented

Currently, when an explicit reply Destination is configured, the JmsOutboundGateway uses a MessageSelector that expects a JMSCorrelationID to match the outbound request's JMSMessageID. When used with Spring's CachingConnectionFactory and its 'cacheConsumers' property set to TRUE, that can lead to an OutOfMemoryException since each consumer's cache key is unique (basically the destination + the message selector). Obviously, that also means we are not benefiting from the potential performance benefits of caching MessageConsumer instances either (rather than creating with each request/reply operation).

We should instead provide a more general JMSCorrelationID for outbound requests that can be used as long as the consuming side knows to give precedence to a JMSCorrelationID property on a request (Spring's MessageListenerAdapter does, and hence a Spring Integration inbound-gateway also does). That general JMSCorrelationID's value can be unique to the outbound-gateway instance and request-sending thread but not unique at the per-Message level as in the current code.

Because not every downstream consumer will support the "echoing" of a JMSCorrelationID (some will only take the JMSMessageID of the request and set it as the JMSCorrelationID of the reply), we need to maintain the current behavior as a default (also for backwards compatibility). A boolean flag should be provided to enable the JMSCorrelationID->JMSCorrelationID behavior as an optimized alternative to the JMSMessageID->JMSCorrelationID default.

NOTE: Ultimately the request/reply behavior in Spring Integration's JMS outbound-gateway needs to make its way into the core Spring JmsTemplate (see SPR-3332).


Issue Links:

spring-operator commented 12 years ago

Gary Russell commented

3 Issues found.

  1. Message selector based on Object.hashCode() is not unique - jmsRequest can get the same hashcode as a previous one after garbage collection.

Consider - send, timeout, reply sits in reply queue with old hashcode; gc; new jmsRequest gets same hashcode, send; receive old response because the hashcode matches.

Need to use AtomicLong instead of hashcode.

  1. 2a and 3a could occasionally timeout (depending on server load) exposing problem #3.

Add receive-timeouts to outbound gateways

  1. Crosstalk between pipeline tests - see #2 - when that occurred, because the same infrastructure queues exist, even though the context it closed, AMQ might not shut down and messages from one test are mixed with those from other tests.

Use unique queues for each test.

spring-operator commented 12 years ago

Gary Russell commented

Just to follow up on #3, the crosstalk appears to be some problem where the context.destroy() fails to stop the DMLC in an earlier test...

Named the gateways and added some diagnostics to see if the wrong inbound gateway was consuming a test's messages...

Sending from out-7-2 [Payload=49] ... jms_replyTo=queue://pipeline07-01, jms_messageId=ID:arwen3-59703-1345503566172-19:1:44:1:2}]

In-4-1 received [Payload=49] ...jms_replyTo=queue://pipeline07-02, delay=1005 ... jms_messageId=ID:arwen3-59703-1345503566172-19:1:52:1:3}]

In this test, the first inbound gateway from test #4 consumed test #7's message.

This explains the anomalies; there is no evidence of crosstalk on received messages (aside from the hashcode issue - #1).

spring-operator commented 12 years ago

Gary Russell commented

Doh! - context.destroy() needs to be in a finally block.

Whenever a test fails (timeout), the older test contexts are still running and may handle the later tests' data.

To fix:

  1. put context.destroy() in finally block
  2. use broker.persistence=false (speeds up tests and avoids timeouts in the first place).
  3. use different queues for each test.
spring-operator commented 12 years ago

Gary Russell commented

Superseded by #6715