quarkiverse / quarkus-pooled-jms

Quarkus extension for a JMS Connection pool for messaging applications supporting JMS 1.1 and 2.0 clients
Apache License 2.0
5 stars 7 forks source link

Bind jms transaction (not XA) on the transaction manager of quarkus #72

Closed mpumd closed 1 year ago

mpumd commented 1 year ago

Hi all,

We would like the integration with the transaction manager to be extended to support also non XA transactions. We are using Quarkus as the foundation for our microservices in OpenShift, and as a result, we want to stay stateless; so we do not intend to activate a XA compliant transaction manager inside Quarkus. Yet, we would like to manage the transaction on our JMS resource manager through JTA Transactional annotation, as opposed to use the JMS api to manage the transaction boundary on the JMS broker. Specifically, we would like the testRollback() to pass in https://github.com/quarkiverse/quarkus-pooled-jms/blob/main/integration-tests/src/test/java/io/quarkiverse/messaginghub/pooled/jms/it/PooledJmsResourceTest.java#L31-L42 even if I comment out property quarkus.pooled-jms.xa.enabled=true in https://github.com/quarkiverse/quarkus-pooled-jms/blob/main/integration-tests/src/main/resources/application.properties#L2-L3

At the implementation level, we see that PooledXAConnection has support for a provided transactionManager. We would like JmsPoolConnectionFactory to support using a transactionmanager as well, so that we can control the transaction on the broker, from the Quarkus transaction manager.

Thanks.

vsevel commented 1 year ago

hi @zhfeng any news on this? is this feasible? do you think this relevant? thanks for your help

zhfeng commented 1 year ago

yeah, I'm working on it and the first attempt is to add a Synchronization to do commit or rollback after a transaction is completed. such like

transactionManager.getTransaction().registerSynchronization(new Synchronization() {
     @Override
     public void beforeCompletion() {
     }

     @Override
     public void afterCompletion(final int status) {
     boolean isCommitted = (status == Status.STATUS_COMMITTED);

      if (isCommitted) {
          session.commit();
      } else {
         session.rollback();
      }
});
zhfeng commented 1 year ago

@vsevel @mpusg can you take a review of https://github.com/quarkiverse/quarkus-pooled-jms/pull/80 and is it possible to test it with your cases?

mpumd commented 1 year ago

Hi @zhfeng

It works.

The producer is transacted.

    @Transactional
    protected void send(String body, ConnectionFactory factory, String queueName) {
        try (JMSContext context = factory.createContext()) {
            JMSProducer producer = context.createProducer();
            producer.send(ActiveMQDestination.createQueue(queueName), body);

            log.info("send {}", body);
        }
    }

And we can also control a transacted session manually like following :

    @Transactional(Transactional.TxType.NEVER)
    private void consumeMessagesByConnection(ConnectionFactory factory, String queueName, MessageListener listener)
            throws JMSException {
        var queue = ActiveMQDestination.createQueue(queueName);
        log.info("try consuming at {}", LocalDateTime.now());
        try (Connection connection = factory.createConnection();
                Session session = connection.createSession(Session.SESSION_TRANSACTED);
                MessageConsumer consumer = session.createConsumer(queue)) {
            connection.start();
            Message message = consumer.receive(MESSAGE_RECEPTION_TIMEOUT);
            boolean ok;
            while (message != null) {
                ok = false;
                try {
                    listener.onMessage(message);
                    session.commit();
                    ok = true;
                } finally {
                    if (!ok) {
                        session.rollback();
                    }
                }
                message = consumer.receive(MESSAGE_RECEPTION_TIMEOUT);
            }
            log.info("ending session because no more messages ");
        }
    }

@zhfeng Thanks for your work

zhfeng commented 1 year ago

Thanks for your feedback and I will try to port it to main branch.

mpumd commented 1 year ago

Hi @zhfeng

Just for information, I have some problems during the quarkus:dev mode specify when I want to restart. Unsatisfied dependency for type javax.jms.ConnectionFactory and qualifiers [@Default] Obviously, it works when I run mvn clean quarkus:dev.

Do you have the same behavior when you restart in the dev mode ? Can you try ?

Thanks !

zhfeng commented 1 year ago

How did you trigger the restart? just modify some source codes?

mpumd commented 1 year ago

I use the s shortcut to restart the quarkus context in my CLI. No modification in the source code.

zhfeng commented 1 year ago

@mpusg I create a simple test project https://github.com/zhfeng/jms-test but can not reproduce the issue. So can you share your project?

mpumd commented 1 year ago

@zhfeng I tested your project and the dev mode works. But After put our parent pom in your project, it fails. So the problem is our side. We will allocate a time on this later. This is not the priority at the moment.

Thanks for your work.