Closed r1ckr closed 2 years ago
Hi @r1ckr, thank you for reaching out to us on GitHub. Someone from the Azure Service Bus SDK for Java team (@Azure/azsdk-sb-java) will follow up with you shortly.
@ki1729 could you please help take a look?
We got the same problem. Any news?
We've tried to follow the provided steps and see if this can be reproduced to collect logs.
Given the issue description state the downtime seems to be related to "Server Errors," - the goal of our deployment was to generate as many server errors as possible, given this is not in the control of the client, but server decides when to return such error, we used a "basic tier" and created 36 queues and started send-receive. The thought was that running such loads on the "basic" tier may cause service throttling.
As expected, over the 7-day run, the service returned ~200 server errors, but none of them left the receivers in a completely unrecoverable state.
This is the main reason we request to provide DEBUG logs from the client side so that we can look at the wire traffic and attempt to find the root cause. Can you please share the SDK DEBUG logs. The SDK DEBUG logs covering the error duration (+/- 20 minutes after experiencing the issue) may provide some insights
Sadly we had no DEBUG logs activated. But i will deliver the error logs. The first line is the last successfull message receive. After th last line no message receive is possible till the pod get restarted by killing it manually.
2022-05-11 18:02:39.668 INFO 7 --- [or-http-epoll-2] c.d.s.s.servicebus.AsyncReceiver : Completing message {"foo": "bar"} 2022-05-11 18:02:39.790 DEBUG 7 --- [ctor-executor-5] c.d.s.s.servicebus.AsyncPublisher : Message sent with payload {"foo":"bar"} 2022-05-11 18:10:30.025 WARN 7 --- [ndedElastic-422] c.a.c.a.i.ReactorDispatcher : {"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","connectionId":"MF_47f5d6_1652286282815"} 2022-05-11 18:10:30.025 WARN 7 --- [ parallel-1] c.a.c.a.i.ReactorDispatcher : {"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","connectionId":"MF_47f5d6_1652286282815"} 2022-05-11 18:10:30.025 WARN 7 --- [ parallel-1] c.a.c.a.i.RequestResponseChannel : {"az.sdk.message":"Unable to open send and receive link.","exception":"Unable to open send and receive link.","connectionId":"MF_47f5d6_1652286282815","linkName":"cbs"} 2022-05-11 18:10:30.025 WARN 7 --- [ parallel-1] c.a.c.a.i.AmqpChannelProcessor : {"az.sdk.message":"Retry attempts exhausted or exception was not retriable.","exception":"Unable to open send and receive link.","connectionId":"MF_47f5d6_1652286282815","entityPath":"$cbs","retry":1} 2022-05-11 18:18:48.096 WARN 7 --- [ctor-executor-6] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Transient error occurred.","exception":"The service was unable to process the request; please retry the operation. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101 TrackingId:1e678124dc4a4ae8b111dc77f9b218ed_G22, SystemTracker:gateway7, Timestamp:2022-05-11T18:18:47, errorContext[NAMESPACE: my-namespace.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: my-microservice/subscriptions/my-subscription, REFERENCE_ID: my-microservice/subscriptions/my-subscription_d1f5ef_1651226348013, LINK_CREDIT: 1]","linkName":"n/a","entityPath":"n/a","attempt":1,"retryAfter":4511} 2022-05-11 18:18:52.608 WARN 7 --- [ndedElastic-418] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Transient error occurred.","exception":"The service was unable to process the request; please retry the operation. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101 TrackingId:1e678124dc4a4ae8b111dc77f9b218ed_G22, SystemTracker:gateway7, Timestamp:2022-05-11T18:18:47, errorContext[NAMESPACE: my-namespace.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: my-microservice/subscriptions/my-subscription, REFERENCE_ID: my-microservice/subscriptions/my-subscription_d1f5ef_1651226348013, LINK_CREDIT: 1]","linkName":"n/a","entityPath":"n/a","attempt":2,"retryAfter":14575} 2022-05-11 18:19:12.901 WARN 7 --- [ctor-executor-6] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Transient error occurred.","exception":"The service was unable to process the request; please retry the operation. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101 TrackingId:1e678124dc4a4ae8b111dc77f9b218ed_G22, SystemTracker:gateway7, Timestamp:2022-05-11T18:19:12, errorContext[NAMESPACE: my-namespace.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: my-microservice/subscriptions/my-subscription, REFERENCE_ID: my-microservice/subscriptions/my-subscription_d1f5ef_1651226348013, LINK_CREDIT: 1]","linkName":"n/a","entityPath":"n/a","attempt":3,"retryAfter":51928} 2022-05-11 18:20:04.830 WARN 7 --- [ndedElastic-418] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Non-retryable error occurred in AMQP receive link.","exception":"The service was unable to process the request; please retry the operation. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101 TrackingId:1e678124dc4a4ae8b111dc77f9b218ed_G22, SystemTracker:gateway7, Timestamp:2022-05-11T18:19:12, errorContext[NAMESPACE: my-namespace.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: my-microservice/subscriptions/my-subscription, REFERENCE_ID: my-microservice/subscriptions/my-subscription_d1f5ef_1651226348013, LINK_CREDIT: 1]","linkName":"n/a","entityPath":"n/a"} 2022-05-11 18:20:04.830 ERROR 7 --- [ndedElastic-418] .s.FluxAutoLockRenew$LockRenewSubscriber : Errors occurred upstream. The service was unable to process the request; please retry the operation. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101 TrackingId:1e678124dc4a4ae8b111dc77f9b218ed_G22, SystemTracker:gateway7, Timestamp:2022-05-11T18:19:12, errorContext[NAMESPACE: my-namespace.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: my-microservice/subscriptions/my-subscription, REFERENCE_ID: my-microservice/subscriptions/my-subscription_d1f5ef_1651226348013, LINK_CREDIT: 1] 2022-05-11 18:20:04.844 ERROR 7 --- [ndedElastic-418] c.d.s.s.servicebus.AsyncReceiver : Could not process message with reason The service was unable to process the request; please retry the operation. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101 TrackingId:1e678124dc4a4ae8b111dc77f9b218ed_G22, SystemTracker:gateway7, Timestamp:2022-05-11T18:19:12, errorContext[NAMESPACE: my-namespace.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: my-microservice/subscriptions/my-subscription, REFERENCE_ID: my-microservice/subscriptions/my-subscription_d1f5ef_1651226348013, LINK_CREDIT: 1]. It may be a problem with the network connection. 2022-05-11 18:35:05.240 WARN 7 --- [ctor-executor-6] c.a.c.a.i.RequestResponseChannel : {"az.sdk.message":"Error in SendLinkHandler. Disposing unconfirmed sends.","exception":"The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:1e678124dc4a4ae8b111dc77f9b218ed_G22, SystemTracker:gateway7, Timestamp:2022-05-11T18:35:05, errorContext[NAMESPACE: my-namespace.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]","connectionId":"MF_730311_1652291573928","linkName":"cbs"} 2022-05-11 18:36:09.824 WARN 7 --- [ndedElastic-418] c.a.c.a.i.ReactorDispatcher : {"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","connectionId":"MF_730311_1652291573928"} 2022-05-11 18:36:09.824 WARN 7 --- [ parallel-1] c.a.c.a.i.ReactorDispatcher : {"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","connectionId":"MF_730311_1652291573928"} 2022-05-11 18:36:09.824 WARN 7 --- [ parallel-1] c.a.c.a.i.RequestResponseChannel : {"az.sdk.message":"Unable to open send and receive link.","exception":"Unable to open send and receive link.","connectionId":"MF_730311_1652291573928","linkName":"cbs"} 2022-05-11 18:36:09.825 WARN 7 --- [ parallel-1] c.a.c.a.i.AmqpChannelProcessor : {"az.sdk.message":"Retry attempts exhausted or exception was not retriable.","exception":"Unable to open send and receive link.","connectionId":"MF_730311_1652291573928","entityPath":"$cbs","retry":2}
@quentinpopp we suggest enabling DEBUG logs, diagnosing receiver “hang” requires looking into logs for a long duration as I mentioned earlier. "hang" here means the receiver entering a state where it neither produces any message nor emits a terminal error.
Now looking at the trace, it seems the service bus receiver didn’t “hang” in your case but it signaled the terminal error to the AsyncReciver
instance (a type local to your project). I see the AsyncReciever
is logging that signal from its error handler? Is this understanding correct?
@quentinpopp we suggest enabling DEBUG logs, diagnosing receiver “hang” requires looking into logs for a long duration as I mentioned earlier. "hang" here means the receiver entering a state where it neither produces any message nor emits a terminal error.
Now looking at the trace, it seems the service bus receiver didn’t “hang” in your case but it signaled the terminal error to the
AsyncReciver
instance (a type local to your project). I see theAsyncReciever
is logging that signal from its error handler? Is this understanding correct?
I enabled the DEBUG logs yesterday and will post an update on the logs as soon as the problem occurs.
Your understanding is correct, here is the code from the receiver:
fun subscribeMessages() {
receiver
.receiveMessages()
.flatMap { message: ServiceBusReceivedMessage -> handleReceivedMessage(message) }
.onErrorResume {
log.error(
"Could not process message with reason {}. It may be a problem with the network connection.",
it.message)
Mono.empty()
}
.subscribe({ log.debug("Message processed successfully") }) { error: Throwable ->
log.error("Could not process message with reason " + error.message)
}
}
Did i implemented the errorhandling wrong?
Thanks for sharing the usage code.
The Flux
(representing message stream) returned from the "ServiceBusAsyncReceiver::receiveMessages()" is a Reactive publisher. Once a Reactive publisher bubble up an error, the message stream is considered terminated, i.e., "error is a terminal event by reactive contract" and the receiver won't emit any more events (message event or error event).
An error is emitted to the application if the error is not transient or the retry is exhausted. It's good to log such an error as you might want to inspect it later to understand if anything is weird. Now that the current receiver instance is in a disposed state after such an error, you want to close that instance, back off, and create a new receiver instance to continue to receive the messages.
Thanks for you help!
That's the reason why i log the error and then return an Mono.empty(). I thought that is enough... Is there a better way to handle an error? Do i have to keep care of the instance manually? I thought Spring would handle this itself. I work for the first time with the Reactor Framework.
Hi Quentin, you're right that when using "ServiceBusAsyncReceiver" you'll have to handle the case where the receiver instance bubble up a non-retriable error or exhaust retry. Like I mentioned, "error is a terminal event by reactive contract".
Depending on how much your application logic is integrated with the Reactor framework, the code pattern for recovery you want to use may differ; if we start with a very simplified apporach, it will be -
String connectionString = System.getenv(SERVICEBUS_CONNECTION_STRING);
String queueName = System.getenv(SERVICEBUS_QUEUE_NAME);
while (true) {
// Using the "current thread" to create the client.
try (ServiceBusReceiverAsyncClient client = new ServiceBusClientBuilder()
.connectionString(connectionString)
.receiver()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.buildAsyncClient()) {
final CountDownLatch errorLatch = new CountDownLatch(1);
client.receiveMessages()
// messages and error are received on "boundedElastic thread".
.flatMap(message -> {
return handleReceivedMessage(message);
})
.subscribe(
__ -> {},
error -> {
LOGGER.info("Retry Exhausted or Non-Retriable error happened: {}", error);
errorLatch.countDown();
});
// block the "current thread" until there is an error.
errorLatch.await();
// latch unblocked, let's auto-close the current receiver, backoff and repeat.
}
// backoff
TimeUnit.SECONDS.sleep(1);
}
Let me know how it goes.
If we want to go a more reactive way of recovery, then there is retryWhen operator
We will implement this too @anuchandy and will let you know how it goes.
Hi Quentin, you're right that when using "ServiceBusAsyncReceiver" you'll have to handle the case where the receiver instance bubble up a non-retriable error or exhaust retry. Like I mentioned, "error is a terminal event by reactive contract".
Depending on how much your application logic is integrated with the Reactor framework, the code pattern for recovery you want to use may differ; if we start with a very simplified apporach, it will be -
String connectionString = System.getenv(SERVICEBUS_CONNECTION_STRING); String queueName = System.getenv(SERVICEBUS_QUEUE_NAME); while (true) { // Using the "current thread" to create the client. try (ServiceBusReceiverAsyncClient client = new ServiceBusClientBuilder() .connectionString(connectionString) .receiver() .queueName(queueName) .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .buildAsyncClient()) { final CountDownLatch errorLatch = new CountDownLatch(1); client.receiveMessages() // messages and error are received on "boundedElastic thread". .flatMap(message -> { return handleReceivedMessage(message); }) .subscribe( __ -> {}, error -> { LOGGER.info("Retry Exhausted or Non-Retriable error happened: {}", error); errorLatch.countDown(); }); // block the "current thread" until there is an error. errorLatch.await(); // latch unblocked, let's auto-close the current receiver, backoff and repeat. } // backoff TimeUnit.SECONDS.sleep(1); }
Let me know how it goes.
If we want to go a more reactive way of recovery, then there is retryWhen operator
Thanks! It seems like it worked. We did a simple try-catch block and in the catch case we abadon/deadletter the message depending on the use case. If the error occurs again i will come back to you.
Hi @quentinpopp We are trying to build an infinite receiver and unable to do so. Can you please share your code how are you completing or abandoning the message.
Hi @quentinpopp We are trying to build an infinite receiver and unable to do so. Can you please share your code how are you completing or abandoning the message.
You just need to call the serviceBusReceiverAsyncClient.complete(serviceBusReceivedMessage), the same with abandoning. Some example in Kotlin:
fun receiveMessages() {
receiver.receiveMessages().flatMap(::tryHandleReceivedMessage).subscribe {
log.debug("Message received and handled")
}
}
fun tryHandleReceivedMessage(message: ServiceBusReceivedMessage): Mono<Void> {
return try {
handleReceivedMessage(message)
receiver.complete(message)
} catch (ex: IOException) {
log.error("Failed to convert service bus message: {}", message, ex)
receiver.deadLetter(message)
} catch (ex: Exception) {
log.error("Failed to process service bus message: {}", message, ex)
if (message.deliveryCount >= 3) receiver.deadLetter(message)
else receiver.abandon(message)
}
}
So the error was away for a few weeks and cases, till this night. This night a server error occured and the receiver again stopped consuming events. It's the same known problem as [r1ckr] described.
Hi @quentinpopp, looking at the code sample you shared (pasted below), I have two feedback -
receiveMessages
here, but I don't see the below code snippet is incorporating such an error listener and retry fun receiveMessages() {
receiver.receiveMessages().flatMap(::tryHandleReceivedMessage).subscribe {
log.debug("Message received and handled")
}
}
handleReceivedMessage
method) is taken care of, but it looks like exceptions from complete|abandon|deadLetter are not handled, if such error bypasses, then it will stop the receiver. fun tryHandleReceivedMessage(message: ServiceBusReceivedMessage): Mono<Void> {
return try {
handleReceivedMessage(message)
receiver.complete(message)
} catch (ex: IOException) {
log.error("Failed to convert service bus message: {}", message, ex)
receiver.deadLetter(message)
} catch (ex: Exception) {
log.error("Failed to process service bus message: {}", message, ex)
if (message.deliveryCount >= 3) receiver.deadLetter(message)
else receiver.abandon(message)
}
}
I have created an example code showing how to handle the above two cases, you can find it here https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientRetrySample.java
Hi, we're sending this friendly reminder because we haven't heard back from you in a while. We need more information about this issue to help address it. Please be sure to give us your input within the next 7 days. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!
Thanks for the feedback @anuchandy! We are implementing this soon and will confirm if this fixes the issue, maybe 14 days won't be enough to fully validate
Hi, we're sending this friendly reminder because we haven't heard back from you in a while. We need more information about this issue to help address it. Please be sure to give us your input within the next 7 days. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!
Describe the bug When there is Server Errors in Service Bus, which happen once or twice per week. The Service Bus queue receiver stops receiving the messages, while the queue sender can keep sending them.
The only way we can fix this is to restart the app.
Exception or Stack Trace This is one of the errors after the Server Errors:
To Reproduce Create a receiver no matter if using the synchronous or the processor model and leave it running for a week receiving messages. Wait until there is Server Errors, shortly after that the Receiver stops receiving messages while the Sender can keep sending them.
The problem here is that not all the Server Errors cause this problem, but since they all arrive in bulk, one of them trigger the problem, here is a screenshot of the Server error that caused the problem:
On the top is this log occurrence, in the middle is the number of server errors and at the bottom is the messages in and out. You can see that is during the 4th server error that the app logs the error and stops receiving the messages. The yellow line is the received messages and the green line is the sent messages.
Code Snippet
Expected behavior The Receiver should reconnect properly and keep receiving the messages.
Setup (please complete the following information):
Additional context We have multiple clusters with the same setup pointing to multiple Service Bus namespaces configured exactly the same way. All of them have the same issue, as soon as there are server errors in service bus, the receiver stops receiving messages.
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report