Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.35k stars 1.99k forks source link

[Azure Service Bus Client filter/segregate messages based on a criteria for session-enabled queuss.] #8340

Closed ahmads-kainos closed 4 years ago

ahmads-kainos commented 4 years ago

When messages have been received by the Azure Service Bus Client then how those messages can be filtered? We want that some of the messages with particular messageId to be marked completed and others to be left un-consumed in the queue for some other process to pick. The code attached does not seem to do it.

The system does not throw any exceptions.

To Reproduce 1- Use the session aware Azure Service Bus queue. 2- Send messages to Azure Service Bus queue with various sessionsIds. 3- Receive the messages trough a queue client onMeassageAsync 4- The code then tried to filter on the messages. 5- The filtering does not seem to work.

Code Snippet

private void registerSessionHandler(IQueueClient queueClient, ExecutorService executorService, String matchId)
      throws Exception {
    queueClient.registerSessionHandler(new ISessionHandler() {
      @Override
      public CompletableFuture<Void> onMessageAsync(IMessageSession iMessageSession,
          IMessage message) {
        logger.info("Message received");
        String messageId = message.getMessageId();

        if(matchId.equalsIgnoreCase(messageId)){
          return queueClient.completeAsync(message.getLockToken());
        }else{
          return queueClient.completeAsync(null);
        }
      }

      @Override
      public CompletableFuture<Void> OnCloseSessionAsync(IMessageSession iMessageSession) {
        return iMessageSession.renewSessionLockAsync();
      }

      @Override
      public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
        System.out.println("Exception " + exceptionPhase + "-" + throwable.getMessage());
      }
    }, new SessionHandlerOptions(3, false, Duration.ofMinutes(1)) ,executorService);
  }

It will be expected that messages with particular Id can be segragated.

Setup (please complete the following information):

joshfree commented 4 years ago

@yvgopal @hemanttanwar can you please follow up?

yvgopal commented 4 years ago

@ahmads-kainos There is no in-built filtering. ServiceBus is a competing consumer model. If a consumer/client accepts a session, it will receive all messages of the session and another consumer/client cannot accept the same session until the session is released by the first one. Abandoning some messages in a session results in the session receiving the same message again and again as there is no competing consumer. To avoid it, you should ideally close the session as soon as you abandon any message. ISessionHandler is not right API for this purpose. Instead use ClientFactory.acceptSession... method.

In your code, you should use iMessageSession.completeAsync(message.getLockToken()) to complete a message. Note that complete is being called on the session, not on the queue client. Messages received with a session must be completed using the session object. Also completeAsyncmethod doesn't accept null argument.

I am closing this issue as it is not a bug. You should adjust your usage pattern. There is no in-built filter/message selector on a queue or subscription or session. I understand JMS has that feature, ServiceBus doesn't have it yet. We are working on that feature but it may not be released in the near future. You should ideally consider using more session ids, and use session id as the filter.