spring-projects / spring-amqp

Spring AMQP - support for Spring programming model with AMQP, especially but not limited to RabbitMQ
https://spring.io/projects/spring-amqp
Apache License 2.0
809 stars 624 forks source link

commitIfNecessary behavior RabbitTemplate with its channelTransacted=true [AMQP-796] #2303

Closed spring-operator closed 6 years ago

spring-operator commented 6 years ago

Raoul de Haard opened AMQP-796 and commented

We use the following configuration:

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPort(5672);

        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setExchange("MyOwnExchange");

        return rabbitTemplate;
    }

We have the following case: We publish an event to RabbitMQ with the Rabbittemplate.isChannelTransacted set to true. The Exchange were we send the event to does not exists on our RabbitMQ server so we expect an error like: _"reply-code=404, reply-text=NOTFOUND - no exchange 'MyOwnExchange' in vhost '/'"

In my case, a post to my RestController.add methode leads to an event being published to RabbitMQ. The first time I do an post to this add methode, an exception is returned as expected because inside the RabbitTemplate.doSend(...) the _"RabbitUtils.commitIfNecessary(channel);" _gets called and throws an exception on the channel because the Exchange in Rabbit does not exists. But when I do the post to my RestController.add methode again, I actually get an 201 response. This time the "RabbitUtils.commitIfNecessary(channel);" is not called and the exception is not returned in the controller. Only in the TransactionSynchronization.afterCompletion, the channel error will popup.

It seems that only the first time the RabbitTemplate sees the channel as "locallyTransacted" and commits it at the end of the doSend(..) methode. Is this behavior as expected (?) If yes, can we enforce the RabbitTemplate to always use a Channel that is locallyTransacted (?) We are trying to achieve that the RabbitTemplate commits after every event send to RabbitMQ, so we can act on error before our RestController completes the response.

I created an example project(see attachment) which represents our actual case. I added an integration-test which reproduces the problem/behavior. Before running the test in "PublishToUnknownExchangeTest" you need to have RabbitMQ running on localhost:5672 with user:guest and pass:guest(or change the ApplicationConfig.java).


Affects: 2.0.1

Attachments:

Referenced from: pull request https://github.com/spring-projects/spring-amqp/pull/705

Backported to: 1.7.6

spring-operator commented 6 years ago

Gary Russell commented

The channel should never be locally transacted; the RabbitTx should be synchronized with the atomikos TX and committed when the controlled method exits. Investigating...

// Created an amqpAdmin bean here because the RabbitAutoConfiguration.java
// actually created this bean with an RabbitTemplate without the
// isChannelTransacted set to true causing a non-transacted channel in the
// connectionFactory that sometimes is used by my rabbitTemplate bean which
// actually has the channelTransacted set to true.

That should not be necessary, the CCF keeps 2 lists of cached channels, transacted and not transacted.

spring-operator commented 6 years ago

Gary Russell commented

// Second post, nothing changed really, so I expect the same result.
// But it seems that the RabbitTemplate does not commit the transaction.
// When looking at the log of this test, the txCommit on the Channel
// happens to late by TransactionSynchronization.afterCompletion
// The database transaction is also not rolledback.
// When debugging the code of the doSend methode in the RabbitTemplate
// class, only in the first call the channel seems locally transacted
// for this RabbitTemplate causing the rabbitTemplate not to commit.

You can use a ChainedTransactionManager and put a RabbitTransactionManager after your JTA TM; that way, the Rabbit TX will always be committed first.

But I am still concerned that the first attempt uses a local TX.

spring-operator commented 6 years ago

Gary Russell commented

OK; here's what's going on...

  1. When the first request is processed the connection is opened; this triggers the RabbitAdmin to declare any queues etc in the application context.
  2. Because your RabbitAdmin is transactional, it should use the same transactional channel (even though declarations are not transactional)
  3. Bug 1 - the RabbitAdmin binds a channel to the transaction but because of the way the channel creations are nested, the "main" RabbitTemplate gets a different channel and, since it's not bound to the transaction, it "appears" to be locally transacted, which means the commit fails with java.lang.IllegalStateException: Channel closed during transaction - we need to fix the binding or always run admin operations in a non-transactional channel.
  4. Bug 2 - we shouldn't run the admin if there is nothing to declare.
  5. When I make the admin non-transactional; the first post gets the 201.
  6. When we get the 201s, we also see
java.lang.IllegalStateException: Channel closed during transaction
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:896) ~[spring-rabbit-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at com.sun.proxy.$Proxy106.txCommit(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:164) ~[spring-rabbit-2.0.1.RELEASE.jar:2.0.1.RELEASE]
  1. This exception is simply logged since the JTA transaction has already committed
try {
    synchronization.afterCompletion(completionStatus);
}
catch (Throwable tsex) {
    logger.error("TransactionSynchronization.afterCompletion threw exception", tsex);
}

So; we have a couple of bugs but, in order to detect the problem before committing the JTA transaction, you should use a ChainedTransactionManager as I discussed above.

spring-operator commented 6 years ago

Raoul de Haard commented

Thank you for the quick response, this cleared up a lot for me. For now I am going to use the chained ChainedTransactionManager. I also remove the setChannelTransacted(true) from the AdminTemplate, now my case works fine in my example code. I will use this solution in my real project later to see if all my other use-cases are covered now. For completeness I added my updated example using the chainedTransactionManager to this issue: [^rabbit-transactional-with-chained-transaction-manager.zip] .