Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.35k stars 1.98k forks source link

[BUG] spring-cloud-azure-starter-servicebus-jms - com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose amqp: connection:forced The connection was closed by container because it did not have any active links in the past 300000 milliseconds #41736

Closed Tom-Van-Asch closed 1 month ago

Tom-Van-Asch commented 1 month ago

Describe the bug We are using the ServiceBusSenderClient to send messages to a service bus topic. When there is no message send for 15 minutes we see that the connection is freed but results in een NullPointerException afterwards as a new session is created on the already closed connection.

Exception or Stack Trace

2024-09-02 13:12:13.557 DEBUG [spring-app-name,,] 80959 --- [tem-execution-4] b.d.s.c.j.o.service.OutboxItemProcessor  : Processing outbox item: 4089 with aggregateId: 6049
2024-09-02 13:27:37.216 ERROR [spring-app-name,,] 80959 --- [ctor-executor-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_27dedd_1725282731155], entityName[service-bus-topic-name] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'e571b398fba34f83bfc494365a9a6210_G4' because it did not have any active links in the past 300000 milliseconds. TrackingId:e571b398fba34f83bfc494365a9a6210_G4, SystemTracker:gateway10, Timestamp:2024-09-02T13:27:39', info=null}], errorContext[NAMESPACE: service-bus-name.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: ervice-bus-topic-name]
Caused by: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_27dedd_1725282731155], entityName[ervice-bus-topic-name] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'e571b398fba34f83bfc494365a9a6210_G4' because it did not have any active links in the past 300000 milliseconds. TrackingId:e571b398fba34f83bfc494365a9a6210_G4, SystemTracker:gateway10, Timestamp:2024-09-02T13:27:39', info=null}], errorContext[NAMESPACE: service-bus-name.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: ervice-bus-topic-name]
    at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxDistinctUntilChanged] :
    reactor.core.publisher.Flux.distinctUntilChanged(Flux.java:4744)
    com.azure.core.amqp.implementation.handler.Handler.getEndpointStates(Handler.java:77)
Error has been observed at the following site(s):
    *__Flux.distinctUntilChanged ⇢ at com.azure.core.amqp.implementation.handler.Handler.getEndpointStates(Handler.java:77)
    |_                  Flux.map ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:141)
    |_            Flux.doOnError ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
    |_         Flux.doOnComplete ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
    |_                Flux.cache ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
    |_                Flux.cache ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
Original Stack Trace:
        at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90)
        at com.azure.core.amqp.implementation.handler.SessionHandler.onSessionRemoteClose(SessionHandler.java:139)
        at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:152)
        at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
        at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
        at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
        at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
        at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

2024-09-02 13:27:37.223 ERROR [spring-app-name,,] 80959 --- [ctor-executor-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_27dedd_1725282731155], entityName[cbs-session] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'e571b398fba34f83bfc494365a9a6210_G4' because it did not have any active links in the past 300000 milliseconds. TrackingId:e571b398fba34f83bfc494365a9a6210_G4, SystemTracker:gateway10, Timestamp:2024-09-02T13:27:39', info=null}], errorContext[NAMESPACE: service-bus-name.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: cbs-session]
Caused by: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_27dedd_1725282731155], entityName[cbs-session] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'e571b398fba34f83bfc494365a9a6210_G4' because it did not have any active links in the past 300000 milliseconds. TrackingId:e571b398fba34f83bfc494365a9a6210_G4, SystemTracker:gateway10, Timestamp:2024-09-02T13:27:39', info=null}], errorContext[NAMESPACE: service-bus-name.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: cbs-session]
    at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxDistinctUntilChanged] :
    reactor.core.publisher.Flux.distinctUntilChanged(Flux.java:4744)
    com.azure.core.amqp.implementation.handler.Handler.getEndpointStates(Handler.java:77)
Error has been observed at the following site(s):
    *__Flux.distinctUntilChanged ⇢ at com.azure.core.amqp.implementation.handler.Handler.getEndpointStates(Handler.java:77)
    |_                  Flux.map ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:141)
    |_            Flux.doOnError ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
    |_         Flux.doOnComplete ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
    |_                Flux.cache ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
    |_                Flux.cache ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
Original Stack Trace:
        at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90)
        at com.azure.core.amqp.implementation.handler.SessionHandler.onSessionRemoteClose(SessionHandler.java:139)
        at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:152)
        at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
        at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
        at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
        at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
        at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

2024-09-02 13:27:39.023 ERROR [spring-app-name,,] 80959 --- [    parallel-10] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Retry attempts exhausted or exception was not retriable.","exception":"Cannot invoke \"java.util.List.add(Object)\" because \"this._sessions\" is null","connectionId":"MF_27dedd_1725282731155","entityPath":"$cbs","tryCount":1}
2024-09-02 13:27:39.024 ERROR [spring-app-name,,] 80959 --- [    parallel-10] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null
Caused by: java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null
    at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:91)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
    reactor.core.publisher.Mono.map(Mono.java:3482)
    com.azure.core.amqp.implementation.ReactorConnection.createSession(ReactorConnection.java:307)
Error has been observed at the following site(s):
    *_____________Mono.map ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createSession(ReactorConnection.java:307)
    |_        Mono.flatMap ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createSession(ReactorConnection.java:340)
    |_           Mono.cast ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createRequestResponseChannel(ReactorConnection.java:441)
    |_            Mono.map ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createRequestResponseChannel(ReactorConnection.java:442)
    |_       Mono.doOnNext ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createRequestResponseChannel(ReactorConnection.java:446)
    |_         Mono.repeat ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createRequestResponseChannel(ReactorConnection.java:454)
    *_________Mono.flatMap ⇢ at com.azure.core.amqp.implementation.ReactorConnection.closeAsync(ReactorConnection.java:504)
    |_      Mono.doFinally ⇢ at com.azure.core.amqp.implementation.ReactorConnection.closeAsync(ReactorConnection.java:536)
    *__Mono.whenDelayError ⇢ at com.azure.core.amqp.implementation.ReactorConnection.closeAsync(ReactorConnection.java:535)
    *____________Mono.then ⇢ at com.azure.core.amqp.implementation.ReactorConnection.closeAsync(ReactorConnection.java:541)
    *____________Mono.then ⇢ at com.azure.core.amqp.implementation.ReactorConnection.closeAsync(ReactorConnection.java:544)
Original Stack Trace:
        at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:91)
        at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:39)
        at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$15(ReactorConnection.java:311)
        at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
        at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$16(ReactorConnection.java:308)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.request(MonoIgnoreThen.java:164)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2331)
        at com.azure.core.amqp.implementation.AmqpChannelProcessor.requestUpstream(AmqpChannelProcessor.java:336)
        at com.azure.core.amqp.implementation.AmqpChannelProcessor.lambda$onError$4(AmqpChannelProcessor.java:230)
        at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:171)
        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270)
        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
        at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

To Reproduce

  1. Create a serviceBusSenderClient instance
  2. send a message to the service bus topic
  3. Wait 15 minutes and the exception logging occurs

Code Snippet

    public void execute() {
        ServiceBusSenderClient serviceBusSenderClient = serviceBusClientBuilder
                .connectionString("connectionString")
                .sender()
                .topicName("topicName")
                .buildClient();

        ServiceBusMessage serviceBusMessage = new ServiceBusMessage("payload");
        serviceBusMessage.getRawAmqpMessage().getApplicationProperties().put(TYPE_ID, "messageType");
        serviceBusMessage.setMessageId("messageId");

        serviceBusSenderClient.sendMessage(serviceBusMessage);
    }

Expected behavior No error logging should occur

Screenshots image When the doFree method is called on the connection the _sessions variable is set to null but afterwards the connection seems to be reused to create a new session which results in the NullPointerException.

Setup (please complete the following information):

github-actions[bot] commented 1 month ago

@anuchandy @conniey @lmolkova

github-actions[bot] commented 1 month ago

Thank you for your feedback. Tagging and routing to the team member best able to assist.

anuchandy commented 1 month ago

Hello @Tom-Van-Asch, thank you for the report. Since your screenshot shows the breakpoint being hit, are you able to reproduce this locally in your developer machine? If so, could you provide more details

anuchandy commented 1 month ago

@Tom-Van-Asch, just to clarify, I mean if the local setup also hits the reported "NPE" (in addition to the onSessionRemoteClose and closed-connection error message)

github-actions[bot] commented 1 month ago

Hi @Tom-Van-Asch. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

anuchandy commented 1 month ago

The same stack trace is discussed, and root caused in this thread https://github.com/Azure/azure-sdk-for-java/issues/41584

Tom-Van-Asch commented 1 month ago

Thanks for the update, when the new version is released I'll test it again and update this issue with my findings.

josebarros2025 commented 1 month ago

We have the same issue without any possible solution to that

anuchandy commented 1 month ago

the release PR has been opened and will be shipped soon: https://github.com/Azure/azure-sdk-for-java/pull/42088

hylander0 commented 1 month ago

the release PR has been opened and will be shipped soon: #42088

Thank you Anu for the update. Looking forward to this update.

anuchandy commented 1 month ago

Hello @Tom-Van-Asch, @pretti-vusion, @amacbean, @josebarros2025, @hylander0, @padmapriyanalam, @pje477,

The library update has been released; please follow the steps outlined below. Let us know if the experience improves. Note that you will still see the session disconnect/reconnect logs (which is expected) but the new library should address the NullPointerException.

Update to 7.17.4 or 7.17.5 dependency

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
    <version>7.17.5</version>
</dependency>

Update the ServiceBusClientBuilder for "com.azure.core.amqp.cache"

When building any client (ServiceBusProcessorClient, ServiceBusReceiverClient, ServiceBusSenderClient etc..) use the configuration ("com.azure.core.amqp.cache"), as shown below. Make sure this configuration is selected for all the places where the application creates a new ServiceBusClientBuilder -

new ServiceBusClientBuilder()
.connectionString(CONNECTION_STRING)
.configuration(new ConfigurationBuilder()
       .putProperty("com.azure.core.amqp.cache", "true")
       .build())
.processor()|sender()|..

Choosing this configuration is important to resolve the problem - java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null

Ensure right transitive dependencies

Make sure the transitive dependencies (azure-core-amqp, azure-core) are resolved to expected versions.

mvn dependency:tree
[INFO] ...
[INFO] +- com.azure:azure-messaging-servicebus:jar:7.17.5:compile
[INFO] |  +- com.azure:azure-core:jar:1.53.0:compile
[INFO] |  |  +- ..
[INFO] |  |  \- ...
[INFO] |  \- com.azure:azure-core-amqp:jar:2.9.10:compile
[INFO] |     +- com.microsoft.azure:qpid-proton-j-extensions:jar:1.2.5:compile
[INFO] |     \- org.apache.qpid:proton-j:jar:0.34.1:compile

Note: In later versions the need for opt-in "com.azure.core.amqp.cache" will be removed

anuchandy commented 1 month ago

Hello @pje477, I'm following up on this comment https://github.com/Azure/azure-sdk-for-java/issues/41584#issuecomment-2378284034 you left in the other GitHub issue.

While onRemoteSessionClose (normal) and NullPointerException (abnormal) had existed in the versions you listed, the "reactor-executor leak" is something new. Could you please try 7.17.4 steps listed in the above comment and check if it resolves the NPE and associated issues?

Few questions about your env where you were observing leak (It's fine to respond later, after trying 7.17.4) -

  1. Can you provide the code showing how your application uses the Service Bus clients - including any configuration values such as concurrency, receive-mode etc. and code doing client close?
  2. What kind of application is it (e.g., Spring, a basic console application)?
  3. How many Service Bus client instances are used in the application?
  4. Is only one topic used in the affected application?
  5. Are you able to reproduce the leak locally by running application in your dev box?
  6. Do you see a correlation between the number of reactor-executor threads and the number of NullPointerException emitted, can you share more of your observation.
  7. What is the Service Bus tier you're using?
josebarros2025 commented 1 month ago

I'm having issues like these

java.lang.NoSuchMethodError: 'void com.azure.core.amqp.implementation.ReactorConnection.<init>(java.lang.String, com.azure.core.amqp.implementation.ConnectionOptions, com.azure.core.amqp.implementation.ReactorProvider, com.azure.core.amqp.implementation.ReactorHandlerProvider, com.azure.core.amqp.implementation.AmqpLinkProvider, com.azure.core.amqp.implementation.TokenManagerProvider, com.azure.core.amqp.implementation.MessageSerializer, org.apache.qpid.proton.amqp.transport.SenderSettleMode, org.apache.qpid.proton.amqp.transport.ReceiverSettleMode, boolean, boolean)' any tips?

return new ServiceBusClientBuilder()
            .connectionString(messagingConfiguration.getConnectionString())
            .configuration(new ConfigurationBuilder()
                .putProperty("com.azure.core.amqp.cache", "true")
                .build())
            .processor()
            .queueName(queueName)
            .maxConcurrentCalls(messagingConfiguration.getMaxConcurrentCall())
            .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
            .processMessage(processMessage())
            .processError(processError())
            .disableAutoComplete()
            .buildProcessorClient();

this is spring boot 3.3.3 application

My dependency tree is different than yours, how should I proceed?

[INFO] +- com.azure.spring:spring-cloud-azure-starter:jar:5.14.0:compile
[INFO] |  \- com.azure.spring:spring-cloud-azure-autoconfigure:jar:5.14.0:compile
[INFO] |     \- com.azure.spring:spring-cloud-azure-service:jar:5.14.0:compile
[INFO] |        \- com.azure.spring:spring-cloud-azure-core:jar:5.14.0:compile
[INFO] |           \- com.azure:azure-core-management:jar:1.15.0:compile
[INFO] +- com.azure:azure-messaging-servicebus:jar:7.17.4:compile
[INFO] |  +- com.azure:azure-core:jar:1.49.1:compile
[INFO] |  |  \- com.azure:azure-json:jar:1.1.0:compile
[INFO] |  +- com.azure:azure-xml:jar:1.0.0:compile
[INFO] |  +- com.azure:azure-core-amqp:jar:2.9.6:compile
[INFO] |  |  \- com.microsoft.azure:qpid-proton-j-extensions:jar:1.2.5:compile
[INFO] |  \- com.azure:azure-core-http-netty:jar:1.15.1:compile
pje477 commented 1 month ago

Hi @anuchandy - Thank you for releasing version 7.17.4 of Service Bus SDK - it appears to have fixed our issue!

Regarding the reactor-executor thread leak, what we observed is that in versions prior to 7.17.4, when the below error was logged for certain service bus namespaces, the reactor-executor thread associated to that connection would not be closed, and the number of reactor-executor threads would increase over time.

This was the error that always preceded the thread leak: reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_ff675e_1727378638933], entityName[sbt-my-topic-name] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container '6b2b3bba82084087bfd7d760339cdade_G0' because it did not have any active links in the past 300000 milliseconds.

We observed this initially as slowly increasing CPU usage of our application, which is a Spring Boot microservice deployed to Azure Spring Apps. We ran a JFR on the Spring Apps instance and observed the high number of zombie reactor-executor threads. Then during troubleshooting we found that only certain service bus namespaces exhibit this behavior. For us, the only namespace that exhibits this behavior is our production instance, and none of our non-prod service bus namespaces exhibit the behavior, despite the SB namespaces being configured identically (via Terraform) and the same exact code being deployed in prod and non-prod.

In any case, here are the answers you requested:

  1. Code that we are using:
String connectionString = "Endpoint=sb://" + hostname + ";SharedAccessKeyName=" + username + ";SharedAccessKey=" + password;

client = new ServiceBusClientBuilder()
        .connectionString(connectionString)
        .sender()
        .queueName(queueTopic)
        .buildClient();

client.createMessageBatch();

Then wait for 15 minutes. After 15 minutes (exactly), we get the error messages in the attached file. This issue only happens on our production service bus, but for that SB namespace, it happens every time the connection goes idle for 15m or more.

  1. It is a Spring Boot microservice, deployed to Azure Spring Apps
  2. The behavior can be observed with just a single client connection as shown above. The actual app opens 20-30 connections to Service Bus. In that case, each connection exhibits the behavior.
  3. We are connecting to many different topics from the application. The behavior has been observed for many different topics (but only in our prod SB namespace as noted above).
  4. Yes, I can reproduce the behavior running the above code locally on my dev box
  5. Yes - whenever the NPE is observed in the log file, a new reactor-executor thread is created, while the old one contiues to run. In our non-prod SB namespaces, the reactor-executor thread disappears after the session times out (desired behavior). We have observed this both by running a JFR in prod and non-prod, and also by periodically dumping all active threads to the console from inside the app itself.
  6. We are running Service Bus Premium

Again, the thread leak behavior appears to be remediated in version 7.17.4 of Service Bus SDK but I'm documenting this here for others.

Also - our use case is low-latency message transfer, so we open a connection to a database and another conenction to Service Bus and we keep these connections open for long periods of time. Then when any message is published from the database, it can be transferred to a Service Bus topic as quickly as possible (without the delay of opening the connection to Service Bus). There are periods of time when the message volume drops and there are no messages transferred in a 15-minute window, which is when we were seeing the behavior.

anuchandy commented 1 month ago

@josebarros2025, thank you for reaching out. I think the issue you're facing is because of spring-cloud-azure-starter:5.16.0 having those core versions (2.9.6, 1.49.1) as the transitive dependency, causing conflicts. The next version of spring-cloud-azure-starter bumping the core version is yet to be released.

Have you tried explicitly specifying the required versions of azure-[core|core-amqp] (in addition to azure-messaging-servicebus:7.14.4) in your Spring app POM above the spring-cloud-azure-starter dependency? Typically, in a Console app the first version found in the dependency chain will be used in case of conflict, I'm unsure if Spring dependency resolution behaves in a different way.

If you’re unable to override the versions in Spring app, then unfortunately, you'll need to wait for the azure spring team to release the spring-cloud-azure-starter that uses azure-messaging-servicebus:7.14.4. Generally, the azure spring team schedules releases after all the azure SDKs and BOM for that month are available. Given the previous release timelines, I would expect the releases by azure spring team to happen before mid of Oct.

anuchandy commented 1 month ago

Hello @pje477, that's wonderful news!. Thank you for confirming that 7.14.4 fixes NPE and leak. Also appreciate responding to my questions. It's unfortunate that, like your non-prod namespace, we (SDK Team) were also unable to repro any of this with our test namespaces. Therefore, your assistance in verifying version 7.14.4 was quite valuable.

padmapriyanalam commented 1 month ago

Hi @anuchandy Thank you for the latest version. I have updated the dependencies to

DependencyTree

and updated code to

 final ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(CONNECTION_STRING)
                .configuration(new ConfigurationBuilder()
                                   .putProperty("com.azure.core.amqp.cache", "true")
                                   .build())
                .sender()
                .queueName(queueName())
                .buildClient();

Deployed the changes to demo environment. Still seeing the exception in demo logs

com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[connectionId], entityName[entityName] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'containerId' because it did not have any active links in the past 300000 milliseconds. TrackingId:TrackingId, SystemTracker:gateway10, Timestamp:2024-10-02T12:26:06', info=null}], errorContext[NAMESPACE: servicebusName. ERROR CONTEXT: N/A, PATH: path]

josebarros2025 commented 1 month ago

Hi @anuchandy Thank you for the latest version. I have updated the dependencies to

DependencyTree

and updated code to

 final ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(CONNECTION_STRING)
                .configuration(new ConfigurationBuilder()
                                   .putProperty("com.azure.core.amqp.cache", "true")
                                   .build())
                .sender()
                .queueName(queueName())
                .buildClient();

Deployed the changes to demo environment. Still seeing the exception in demo logs

com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[connectionId], entityName[entityName] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'containerId' because it did not have any active links in the past 300000 milliseconds. TrackingId:TrackingId, SystemTracker:gateway10, Timestamp:2024-10-02T12:26:06', info=null}], errorContext[NAMESPACE: servicebusName. ERROR CONTEXT: N/A, PATH: path]

Hi we aren't having this issue anymore I had to force some dependencies otherwise won't work

private ServiceBusSenderAsyncClient sender;

    protected AbstractMessageProducer(final String queueName, final MessagingConfiguration messagingConfiguration, final JacksonJsonSerializer jacksonJsonSerializer) {
        log.debug("producer queueName: {}", queueName);
        this.jacksonJsonSerializer = jacksonJsonSerializer;
        this.messagingConfiguration = messagingConfiguration;
        this.queueName = queueName;
        sender = new ServiceBusClientBuilder()
            .connectionString(messagingConfiguration.getConnectionString())
            .configuration(new ConfigurationBuilder()
                .putProperty("com.azure.core.amqp.cache", "true")
                .build())
            .sender()
            .queueName(queueName)
            .buildAsyncClient();
    }

    public void sendToQueue(final ServiceBusMessage message) {
        sendMessage(message);
    }

    private void sendMessage(final ServiceBusMessage message) {
        try {
            sender.sendMessage(message)
                .subscribe(
                    sentSignal -> log.info("Sent Message with ID {} to queue", message.getMessageId()),
                    errorSignal -> log.warn("Error signal: " + errorSignal),
                    () -> {
                        log.info("Send message completed");
                    });
        } catch (Exception e) {
            log.error("error sending message to queue: {}", e.getMessage(), e);
        }
    }
 <azure.servicebus.version>7.17.4</azure.servicebus.version>

<dependency>
            <groupId>com.azure.spring</groupId>
            <artifactId>spring-cloud-azure-starter</artifactId>
            <version>${azure.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.azure</groupId>
                    <artifactId>azure-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.azure</groupId>
                    <artifactId>azure-core-ampq</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core</artifactId>
            <version>1.52.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core-amqp</artifactId>
            <version>2.9.9</version>
            <scope>compile</scope>
        </dependency>
anuchandy commented 1 month ago

@josebarros2025, glad to hear you managed to solve the conflicts. Thanks for sharing the solution; I'm sure others will find it helpful.

anuchandy commented 1 month ago

@padmapriyanalam, thanks for the response. As I noted in my previous comment, we will still observe disconnect events. The service will disconnect if there is no activity, and the client will reconnect during the next send attempt. The log you’re seeing is such a disconnect event.

What 7.14.4 addresses is NullPointerException related to the session disconnect event and resulting thread leaks in certain environments. Hope this clarifies.

anuchandy commented 1 month ago

Closing this issue as 7.17.4 with the fix is released. Refer the steps outlined here to use 7.17.4

padmapriyanalam commented 1 month ago

Thank you @josebarros2025 @anuchandy. We aren't having this issue anymore

robsonkades commented 2 weeks ago

@anuchandy I believe this issue still persists, even after updating the versions, we are still receiving the error

We are using the implementation: spring-cloud-azure-stream-binder-servicebus

    Caused by: java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null
    at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:91)
    at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:39)
    at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$15(ReactorConnection.java:342)
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
    at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$16(ReactorConnection.java:339)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
            <dependency>
                <groupId>com.azure.spring</groupId>
                <artifactId>spring-cloud-azure-dependencies</artifactId>
                <version>5.17.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-servicebus</artifactId>
      <version>7.17.4</version> <!-- {x-version-update;com.azure:azure-messaging-servicebus;dependency} -->
    </dependency>
anuchandy commented 2 weeks ago

Hello @robsonkades, I think the issue is that the configuration "com.azure.core.amqp.cache" is not enabled. Since the application indirectly uses version 7.17.4 through the Spring library and cannot set it directly in the builder, you can set the system property "com.azure.core.amqp.cache" to true.