Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.3k stars 1.96k forks source link

[BUG] EventProcessorClient sometimes stops reading in low traffic environments #41535

Open Bestyan opened 4 weeks ago

Bestyan commented 4 weeks ago

Describe the bug In our low traffic environments (~40 messages per second on our eventhub), we're facing issues with the EventProcessorClient. It runs stable for a while, but after about a day it just stops reading. it does not crash, nor does it throw exceptions, it just stops. Restarting the application fixes the issue until it happens again. Sometimes it's after 5 minutes, sometimes after a day, sometimes weeks. In the last 7 days, we have observed it 3 times in our prod environment.

Exception or Stack Trace I've replaced the true service name with my-service, just fyi

``` [INFO] 2024-08-15T11:08:34,926: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionRemoteClose","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"} [INFO] 2024-08-15T11:08:34,926: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionRemoteClose closing a local session.","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"} [DEBUG] 2024-08-15T11:08:34,926: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Got endpoint state.","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms","state":"CLOSED"} [INFO] 2024-08-15T11:08:34,926: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_ed5df0_1723716621282","errorCondition":"amqp:connection:forced","errorDescription":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34","linkName":"cbs:sender","entityPath":"$cbs"} [INFO] 2024-08-15T11:08:34,926: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"Local link state is not closed.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:sender","entityPath":"$cbs","state":"ACTIVE"} [WARN] 2024-08-15T11:08:34,927: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Error in SendLinkHandler. Disposing unconfirmed sends.","exception":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} com.azure.core.amqp.exception.AmqpException: The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98] at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90) ~[azure-core-amqp-2.9.7.jar!/:2.9.7] at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:120) ~[azure-core-amqp-2.9.7.jar!/:2.9.7] at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:63) ~[azure-core-amqp-2.9.7.jar!/:2.9.7] at com.azure.core.amqp.implementation.handler.SendLinkHandler.onLinkRemoteClose(SendLinkHandler.java:37) ~[azure-core-amqp-2.9.7.jar!/:2.9.7] at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) ~[proton-j-0.34.1.jar!/:?] at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.34.1.jar!/:?] at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) ~[proton-j-0.34.1.jar!/:?] at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) ~[proton-j-0.34.1.jar!/:?] at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) ~[azure-core-amqp-2.9.7.jar!/:2.9.7] at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.ContextPropagationOperator$RunnableWrapper.run(ContextPropagationOperator.java:373) ~[agent.jar:?] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.6.7.jar!/:3.6.7] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.6.7.jar!/:3.6.7] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?] at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] at java.base/java.lang.Thread.run(Thread.java:1583) [?:?] [DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Closing request/response channel.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [INFO] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Transient error occurred. Retrying.","exception":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","tryCount":0,"intervalMs":1800} com.azure.core.amqp.exception.AmqpException: The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98] at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90) ~[azure-core-amqp-2.9.7.jar!/:2.9.7] at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:120) ~[azure-core-amqp-2.9.7.jar!/:2.9.7] at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:63) ~[azure-core-amqp-2.9.7.jar!/:2.9.7] at com.azure.core.amqp.implementation.handler.SendLinkHandler.onLinkRemoteClose(SendLinkHandler.java:37) ~[azure-core-amqp-2.9.7.jar!/:2.9.7] at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) ~[proton-j-0.34.1.jar!/:?] at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.34.1.jar!/:?] at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) ~[proton-j-0.34.1.jar!/:?] at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) ~[proton-j-0.34.1.jar!/:?] at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91) ~[azure-core-amqp-2.9.7.jar!/:2.9.7] at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.ContextPropagationOperator$RunnableWrapper.run(ContextPropagationOperator.java:373) ~[agent.jar:?] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.6.7.jar!/:3.6.7] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.6.7.jar!/:3.6.7] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?] at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] at java.base/java.lang.Thread.run(Thread.java:1583) [?:?] [DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Terminating 0 unconfirmed sends (reason: The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"completed the termination of 0 unconfirmed sends (reason: The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34, errorContext[NAMESPACE: my-service-prod-jpe-ehn-vsms.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Channel already closed.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"SendLinkHandler disposed. Remaining: 1","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionLocalClose","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"} [DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkLocalClose","connectionId":"MF_ed5df0_1723716621282","errorCondition":"amqp:connection:forced","errorDescription":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34","linkName":"cbs:sender","entityPath":"$cbs"} [DEBUG] 2024-08-15T11:08:34,928: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Closing send link and receive link.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"onLinkLocalClose","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:receiver","entityPath":"$cbs"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_ed5df0_1723716621282","errorCondition":"amqp:connection:forced","errorDescription":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34","linkName":"cbs:receiver","entityPath":"$cbs"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"ReceiveLinkHandler disposed. Remaining: 0","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Terminating 0 unconfirmed sends (reason: The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination.).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"completed the termination of 0 unconfirmed sends (reason: The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination.).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Could not emit complete signal.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","signalType":"onComplete","emitResult":"FAIL_TERMINATED"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionRemoteClose","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionRemoteClose closing a local session.","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Got endpoint state.","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session","state":"CLOSED"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ConnectionHandler - {"az.sdk.message":"onConnectionRemoteClose","connectionId":"MF_ed5df0_1723716621282","errorCondition":"amqp:connection:forced","errorDescription":"The connection was closed by container '4b547255507242e8a5bc0e9949c405dd_G0' because it did not have any active links in the past 300000 milliseconds. TrackingId:4b547255507242e8a5bc0e9949c405dd_G0, SystemTracker:gateway5, Timestamp:2024-08-15T11:08:34","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ConnectionHandler - {"az.sdk.message":"onTransportClosed","connectionId":"MF_ed5df0_1723716621282","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.GlobalIOHandler - {"az.sdk.message":"onTransportClosed","connectionId":"MF_ed5df0_1723716621282","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionLocalClose","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ConnectionHandler - {"az.sdk.message":"onConnectionLocalClose","connectionId":"MF_ed5df0_1723716621282","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ConnectionHandler - {"az.sdk.message":"onConnectionUnbound","connectionId":"MF_ed5df0_1723716621282","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net","state":"CLOSED","remoteState":"CLOSED"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"getConnectionState","connectionId":"MF_ed5df0_1723716621282","state":"CLOSED"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Disposing of active sessions due to connection close.","connectionId":"MF_ed5df0_1723716621282"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Disposing of ReactorConnection.","connectionId":"MF_ed5df0_1723716621282","isTransient":false,"isInitiatedByClient":false,"shutdownMessage":"Connection handler closed."} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Setting error condition and disposing session. Shutdown signal received (Connection handler closed.)","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Setting error condition and disposing session. Shutdown signal received (Connection handler closed.)","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Added subscriber.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","subscriberId":"un_b261dd_1723720114929"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Closed management nodes.","connectionId":"MF_ed5df0_1723716621282","signalType":"onComplete"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor - {"az.sdk.message":"Channel is closed. Requesting upstream.","entityPath":"my-service-prod-jpe-eh-vsms"} [DEBUG] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Connection was already closed. Not disposing again.","connectionId":"MF_ed5df0_1723716621282"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor - {"az.sdk.message":"Connection not requested, yet. Requesting one.","entityPath":"my-service-prod-jpe-eh-vsms"} [INFO] 2024-08-15T11:08:34,929: reactor-executor-31 - com.azure.messaging.eventhubs.EventHubClientBuilder - {"az.sdk.message":"Emitting a single connection.","connectionId":"MF_6c869a_1723720114929"} [DEBUG] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"getConnectionState","connectionId":"MF_6c869a_1723720114929","state":"UNINITIALIZED"} [INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor - {"az.sdk.message":"Setting next AMQP channel.","entityPath":"my-service-prod-jpe-eh-vsms"} [INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkFinal","connectionId":"MF_ed5df0_1723716621282","linkName":"my-service-prod-jpe-eh-vsms","entityPath":"my-service-prod-jpe-eh-vsms"} [INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionFinal.","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"} [DEBUG] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Disposing of active send and receive links due to session close.","connectionId":"MF_ed5df0_1723716621282","sessionName":"my-service-prod-jpe-eh-vsms"} [INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkFinal","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:sender","entityPath":"$cbs"} [DEBUG] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"Could not emit messages.close when closing handler.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","linkName":"cbs","emitResult":"FAIL_TERMINATED"} [INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"onLinkFinal","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:receiver","entityPath":"$cbs"} [INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionFinal.","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"} [DEBUG] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorSession - {"az.sdk.message":"Disposing of active send and receive links due to session close.","connectionId":"MF_ed5df0_1723716621282","sessionName":"cbs-session"} [INFO] 2024-08-15T11:08:34,930: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ConnectionHandler - {"az.sdk.message":"onConnectionFinal","connectionId":"MF_ed5df0_1723716621282","hostName":"my-service-prod-jpe-ehn-vsms.servicebus.windows.net"} [INFO] 2024-08-15T11:08:36,728: parallel-1 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Requesting from upstream.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","tryCount":0} [INFO] 2024-08-15T11:08:36,728: parallel-1 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Connection not requested, yet. Requesting one.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs"} [DEBUG] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Updating endpoint states.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","sendState":null,"receiveState":"UNINITIALIZED"} [DEBUG] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Updating endpoint states.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","sendState":"UNINITIALIZED","receiveState":"UNINITIALIZED"} [DEBUG] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Shutdown signal received.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Closing request/response channel.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [INFO] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Emitting new response channel.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","linkName":"cbs"} [INFO] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Setting next AMQP channel.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs"} [DEBUG] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Channel already closed.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Closing send link and receive link.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [INFO] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Next AMQP channel received.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","subscriberId":"un_b261dd_1723720114929"} [INFO] 2024-08-15T11:08:36,729: parallel-1 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Upstream connection publisher was completed. Terminating processor.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkLocalOpen","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:sender","entityPath":"$cbs","localTarget":"Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"onLinkLocalOpen","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","linkName":"cbs:receiver","localSource":"Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"onLinkLocalClose","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:sender","entityPath":"$cbs"} [INFO] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.SendLinkHandler - {"az.sdk.message":"Sender link was never active. Closing endpoint states.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","entityPath":"$cbs"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Updating endpoint states.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","sendState":"CLOSED","receiveState":"UNINITIALIZED"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Channel already closed.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"SendLinkHandler disposed. Remaining: 1","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"onLinkLocalClose","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs:receiver","entityPath":"$cbs"} [INFO] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2 - {"az.sdk.message":"Receiver link was never active. Closing endpoint states","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Updating endpoint states.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs","sendState":"CLOSED","receiveState":"CLOSED"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Channel already closed.","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"ReceiveLinkHandler disposed. Remaining: 0","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"Terminating 0 unconfirmed sends (reason: The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination.).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.RequestResponseChannel - {"az.sdk.message":"completed the termination of 0 unconfirmed sends (reason: The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination.).","connectionId":"MF_ed5df0_1723716621282","linkName":"cbs"} [INFO] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.AmqpChannelProcessor - {"az.sdk.message":"Channel is disposed.","connectionId":"MF_ed5df0_1723716621282","entityPath":"$cbs"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Scheduling closeConnection work.","connectionId":"MF_ed5df0_1723716621282"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Closed reactor dispatcher.","connectionId":"MF_ed5df0_1723716621282","signalType":"onComplete"} [DEBUG] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Closed CBS node.","connectionId":"MF_ed5df0_1723716621282","signalType":"onComplete"} [INFO] 2024-08-15T11:08:36,729: reactor-executor-31 - com.azure.core.amqp.implementation.ReactorConnection - {"az.sdk.message":"Closing executor.","connectionId":"MF_ed5df0_1723716621282"} ```

To Reproduce Steps to reproduce the behavior:

Code Snippet The EventProcessorClient setup looks like this:

```java public void startTelemetryProcessor() { LOGGER.debug("starting new EventProcessorClient"); String fullyQualifiedNamespace = config.getIncomingEventHubNamespace() + config.getEhHostSuffix(); String consumerGroupName = config.getConsumerGroupName(); TokenCredential tokenCredential = azureCredentials.getTokenCredential(); BlobContainerAsyncClient blobContainerClient = createBlobClient(tokenCredential, config); telemetryProcessorHost = new EventProcessorClientBuilder() .fullyQualifiedNamespace(fullyQualifiedNamespace) .eventHubName(config.getIncomingEventHub()) .credential(tokenCredential) .consumerGroup(consumerGroupName) .initialPartitionEventPosition((partitionId) -> EventPosition.earliest()) .checkpointStore(new BlobCheckpointStore(blobContainerClient)) .loadBalancingUpdateInterval(Duration.ofSeconds(5)) .partitionOwnershipExpirationInterval(Duration.ofSeconds(20)) .loadBalancingStrategy(LoadBalancingStrategy.GREEDY) .processEventBatch(createEventConsumer(), 256, Duration.ofSeconds(2)) .processError(createErrorConsumer()) .processPartitionInitialization(createInitializationConsumer()) .processPartitionClose(createCloseConsumer()) .buildEventProcessorClient(); try { telemetryProcessorHost.start(); LOGGER.info( "Started EventProcessorClient for telemetry with identifier {}", telemetryProcessorHost.getIdentifier()); } catch (Exception ex) { LOGGER.error("Failed at startup telemetry EventProcessorClient", ex); } } private BlobContainerAsyncClient createBlobClient(TokenCredential tokenCredential, Configuration config) { return new BlobContainerClientBuilder() .credential(tokenCredential) .endpoint(config.getCheckpointStorageAccountEndpoint()) .containerName(config.getTelemetryContainerName()) .buildAsyncClient(); } private Consumer createErrorConsumer() { return context -> { PartitionContext partitionContext = context.getPartitionContext(); LOGGER.error( "Error received for Partition {} on {}", partitionContext.getPartitionId(), partitionContext.getEventHubName(), context.getThrowable()); }; } private Consumer createCloseConsumer() { return closeContext -> { metrics.decrementPartitionCount(); PartitionContext partitionContext = closeContext.getPartitionContext(); LOGGER.info( "Closed partition {} on {}. Reason: {}", partitionContext.getPartitionId(), partitionContext.getEventHubName(), closeContext.getCloseReason()); }; } private Consumer createInitializationConsumer() { return initializationContext -> { metrics.incrementPartitionCount(); PartitionContext partitionContext = initializationContext.getPartitionContext(); LOGGER.info( "Opened partition {} on {}", partitionContext.getPartitionId(), partitionContext.getEventHubName()); }; } private Consumer createEventConsumer() { return eventDataBatch -> { if (!eventDataBatch.getEvents().isEmpty()) { String transactionId = UUID.randomUUID().toString(); TelemetryProcessResult result = eventService.processTelemetry(transactionId, eventDataBatch.getEvents()); eventService.publishMessages(transactionId, result); eventDataBatch.updateCheckpoint(); LOGGER.debug( "Checkpointed Partition {}", eventDataBatch.getPartitionContext().getPartitionId()); } }; } ```

Expected behavior EventProcessorClient keeps reading messages.

Screenshots Azure Portal Metrics of the eventhub in the time where it happens (reading stops) image

Grafana Metrics of our service: we have 2 pods running and they both stop reading about 1 minutes apart. Note that the line continues, meaning the metrics keep getting published as the pods are still healthy. image

Here it happened to both pods 1 hour apart. image

Here both pods stopped reading at the same time. image

Setup:

Additional context Here is the extended log file of the above log snippet where you can see the pump threads processing messages before: sdk-issue-prod-jp.zip

The same issue also occurs in a similar service that uses these dependency versions:

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

conniey commented 3 weeks ago

Thanks for reporting this.

We've been working on a newer version of the internals for Event Hubs (it's been GA'd for Service Bus for several months), and this behaviour doesn't appear in tests where we have idler clients. We're planning to release a beta where people can opt into using the v2 stack via environment variable (can disable it to switch back to original v1 stack). Would you be interested in trying out the beta?

Bestyan commented 3 weeks ago

Hi connie, we would definitely be interested in that. However it won't help us with this issue as we cannot use a beta library in a production environment.