Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.31k stars 1.96k forks source link

[BUG] NullPointerException in Apache Qpid Proton-J during Azure Service Bus AMQP Reconnection #41865

Open pretti-vusion opened 3 days ago

pretti-vusion commented 3 days ago

Describe the bug We are encountering a NullPointerException in the org.apache.qpid.proton library, triggered during session creation while using the azure-messaging-servicebus dependency to connect to Azure Service Bus. This exception happens intermittently, particularly after the connection has been idle for around 30 seconds, suggesting it may be related to reconnect behavior. Despite the exception, normal operations seem unaffected.

Exception or Stack Trace

java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null
at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:91)
at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:39)
at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$15(ReactorConnection.java:311)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$16(ReactorConnection.java:308)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.request(MonoIgnoreThen.java:164)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2331)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.requestUpstream(AmqpChannelProcessor.java:336)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.lambda$onError$4(AmqpChannelProcessor.java:230)
at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:171)
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270)
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.lang.Thread.run(Thread.java:1583)

To Reproduce Steps to reproduce the behavior:

Set up an application using Azure Service Bus via AMQP with the following dependency: com.azure:azure-messaging-servicebus:7.17.0 After the connection to the Azure Service Bus is idle for around 30 seconds, the system reconnects. A NullPointerExceptionis intermittently thrown during session re-creation.

Code Snippet // Sample code illustrating initialization of Servicebus client

final AmqpRetryOptions retryOptions = new AmqpRetryOptions();
    retryOptions.setDelay(Duration.ofSeconds(1));
    retryOptions.setTryTimeout(Duration.ofSeconds(120));
    retryOptions.setMaxRetries(30);
    retryOptions.setMode(AmqpRetryMode.EXPONENTIAL);
ServiceBusReceiverAsyncClient client = new ServiceBusClientBuilder()
    .connectionString(connectionString)
    .receiver()
    .topicName(queueName)
    .buildAsyncClient();

Expected behavior The reconnection process should not result in a NullPointerException. The internal sessions should be properly initialized during reconnections, without throwing errors.

Screenshots Not applicable

Setup (please complete the following information):

OS: Linux IDE: IntelliJ IDEA Library: com.azure:azure-messaging-servicebus:7.17.0 Java version: 21 App Server/Environment: Azure Service Bus (AMQP) Frameworks: Spring Boot 3.2.5

Additional context The issue appears to be related to reconnections to the Azure Service Bus after a 30-second idle period. The NullPointerException originates from the org.apache.qpid.proton.engine.impl.ConnectionImpl class, which is used internally by the Azure SDK, specifically during session creation. This may indicate an issue in how the Azure SDK interacts with the Apache Qpid Proton-J library.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

github-actions[bot] commented 3 days ago

@anuchandy @conniey @lmolkova

github-actions[bot] commented 3 days ago

Thank you for your feedback. Tagging and routing to the team member best able to assist.

anuchandy commented 2 days ago

Hello @pretti-vusion, thank you for reporting. We looked into a similar stack trace here. The code changes with fix are checked-in. The Service bus and Event Hubs with these changes should be released in 1-2 weeks.

pretti-vusion commented 2 days ago

Hi @anuchandy , thanks for the info, i'm looking forward to it.

amacbean commented 1 day ago

@anuchandy Would it be possible to tag this ticket when the release is made. We are seeing the exact same thing.

anuchandy commented 1 day ago

@amacbean, sure, I will update here with link to the released version and any instructions.

pretti-vusion commented 1 day ago

@anuchandy, has there been a ticket for the Apache Qpid project filed as well? I think, even though your upcoming changes will solve the issue for us, it might be a good idea for them to add a null-check at this point to prevent any unnecessary NPE.

anuchandy commented 20 hours ago

Hi @pretti-vusion, thanks for the message. We don't have to open a ticket for Qpid, let me explain. The Qpid library assume that it's APIs are always accessed by "one and the same" thread (Reactor-Thread) throughput the lifetime of the Connection. This is the reason we don't see any locks or concurrent safety guards in Qpid. So reg: your comment on check in Qpid – Yes, it would be nice if Qpid provided a check and clearer error message. However, Qpid assume it's APIs will be called by a single thread and serially. So, if that thread disposes of the connection, then code that serially follows the disposal should be aware it can't make further API calls.

Part of SB/EH libraries is a multi-threading layer on the top of Qpid, so users can scale the message/event processing and perform blocking or external service calls. The reason for the NPE was, as part of recovering from a network disconnect, the type AmqpChannelProcessor (in SB/EH libraries) were attempting to access a Qpid API from "retry thread" that runs concurrently with the Reactor-Thread.

We believe the new cache design replacing AmqpChannelProcessor (the pr linked in the earlier comment) will address this issue. It’s unfortunate that we (library owners) are unable to reproduce this NPE while few customers, including you were running into this randomly. Although theoretically, the new cache design addresses this code path, we still need affected customers to confirm its effectiveness. We're also running our stress tests on to-be-released bits to confirm things are good.

Hope this helps.