Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.34k stars 1.98k forks source link

[QUERY] Customer is not able to complete a service bus message using Azure Java SDK 7.15.2 V2 Stack, Disabling the V2 Stack they are able to complete a message with out issue #39913

Closed satyayella closed 6 months ago

satyayella commented 6 months ago

Query/Question

Customer is using Java SDK 7.15.2 (Later than 7.15 is V2 Stack)

Session receiver async client with V2 opt in

They receive message - defer it - receive the deferred message with same client with sequence number - complete call is failing

Error Call Stack:

{'az.sdk.message':'onLinkRemoteOpen','connectionId':'MF_eb2041_1713994904143','entityPath':'test-session-topic/subscriptions/test-session-receiver/$management','linkName':'test-session-topic/subscriptions/test-session-receiver-mgmt:receiver','remoteSource':'Source{address='test-session-topic/subscriptions/test-session-receiver/$management', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}'}

17:41:56.087 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - retrieve deferred message with id:5fff155c-7c23-416b-a88f-a306cca40583, seq num:2, message state:DEFERRED

17:41:56.087 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - Completed processing message:5fff155c-7c23-416b-a88f-a306cca40583, completing the message

17:41:56.099 [message-receiver-1] ERROR com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {'az.sdk.message':'Cannot process the disposition request to set the state as 'Accepted{}' for the delivery with delivery tag (id) 'b277a0f5-8ce6-4309-bd80-49a451084189'. Reason: The delivery with the delivery tag does not exist in the link's DeliveryMap.','exception':'Cannot process the disposition request to set the state as 'Accepted{}' for the delivery with delivery tag (id) 'b277a0f5-8ce6-4309-bd80-49a451084189'. Reason: The delivery with the delivery tag does not exist in the link's DeliveryMap.','connectionId':'MF_eb2041_1713994904143'}

17:41:56.102 [message-receiver-1] ERROR com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - complete failed
com.azure.messaging.servicebus.ServiceBusException: Cannot process the disposition request to set the state as 'Accepted{}' for the delivery with delivery tag (id) 'b277a0f5-8ce6-4309-bd80-49a451084189'. Reason: The delivery with the delivery tag does not exist in the link's DeliveryMap.
at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$updateDisposition$50(ServiceBusReceiverAsyncClient.java:1659) ~[azure-messaging-servicebus-7.15.2.jar:7.15.2]
at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3811) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.Mono.subscribe(Mono.java:4490) ~[reactor-core-3.4.34.jar:3.4.34]
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4605) ~[reactor-core-3.4.34.jar:3.4.34] 

With out using V2 :

Complete is successful

17:43:09.707 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - Processing message with id:8d1896b7-2f3a-45dc-acde-f9e712862ce2 from session:99

17:43:09.707 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - message seq num:3
17:43:09.818 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - sleep 10 seconds to simulate long running task

17:43:19.913 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - retrieve deferred message with id:8d1896b7-2f3a-45dc-acde-f9e712862ce2, seq num:3, message state:DEFERRED

17:43:19.913 [message-receiver-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - Completed processing message:8d1896b7-2f3a-45dc-acde-f9e712862ce2, completing the message

17:43:19.954 [reactor-executor-1] INFO  com.ms.settlements.safe3.infra.runtime.integrationtests.servicebus.SessionV2ConsumerDeferTest - completed successfully

Why is this not a Bug or a feature Request?

We want to further know why the code sample does not work with V2 Opt In

Setup (please complete the following information if applicable):

Libraries used:

import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import com.azure.core.util.Configuration;
import com.azure.core.util.ConfigurationBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;

Sample Code:


@Slf4j
class SessionV2ConsumerDeferTest {

  private ServiceBusSessionReceiverAsyncClient sessionReceiverAsyncClient;
  private final Scheduler scheduler = Schedulers.newBoundedElastic(1,
      DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "message-receiver");
  private final String CONNECTION_STRING = System.getenv("LOCALRUN_SERVICEBUS_CONNECTIONSTRING");
  private Configuration configuration;

  void setUp() {
    ServiceBusClientBuilder serviceBusClientBuilder = new ServiceBusClientBuilder().connectionString(CONNECTION_STRING);
    String testSubscription = "test-session-receiver";
    String testTopic = "test-session-topic";
    String SESSION_RECEIVER_V2_CONFIG = "com.azure.messaging.servicebus.session.reactor.asyncReceive.v2";
    configuration = new ConfigurationBuilder().putProperty(SESSION_RECEIVER_V2_CONFIG, "true").build();
    sessionReceiverAsyncClient = createSessionReceiverAsyncClient(serviceBusClientBuilder, testTopic, testSubscription);
  }

  private ServiceBusSessionReceiverAsyncClient createSessionReceiverAsyncClient(ServiceBusClientBuilder serviceBusClientBuilder, String testTopic, String testSubscription) {
    return serviceBusClientBuilder
        .configuration(configuration)
        .sessionReceiver()
        .topicName(testTopic)
        .subscriptionName(testSubscription)
        .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
        .disableAutoComplete()
        .prefetchCount(0)
        .maxAutoLockRenewDuration(Duration.ofSeconds(300)) // 5 minutes
        .buildAsyncClient();
  }

  @Test
  void testSessionConsumerV2_deferredMessage() throws InterruptedException {
    // setup session receiver async client
   setUp();

   // accept a session and start receiving
   Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiverAsyncClient.acceptNextSession();

   Flux<Tuple2<ServiceBusReceivedMessage, ServiceBusReceiverAsyncClient>> sessionMessages = Flux.usingWhen(receiverMono,
       receiver -> {
         LOGGER.info("Receiving messages from session:{}", receiver.getSessionId());
         Flux<Tuple2<ServiceBusReceivedMessage, ServiceBusReceiverAsyncClient>> receivedMessageFlux = receiver.receiveMessages()
             .publishOn(scheduler, 1)
             .zipWith(Mono.just(receiver).cache().repeat());
         return receivedMessageFlux.switchOnFirst((signal, serviceBusReceivedMessageFlux) -> {
           if (signal.hasValue()) {
             LOGGER.debug("Adding timeout of {} millis to close the client if no new messages available for the current session",
                 50000);
             return serviceBusReceivedMessageFlux.timeout(Duration.ofMillis(50000), scheduler);
           }
           return serviceBusReceivedMessageFlux;
         });
       },
       receiver -> Mono.fromRunnable(receiver::close));

    CoreSubscriber<Tuple2<ServiceBusReceivedMessage, ServiceBusReceiverAsyncClient>> subscriber =
        createSubscriber();
    sessionMessages.subscribe(subscriber);

    // wait for the test
    TimeUnit.SECONDS.sleep(600);
 }

  private CoreSubscriber<Tuple2<ServiceBusReceivedMessage, ServiceBusReceiverAsyncClient>> createSubscriber() {

    return new CoreSubscriber<>() {
      private Subscription subscription;

      @Override
      public void onSubscribe(@NonNull Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
      }

      @Override
      public void onNext(Tuple2<ServiceBusReceivedMessage, ServiceBusReceiverAsyncClient> receivedMessageClientTuple) {
        ServiceBusReceivedMessage message = receivedMessageClientTuple.getT1();
        ServiceBusReceivedMessage retrievedDeferredMsg = null;
        ServiceBusReceiverAsyncClient receiverAsyncClient = receivedMessageClientTuple.getT2();
        LOGGER.info("Processing message with id:{} from session:{}", message.getMessageId(), receiverAsyncClient.getSessionId());
        try {
          var msgSeqNum = message.getSequenceNumber();
          LOGGER.info("message seq num:{}", msgSeqNum);
          receiverAsyncClient.defer(message).block(Duration.ofSeconds(10));
          LOGGER.info("sleep 10 seconds to simulate long running task");
          TimeUnit.SECONDS.sleep(10);
          retrievedDeferredMsg = receiverAsyncClient.receiveDeferredMessage(msgSeqNum).block();
          assert retrievedDeferredMsg != null;
          LOGGER.info("retrieve deferred message with id:{}, seq num:{}, message state:{}",
              retrievedDeferredMsg.getMessageId(),
              retrievedDeferredMsg.getSequenceNumber(),
              retrievedDeferredMsg.getState().toString());
        } catch (Exception e) {
          LOGGER.error("exception", e);
        }
        assert retrievedDeferredMsg != null;
        LOGGER.info("Completed processing message:{}, completing the message", retrievedDeferredMsg.getMessageId());
        receiverAsyncClient.complete(retrievedDeferredMsg).subscribe(
            null,
            error -> LOGGER.error("complete failed", error),
            () -> LOGGER.info("completed successfully")
        );
        this.subscription.request(1);
      }

      @Override
      public void onError(Throwable throwable) {
        LOGGER.error("Error occurred", throwable);
      }

      @Override
      public void onComplete() {
        LOGGER.info("completed receiving messages for this session.");
      }
    };
  }
}

We want to know why this code fails with V2 Stack enabled and how to correct it.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

github-actions[bot] commented 6 months ago

@anuchandy @conniey @lmolkova

github-actions[bot] commented 6 months ago

Thank you for your feedback. Tagging and routing to the team member best able to assist.

anuchandy commented 6 months ago

@satyayella, thanks for the report, I'm taking a look.

anuchandy commented 6 months ago

@satyayella, thanks for the code and I can repro this. Looking into addressing this.

manishtulsiani commented 6 months ago

Thanks @anuchandy

anuchandy commented 6 months ago

This is addressed now, thank you for reporting!