Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.35k stars 1.99k forks source link

[QUERY] Support PEEK-LOCK mode of Service Bus from Azure Spring Boot JMS libraries #26762

Closed yiliuTo closed 2 years ago

yiliuTo commented 2 years ago

Query/Question How to enable the PEEK-LOCK mode of Azure Service Bus from Azure Spring JMS support, which allow users to explicitly settle a message?

The default ack mode of Spring JMS is AUTO_ACKNOWLEDGE which automatically acknowledges messages before / after the message has been successfully processed by listener execution according to the default / simple MessageListenerContainer implementation. And due to the JMS listener and JMS template uses the default and simple message listener container respectively, so their default ack behavior are different as well.

According to the introduction of CLIENT_ACKNOWLEDGE, this mode allows client to explicitly acknowledge a message which is consistent with PEEK-LOCK mode. So how should it be used with JmsTemplate and JmsListener?

yiliuTo commented 2 years ago

The possible solution:

  1. For JmsTemplate, there is an open API to set the session ack mode:
    JmsTemplate#setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

    And use the execute method with a SessionCallback to consume a message and acknowledge.

  2. For JmsListener, there are two scenarios:
    1. Use the native Spring Jms message listener to consume and acknowledge messages. In this case, the DefaultMessageListenerContainer will be used which will also acknowledge messages after the successful listener execution for the CLIENT_ACKNOWLEDGE mode. So I doubt that users still cannot manually acknowledge a message, we should verify if customized acknowledgment could be supported in this way. Besides, to set the ack mode, the property of spring.jms.listener.acknowledge-mode could be configured.
    2. Customize the message listener and then consume and acknowledge messages in the onMessage method. In this case, another customzied bean of DefaultMessageListenerContainer should be provided to accept this message listener and also set the ack mode to client. We should check if this solution could work as expected.
jialigit commented 2 years ago

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/jms/listener/AbstractMessageListenerContainer.html

"sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default): This mode is container-dependent: For DefaultMessageListenerContainer, it means automatic message acknowledgment before listener execution, with no redelivery in case of an exception and no redelivery in case of other listener execution interruptions either. For SimpleMessageListenerContainer, it means automatic message acknowledgment after listener execution, with no redelivery in case of a user exception thrown but potential redelivery in case of the JVM dying during listener execution. In order to consistently arrange for redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - preferably - setting "sessionTransacted" to "true" instead. "sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE": Lazy message acknowledgment during (DefaultMessageListenerContainer) or shortly after (SimpleMessageListenerContainer) listener execution; no redelivery in case of a user exception thrown but potential redelivery in case of the JVM dying during listener execution. In order to consistently arrange for redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - preferably - setting "sessionTransacted" to "true" instead. "sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": Automatic message acknowledgment after successful listener execution; best-effort redelivery in case of a user exception thrown as well as in case of other listener execution interruptions (such as the JVM dying). "sessionTransacted" set to "true": Transactional acknowledgment after successful listener execution; guaranteed redelivery in case of a user exception thrown as well as in case of other listener execution interruptions (such as the JVM dying).

As per the docs above describes: Spring JMS framework expose an upper level of abstraction for acknowledgement for messages. Users only need to concern the scenario of whether to redeliver the message if user exception happens.

In our default message listener container, we set the sessionTransacted to be true, that means we guarantee redelivery in case of a user exception.

So, cares should be taken for the bean of message listener container factory and configure the jms option correspondly.

Jms Session exist in two modes: transacted and non-transacted. In transacted mode, only commit and roll back make sense, that means the message sending and acknowledgement all occur in a transaction. The sending and ack operation really take effect only when the transaction success.

And in the non-transacted mode, ack here make sense then with no transaction provided.

So the session acknowledge mode only takes effects when the transacted mode is false.

More details, please refer: https://www.infoworld.com/article/2074123/transaction-and-redelivery-in-jms.html?page=2

yiliuTo commented 2 years ago

What is the behavior of using JmsTemplate#execute and overriding the MessageListener#onMessage to manually ack a message?

jialigit commented 2 years ago

What is the behavior of using JmsTemplate#execute and overriding the MessageListener#onMessage to manually ack a message?

If

 jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

is setup, then the execute can not ack automatically. and the message is not deleted.

jmsTemplate.execute(new SessionCallback<Object>() {
           @Override
           public Object doInJms(Session session) throws JMSException {

               MessageConsumer consumer = session.createConsumer(jmsTemplate.getDestinationResolver().resolveDestinationName(session, queueName, false));
               try {
                   Message message = consumer.receive(1);
                   if (message != null) {
                       User user = (User) ((ObjectMessage) message).getObject();
                       LOGGER.info("Received message from queue: {}", user.getName());
                   }

               }
               catch (Exception e) {
                   return null;
               }
               finally {
                   consumer.close();
               }
               return null;
           }
       }, true);

Should invoke message.acknowledge() to ack message.

As of above post said, this style invocation is non-transacted, so message.acknowledge make sense here.

jialigit commented 2 years ago

The possible solution:

  1. For JmsTemplate, there is an open API to set the session ack mode:

    JmsTemplate#setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

    And use the execute method with a SessionCallback to consume a message and acknowledge.

  2. For JmsListener, there are two scenarios:

    1. Use the native Spring Jms message listener to consume and acknowledge messages. In this case, the DefaultMessageListenerContainer will be used which will also acknowledge messages after the successful listener execution for the CLIENT_ACKNOWLEDGE mode. So I doubt that users still cannot manually acknowledge a message, we should verify if customized acknowledgment could be supported in this way. Besides, to set the ack mode, the property of spring.jms.listener.acknowledge-mode could be configured.
    2. Customize the message listener and then consume and acknowledge messages in the onMessage method. In this case, another customzied bean of DefaultMessageListenerContainer should be provided to accept this message listener and also set the ack mode to client. We should check if this solution could work as expected.

Using the listener container, we need to care the mode of transaction. If it is in a transaction, ack will not take effect when exception occurs. Only when the transaction complete successfully, the ack takes effect then. That means the sending, receiving and ack operation are all managed by the transaction.

jialigit commented 2 years ago

Spring JMS framework provide two implementations for AbstractMessageListenerContainer, i.e. DefaultMessageListenerContainer and SimpleMessageListenerContainer.

For the auto-acknownledge mode, the behavior is different for these two containers as presented above of that after or before the execution of the listener. In addition to this, the DefaultMessageListenerContainer uses pulling style of consumption while the SimpleMessageListenerContainer uses pushing style of consumption. (MessageConsumer.receive() and MessageConsumer.setMessageListener()).

For DefaultMessageListenerContainer.

Message message = receiveMessage(consumerToUse);
            if (message != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
                            consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
                            sessionToUse + "]");
                }
                messageReceived(invoker, sessionToUse);
                boolean exposeResource = (!transactional && isExposeListenerSession() &&
                        !TransactionSynchronizationManager.hasResource(obtainConnectionFactory()));
                if (exposeResource) {
                    TransactionSynchronizationManager.bindResource(
                            obtainConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
                }
                try {
                    doExecuteListener(sessionToUse, message);
                }

In the receiveMessage(consumerToUse):

 @Override
    public Message receive(long timeout) throws JMSException {
        checkClosed();
        checkMessageListener();

        // Configure for infinite wait when timeout is zero (JMS Spec)
        if (timeout == 0) {
            timeout = -1;
        }

        return copy(ackFromReceive(dequeue(timeout, connection.isReceiveLocalOnly())));
    }

We see ack before actual listener execution.

And for SimpleMessageListenerContainer

 private boolean deliverNextPending() {
        if (session.isStarted() && messageQueue.isRunning() && messageListener != null) {
            dispatchLock.lock();
            try {
                JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait();
                if (envelope == null) {
                    return false;
                }

                TraceableMessage facade = envelope.getMessage().getFacade();

                if (consumeExpiredMessage(envelope)) {
                    LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
                    doAckExpired(envelope);
                    tracer.asyncDeliveryInit(facade, address);
                    tracer.asyncDeliveryComplete(facade, DeliveryOutcome.EXPIRED, null);
                } else if (session.redeliveryExceeded(envelope)) {
                    LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
                    applyRedeliveryPolicyOutcome(envelope);
                    tracer.asyncDeliveryInit(facade, address);
                    tracer.asyncDeliveryComplete(facade, DeliveryOutcome.REDELIVERIES_EXCEEDED, null);
                } else {
                    final JmsMessage copy;

                    boolean deliveryFailed = false;
                    boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||
                                              acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
                    if (autoAckOrDupsOk) {
                        copy = copy(doAckDelivered(envelope));
                    } else {
                        copy = copy(ackFromReceive(envelope));
                    }
                    session.clearSessionRecovered();

                    try {
                        tracer.asyncDeliveryInit(facade, address);

                        messageListener.onMessage(copy);
                    } catch (RuntimeException rte) {
                        deliveryFailed = true;
                        tracer.asyncDeliveryComplete(facade, DeliveryOutcome.APPLICATION_ERROR, rte);
                    } finally {
                        if (!deliveryFailed) {
                            tracer.asyncDeliveryComplete(facade, DeliveryOutcome.DELIVERED, null);
                        }
                    }

                    if (autoAckOrDupsOk && !session.isSessionRecovered()) {
                        if (!deliveryFailed) {
                            doAckConsumed(envelope);
                        } else {
                            doAckReleased(envelope);
                        }
                    }
                }
            } catch (Exception e) {
                // An error while attempting to copy the message is the likely cause of this
                // exception case being hit.
                signalExceptionListener(e);
            } finally {
                dispatchLock.unlock();

                if (isPullConsumer()) {
                    try {
                        startConsumerResource();
                    } catch (JMSException e) {
                        LOG.error("Exception during credit replenishment for consumer listener {}", getConsumerId(), e);
                    }
                }
            }
        }

        return !messageQueue.isEmpty();
    }

We see ack after listener execution.

API consume type after or before
DefaultMessageListenerContainer pull before
SimpleMessageListenerContainer push after
jialigit commented 2 years ago

Conclusion: To explicityly ack a message using the JmsTemplate and JmsListener. . For JmsTemplate

  1. Setup the ack mode
    jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
  2. Execution runner configuration

    
    @Configuration
    public class SimpleConsumerWithTemplate {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerWithTemplate.class);
    
    private static final String QUEUE_NAME = "que001";
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    @Bean
    public ApplicationRunner runner() {
        return new ApplicationRunner() {
    
            @Override
            public void run(ApplicationArguments args) throws Exception {
                jmsTemplate.execute(new SessionCallback<Object>() {
                    @Override
                    public Object doInJms(Session session) throws JMSException {
    
                        MessageConsumer consumer =
                            session.createConsumer(jmsTemplate.getDestinationResolver().resolveDestinationName(session,
                                QUEUE_NAME, false));
                        try {
                            Message message = consumer.receive(1);
                            if (message != null) {
                               // add business logic here
                                message.acknowledge();
                            }
    
                        } catch (Exception e) {
                            return null;
                        } finally {
                            consumer.close();
                        }
                        return null;
                    }
                }, true);
            }
        };
    
    }
    }

.  For JmsListener
In our default message listener container, we set the sessionTransacted to be true, that means we guarantee redelivery in case of a user exception. To explicitly ack the message, we can customize one listener container factory and set the sessionTransacted to be false like beblow:
```java
    // declare customized listener container factory here
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory(
        DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory jmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
        configurer.configure(jmsListenerContainerFactory, connectionFactory);
        jmsListenerContainerFactory.setPubSubDomain(Boolean.FALSE);
        configureCommonListenerContainerFactory(jmsListenerContainerFactory);
        // set session transacted to be false, default is true
        jmsListenerContainerFactory.setSessionTransacted(false);
        return jmsListenerContainerFactory;
    }

And then in the logic listener POJO, we ack mannuly like below:

@Service
public class QueueReceiveService {

    private static final String QUEUE_NAME = "que001";

    private final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveService.class);

    @JmsListener(destination = QUEUE_NAME, containerFactory = "jmsListenerContainerFactory")
    public void receiveMessage(Message message) throws JMSException {
       // add business logic here
        message.acknowledge();
    }

}

And setup the configuration option.

spring:
  jms:
    listener:
      acknowledge-mode: client
jialigit commented 2 years ago

close for solved.