Azure / azure-service-bus-java

☁️ Java client library for Azure Service Bus
https://azure.microsoft.com/services/service-bus
MIT License
60 stars 58 forks source link

Internal receive link of requestresponselink to '$cbs' closed with error. #243

Open gjong opened 6 years ago

gjong commented 6 years ago

As part of a high volume application we are running 4 workers consuming the IOT Hub and passing messages along to the service bus. These workers are deployed in an AKS cluster, where each worker consumes one partition in the IOT Hub.

To preserve connecting overhead we cache the IMessageSender in a local concurrent map per queue and re-use them for every message. However after some time some of the workers are throwing exceptions about a $sbc link being closed after being inactive for 60000ms. Which is strange since there is a constant flow of messages to the service bus.

What we end up with is similar to this:

IMessageSender cachedSender = ClientFactory.createMessageSenderFromConnectionStringBuilder(connectionStringBuilder);

// on every message we receive from the IOTHub we split up the message and send multiple messages to the service bus like this

cachedSender.sendAsync(convertMessage(message, partitionKey))
                .exceptionally(th -> {
                    this.errorCount++;
                    throw new AmqpException("Could not deliver message to destination, exception received from Azure", th);
                })
                .thenRun(() -> errorCount = 0)

Actual Behavior

  1. After some time sending messages using sendAsync exceptions in the transport layer occur

Expected Behavior

  1. All messages are sent to the service bus through the sendAsync

Versions

Stacktrace produced by the java service bus SDK:

18-07-10 13:16:22.577   WARN c.m.a.s.primitives.RequestResponseLink   : Completing all pending requests with exception in request response link to $cbs
2018-07-10 13:16:22.579   WARN c.m.a.s.primitives.RequestResponseLink   : Internal receive link of requestresponselink to '$cbs' closed with error.
com.microsoft.azure.servicebus.primitives.ServiceBusException: Error{condition=amqp:connection:forced, description='The connection was inactive for more than the allowed 60000 milliseconds and is closed by container 'LinkTracker'. TrackingId:ad8af2f1f61b400c9ed9b0e1c1f931fe_G19, SystemTracker:gateway7, Timestamp:7/10/2018 1:16:22 PM', info=null}
        at com.microsoft.azure.servicebus.primitives.ExceptionUtil.toException(ExceptionUtil.java:113)
        at com.microsoft.azure.servicebus.primitives.RequestResponseLink$InternalReceiver.onClose(RequestResponseLink.java:661)
        at com.microsoft.azure.servicebus.amqp.BaseLinkHandler.processOnClose(BaseLinkHandler.java:68)
        at com.microsoft.azure.servicebus.amqp.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:42)
        at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
        at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
        at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
        at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
        at com.microsoft.azure.servicebus.primitives.MessagingFactory$RunReactor.run(MessagingFactory.java:559)
        at java.lang.Thread.run(Thread.java:748)
yvgopal commented 6 years ago

what you posted here is the trace, not the error. Yes, the service closes idle connections are 10 minutes. The client traces it and reopens the connection. It is seamless, doesn't throw any exceptions to the application. That can't be your problem. If your sends are failing means there may be another problem, but not this one.

When you call sendAsync(), it returns a completable future. what is the exception the future is throwing to your application? If there are no exceptions thrown to the application, that means everything is alright.

gjong commented 6 years ago

This makes me wonder, is there a connection pool underneath in the service bus SDK. I ask since the traffic on the IMessagSender is constant (40 messages per second or so). So an idle connection would be unlikely. I finally got a working code version that continues sending even after the connectivity issues with the Service Bus. I had to change the CompletableFuture handling to:

        completableFuture.exceptionally(th -> {
            logger.error("Failed to send message to the service bus on partition {}", partitionKey, th);
            this.errorCount++;
            return null;
        });
        completableFuture.thenRun(() -> {
            logger.trace("Completed sending of message to partition {}", partitionKey);
            errorCount = 0;
        });

Apparently when you throw an exception in the exceptionally it can cause some havoc triggering locked threads. So this solves my problem for now, still seeing a lot of failed send attempts to the Service Bus though. For now we will move away from the Service Bus and back to JMS.

yvgopal commented 6 years ago

There is no connection pool underneath. If you are creating a sender with the code you posted here, there is one connection per sender. If you are sending something like 40 messages per second, then the connection should not be idle and shouldn't be closed. How many senders do you create? Do you use all of them? what do you mean by send failures? What kind of failures are you seeing? Every send failure will throw an exception to you. What are those exceptions, I mean the 'th' in your code?

eminkucuk commented 5 years ago

@yvgopal

Currently we are using version 1.2.15 of sdk and facing same issue. We have seen following log in our application. and please have a look log levels. WARN and ERROR.

2019-08-27 18:21:26.419 [WARN ] [ ] [d588350] [ c.m.a.servicebus.primitives.RequestResponseLink]: Internal receive link 'RequestResponseLink-Receiver_b80af3_90bf5ce8db06455793738efe1faa1855_G46' of requestresponselink to '$cbs' enco untered error. com.microsoft.azure.servicebus.primitives.ServiceBusException: com.microsoft.azure.servicebus.amqp.AmqpException: The connection was inactive for more than the allowed 60000 milliseconds and is closed by container 'LinkTracker'. TrackingId:90bf5ce8db06455793738efe1 faa1855_G46, SystemTracker:gateway7, Timestamp:2019-08-27T15:21:26 at com.microsoft.azure.servicebus.primitives.ExceptionUtil.toException(ExceptionUtil.java:90) at com.microsoft.azure.servicebus.primitives.RequestResponseLink$InternalReceiver.onClose(RequestResponseLink.java:698) at com.microsoft.azure.servicebus.amqp.BaseLinkHandler.processOnClose(BaseLinkHandler.java:79) at com.microsoft.azure.servicebus.amqp.BaseLinkHandler.onLinkRemoteClose(BaseLinkHandler.java:43) at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) at com.microsoft.azure.servicebus.primitives.MessagingFactory$RunReactor.run(MessagingFactory.java:493) at java.lang.Thread.run(Thread.java:748) Caused by: com.microsoft.azure.servicebus.amqp.AmqpException: The connection was inactive for more than the allowed 60000 milliseconds and is closed by container 'LinkTracker'. TrackingId:90bf5ce8db06455793738efe1faa1855_G46, SystemTracker:gateway7, Timestamp:2019- 08-27T15:21:26 ... 10 common frames omitted 2019-08-27 18:21:26.420 [ERROR] [ ] [d588350] [ c.m.azure.servicebus.primitives.MessagingFactory]: Connection error. 'Error{condition=amqp:connection:forced, description='The connection was inactive for more than the allowed 60000 mi lliseconds and is closed by container 'LinkTracker'. TrackingId:90bf5ce8db06455793738efe1faa1855_G46, SystemTracker:gateway7, Timestamp:2019-08-27T15:21:26', info=null}'