Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.32k stars 1.97k forks source link

Receive message connection from ServiceBusProcessorClient closes due to inactivity in service bus #40805

Closed neeraj-metallic closed 2 months ago

neeraj-metallic commented 3 months ago

Describe the bug The connection to receive messages from ServiceBusProcessorClient is closed due to inactivity in service bus and it was not able to recreate the connection.

Exception or Stack Trace

2024-06-25 13:04:04.282  INFO 14432 --- [     parallel-4] c.a.c.a.i.ActiveClientTokenManager       : {"az.sdk.message":"Refreshing token.","scopes":"amqp://testsb.servicebus.windows.net/test_queue"}
2024-06-25 13:22:03.541  INFO 14432 --- [     parallel-7] c.a.c.a.i.ActiveClientTokenManager       : {"az.sdk.message":"Refreshing token.","scopes":"amqp://testsb.servicebus.windows.net/test_queue"}
2024-06-25 13:40:02.797  INFO 14432 --- [    parallel-10] c.a.c.a.i.ActiveClientTokenManager       : {"az.sdk.message":"Refreshing token.","scopes":"amqp://testsb.servicebus.windows.net/test_queue"}
2024-06-25 14:02:47.478  WARN 14432 --- [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      : {"az.sdk.message":"onTransportError","connectionId":"MF_8cd57c_1719236086768","errorCondition":"amqp:connection:framing-error","errorDescription":"connection aborted","hostName":"testsb.servicebus.windows.net"}
2024-06-25 14:02:47.584  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.ReactorConnection              : {"az.sdk.message":"Disposing of ReactorConnection.","connectionId":"MF_8cd57c_1719236086768","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"connection aborted, errorContext[NAMESPACE: testsb.servicebus.windows.net. ERROR CONTEXT: N/A]"}
2024-06-25 14:02:47.696  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      : {"az.sdk.message":"onConnectionUnbound","connectionId":"MF_8cd57c_1719236086768","hostName":"testsb.servicebus.windows.net","state":"CLOSED","remoteState":"ACTIVE"}
2024-06-25 14:02:47.700  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.handler.ReceiveLinkHandler2    : {"az.sdk.message":"onLinkFinal","connectionId":"MF_8cd57c_1719236086768","linkName":"test_queue_8fb2d1_1719236086750","entityPath":"test_queue"}
2024-06-25 14:02:47.704  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.handler.SessionHandler         : {"az.sdk.message":"onSessionFinal.","connectionId":"MF_8cd57c_1719236086768","sessionName":"test_queue"}
2024-06-25 14:02:47.705  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.handler.SendLinkHandler        : {"az.sdk.message":"onLinkFinal","connectionId":"MF_8cd57c_1719236086768","linkName":"cbs:sender","entityPath":"$cbs"}
2024-06-25 14:02:47.705  WARN 14432 --- [ receiverPump-1] c.a.c.amqp.implementation.MessageFlux    : {"az.sdk.message":"Receiver emitted terminal completion.","messageFlux":"mf_eb0a16_1719236086755","connectionId":"MF_8cd57c_1719236086768","linkName":"test_queue_8fb2d1_1719236086750","entityPath":"test_queue"}
2024-06-25 14:02:47.705  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.handler.ReceiveLinkHandler2    : {"az.sdk.message":"onLinkFinal","connectionId":"MF_8cd57c_1719236086768","linkName":"cbs:receiver","entityPath":"$cbs"}
2024-06-25 14:02:47.707  WARN 14432 --- [ receiverPump-1] c.a.c.amqp.implementation.MessageFlux    : {"az.sdk.message":"Current mediator reached terminal completion-state (retriable:true).","messageFlux":"mf_eb0a16_1719236086755","connectionId":"MF_8cd57c_1719236086768","linkName":"test_queue_8fb2d1_1719236086750","entityPath":"test_queue","retryAfter":1000}
2024-06-25 14:02:47.709  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Channel is closed. Requesting upstream.","connectionId":"MF_8cd57c_1719236086768","entityPath":"$cbs"}
2024-06-25 14:02:47.710  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Connection not requested, yet. Requesting one.","connectionId":"MF_8cd57c_1719236086768","entityPath":"$cbs"}
2024-06-25 14:02:47.717  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.ReactorConnection              : {"az.sdk.message":"Emitting new response channel.","connectionId":"MF_8cd57c_1719236086768","entityPath":"$cbs","linkName":"cbs"}
2024-06-25 14:02:47.717  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Setting next AMQP channel.","connectionId":"MF_8cd57c_1719236086768","entityPath":"$cbs"}
2024-06-25 14:02:47.718  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Upstream connection publisher was completed. Terminating processor.","connectionId":"MF_8cd57c_1719236086768","entityPath":"$cbs"}
2024-06-25 14:02:47.723  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.handler.SessionHandler         : {"az.sdk.message":"onSessionFinal.","connectionId":"MF_8cd57c_1719236086768","sessionName":"cbs-session"}
2024-06-25 14:02:47.724  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      : {"az.sdk.message":"onConnectionFinal","connectionId":"MF_8cd57c_1719236086768","errorCondition":"amqp:resource-limit-exceeded","errorDescription":"local-idle-timeout expired","hostName":"testsb.servicebus.windows.net"}
2024-06-25 14:02:47.728  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.ReactorConnection              : {"az.sdk.message":"Closing executor.","connectionId":"MF_8cd57c_1719236086768"}
2024-06-25 14:02:47.730  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.handler.SendLinkHandler        : {"az.sdk.message":"Sender link was never active. Closing endpoint states.","connectionId":"MF_8cd57c_1719236086768","linkName":"cbs","entityPath":"$cbs"}
2024-06-25 14:02:47.730  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.handler.ReceiveLinkHandler2    : {"az.sdk.message":"Receiver link was never active. Closing endpoint states","connectionId":"MF_8cd57c_1719236086768","entityPath":"$cbs","linkName":"cbs"}
2024-06-25 14:02:47.731  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Channel is disposed.","connectionId":"MF_8cd57c_1719236086768","entityPath":"$cbs"}
2024-06-25 14:02:47.731  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.handler.ConnectionHandler      : {"az.sdk.message":"onTransportClosed","connectionId":"MF_8cd57c_1719236086768","errorCondition":"amqp:connection:framing-error","errorDescription":"connection aborted","hostName":"testsb.servicebus.windows.net"}
2024-06-25 14:02:47.731  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.handler.GlobalIOHandler        : {"az.sdk.message":"onTransportClosed","connectionId":"MF_8cd57c_1719236086768","hostName":"testsb.servicebus.windows.net"}
2024-06-25 14:02:48.722  WARN 14432 --- [     parallel-3] c.a.c.amqp.implementation.MessageFlux    : {"az.sdk.message":"Requesting a new mediator.","messageFlux":"mf_eb0a16_1719236086755","connectionId":"MF_8cd57c_1719236086768","linkName":"test_queue_8fb2d1_1719236086750","entityPath":"test_queue"}
2024-06-25 14:02:48.724  INFO 14432 --- [     parallel-3] c.a.c.a.i.ReactorConnectionCache         : {"az.sdk.message":"The connection is closed, requesting a new connection.","entityPath":"N/A","connectionId":"MF_8cd57c_1719236086768"}
2024-06-25 14:02:48.727  INFO 14432 --- [     parallel-3] c.a.c.a.i.ReactorConnectionCache         : {"az.sdk.message":"Waiting to connect and become active.","entityPath":"N/A","connectionId":"MF_cce23d_1719304368725"}
2024-06-25 14:02:48.727  INFO 14432 --- [     parallel-3] c.a.c.a.i.ReactorConnection              : {"az.sdk.message":"Creating and starting connection.","connectionId":"MF_cce23d_1719304368725","hostName":"testsb.servicebus.windows.net","port":5671}
2024-06-25 14:02:48.743  INFO 14432 --- [     parallel-3] c.a.c.a.implementation.ReactorExecutor   : {"az.sdk.message":"Starting reactor.","connectionId":"MF_cce23d_1719304368725"}
2024-06-25 14:02:48.747  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.handler.ConnectionHandler      : {"az.sdk.message":"onConnectionInit","connectionId":"MF_cce23d_1719304368725","hostName":"testsb.servicebus.windows.net","namespace":"testsb.servicebus.windows.net"}
2024-06-25 14:02:48.748  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.handler.ReactorHandler         : {"az.sdk.message":"reactor.onReactorInit","connectionId":"MF_cce23d_1719304368725"}
2024-06-25 14:02:48.748  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.handler.ConnectionHandler      : {"az.sdk.message":"onConnectionLocalOpen","connectionId":"MF_cce23d_1719304368725","hostName":"testsb.servicebus.windows.net"}
2024-06-25 14:02:48.757  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.handler.ConnectionHandler      : {"az.sdk.message":"onConnectionBound","connectionId":"MF_cce23d_1719304368725","hostName":"testsb.servicebus.windows.net","peerDetails":"testsb.servicebus.windows.net:5671"}
2024-06-25 14:02:50.461  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.handler.ConnectionHandler      : {"az.sdk.message":"onConnectionRemoteOpen","connectionId":"MF_cce23d_1719304368725","hostName":"testsb.servicebus.windows.net","remoteContainer":"0befeb3509974c5f9b20ee34e06fedd6_G5"}
2024-06-25 14:02:50.462  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.ReactorConnectionCache         : {"az.sdk.message":"Emitting the new active connection.","entityPath":"N/A","connectionId":"MF_cce23d_1719304368725"}
2024-06-25 14:02:50.713  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.handler.SessionHandler         : {"az.sdk.message":"onSessionRemoteOpen","connectionId":"MF_cce23d_1719304368725","sessionName":"test_queue","sessionIncCapacity":0,"sessionOutgoingWindow":2147483647}
2024-06-25 14:02:50.715  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.ReactorConnection              : {"az.sdk.message":"Setting CBS channel.","connectionId":"MF_cce23d_1719304368725"}
2024-06-25 14:02:50.957  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.handler.SessionHandler         : {"az.sdk.message":"onSessionRemoteOpen","connectionId":"MF_cce23d_1719304368725","sessionName":"cbs-session","sessionIncCapacity":0,"sessionOutgoingWindow":2147483647}
2024-06-25 14:02:50.958  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.ReactorConnection              : {"az.sdk.message":"Emitting new response channel.","connectionId":"MF_cce23d_1719304368725","entityPath":"$cbs","linkName":"cbs"}
2024-06-25 14:02:50.958  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Setting next AMQP channel.","connectionId":"MF_cce23d_1719304368725","entityPath":"$cbs"}
2024-06-25 14:02:50.959  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Next AMQP channel received.","connectionId":"MF_cce23d_1719304368725","entityPath":"$cbs","subscriberId":"un_56124c_1719304370717"}
2024-06-25 14:02:51.479  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.handler.SendLinkHandler        : {"az.sdk.message":"onLinkRemoteOpen","connectionId":"MF_cce23d_1719304368725","linkName":"cbs:sender","entityPath":"$cbs","remoteTarget":"Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}"}
2024-06-25 14:02:51.480  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Channel is now active.","connectionId":"MF_cce23d_1719304368725","entityPath":"$cbs"}
2024-06-25 14:02:51.480  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.handler.ReceiveLinkHandler2    : {"az.sdk.message":"onLinkRemoteOpen","connectionId":"MF_cce23d_1719304368725","entityPath":"$cbs","linkName":"cbs:receiver","remoteSource":"Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}"}
2024-06-25 14:02:51.722  INFO 14432 --- [ctor-executor-2] c.a.c.a.i.ActiveClientTokenManager       : {"az.sdk.message":"Scheduling refresh token task.","scopes":"amqp://testsb.servicebus.windows.net/test_queue"}
2024-06-25 14:02:51.724  INFO 14432 --- [ctor-executor-2] c.a.c.a.implementation.ReactorSession    : {"az.sdk.message":"Creating a new receiver link.","connectionId":"MF_cce23d_1719304368725","sessionName":"test_queue","linkName":"test_queue_8fb2d1_1719236086750"}
2024-06-25 14:02:51.725  WARN 14432 --- [ctor-executor-2] c.a.c.amqp.implementation.MessageFlux    : {"az.sdk.message":"Setting next mediator and waiting for activation.","messageFlux":"mf_eb0a16_1719236086755","connectionId":"MF_cce23d_1719304368725","linkName":"test_queue_8fb2d1_1719236086750","entityPath":"test_queue"}
2024-06-25 14:02:51.726  INFO 14432 --- [ctor-executor-2] c.a.c.a.implementation.ReactorSession    : {"az.sdk.message":"Returning existing receive link.","connectionId":"MF_cce23d_1719304368725","linkName":"test_queue_8fb2d1_1719236086750","entityPath":"test_queue"}
2024-06-25 14:02:51.730  INFO 14432 --- [ctor-executor-1] c.a.c.a.implementation.ReactorExecutor   : {"az.sdk.message":"Processing all pending tasks and closing old reactor.","connectionId":"MF_8cd57c_1719236086768"}
2024-06-25 14:02:51.732  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.ReactorDispatcher              : {"az.sdk.message":"Reactor selectable is being disposed.","connectionId":"MF_8cd57c_1719236086768"}
2024-06-25 14:02:51.732  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.ReactorConnection              : {"az.sdk.message":"onConnectionShutdown. Shutting down.","connectionId":"MF_8cd57c_1719236086768","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"connectionId[MF_8cd57c_1719236086768] Reactor selectable is disposed.","namespace":"testsb.servicebus.windows.net"}
2024-06-25 14:02:51.751  WARN 14432 --- [ctor-executor-1] c.a.c.a.i.ReactorConnection              : {"az.sdk.message":"Error occurred while processing connection state.","exception":"connection aborted, errorContext[NAMESPACE: testsb.servicebus.windows.net. ERROR CONTEXT: N/A]","connectionId":"MF_8cd57c_1719236086768"}
2024-06-25 14:02:51.752  INFO 14432 --- [ctor-executor-1] c.a.c.a.i.ReactorConnection              : {"az.sdk.message":"onConnectionShutdown. Shutting down.","connectionId":"MF_8cd57c_1719236086768","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"Finished processing pending tasks.","namespace":"testsb.servicebus.windows.net"}

To Reproduce Steps to reproduce the behavior:

Code Snippet

public void receiveMessages() {
        processorClient = new ServiceBusClientBuilder()
                .connectionString(connectString)
                .processor()
                .queueName(queueName)
                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                .disableAutoComplete()
                .processMessage(TestMessageListener::processMessage)
                .processError(context -> processError(context))
                .buildProcessorClient();

        logger.info("Starting message processor");
        processorClient.start();
    }

Expected behavior

Setup (please complete the following information):

Application is written in Java and is running in Azure Kubernetes Service Bus is in Azure

joshfree commented 3 months ago

@anuchandy could you please take a look?

anuchandy commented 3 months ago

In the log I see that indeed the TCP connection recovered, re-established a new link to the queue entity (with name testqueue*). Is there any chance that the app logic in the processor handler stuck? no message will be delivered on the new link until the handler resume

anuchandy commented 2 months ago

Closing this. The log about connection close is normal (e.g. idle timeout). As observed in the log, the SDK will make a new connection if the current one is closed. Please refer above comment, in case there is no message delivery after post re-connection log.