Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.35k stars 1.99k forks source link

[BUG] Multiple messages aren't picked from the Subscription when using ServiceBusReceiverAsyncClient #19254

Closed sudharsan2020 closed 3 years ago

sudharsan2020 commented 3 years ago

Describe the bug

Exception or Stack Trace NA

To Reproduce Steps to reproduce the behavior: Send batch of messages to the subscription. [ Example: *nprefetch_count**]. Here we sent 50 messages to the subscription to reproduce the issue

Code Snippet

public class WebJobListener {
    private ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient;
    private Gson gson;
    private ILogger log;
    private static final Type requestModelType = new TypeToken<RequestModel>() {}.getType();

    @Inject
    public WebJobListener(ILogger log) {
        this.serviceBusReceiverAsyncClient = new ServiceBusClientBuilder()
                .connectionString(System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"))
                .retryOptions(new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(30)))
                .receiver()
                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                .topicName("MY_TOPIC")
                .subscriptionName("MY_SUBSCRIPTION")
                .prefetchCount(10)
                .maxAutoLockRenewDuration(Duration.ofMinutes(10))
                .disableAutoComplete()
                .buildAsyncClient();
        this.jobManager = jobManager;
        this.captureJob = new CaptureJob();
        this.gson = new Gson();
        this.log = log;
    }

    public void run() {
        log.trace("Webjob worker starting to Listen", LogLevels.Verbose, null);
        serviceBusReceiverAsyncClient.receiveMessages().subscribe(message -> {
                    // Process message. If an exception is thrown from this consumer, the message is abandoned.
                    // Otherwise, it is completed.
                    if(message.getSubject().contentEquals("MY_SUBSCRIPTION")) {
                        RequestModel requestModel = gson.fromJson(message.getBody().toString(), requestModelType);

                        // My logic goes here
                        //my_test_function(requestModel);

                        // Complete the message after function completes
                        serviceBusReceiverAsyncClient.complete(message);
                    }
                },
                error -> log.trace("Error occurred while receiving message from subscription: " + error , LogLevels.Error, null));
    }
}

Expected behavior

Screenshots NA

Setup (please complete the following information):

Additional context azure-messaging-servicebus library 7.0.1 version was used.

        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-messaging-servicebus</artifactId>
            <version>7.0.1</version>
        </dependency>

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

hemanttanwar commented 3 years ago

@sudharsan2020 Thank You for using our library. When you call serviceBusReceiverAsyncClient.complete(message) which is a Async call, It returns a Mono and you are not subscribing to this.

Here is a example which show, how to chain the multiple Async calls. Please try this.

https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAndSettleAsyncSample.java#L83

sudharsan2020 commented 3 years ago

@hemanttanwar While trying to fetch next batch of messages after prefetch count, I am still seeing the below errors. Will the receiver client will be connected 24*7 or we need to reestablish the connection after a time range.

Multiple message handling:

Attaching the entire log for your reference.

10:25:28.712 [reactor-executor-1] INFO  c.a.c.a.i.handler.SessionHandler - onSessionRemoteClose connectionId[nextstep-service-bus/subscriptions/AI_EXTRACTION_RQST], entityName[MF_725f99_1613537389496], condition[Error{condition=null, description='null', info=null}]
10:25:28.712 [reactor-executor-1] INFO  c.a.c.a.i.handler.SessionHandler - onSessionRemoteClose closing a local session for connectionId[MF_725f99_1613537389496], entityName[nextstep-service-bus/subscriptions/AI_EXTRACTION_RQST], condition[null], description[null]
10:25:28.713 [reactor-executor-1] INFO  c.a.c.a.i.ReactorConnection - connectionId[MF_725f99_1613537389496] sessionName[nextstep-service-bus/subscriptions/AI_EXTRACTION_RQST]: Error occurred. Removing and disposing session.
10:25:28.713 [reactor-executor-1] INFO  c.a.c.a.i.ReactorSession - connectionId[MF_725f99_1613537389496], sessionId[nextstep-service-bus/subscriptions/AI_EXTRACTION_RQST], errorCondition[n/a]: Disposing of session.
10:25:28.716 [reactor-executor-1] INFO  c.a.c.a.i.handler.SendLinkHandler - onLinkRemoteClose connectionId[MF_725f99_1613537389496], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6515879c83774e0da496e8f6f415a0ed_G27, SystemTracker:gateway7, Timestamp:2021-02-17T04:55:29]
10:25:28.716 [reactor-executor-1] INFO  c.a.c.a.i.handler.SendLinkHandler - processOnClose connectionId[MF_725f99_1613537389496], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6515879c83774e0da496e8f6f415a0ed_G27, SystemTracker:gateway7, Timestamp:2021-02-17T04:55:29]
10:25:28.724 [reactor-executor-1] WARN  c.a.c.a.i.RequestResponseChannel - Retry #1. Transient error occurred. Retrying after 4511 ms.
The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6515879c83774e0da496e8f6f415a0ed_G27, SystemTracker:gateway7, Timestamp:2021-02-17T04:55:29, errorContext[NAMESPACE: capture-dev-service-bus.servicebus.windows.net, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 99]
10:25:28.725 [reactor-executor-1] ERROR c.a.c.a.i.RequestResponseChannel - cbs - Exception in RequestResponse links. Disposing and clearing unconfirmed sends.
The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6515879c83774e0da496e8f6f415a0ed_G27, SystemTracker:gateway7, Timestamp:2021-02-17T04:55:29, errorContext[NAMESPACE: capture-dev-service-bus.servicebus.windows.net, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 99]
10:25:28.726 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - onLinkRemoteClose connectionId[MF_725f99_1613537389496], linkName[cbs:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6515879c83774e0da496e8f6f415a0ed_G27, SystemTracker:gateway7, Timestamp:2021-02-17T04:55:29]
10:25:28.727 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - processOnClose connectionId[MF_725f99_1613537389496], linkName[cbs:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6515879c83774e0da496e8f6f415a0ed_G27, SystemTracker:gateway7, Timestamp:2021-02-17T04:55:29]
10:25:28.727 [reactor-executor-1] INFO  c.a.c.a.i.handler.SessionHandler - onSessionRemoteClose connectionId[cbs-session], entityName[MF_725f99_1613537389496], condition[Error{condition=null, description='null', info=null}]
10:25:28.727 [reactor-executor-1] INFO  c.a.c.a.i.handler.SessionHandler - onSessionRemoteClose closing a local session for connectionId[MF_725f99_1613537389496], entityName[cbs-session], condition[null], description[null]
10:25:28.727 [reactor-executor-1] INFO  c.a.c.a.i.ReactorConnection - connectionId[MF_725f99_1613537389496] sessionName[cbs-session]: Error occurred. Removing and disposing session.
10:25:28.727 [reactor-executor-1] INFO  c.a.c.a.i.ReactorSession - connectionId[MF_725f99_1613537389496], sessionId[cbs-session], errorCondition[n/a]: Disposing of session.
10:25:28.727 [reactor-executor-1] INFO  c.a.c.a.i.handler.ConnectionHandler - onConnectionRemoteClose hostname[capture-dev-service-bus.servicebus.windows.net:5671], connectionId[MF_725f99_1613537389496], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6515879c83774e0da496e8f6f415a0ed_G27, SystemTracker:gateway7, Timestamp:2021-02-17T04:55:29]
10:25:28.727 [reactor-executor-1] INFO  c.a.c.a.i.handler.ConnectionHandler - onTransportClosed hostname[capture-dev-service-bus.servicebus.windows.net:5671], connectionId[MF_725f99_1613537389496], error[n/a]
10:25:28.728 [reactor-executor-1] INFO  c.a.c.a.i.handler.CustomIOHandler - onTransportClosed connectionId[MF_725f99_1613537389496], hostname[capture-dev-service-bus.servicebus.windows.net:5671]
10:25:28.728 [reactor-executor-1] INFO  c.a.c.a.i.handler.SendLinkHandler - onLinkLocalClose connectionId[MF_725f99_1613537389496], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:6515879c83774e0da496e8f6f415a0ed_G27, SystemTracker:gateway7, Timestamp:2021-02-17T04:55:29]
10:25:28.729 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - onLinkLocalClose connectionId[MF_725f99_1613537389496], linkName[cbs:receiver], errorCondition[null], errorDescription[null]
10:25:28.729 [reactor-executor-1] INFO  c.a.c.a.i.handler.ConnectionHandler - onConnectionLocalClose hostname[capture-dev-service-bus.servicebus.windows.net:5671], connectionId[MF_725f99_1613537389496], errorCondition[null], errorDescription[null]
10:25:28.729 [reactor-executor-1] INFO  c.a.c.a.i.handler.ConnectionHandler - onConnectionUnbound hostname[capture-dev-service-bus.servicebus.windows.net:5671], connectionId[MF_725f99_1613537389496], state[CLOSED], remoteState[CLOSED]
10:25:28.729 [reactor-executor-1] INFO  c.a.c.a.i.handler.SessionHandler - onSessionFinal connectionId[MF_725f99_1613537389496], entityName[nextstep-service-bus/subscriptions/AI_EXTRACTION_RQST], condition[null], description[null]
10:25:28.729 [reactor-executor-1] INFO  c.a.c.a.i.handler.SendLinkHandler - onLinkFinal connectionId[MF_725f99_1613537389496], linkName[cbs:sender]
10:25:28.730 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - onLinkFinal connectionId[MF_725f99_1613537389496], linkName[cbs:receiver]
10:25:28.730 [reactor-executor-1] INFO  c.a.c.a.i.handler.SessionHandler - onSessionFinal connectionId[MF_725f99_1613537389496], entityName[cbs-session], condition[null], description[null]
10:25:28.730 [reactor-executor-1] INFO  c.a.c.a.i.handler.ConnectionHandler - onConnectionFinal hostname[capture-dev-service-bus.servicebus.windows.net:5671], connectionId[MF_725f99_1613537389496], errorCondition[null], errorDescription[null]
10:25:28.732 [reactor-executor-1] INFO  c.a.m.s.i.ServiceBusConnectionProcessor - namespace[] entityPath[capture-dev-service-bus.servicebus.windows.net]: Channel is closed.
10:25:28.733 [reactor-executor-1] INFO  c.a.c.a.i.ReactorConnection - connectionId[MF_725f99_1613537389496], errorCondition[n/a]: Disposing of ReactorConnection.
10:25:33.237 [parallel-9] INFO  c.a.c.a.i.RequestResponseChannel - Retry #1. Requesting from upstream.
10:25:33.237 [parallel-9] INFO  c.a.c.a.i.RequestResponseChannel - namespace[MF_725f99_1613537389496] entityPath[$cbs]: Connection not requested, yet. Requesting one.
10:25:33.237 [parallel-9] WARN  c.a.c.a.i.RequestResponseChannel - Non-retryable error occurred in connection.
10:25:33.238 [parallel-9] INFO  c.a.c.a.i.RequestResponseChannel - namespace[MF_725f99_1613537389496] entityPath[$cbs]: Error in AMQP channel processor. Notifying 0 subscribers.
10:25:58.734 [reactor-executor-1] INFO  c.a.c.a.i.ReactorExecutor - Unable to acquire dispose reactor semaphore within timeout.
10:25:58.736 [reactor-executor-1] INFO  c.a.c.a.i.AmqpExceptionHandler - Shutdown received: ReactorExecutor.close() was called., isTransient[false], initiatedByClient[true]
10:25:58.739 [reactor-executor-1] WARN  c.a.c.a.i.ReactorExecutor - connectionId[MF_725f99_1613537389496], message[Scheduling reactor failed because the scheduler has been shut down.]
Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@75c8c075 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@487fe083[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
10:25:58.739 [reactor-executor-1] INFO  c.a.c.a.i.ReactorExecutor - connectionId[MF_725f99_1613537389496], message[Stopping the reactor because thread was interrupted or the reactor has no more events to process.]
10:38:42.693 [parallel-8] INFO  c.a.c.a.i.ActiveClientTokenManager - Refreshing token. scopes[amqp://capture-dev-service-bus.servicebus.windows.net/nextstep-service-bus] 
10:38:42.915 [reactor-executor-2] INFO  c.a.c.a.i.ActiveClientTokenManager - Authorization successful. Refreshing token in 1079000 ms. scopes[amqp://capture-dev-service-bus.servicebus.windows.net/nextstep-service-bus]
10:51:03.089 [reactor-executor-2] INFO  c.a.c.a.i.handler.SendLinkHandler - onLinkRemoteClose connectionId[MF_20a878_1613537389305], linkName[nextstep-service-bus], errorCondition[amqp:link:detach-forced], errorDescription[Idle link tracker, link nextstep-service-bus has been idle for 1800000ms TrackingId:95859ace-6f7a-4f96-96a2-8c4aef23bcc3_G19, SystemTracker:client-link7019444, Timestamp:2021-02-17T05:21:02]
10:51:03.089 [reactor-executor-2] INFO  c.a.c.a.i.handler.SendLinkHandler - processOnClose connectionId[MF_20a878_1613537389305], linkName[nextstep-service-bus], errorCondition[amqp:link:detach-forced], errorDescription[Idle link tracker, link nextstep-service-bus has been idle for 1800000ms TrackingId:95859ace-6f7a-4f96-96a2-8c4aef23bcc3_G19, SystemTracker:client-link7019444, Timestamp:2021-02-17T05:21:02]
10:51:03.092 [reactor-executor-2] INFO  c.a.c.a.i.ReactorSession - linkName[nextstep-service-bus]: Error occurred. Removing and disposing send link.
10:51:03.092 [reactor-executor-2] INFO  c.a.c.a.i.handler.SendLinkHandler - onLinkLocalClose connectionId[MF_20a878_1613537389305], linkName[nextstep-service-bus], errorCondition[amqp:link:detach-forced], errorDescription[Idle link tracker, link nextstep-service-bus has been idle for 1800000ms TrackingId:95859ace-6f7a-4f96-96a2-8c4aef23bcc3_G19, SystemTracker:client-link7019444, Timestamp:2021-02-17T05:21:02]
sudharsan2020 commented 3 years ago

@hemanttanwar We are headed for critical release based on the Service Bus Architecture change. If you could help, it would be really great. The parallel .NET API for Azure Service Bus implemented by our Internal team doesn't face the above-mentioned issues. Not sure why we face such issues with Java API alone?

Do you suggest using 3rd Party stable Java APIs for interacting with Azure Service Bus like JMS? https://docs.microsoft.com/en-us/azure/service-bus-messaging/how-to-use-java-message-service-20

hemanttanwar commented 3 years ago

@sudharsan2020 , The client should be able to re-establish the connection after the connection is inactive.

setMaxConcurrentCalls : It is because ServiceBusProcessorClient is synchronous client and SDK provide this feature as convenience API. ServiceBusReceiverAsyncClient is based on project reactor and you leverage its API for parallel processing https://projectreactor.io/docs/core/release/reference/#schedulers.

ServiceBusProcessorClient : This returns ServiceBusReceivedMessageContext and you can call .complete() method on it.

Prefetch count = 50 scenario : I have done few test where prefetchCount is 50 using ServiceBusReceiverAsyncClient and I have sent 55 messages in the queue. I do receive all the 55 messages immediately.

I will also run test for last scenario you mentioned "55 messages in a single shot" and update you .

Next steps : Can you send me code sample for this case where you do not receive 51st message ? If it is bigger code base, you can upload sample program on a repo. Please make sure there is no proprietary information there.

sudharsan2020 commented 3 years ago

@hemanttanwar Thanks for your reply. The code remains same as mentioned in the above thread.

Currently I could see all the messages are completed when sent in batches of 50 also noticed this after upgrading the azure-messaging-servicebus from 7.0.1 to 7.0.2. I will monitor this observation for a week and close this ticket.

Thanks a lot for your timely support :+1:

hemanttanwar commented 3 years ago

@sudharsan2020 The AmqpRetryOptions() has default values of mode = AmqpRetryMode.EXPONENTIAL. This will dictate how the retry attempts to recover from connection failure will work. Each API have java doc explaining what it does. Normally the default setup should recover from most connection errors. But it is better to observe in your environment and see if you want to override the default "AmqpRetryOptions".

sudharsan2020 commented 3 years ago

@hemanttanwar Thanks a lot for your suggestion. Also noticed a weird scenario the messages aren't completed when using a sync client to receive the messages.

I've followed the below API implementation for receiving the messages through sync client https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSample.java#L51

Sync client to receive messages:

public class AzureServiceBusReceiverClient {
    private ServiceBusReceiverClient receiverClient;

    public AzureServiceBusReceiverClient() {

        // Build the receiver client
        this.receiverClient = new ServiceBusClientBuilder()
                .connectionString(System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"))
                .receiver()
                .topicName("MY_TOPIC")
                .subscriptionName("MY_SUBSCRIPTION")
                .prefetchCount(10)
                .disableAutoComplete()
                .buildClient();
    }

    /*
     * Function to receive Service Bus messages to topic
     * @param serviceBusMessages: List of messages to be received in batch
     */
    public List<ServiceBusReceivedMessage> receiveMessages(Map<String, String> documentProperties) {

        // Iterate through messages
        List<ServiceBusReceivedMessage> serviceBusReceivedMessageList = new ArrayList<>();
        try {

            for (ServiceBusReceivedMessage serviceBusReceivedMessage :
                    receiverClient.receiveMessages(10)) {

                // receives message is passed to callback
                if (serviceBusReceivedMessage.getSubject() != null
                        && serviceBusReceivedMessage.getContentType() != null
                        && serviceBusReceivedMessage
                        .getSubject()
                        .contentEquals("MY_SUBSCRIPTION")
                        && serviceBusReceivedMessage
                        .getApplicationProperties()
                        .get("customer_id")
                        .equals(documentProperties.get("customer_id"))) {

                    // Append the message response
                    serviceBusReceivedMessageList.add(serviceBusReceivedMessage);

                    // Messages from the sync receiver MUST be settled explicitly.
                    receiverClient.complete(serviceBusReceivedMessage);
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        // Sort this based on page number and remove duplicates
        return serviceBusReceivedMessageList;
    }
}
hemanttanwar commented 3 years ago

@sudharsan2020 I took your above code, removed if condition and just called receiverClient.complete(serviceBusReceivedMessage); It was able to complete the message, worked as expected.

I believe the if condition is failing in your case and control is not going into if block. Can you check that ?

sudharsan2020 commented 3 years ago

Closing the ticket as the issue is resolved. Thanks a lot @hemanttanwar for your support :100: :1st_place_medal: