Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.35k stars 1.99k forks source link

[BUG] Topic subscription randomly stops consuming messages after likely initial connection loss #40608

Closed TheDevOps closed 1 week ago

TheDevOps commented 5 months ago

Describe the bug We have observed the following behavior infrequently in our applications:

More details in the later sections

Exception or Stack Trace I'll add a few logs here showing the individual stages described above (All timestamps are in Europe/Vienna timezone)

1) As said usually at first we notice something likely related to some connection loss or so terminating the subscription clients. These logs are similarly repeated multiple times for different subscriptions and connectionIds, this is just here the first one that happened but within a few seconds we got the same logs for all our subscriptions existing in this app and also the same in other apps

Jun 11, 2024 @ 20:53:36.960 {"az.sdk.message":"Receiver emitted terminal completion.","messageFlux":"mf_9d516e_1718126796078","connectionId":"MF_edc9a5_1718126796076","linkName":"preparer-topic_preparer/subscriptions/preparer_AT01_84ea87_1718126796078","entityPath":"preparer-topic_preparer/subscriptions/preparer_AT01"}
Jun 11, 2024 @ 20:53:36.960 {"az.sdk.message":"Current mediator reached terminal completion-state (retriable:true).","messageFlux":"mf_9d516e_1718126796078","connectionId":"MF_edc9a5_1718126796076","linkName":"preparer-topic_preparer/subscriptions/preparer_AT01_84ea87_1718126796078","entityPath":"preparer-topic_preparer/subscriptions/preparer_AT01","retryAfter":1000}

2) So as expected from the wording of the previous log shortly after we get logs showing that the new connections are opened again

Jun 11, 2024 @ 20:53:38.035 {"az.sdk.message":"Setting next mediator and waiting for activation.","messageFlux":"mf_9d516e_1718126796078","connectionId":"MF_a311b3_1718132016996","linkName":"preparer-topic_preparer/subscriptions/preparer_AT01_84ea87_1718126796078","entityPath":"preparer-topic_preparer/subscriptions/preparer_AT01"}
Jun 11, 2024 @ 20:53:38.097 {"az.sdk.message":"The mediator is active.","messageFlux":"mf_9d516e_1718126796078","connectionId":"MF_a311b3_1718132016996","linkName":"preparer-topic_preparer/subscriptions/preparer_AT01_84ea87_1718126796078","entityPath":"preparer-topic_preparer/subscriptions/preparer_AT01"}

3) So far this would look pretty good again and is what we usually see before the app starts consuming messages again. Here however no message processing logs started to show up but instead a bit later we got a few more logs showing some kind of error, and since one of them specifically asks to report it is why we are already opening this bug

Jun 11, 2024 @ 20:54:38.070 {"az.sdk.message":"Unhandled exception while processing events in reactor, report this error.","exception":"java.lang.NullPointerException: uncorrelated channel: 74","connectionId":"MF_a311b3_1718132016996"}

so some kind of NPE, sadly I was unable to find anything else like a stack or so on the logs, so sorry about this being only so little. After this one we get another few logs a bit later all following roughly this one, so seemingly the connection is closed yet again

Jun 11, 2024 @ 20:55:38.076 {"az.sdk.message":"Receiver emitted terminal completion.","messageFlux":"mf_9d516e_1718126796078","connectionId":"MF_a311b3_1718132016996","linkName":"preparer-topic_preparer/subscriptions/preparer_AT01_84ea87_1718126796078","entityPath":"preparer-topic_preparer/subscriptions/preparer_AT01"}
Jun 11, 2024 @ 20:55:38.076 {"az.sdk.message":"Current mediator reached terminal completion-state (retriable:true).","messageFlux":"mf_9d516e_1718126796078","connectionId":"MF_a311b3_1718132016996","linkName":"preparer-topic_preparer/subscriptions/preparer_AT01_84ea87_1718126796078","entityPath":"preparer-topic_preparer/subscriptions/preparer_AT01","retryAfter":1000}

4) Finally after this 2nd time of apparent connection loss now totally new logs starts to endlessly repeat every few minutes

Jun 11, 2024 @ 21:00:12.218 {"az.sdk.message":"Terminal error signal from Upstream|RetryLoop arrived at MessageFlux.","exception":"Retries exhausted: 3/3","messageFlux":"mf_9d516e_1718126796078","connectionId":"MF_a311b3_1718132016996","linkName":"preparer-topic_preparer/subscriptions/preparer_AT01_84ea87_1718126796078","entityPath":"preparer-topic_preparer/subscriptions/preparer_AT01"}
Jun 11, 2024 @ 21:00:12.218 {"az.sdk.message":"MessageFlux reached a terminal error-state, signaling it downstream.","exception":"Retries exhausted: 3/3","messageFlux":"mf_9d516e_1718126796078","connectionId":"MF_a311b3_1718132016996","linkName":"preparer-topic_preparer/subscriptions/preparer_AT01_84ea87_1718126796078","entityPath":"preparer-topic_preparer/subscriptions/preparer_AT01"}
Jun 11, 2024 @ 21:00:12.218 {"az.sdk.message":"Downstream cancellation signal arrived at MessageFlux.","messageFlux":"mf_9d516e_1718126796078","connectionId":"MF_a311b3_1718132016996","linkName":"preparer-topic_preparer/subscriptions/preparer_AT01_84ea87_1718126796078","entityPath":"preparer-topic_preparer/subscriptions/preparer_AT01"}

These errors randomly show up again every few minutes for hours on end afterwards, during all of this time this application is no longer receiving any messages

5) Optional: This one is then even more rare and does not always happen before we just restart or redeploy our applications. But sometimes after a seemingly random time we see yet another strange error, and even stranger after this one suddenly the application starts consuming messages again for whatever reason

Jun 12, 2024 @ 19:49:52.371 {"az.sdk.message":"Unhandled exception while processing events in reactor, report this error.","exception":"java.lang.NullPointerException","connectionId":"MF_c2211b_1718132083255"}

So yet another NPE. But weirdly after this error showed up the subscribers were yet again recreated, the repeating logs stopped and eventually messages started getting processed again. As said this does not always happen, nor is there a fixed time for it to happen, we had apps running for multiple days never recovering until terminated.

To Reproduce Steps to reproduce the behavior: Sadly I was so far not able to provide a reliable way to reproduce it. I was experimenting a bit with interrupting my local network connection but so far was unable to reproduce this behavior from our openshift deployments on my machine.

One guess maybe from the logs above would be that it may be necessary to loose the connection twice within a short time, the 2nd one while the first reconnect is still ongoing, but this is really just a guess. Also could be some race condition happening only with sufficient load, there's a limit to how much I can manage locally at once. Maybe the NPEs asking to get reported or some other logs do provide someone with deeper code understanding already with some further idea.

We will also continue looking into this on our side and post any updates, while it is not that bad since it seems overall pretty rare usually only noticing it in a few single applications every few days with hundreds of them running and thanks to high available setup usually messages still get processed by other replicas, just maybe with a bit less throughput, we still would prefer to be reliable in this area.

Code Snippet Here's the code how we are usually creating our subscribers

final ServiceBusClientBuilder.ServiceBusProcessorClientBuilder builder =
    resolveBuilder(String.format(CONNECTION_HOST_URL, connection.getHost(), connection.getKeyName(), connection.getKey()))
        .processor()
        .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
        .topicName("the-topic-name")
        .subscriptionName("the-subscription-name")
        .prefetchCount(0)
        .maxConcurrentCalls(<1 to 10 usually>)
        .maxAutoLockRenewDuration(<usually around 10min>)
        .processMessage(serviceBusMessageHandler::receiveMessage)
        .processError(serviceBusMessageHandler::handleError);

Expected behavior Lost subscription connections are always recreated and running applications always reliably recover once the connection can be recreated and start consuming messages again

Screenshots None

Setup (please complete the following information):

Further information If there's anything else needed just let me know and I'll try to provide it, e.g. specific logs

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

github-actions[bot] commented 5 months ago

@anuchandy @conniey @lmolkova

github-actions[bot] commented 5 months ago

Thank you for your feedback. Tagging and routing to the team member best able to assist.

anuchandy commented 4 months ago

Hi @TheDevOps, thanks for reaching out and sharing the analysis.

A potential flow that can lead to "java.lang.NullPointerException: uncorrelated channel: xx" is -

  1. Using ProtonJ library, the SDK sent a request to the Service bus broker to open a session in the TCP connection.
  2. The SDK waits for a minute but doesn't get a session open signal from the ProtonJ library. The SDK times out and clears the pending session state locally.
  3. Later, the ProtonJ library gets a session open ACK. It doesn't find the session state anymore and it throws uncorrelated (session) channel error.

(This is a known problem in ProtonJ library and when it happens SDK will abort all operations in the TCP connection, re-create a new TCP connection and attempt to recover).

Typically, the session open ACK from the broker arrives in short time less than a seconds. If the ACK takes more than a minute to surface to the ProtonJ library, "one possibility" is the connection’s (single-threaded, shared) IO-Thread is overloaded i.e., It has many tasks to finish, and the timeout runs out before it processes the ACK.

Given there are multiple topic-subscription, there will be at least that many sessions+links multiplexed in the shared connection. So the shared IO-Thread deals with operations among the links. Also cores in the container are additional potential bottlenecks. The SDK uses the global timer enabled parallel thread pool for retries (pool size is equal to the number of cores) and bottle neck in the thread pool for pumping of messages across the topic-subscription can impact the recovery.

Here are the troubleshooting sections about the points mentioned above -

  1. connection-sharing-bottleneck
  2. concurrency-in-servicebusprocessorclient

Could you please take a look at it and see if scaling and load balancing the topic-subscription across containers improves the experience.

Also, is it possible to share additional logs from the impacted replica? -15/+15 of logs at the time of this uncorrelated error? If it is verbose that would be great.

anuchandy commented 3 months ago

Hi @TheDevOps, have you had a chance to look at the previous message and the linked docs? If you've gathered the logs, please provide them. Additionally, could you tell me the number of subscription clients that the affected application had?

TheDevOps commented 3 months ago

Hi @anuchandy and sorry for the delay, I had been out of office and only got back this week and while I did see the mail for the update on Monday by the time I had caught up on internal topics it had totally slipped my mind again, so thanks a lot for the well needed reminder!

I've thankfully had time right away today to start looking into this topic, also discuss it with a few people responsible for our internal libraries in this area and so on to further understand things. Further I've also been "lucky" and happened to spot one application I had full access to that just started the issue like 15 minutes ago and was able to get the raw logs of our deployment and did not need to try some filtered logging tool exports so let's start of with them:

messaging-issue.log

This are all the logs we have with the usual info log level. If you have a specific class or at most package (that generates a 'reasonable' amount of spam) I can try enabling debug logging for them but I can not make any promises if and how soon I can get something there, since as said it's kinda random when and where the issue arises and I can only temporary enable debug logging for running processes I have access to (which are not all where it sometimes happens) and then it has to happen in one where I did before it is restarted again. And I have to take care to not explode our central logging storage ;) So hopefully maybe the logs as provided above can already help a bit more. One that stuck out to me was

2024-07-19 05:44:19.608+0000 [reactor-executor-15] W [//]            c.a.c.a.i.ReactorExecutor - {"az.sdk.message":"scheduleCompletePendingTasks - exception occurred while  processing events.\njava.lang.NullPointerException: uncorrelated channel: 90\norg.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:112)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:292)\ncom.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$1(ReactorExecutor.java:158)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\njava.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\njava.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\njava.base/java.lang.Thread.run(Thread.java:829)Cause: uncorrelated channel: 90\norg.apache.qpid.proton.engine.impl.TransportImpl.handleBegin(TransportImpl.java:1213)\norg.apache.qpid.proton.engine.impl.TransportImpl.handleBegin(TransportImpl.java:70)\norg.apache.qpid.proton.amqp.transport.Begin.invoke(Begin.java:166)\norg.apache.qpid.proton.engine.impl.TransportImpl.handleFrame(TransportImpl.java:1455)\norg.apache.qpid.proton.engine.impl.FrameParser.input(FrameParser.java:425)\norg.apache.qpid.proton.engine.impl.FrameParser.process(FrameParser.java:536)\norg.apache.qpid.proton.engine.impl.SaslImpl$SwitchingSaslTransportWrapper.process(SaslImpl.java:832)\norg.apache.qpid.proton.engine.impl.HandshakeSniffingTransportWrapper.process(HandshakeSniffingTransportWrapper.java:101)\norg.apache.qpid.proton.engine.impl.ssl.SimpleSslTransportWrapper.unwrapInput(SimpleSslTransportWrapper.java:136)\norg.apache.qpid.proton.engine.impl.ssl.SimpleSslTransportWrapper.process(SimpleSslTransportWrapper.java:370)\norg.apache.qpid.proton.engine.impl.ssl.SslImpl$UnsecureClientAwareTransportWrapper.process(SslImpl.java:153)\norg.apache.qpid.proton.engine.impl.TransportImpl.process(TransportImpl.java:1572)\norg.apache.qpid.proton.reactor.impl.IOHandler$1.run(IOHandler.java:234)\norg.apache.qpid.proton.reactor.impl.SelectableImpl.readable(SelectableImpl.java:118)\norg.apache.qpid.proton.reactor.impl.IOHandler.handleQuiesced(IOHandler.java:61)\norg.apache.qpid.proton.reactor.impl.IOHandler.onUnhandled(IOHandler.java:390)\norg.apache.qpid.proton.engine.BaseHandler.onReactorQuiesced(BaseHandler.java:87)\norg.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:206)\norg.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)\norg.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:292)\ncom.azure.core.amqp.implementation.ReactorExecutor.lambda$scheduleCompletePendingTasks$1(ReactorExecutor.java:158)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\nreactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\njava.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\njava.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\njava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\njava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\njava.base/java.lang.Thread.run(Thread.java:829)","connectionId":"MF_2465ed_1721367794515"}

So some NPE again with a stack this time.

Further I've also created a stackdump for the affected application, maybe it can also help further

messaging-stackdump.log

I had to anonymize a few things in both of them but hopefully this is fine and no relevant information was lost.

Now a few facts to this specific application also with focus on your initial message:

Some additional information in general not just to this app

We are actually looking into ways of better separating subscriptions of specific countries into specific replicas of the application but this is likely a bigger topic requiring big refactoring of many central components with messaging being just one of them that will take quite some time to see progress.

I hope this answers most of the initial questions and am looking forward to further feedback from you!

TheDevOps commented 1 month ago

Hello,

just checking in if there's anything further we could provide or if there are any insights to share, because so far the issue still is happening ever now and then and we are pretty much just monitoring our apps and restarting those we notice stopped processing azure servicebus messages.

jjfraney-cg commented 1 month ago

We observe a similar message outage. However, we are not using: com.azure:azure-messaging-servicebus.

We are use com.azure.spring:spring-cloud-azure-starter-servicebus-jms version 5.14.0.

We originally suspected the idle connection disconnect; speculating there is some fault towards propagating the broker side disconnect up into the spring 'Cached connection handler'. However, the connection is idle quite often, and its usually restored. Many disconnects occur over months of uptime without any disruption.

From where can I learn about diagnostic logging from the qpid and azure libraries? (I admire TheDevOps for the effort and detail provided.)

anuchandy commented 1 month ago

Hello @jjfraney-cg, could you kindly open a new issue for your case? The Spring starter JMS uses another library https://github.com/Azure/azure-servicebus-jms. Since there are no shared layers between that JMS library and the azure-messaging-servicebus library, any pointers discussed here would not be applicable to your case. Looking into your issue probably need involvement of JMS library and the Spring binder maintainers. Thanks!

anuchandy commented 1 month ago

Hi @TheDevOps, I'm sorry for the delay. I got sidetracked during my time off and with other work later.

Thanks for the detailed description of the environment and info logs. I've a few thoughts to brainstorm with you.

Adjusting the timeout

From the info logs, one observation is - A connection with id MF_2465ed_1721367794515 was opened to host 50 links, each linked to a subscription. Most links opened successfully, but one encountered an "uncorrelated channel error", which shut down the parent connection and terminated the other links. The code flow leads to this error, related to a timeout, is outlined here: https://github.com/Azure/azure-sdk-for-java/issues/40608#issuecomment-2200776439. There are two potential reasons -

The first step we can try out is to extend the timeout beyond the default duration of one minute to two minutes or more. This can be configured by setting .tryTimeout(Duration.ofMinutes(timeout)) in a new AmqpRetryOption object and passing this object to builder. With this, I hope we can avoid the "uncorrelated channel" error causing connection shutdown.

Tuning the thread pool

It's a good idea to examine thread pool sizing. Low pool sizing will lead to issues with Processor recovery. To share an estimate around thread usage,

  1. With 50 subscriptions and 10 concurrencies, we have 500 total concurrencies per pod for "message pumping", which uses a global thread pool.
  2. There's an internal thread pool in the library, to channel messages to the above pumping layer ("message pumping"). This is sized 20 times the cores, in your case it’s a thread pool with cap 40.
  3. There is one IO thread per Connection.
  4. Threads for timeouts which belongs to a pool of size equal to number of cores.

There are approximately 543 threads dedicated solely to the bus service library during peak loads. Additionally, the application utilizes other libraries for DB, SolarCloud, and ZooKeeper etc.., each of which may maintain its own thread pools.

Can we ensure the global thread pool for "message pumping" is sized appropriately? The "message pumping" (across all subscription clients) utilizes the global Schedulers.boundedElastic(), which can be adjusted through the system property reactor.schedulers.defaultBoundedElasticSize. I'm unsure if any of the application components you use share this global Schedulers.boundedElastic() thread pool, so the size should account for that as well. Taking all into consideration, should we start with a size between 600 and 700? While experimenting with this, it might be useful to temporarily raise the core count for the affected pods to see if it improves the situation.


We might need debug logs later, but we can do that afterward since gathering them takes some effort as you mentioned.

We're also going to release the next version of azure-messaging-servicebus which improvements around threading and locks. I'll update here once that version is released.

TheDevOps commented 1 month ago

Good morning (at least on my location) @anuchandy,

thanks a lot for the detailed response and no worries about the delay, after all exactly the same had happened to me before here haha.

Here's my thoughts on your points.

Let's start with the first one:

So either way with those points out of the way with some help by the responsible people for our internal messaging lib I've located and adjusted the code as follows

image image image image

I hope this was the correct location to increase this retry timeout, for now we've set it as 5 minutes. If this is not the right place please just let me know and I'll adjust it asap. Sadly the only issue here is, my current project is about as far removed from continuous deployment as possible so until this change makes it to production, where the issue by far happens most frequently, we are probably looking at around 6 to 8 weeks. While the issue does also happen in previous stages like our development, where it should be available today already, it's a lot less frequent there so unless I can observe it being completely gone there for an extended period of time it's hard to really reach a conclusion there if something changed. But I'll try to provide any updates on it as soon as I have something concrete.

Further I've also discussed with the people responsible for the library if we want to start a new builder after a certain number of subscription listeners have been registered with an instance since right now it's reused as long as the servicebus host + keys do not change. They said this definitely doable but would still like the "Input on recommendations how many subs should be registered with a single builder is also highly welcome" since as said before we have to make sure to not pass the servicebus server side connection limits.

Now let's move to point 2:

So far everything is running and processing messages again with the configuration adjustments. As usual the problem is a bit random and may take a few hours to days before it shows up so I'll keep an eye it over the next week if I notice any changes be they positive or negative or non existent and report them once I feel confident in them.

I've also already made sure to let our library guys know that they should expect a new version of the azure servicebus libs soonish and they are already eager to update once they have something to work with! And so am I to observe them once we are using them.

As usual if there's anything else I can do or provide just let me know! And thanks again for the detailed explanations, helps a lot with really understanding what is happening internally of the library and why some things may happen.

anuchandy commented 1 month ago

Hi @TheDevOps, thank you for reviewing the points and sharing your feedback. Let me address first those questions that may be pending to move forward with the deployments -

  1. Regarding the timeout, that is the correct spot to configure it.
  2. Since we've confirmed the global pool probably isn't used elsewhere, 800 pool size and 4 cores seem appropriate.

I'll get back to you soon with the rest of the thoughts, but I wanted to reply to these first to avoid any delays in deployment/testing. Thanks again for the collaboration.

anuchandy commented 1 month ago

Hi @TheDevOps, wanted to update you about release of the next version of azure-messaging-servicebus.

• This version resolves an internal race between IO Thread and worker/timer threads. • It improves internal management around AMQP resources (sessions, management channels), reducing the potential of attempting to use disposed resources.

The changes are outlined here.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
    <version>7.17.4</version>
</dependency>

IMPORTANT : When using the version 7.17.4, the app will need to opt-in a configuration "com.azure.core.amqp.cache", as shown below. Without opting it in, version 7.17.4 will function the same as 7.17.3 and won't include the fixes mentioned earlier.

new ServiceBusClientBuilder()
.connectionString(CONNECTION_STRING)
.configuration(new ConfigurationBuilder()
       .putProperty("com.azure.core.amqp.cache", "true") // <---- opt-in
       .build())
.processor()|sender|…

This opt-in requirement will be removed in upcoming releases and become default.

7.17.4 uses azure-core-amqp: 2.9.9 and azure-core: 1.52.0, its a good idea to runmvn dependency:tree for the app to ensure the right dependencies are resolved.

TheDevOps commented 1 month ago

Hi @anuchandy ,

thanks a lot for the update that the new version is ready. I'll inform and discuss it with our library responsibles right away so hopefully we can have some first DEV stage deployments with it soon even if it will take a bit to get it to production!

Else besides this since it now has been almost 1 week since increasing the boundedElastic threadpool size and CPUs for the one service on PROD I've quickly done a check right now but sadly it still has happened in 6 pods during this time frame that they completely stopped processing messages for at least 1 hour. Which is just about the average we are seeing in this application, it's just deviating quite a bit, there are weeks with (almost) none and sometimes it's 10+. But right now it feels like there hasn't been any obvious improvement at least from changing those two settings at least. But I'll continue to keep an eye on it.

So with this said I'll see that hopefully we can at least get the new version and opt-in, and also the previously done longer retry timeout, out as soon as possible and update once there's anything worth reporting!

anuchandy commented 1 month ago

Hi @TheDevOps, thank you for sharing the update.

I've been thinking and sketching some flow diagrams based on the INFO logs you shared. If you remember you mentioned (also I see same in log file) that, the following retry exhaust logs appear repeatedly -

{"az.sdk.message":"Terminal error signal from Upstream|RetryLoop arrived at MessageFlux.","exception":"Retries exhausted: 3/3","messageFlux":"mf_9d516e_1718126796078","connectionId":"MF_a311b3_1718132016996","linkName":"<link-name>"}

This error indicates that 3 consecutive attempts to obtain an AMQP-Link (which connects to a topic) have failed. Each attempt is supposed to do the following 3 things -

  1. Obtain a new or existing healthy connection from the cache. A connection is represented by the type ReactorConnection and is managed in a cache type ReactorConnectionCache.
  2. From the ReactorConnection in step.1, retrieve a (a new or existing) AMQP-session, represented by the type ReactorSession.
  3. From the ReactorSession in step.2, retrieve a new AMQP-Link, represented by the type ReactorReceiver.

When control is in step.3 (means steps.1 and.2 succeeded), the ReactorSession is supposed to log an INFO statement about creating a new AMQP-Link. We can see there is no such log from ReactorSession type.

I suspect step.2 failed for an unknown reason, causing a timeout in obtaining the session. Retry attempts also failed with the same unknown reason, exhausting all 3 retries. It could be also that step 1 failed but less likely.

The important thing about 7.17.4 is - we redesigned internals of logic around step.2. When the configuration "com.azure.core.amqp.cache" is opt-ed in for 7.17.4, the control flows through this redesigned internal code path.

When deploying 7.17.4 with "com.azure.core.amqp.cache" opt-in, for one of the environments, is it possible to enable DEBUG/VERBOSE log for the following two types participate in step.2?

  1. com.azure.core.amqp.implementation.ReactorSession
  2. com.azure.core.amqp.implementation.ReactorConnection

These two types have lower log spam since the functions in those are invoked only during connectivity establishment phase. For the rest of the types, it can be INFO as it used to be.

TheDevOps commented 1 month ago

Hi again @anuchandy perfect timing for this update, I just wanted to report that our library guys have created a new version with 7.17.4 used and support for a spring/env/system/whatever property to enable this opt-in

image

which I'm planning to pull into my team's application today and set the flag for it as well and get it deployed on our DEV environment. I'll also add the logging config for those 2 loggers as well then.

Any specific log I can/should pay attention to see if the opt-in is correctly enabled then?

Else the only "point to consider, as previously mentioned - since it seems quite likely that the problem is at least somewhat related to the messaging load/throughput - it's pretty rare on our DEV environment and really only happens "regularly" once we are in production with a lot more users on our systems. So it's entirely possible we get "unlucky" and don't see anything relevant on development for quite some time and may need to wait for our current version to reach production before I can make any reliable judgment if something improved or changed at all, as long as stuff generally still works on development, which seems likely. Still I'll obviously keep a closer eye on it the next days and report anything that seems of note to me!

anuchandy commented 1 month ago

Hi @TheDevOps, that sounds like a plan!

If the opt-in is set up properly, we should observe the following INFO log from the class "com.azure.core.amqp.implementation.RequestResponseChannelCache",

[INFO] Waiting for channel to active.

logged from here .

This line should appear at the beginning of the logs once the application connects to Service Bus.

In versions 7.17.3 and earlier, or if the flag is not enabled in version 7.17.4, logs will originate from the class AmqpChannelProcessor instead of RequestResponseChannelCache. When flag is enabled, there must not be any activities logged from AmqpChannelProcessor.

TheDevOps commented 1 month ago

Hello @anuchandy

alright then I think everything looks correct now for the first development deployment

image

We'll be doing some testing today to ensure everything still works correctly there and then at the start of next week I'll check if we maybe can slightly speed it up getting this change to production just for this one application so hopefully I'm slightly faster able to really say if anything changed with regards to this issue!

TheDevOps commented 1 month ago

Hello @anuchandy ,

Some small updates from my side:

While as said everything works on our development cluster I've still seen that several times mass reconnects have happened and figured maybe the logs of them are interesting already and can tell something so I'll attach them. These are all logs created by "com.azure.*" loggers, most up to INFO level and the two previously requested on DEBUG for 1 pod ever since it was started around 4 hours ago.

Sadly with the higher level and not that much luck on spotting them the moment they happened I do not have a nice raw log directly from the kubernetes pod console and can only offer a slightly less convenient export from our elasticsearch logging cluster. One important (and annoying) thing with logs from elasticsearch: Since elastic sorts only by the timestamp with ms as most detailed precision in case 2 or more logs happen in the same ms their order in the search result is no longer guaranteed to be the same as they really happened in. So if something seems weird because log A should happen before log B this is most likely the case. Sorry about this but it's the best I have quickly available, else I'd need to configure our pods to also log to some mounted volume file and try to get it there before the rotation removes it which is a bit annoying so I hope this is still somewhat usable for you!

app-azure-logs.xlsx

Maybe 1 specific type of log I have a small question to because even though it seems they do absolutely not cause any issues since all message processing works perfectly fine, but they seem new ever since we deployed the new version with the opt-in enabled:

E.g. one of them happened in line 220 in the xls "Failed to create receive link app-topic_app/subscriptions/app_SGDEV02/$deadletterqueue_475924_1728298898820 status-code: 401, status-description: InvalidSignature: The token has an invalid signature., errorContext[NAMESPACE: our-company-dev.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:receiver, LINK_CREDIT: 0]"

And shortly before we see those there's always a

e.g. line 240 "{"az.sdk.message":"Error occurred while refreshing token that is not retriable. Not scheduling refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.","exception":"status-code: 401, status-description: InvalidSignature: The token has an invalid signature., errorContext[NAMESPACE: our-company-dev.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:receiver, LINK_CREDIT: 0]","scopes":"amqp://our-company-dev.servicebus.windows.net/app-topic_app/subscriptions/app_SGDEV02/$deadletterqueue","audience":"amqp://our-company-dev.servicebus.windows.net/app-topic_app/subscriptions/app_SGDEV02/$deadletterqueue"}"

From my interpretation it seems like there's some token refresh attempted (for whichever token this exactly is?) that doesn't work and then leads to the link being recreated? As said I've not noticed those 401 invalid signature logs before this update, but maybe they always happened hidden below somewhere and were just not logged in this way before.

Would be great if you could give it a small check if maybe these logs already show something. Else I'll keep you updated once we have this version running on PROD hopefully soon as said.

anuchandy commented 1 month ago

Hi @TheDevOps, thank you for getting back with observations and logs.

Looking at the logs, we can see logs statements originated from AmqpChannelProcessor, which should not happen when the flag com.azure.core.amqp.cache is set to true in the builder. At the same time, there are logs originating from RequestResponseChannelCache, which serves as the replacement for AmqpChannelProcessor when com.azure.core.amqp.cache is set.

I wonder if something like this is happening - a builder with opt-in is used for ServiceBusProcessorClient (resulting logs from new RequestResponseChannelCache), while other builder(s) without opt-in is used for Sender (resulting logs from old AmqpChannelProcessor). Some users mentioned encountering NullPointerException and thread leak (reference) when the Sender was using AmqpChannelProcessor in a low-traffic environment. I did not observe any NPE in the logs you shared though. The opt-in addresses this problem for Sender.

I'll review logs further and get back with any findings, but I thought of letting you know above observation.

anuchandy commented 1 month ago

Hi again @TheDevOps, to follow up on your observation on an occurrence of status-code: 401, status-description: InvalidSignature: The token has an invalid signature, I scanned through the logs.

The library creates one AMQP-Link for each Processor Client associated with a topic subscription. In this case, the topic is "app-topic_app" and subscription is for the DLQ "app_ATDEV03/$deadletterqueue".

If the AMQP-Link disconnects for some reason, then the client will reconnect by creating a new AMQP-Link. This disconnect is usually initiated by the broker, for example, when the link is idle. Sometimes links can also be disconnected from the client if client encounters an error.

Each AMQP-Link in a connection needs to be authenticated with the broker separately. Once authenticated, the auth-session is valid for 20 minutes. Each time before auth-session expires, the library will refresh the auth, extending the validity for another 20 minutes. The library will refresh 2 minutes before the current auth-session expires. For each refresh, the library needs to compute and send the token signature.

What we see in the logs for the topic "app-topic_app" and subscription "app_ATDEV03/$deadletterqueue" is - the first auth and following two auth refresh succeeded, means the AMQP-Link was active for (20 + 20 + 20) 60 minutes. The third attempt to refresh at ~80th minutes, encountered the error status-code: 401, status-description: InvalidSignature: from the broker. Following that, the library created a new AMQP-Link to recover, which is good.

Without having access to the broker logs of the Service Bus namespace, it is difficult to understand why the broker considered the provided token signature is invalid when three previous generated token signatures via the same algorithm were accepted.

The reason I can think of from the client's perspective is, clock skewing in the host. However, this is purely speculative. The algorithm to compute the token signature uses the current system time, other than this current time part, all other parts used to form the signature is constant. Here is how the signature is computed.

I think we should keep an eye on this error type and see if that's one-off/rare or multiple AMQP-Links are encountering this frequently over the period of their lifetime. In that case it deserves a further look. As of now, from the client side, the recovery happens through a new AMQP-Link (at least for this occurrence). <edit> I am currently reaching out to colleagues to find out if they have encountered similar cases.

Update : The status-code: 401, status-description: InvalidSignature: was root caused as a thread safety of the hmac instance used for computing the signature. This issue is addressed in the pr (10/15/2024) and released in version 7.17.5.

TheDevOps commented 1 month ago

Good morning @anuchandy

As usual thanks for the great responses!

Your guess with a different builder being used for the sender is almost certainly correct, while we do have a central library location where we create all our subscribers for topics defined in an applications yaml signature and then all the "tenant groups" (this is always the _XX01 and so on you can see at the end of the sub names, we generally have some 50~80 tenants grouped as e.g. _AT01 as the first Austrian grouping and so on) where the specific application build is active currently. Senders however are generated locally by the application team "as required" and the only thing our library does is adding some interceptors that set the required routing headers (such as the tenant group sending a message so it gets then routed to the correct tenant group subscription) but the builder object is unrelated to the subscriber builder right now and so also does not have any opt-in code right now. Even though we are not currently observing any issues such as the NPE you mentioned I assume it's generally recommended to ideally perform the opt-in for both senders and subscribers? I'll try to sync if we currently have any easy way to maybe also centrally do it for senders in this case. I assume the syntax is generally the same as for subscribers?

For the private stuff you'll get a mail from me any minute!

anuchandy commented 1 month ago

Hello @TheDevOps, good morning!, thanks for the response.

Yes, it’s recommended to use opt-in for Sender as well. you’re correct—the opt-in syntax for Sender remains the same. Just a small note: when transitioning to central management for Sender, please ensure that the current nature (whether dedicated or shared) of the Sender builder remains unchanged to avoid any shifts in connection handling behavior.

TheDevOps commented 3 weeks ago

Hello @anuchandy

As promised here's my Friday update: We updated our application for our PROD cluster with the opt-in enabled this Tuesday and so it has now been running for almost 72 hours. And the so far great news: There hasn't been a single instance of a noticeable timeframe where any pod stopped to process messages. While it is "only" 3 days in this application so far it has been pretty common to see it almost daily in at least 1 pod, so I have a very good feeling that the bigger internal reworkings will also have solved this issue right now. Still if you do not mind I'd like to keep monitoring it for 1 more week so we have 10 days total before I think it would be fine to close this ticket with pointing to the big reworkings and opting-in to them if you need it early or waiting until it becomes the default. If it never happens in this timeframe I'm absolutely certain that this reworkings have fixed this issue. I'd also like to look into the logs a bit more if I can spot anything else since the switch, but from a first quick glance besides the already known and fixed token renewal due to the non threadsafe object (which I hope we will also get around to deploying the fix for it as well soon) that besides some logs is not really causing any observable issues with the message processing I did not immediately spot anything else, but sadly I didn't have as much time as I wanted yet for this topic, but I'll definitely try to get it next week. I'll provide another update next week, but already thanks a lot for all the support and fingers crossed we can soon close this!

TheDevOps commented 2 weeks ago

Hello @anuchandy

alright it's the promised Friday again so time for my final update: Still not a single instance of no messages being processed has been observed in the application with the opt-in enabled! Also yet another detailed review of the logs showed no further issues either. Considering it has been 10 days now in an application where it previously happened almost daily I'm absolutely positive that it is indeed fixed.

So maybe to just summarize the whole ticket again in this final location:

So in short if someone else is regularly affected by the problem described here and also has a shared builder for multiple subscribers setup I strongly recommend to update to 7.17.5 or later and perform the small opt-in change for all builders in the application.

For my project we will check which apps besides the one we did all our tests right now are also affected regularly enough to warrant enabling the opt-in for them and for those we only see it like once a month we will just continue our previous "monitor, get notified and restart" approach until it gets resolved by a small update of the library eventually. For us this is a totally acceptable solution and the issue can be closed if you don't see any need to keep it open any longer. Maybe if possible just drop a small final comment once the version where the current opt-in becomes the default is released, might be interesting for others that stumble into this issue as well.

Besides this thanks for all the help and the great discussions during this issue!

anuchandy commented 1 week ago

@TheDevOps, glad to hear that the issue has been resolved. Thanks for providing summary; it will certainly be helpful for others with similar use cases.

Between as I promised offline, I'm posting below a write up about Kubernetes learning, so everyone can benefit from it.

anuchandy commented 1 week ago

For those who are reading, this comment is not about the RCA of the original issue but discusses how certain CPU settings could theoretically cause the Service Bus client to halt. It is recommended that you review the following two sections if you are containerizing an application that utilizes the Service Bus library.

Average CPU usage can be misleading

This section outlines the side effects experienced by a library due to CPU settings in Kubernetes, in production. We'll refer this library as HEI-client. The HEI-client has similar characteristics and performance expectations as the Service Bus library.

First, let's start with the overview of the threading model in the Service Bus library. There are two categories of threads,

  1. IO-Thread – this single thread responsible for all IO operations, message decoding and resource cleanup in an AMQP connection.
  2. Worker Threads – these are all threads responsible for pumping messages to the Processor handlers.

Image

As we can see, the single IO-Thread is a scarce resource. The overall "performance" and "reliability" of the library depends on health and available system resources for the IO-Thread.

By "performance", we refer to the timely delivery of messages and the processing of dispositions (complete and abandon operations). If the library processes 1000 messages per second, then there will be 1000 dispositions as well, resulting in approximately 2000 events per second enqueued to the IO-Thread's event queue. The IO-Thread must process these events efficiently to maintain high throughput, which in turn depends on available CPU cycles.

By "reliability", we refer to the role IO-thread has when the connection is lost, the IO-Thread is responsible for reaching out to each receiver one by one (serially), draining their in-memory messages and performing cleanup (closing resources, freeing internal states etc..). After the completion of cleanup across all receivers, the current IO-Thread is responsible for setting up the next connection and next IO-Thread. Failure in any of these steps can halt system progress. A halt in IO-Thread won't cause high CPU usage, making it hard to detect by common monitoring systems.

For these reasons, it is recommended to leave one CPU core for Service Bus library IO-Thread and have additional core for Worker Threads depending on the application workload (more on this in the next section).

The HEI-client is like the Service Bus library, with IO-Thread and Worker Threads. Let's examine the resourcing issues encountered by the HEI-client and draw comparisons to the Service Bus client.

The diagram below illustrates the latency spikes experienced by the HEI-client. While the average latency remained approximately 10ms, the application periodically encountered significant latency spikes. The chart identifies two such instances where the latency increased to 50ms and 80ms, respectively, thus impacting the SLA.

Latency

The networking module of HEI-client provides a metric indicating the number of pending tasks in the task-queue associated with its threads. A correlation is observed between latency spikes and the periods when there were numerous outstanding tasks in the thread task-queue.

ThreadPoolQueueLength

Like HEI-client, the IO-Thread in the Service Bus library has a task queue (called Qpid event queue as shown in the threading model diagram). The Service Bus library's Worker Threads from the bounded elastic thread pool also have similar task queues. Unfortunately, neither the Qpid library nor the Reactor bounded elastic pool has task-queue length metric available.

Now looking at the CPU usage of the HEI-client, the average CPU usage stayed 20-25% range, drawing no correlation between CPU usage and the latency spike or thread's task-queue length.

CPUAverage

Upon further investigation, it was determined that average CPU utilization graphs are not reliable indicators for assessing whether limits are being reached. The appropriate method is to inspect the pod's cpu.stat file for the "number of throttled periods" and the "total time spent throttled".

The monitoring system used by HEI-Client had agents capturing these throttling stats from the pods. E.g., Prometheus can alert when container experiences CPU throttling. The diagram below shows the CPU throttling chart for HEI-Client.

CPUThrottle

As we can see there is a correlation between the CPU throttling and latency spike / thread's tasks-queue length.

It was discovered that the containers had CPU "limit" set. Although the Kubernetes cluster has sufficient CPUs available globally, the limit imposed on the container restricted the HEI-Client from utilizing the spare CPU cycles. This led to a lower average CPU in usage graph while there were latency spikes.

The Kubernetes experts involved in the discussion shared the following references outlining problems with "limit" -

  1. Stop Using CPU Limits on Kubernetes
  2. CPUThrottlingHigh Prometheus Alert
  3. Resource-QoS

Tuning CPU and Thread pool size

We have seen a quite handful of Service Bus cases, where application hangs and bumping the cores and/or tuning the thread pool size brought back the processing to normal. The library team's investigations are based on SDK logs to identify any issue in SDKs and don't cover resource constraints imposed by the host environment. The resource allocation depends on user-specific workloads and is outside library team's scope/expertise, but we have the following general guidelines -

A starting point of 30-40 Worker threads per core is generally recommended; however, you may need to adjust based on the application's specific workload characteristics. This could involve either reducing or increasing the number of cores as required. The following formula can be used to compute the optimal number of cores, Ncpu,

Ncpu = Max(2, Thread_pool_size / (Ucpu * (1 + W/C)))

The formula first ensures a minimum of 2 cores, based on the recommendation from Java team at Microsoft for any containerized application, Containerize your Java applications | Microsoft Docs.

The second part of the formula, Thread_pool_size / (Ucpu * (1 + W/C)), is derived from the pool sizing guideline in "Java Concurrency in Practice" book.

Where

Thread_pool_size :

The size of the Worker Thread pool computed based on max-concurrent-calls (220 in the above example).

Ucpu :

Target CPU utilization for the application (0 <= Ucpu <= 1). For example, if we want 70% of CPU to be allocated for the application and 30% for the rest (JVM GC threads, other apps), then this value Ucpu is 0.7.

W/C :

Ratio of Wait-Time (W) to Compute-Time (C), this is what application developer should "measure" (specific to their application workload) using profiling tools -

Profiling the application in a setup close to real traffic will help find these numbers.

For example, consider an application that on average spends 500ms on I/O (e.g., calling a REST API) and 5ms on computation (e.g., processing the JSON response). If the required thread pool size is 220 and the desired CPU utilization (Ucpu) is 0.7, the formula would be applied as follows:

Ncpu       = Max(2, Thread_pool_size / (Ucpu * (1 + W/C))) 
           = Max(2, 220 / (0.7 * (1 + 500/5)))
           = Max(2, 200 / (0.7 * 101))
           = Max(2, 3.1)
           = 3.1 cores = 3100millicore

So, 3.1 core are required to support Worker Thread pool of size 220.

The primary factor determining the core count is the pool size, which is based on the application's chosen "concurrency" level (Processors multiplied by max-concurrent-calls). It is crucial to select a value that aligns with the actual processing requirements. For e.g., with a "concurrency" of 200 (e.g., 20 Processors each with 10 max-concurrent-calls) and an average processing time of 500ms, the application aims to handle 400 messages per second, i.e., 24,000 messages per minute. Therefore, it makes sense to evaluate whether this indeed corresponds to the expected incoming load and if the external service calls within the processor handler (such as database interactions and API calls) are optimized to manage such a volume efficiently. The quality of the network, the Service Bus tier and throughput units are other factors to consider.

It is worth noting that the Worker Thread pool does not immediately create threads equal to the pool size. The pool size represents an upper limit, and the pool will expand or shrink within this limit based on the workload.

anuchandy commented 1 week ago

Closing this, I'll leave a comment when the Service Bus library that opt-in the flag by default is released.