atomikos / transactions-essentials

Development repository for next major release of
https://www.atomikos.com/Main/TransactionsEssentials
Other
462 stars 139 forks source link

ActiveMQ Redelivery not working with new transaction in consumer #109

Open basecade opened 4 years ago

basecade commented 4 years ago

Hi,

I have a ActiveMQ where I have setup Redelivery on the client side. With a simple consumer it works as expected with the below configurations:

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.boot.jta.atomikos.AtomikosConnectionFactoryBean;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

...
    @Bean
    public ConnectionFactory atomikosConnectionFactoryBean() {
        String mqUrl = System.getenv("MQ_URL");
        AtomikosConnectionFactoryBean atomikos = new AtomikosConnectionFactoryBean();
        atomikos.setLocalTransactionMode(false);
        atomikos.setMaxPoolSize(10);
        atomikos.setUniqueResourceName("QUEUE_BROKER");

        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(4);
        redeliveryPolicy.setBackOffMultiplier(10);
        redeliveryPolicy.setRedeliveryDelay(1000L);
        redeliveryPolicy.setInitialRedeliveryDelay(1000L);
        redeliveryPolicy.setUseExponentialBackOff(true);
        redeliveryPolicy.setMaximumRedeliveryDelay(86400000L);
        ActiveMQXAConnectionFactory xaConnectionFactoryBean = new ActiveMQXAConnectionFactory(System.getenv("MQ_USERNAME"), System.getenv("MQ_PASSWORD"), mqUrl);
        xaConnectionFactoryBean.setRedeliveryPolicy(redeliveryPolicy);
        xaConnectionFactoryBean.setNonBlockingRedelivery(true);
        atomikos.setXaConnectionFactory(xaConnectionFactoryBean);
        return atomikos;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setErrorHandler(new EHealthEventErrorHandler());
        factory.setMessageConverter(jacksonJmsMessageConverter());
        factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
        factory.setDestinationResolver(new EHealthDestinationResolver());
        factory.setSessionTransacted(true);
        return factory;
    }

    @Bean(autowire = Autowire.BY_TYPE)
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(atomikosConnectionFactoryBean());
        jmsTemplate.setDestinationResolver(new EHealthDestinationResolver());
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }
    ...
import org.springframework.jms.annotation.JmsListener;
import org.springframework.transaction.annotation.Transactional;

@Transactional
@JmsListener(destination = "XXX")
public void onMessageReceived(XXXEvent event) {
    throw new Exception();
}

So the above works as expected and the message is redelivered with the ExponentialBackOff strategy.

BUT it goes sideways when the message consumer (onMessageReceived) calls a method on a class that sends a message to another queue in a new transaction. Then the message is not redelivered if the exception is thrown after the new transaction have been committed, ex:

import org.springframework.transaction.annotation.Transactional;

public class FooClass {
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void createInNewTransaction() {
        sendMessageToAnotherQueue();
    }
}
import org.springframework.jms.annotation.JmsListener;
import org.springframework.transaction.annotation.Transactional;

@Transactional
@JmsListener(destination = "Foo")
public void onMessageReceived(FooEvent event) {
    fooClass.createInNewTransaction();
    throw new Exception();
}

In the stacktrace below it is seen that the org.apache.activemq.TransactionContext.synchronizations are nulled when sending the message in the new transaction. The TransactionContext.synchronizations contains the ActiveMQMessageConsumer that is used to receive the message and is needed for the redelivery after the exception is thrown. When this is cleared the message is not redelivered: https://ibb.co/ch7NqQ8 (stacktrace)

private void afterRollback() throws JMSException {
        if (synchronizations == null) {
            return;
        }
    ...
}

It is the method com.atomikos.datasource.xa.session.BranchEnlistedStateHandler.checkEnlistBeforeUse() that detects that the transaction context is different and throws an exception that is catched in SessionHandleState.notifyBeforeUse():

TransactionContextStateHandler checkEnlistBeforeUse ( CompositeTransaction currentTx)
            throws InvalidSessionHandleStateException, UnexpectedTransactionContextException 
    {

        if ( currentTx == null || !currentTx.isSameTransaction ( ct ) ) {
            //OOPS! we are being used a different tx context than the one expected...

            //TODO check: what if subtransaction? Possible solution: ignore if serial_jta mode, error otherwise.

            String msg = "The connection/session object is already enlisted in a (different) transaction.";
            if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( msg );
            throw new UnexpectedTransactionContextException();
        } 

        //tx context is still the same -> no change in state required
        return null;
    }

Then a new context is created and currentContext.checkEnlistBeforeUse(ct) is called which ends up clearing the TransactionContext.synchronizations

There is a comment in BranchEnlistedStateHandler.checkEnlistBeforeUse(): "//TODO check: what if subtransaction? Possible solution: ignore if serial_jta mode, error otherwise."

I have a subtransaction and have "com.atomikos.icatch.serial_jta_transactions" set to true. Am I just unlucky to have hit something that is not supported yet?

Versions used: "org.springframework:spring-jms:5.1.10.RELEASE", "com.atomikos:transactions:5.0.3", "org.apache.activemq:activemq-client:5.15.10"

Have tried to bump to newest versions, but didn't make a difference.

GuyPardon commented 4 years ago

Well we do have a free trial if you want to try the latest stable customer version?

basecade commented 4 years ago

Hi,

Thank you for your reply. So you think my issue is not supported yet in the free version of atomikos? But yes, I would like to try the latest stable version. "Customer version" is that "ExtremeTransactions"?

GuyPardon commented 4 years ago

Hi, yes it is.

You can get it here:

https://www.atomikos.com/Main/ExtremeTransactionsFreeTrial

Stay safe! Guy