Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.25k stars 1.93k forks source link

Spring cloud stream service bus binder health recovery #35266

Open dhananjay12 opened 1 year ago

dhananjay12 commented 1 year ago

Background

We are using service bus binder which pushes message to Azure service bus topics. The app is running in a K8s environment.

spring-cloud-azure-stream-binder-servicebus - 4.4.1 Spring boot -2.7.7

There are unknown situation where the connectivity for the app and azure-service-bus is lost, resulting in readiness probe to be down. image

In the logs I can see that the expected error

2023-06-02 03:20:39.976 ERROR 63250 --- [ctor-executor-9] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.messaging.servicebus.ServiceBusException: Retries exhausted: 3/3
Caused by: com.azure.messaging.servicebus.ServiceBusException: Retries exhausted: 3/3
    at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.mapError(ServiceBusSenderAsyncClient.java:848) ~[azure-messaging-servicebus-7.11.0.jar:7.11.0]
    at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3811) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onError(FluxDoOnEach.java:195) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:415) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:219) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onError(FluxHide.java:142) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:219) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:865) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:965) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxReplay$ReplaySubscriber.onError(FluxReplay.java:1360) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:231) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:272) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:86) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.InternalEmptySink.emitEmpty(InternalEmptySink.java:26) ~[reactor-core-3.4.26.jar:3.4.26]
    at com.azure.core.amqp.implementation.ReactorConnection.lambda$closeConnectionWork$34(ReactorConnection.java:527) ~[azure-core-amqp-2.7.1.jar:2.7.1]
    at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:228) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:272) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:86) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.publisher.InternalEmptySink.emitEmpty(InternalEmptySink.java:26) ~[reactor-core-3.4.26.jar:3.4.26]
    at com.azure.core.amqp.implementation.ReactorExecutor.close(ReactorExecutor.java:188) ~[azure-core-amqp-2.7.1.jar:2.7.1]
    at com.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$1(ReactorExecutor.java:173) ~[azure-core-amqp-2.7.1.jar:2.7.1]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.26.jar:3.4.26]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.26.jar:3.4.26]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
    at reactor.core.Exceptions.retryExhausted(Exceptions.java:294) ~[reactor-core-3.4.26.jar:3.4.26]

Spring actuator configurations for readiness/liveness:

management.endpoint.health.group.readiness.include=*
management.endpoint.health.group.readiness.show-details=always
management.endpoint.health.group.liveness.include=ping
management.endpoint.health.group.liveness.show-details=never

readiness.show-details=always is only to debug the application.

Because the readiness probe is DOWN, K8s stops sending traffic to the POD.

Issue

After the connection is restored, the readiness probe remains down and does not recover. In this state, even though a successful request is technically possible, it never occurs because Kubernetes has halted traffic to the affected container.

When reproducing this issue locally, if I send a request to the service, it establishes a connection and sends the message to the service bus, causing the Health Endpoint status to return to UP. However, the situation differs in Kubernetes, where the traffic is blocked to the container, leaving it in a persistent state of unavailability.

I did find this class in the azure-binder class

image The health instrumentation for the topic is always down until there is a successful delivery which is not possible in K8s.

Is there any other configuration you would suggest to tackle this scenario?

Setup

The only way I could reproduce this situation locally is by running a simple app when the connection with the service bus is stable. I make a few successful requests and then intentionally disable the internet connection on my laptop. When attempting to make another request, it throws a retry exception, causing the readiness probe to fail (and the application to become unavailable if it was in K8s env). Even after restoring the internet connection, the app's readiness probe never recovers.

Simple app - https://github.com/dhananjay12/learn-azure/tree/main/scs-service-bus NOTE - For connection we are using service principal way to connect to the service-bus topic.

dhananjay12 commented 1 year ago

The problem is with any kind of exception we get in the producer, after which it makes the readiness probe DOWN. Another way to produce the issue is

@GetMapping("/test")
    public void test() {
        JSONObject jsonObject = new JSONObject();

        for (int i = 0; i <= 3000; i++) {
            String key = "description" + i;
            String value = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quisque cursus, nisl sed vestibulum tempus, " +
                    "lectus leo venenatis massa, ut ullamcorper libero enim sit amet mi. Fusce aliquam aliquet augue, " +
                    "vitae pellentesque ex dignissim quis. Sed et neque ligula. Vivamus elementum varius lobortis. Sed " +
                    "efficitur, lorem vel efficitur faucibus, libero ex iaculis nisi, id consectetur massa orci ac tortor. " +
                    "Nullam malesuada lectus enim, vitae bibendum ex pulvinar nec. Nulla facilisi. Suspendisse potenti.";
            jsonObject.put(key, value);
        }

        streamBridge.send("hello-out-0", jsonObject);
    }

This is a huge object and obviously will throw an exception.

image

The readiness probe is down and will only be up after a successful request. We are in a similar situation like before where in K8s, the traffic is stopped to the POD when readiness probes returns DOWN.

In the Code, I see the instrumentation is added to every producer Seeing the Tests, while its marked as DOWN, I was wondering if a successful subsequent request is the only way to bring it UP?

Checking with other binders like Kafka, Rabbit, they typically check only connection, and recovers automatically, once the connection is back up.

I guess our expectation would be:

Netyyyy commented 1 year ago

Hi @dhananjay12 , Thank you for reporting this issue. It seems like a new feature for K8s. We will take it into consideration. We appreciate your input and will review this matter as soon as possible. Please feel free to provide any additional information or context that you think may be helpful. We'll keep you updated on the progress of our review. Thank you for your contribution to improving our project.

dhananjay12 commented 1 year ago

Hey @Netyyyy While the issue (readiness endpoint not recovering automatically) can be seen without the need to deploy on K8s, you can build the image locally following the readme in the repo. You can apply the K8s folder on your local cluster. image Check the k8 endpoints- kubectl get endpoints | grep scs-service-bus

image

Hit - http://localhost:8080/test which is prgrammed to send a large object. In the logs you will see the exception which is fine, the important thing here that after 15/20 sec the readiness probe will be return DOWN which will will look something like this

image

If you grep endpoints again - kubectl get endpoints | grep scs-service-bus

image

Locally it will not affect much, but in an cluster the pod is not available for traffic.

awsule-cvs commented 5 months ago

@anuchandy and @saragluna we are also experiencing a similar issue where we are noticing missing data when this issue is raised in our logs. We only see a subset of the data in Service bus and the first few are missing. Somehow it seems the spring cloud stream binder is sending data even before fully confirming the service bus connection. Here's a screenshot of our error below:

Is there a way we could enable advanced/verbose tracing on the library and get some help from you to investigate further?

Screen Shot 2024-01-29 at 9 49 26 AM
saragluna commented 5 months ago

@awsule-cvs but seems like this is a different issue?