Azure / azure-event-hubs-java

☁️ Java client library for Azure Event Hubs
https://azure.microsoft.com/services/event-hubs
MIT License
51 stars 60 forks source link

IEventProcessor#onClose not being called during graceful shutdown #462

Closed tomaszszlek closed 3 years ago

tomaszszlek commented 4 years ago

Actual Behavior

I am implementing graceful shutdown in my application. When application got shutdown signal I want my Event Processor to onEvent to not being triggered anymore and finish all pending events processing (wait for IEventProcessor to return). The javadoc of IEventProcessor#onClose suggest that onClose should be invoked when EventProcessor is stopped however neither is not being invoked during graceful shutdown nor by invoking EventProcessorHost#unregisterEventProcessor() explicitly.

Expected Behavior

IEventProcessor#onClose should be invoked either during application graceful shutdown or when EventProcessorHost#unregisterEventProcessor() is explicitly called. Please advice if I missed something.

Versions

JamesBirdsall commented 4 years ago

Invoking unregisterEventProcessor is the way to gracefully shut down a host instance. In that case, IEventProcessor#onClose absolutely should be called, but there may be a delay: if there is no traffic on the event hub, then shutting down the message pump will block until the receive call times out (which can be up to a minute by default), and onClose will not be called until the message pump is shut down. How long are you waiting after calling unregisterEventProcessor?

tomaszszlek commented 4 years ago

Hi @JamesBirdsall, Thank you for explanation! I am waiting 30 seconds only because this is default in Kubernetes which I am using for my application (terminationGracePeriodSeconds setting). I have one more question. Once the unregisterEventProcessor is called, is the events processing blocked ? I am assuming that the effect of calling this method is to wait for all active events processing to be completed but no more messages will be polled from event hub from that point. Am I right ? Also I am not sure why unregisterEventProcessor is waiting for receive call time out if there is no traffic. Shouldn't it finish immediately in that case ?

JamesBirdsall commented 4 years ago

1) Once unregisterEventProcessor is called, the current call to onEvents is allowed to complete but there will be no more. 2) The behavior of the raw receive call is to block until either at least one event is available or the timeout expires. Way down in the code, there are receive loops (one per partition) which will exit after unregisterEventProcessor is called, and then once each loop exits the rest of cleanup and shutdown occurs for that partition. If there is no traffic on a partition, the receive call sits there blocked until the timeout expires. We could rewrite the receive call to be interruptable, but shutdown isn't usually a time-critical operation so there hasn't been the pressure do to so.

tomaszszlek commented 4 years ago

Hi @JamesBirdsall, I have extended waiting time for unregisterEventProcessor completion before sending SIGKILL however in my case even if each processing takes ~10 seconds (I just simulated such a long time, in real it is just few milliseconds) then it takes very long time after invoking unregisterEventProcessor to stop consuming new messages. I didn't debug it, but it looks to me like consumers process every message that they polled from EventHub before unregister was invoked, see below logs:

2019-12-17 12:36:17 ERROR EventHubConsumer:113 - Stop processing
2019-12-17 12:36:17 ERROR EventHubConsumer:85 - Start processing
2019-12-17 12:36:17 ERROR EventHubConsumer:113 - Stop processing
2019-12-17 12:36:17 ERROR EventHubConsumer:85 - Start processing
2019-12-17 12:36:19 INFO  ThreadPoolTaskExecutor:208 - Shutting down ExecutorService 'applicationTaskExecutor'
2019-12-17 12:36:19 INFO  ShutdownEventHubManager:33 - Received shutdown signal. Unregistering event processor
2019-12-17 12:36:19 INFO  EventProcessorHost:292 - host ec-ehub-consumer-xxx: Stopping event processing
2019-12-17 12:36:19 INFO  PartitionManager:120 - host ec-ehub-consumer-xxx: Shutting down all pumps
2019-12-17 12:36:19 INFO  PumpManager:68 - host ec-ehub-consumer-xxx: 0: closing pump for reason Shutdown
2019-12-17 12:36:19 INFO  PartitionPump:382 - host ec-ehub-consumer-xxx: 0: pump shutdown for reason Shutdown
2019-12-17 12:36:19 INFO  PumpManager:68 - host ec-ehub-consumer-xxx: 1: closing pump for reason Shutdown
2019-12-17 12:36:19 INFO  PartitionPump:382 - host ec-ehub-consumer-xxx: 1: pump shutdown for reason Shutdown
2019-12-17 12:36:19 INFO  PumpManager:68 - host ec-ehub-consumer-xxx: 2: closing pump for reason Shutdown
2019-12-17 12:36:19 INFO  PartitionPump:382 - host ec-ehub-consumer-xxx: 2: pump shutdown for reason Shutdown
2019-12-17 12:36:19 INFO  PumpManager:68 - host ec-ehub-consumer-xxx: 3: closing pump for reason Shutdown
2019-12-17 12:36:19 INFO  PartitionPump:382 - host ec-ehub-consumer-xxx: 3: pump shutdown for reason Shutdown
2019-12-17 12:36:19 INFO  PumpManager:68 - host ec-ehub-consumer-xxx: 4: closing pump for reason Shutdown
2019-12-17 12:36:19 INFO  PartitionPump:382 - host ec-ehub-consumer-xxx: 4: pump shutdown for reason Shutdown
2019-12-17 12:36:19 INFO  PumpManager:68 - host ec-ehub-consumer-xxx: 5: closing pump for reason Shutdown
2019-12-17 12:36:19 INFO  PartitionPump:382 - host ec-ehub-consumer-xxx: 5: pump shutdown for reason Shutdown
2019-12-17 12:36:19 INFO  PumpManager:68 - host ec-ehub-consumer-xxx: 6: closing pump for reason Shutdown
2019-12-17 12:36:19 INFO  PartitionPump:382 - host ec-ehub-consumer-xxx: 6: pump shutdown for reason Shutdown
2019-12-17 12:36:19 INFO  PumpManager:68 - host ec-ehub-consumer-xxx: 7: closing pump for reason Shutdown
2019-12-17 12:36:19 INFO  PartitionPump:382 - host ec-ehub-consumer-xxx: 7: pump shutdown for reason Shutdown
2019-12-17 12:36:19 INFO  PumpManager:68 - host ec-ehub-consumer-xxx: 8: closing pump for reason Shutdown
2019-12-17 12:36:19 INFO  PartitionPump:382 - host ec-ehub-consumer-xxx: 8: pump shutdown for reason Shutdown
2019-12-17 12:36:19 INFO  PumpManager:68 - host ec-ehub-consumer-xxx: 9: closing pump for reason Shutdown
2019-12-17 12:36:19 INFO  PartitionPump:382 - host ec-ehub-consumer-xxx: 9: pump shutdown for reason Shutdown
2019-12-17 12:36:19 INFO  PartitionPump:283 - host ec-ehub-consumer-xxx: 0: Setting receive handler to null
2019-12-17 12:36:19 INFO  PartitionPump:283 - host ec-ehub-consumer-xxx: 1: Setting receive handler to null
2019-12-17 12:36:19 INFO  PartitionPump:283 - host ec-ehub-consumer-xxx: 2: Setting receive handler to null
2019-12-17 12:36:19 INFO  PartitionPump:283 - host ec-ehub-consumer-xxx: 3: Setting receive handler to null
2019-12-17 12:36:19 INFO  PartitionPump:283 - host ec-ehub-consumer-xxx: 4: Setting receive handler to null
2019-12-17 12:36:19 INFO  PartitionPump:283 - host ec-ehub-consumer-xxx: 5: Setting receive handler to null
2019-12-17 12:36:19 INFO  PartitionPump:283 - host ec-ehub-consumer-xxx: 6: Setting receive handler to null
2019-12-17 12:36:19 INFO  PartitionPump:283 - host ec-ehub-consumer-xxx: 8: Setting receive handler to null
2019-12-17 12:36:19 INFO  PartitionPump:283 - host ec-ehub-consumer-xxx: 7: Setting receive handler to null
2019-12-17 12:36:19 INFO  PartitionPump:283 - host ec-ehub-consumer-xxx: 9: Setting receive handler to null
2019-12-17 12:36:24 INFO  ReceivePump:75 - Stopping receive pump for eventHub (my-ehub), consumerGroup (dev-consumer), partition (2) as per the request.
2019-12-17 12:36:24 INFO  PartitionPump:297 - host ec-ehub-consumer-xxx: 2: Closing EH receiver
2019-12-17 12:36:24 INFO  ClientEntity:76 - close: clientId[PR_5be84f_1576582523919_MF_a0d983_1576582523155]
2019-12-17 12:36:24 INFO  ClientEntity:76 - close: clientId[PR_5be84f_1576582523919_MF_a0d983_1576582523155-InternalReceiver]
2019-12-17 12:36:24 INFO  ActiveClientTokenManager:44 - clientEntity[PR_5be84f_1576582523919_MF_a0d983_1576582523155-InternalReceiver] - canceling ActiveClientLinkManager
2019-12-17 12:36:24 INFO  BaseLinkHandler:34 - onLinkLocalClose clientName[PR_5be84f_1576582523919_MF_a0d983_1576582523155-InternalReceiver], linkName[LN_c1a230_1576582524239_87ec_G6], errorCondition[null], errorDescription[null]
2019-12-17 12:36:24 INFO  BaseLinkHandler:90 - closeSession for clientName[PR_5be84f_1576582523919_MF_a0d983_1576582523155-InternalReceiver], linkName[LN_c1a230_1576582524239_87ec_G6], errorCondition[null], errorDescription[null]
2019-12-17 12:36:24 INFO  SessionHandler:121 - onSessionLocalClose connectionId[my-ehub/ConsumerGroups/dev-consumer/Partitions/2], entityName[MF_a0d983_1576582523155], condition[Error{condition=null, description='null', info=null}]
2019-12-17 12:36:24 INFO  BaseLinkHandler:47 - onLinkRemoteClose clientName[PR_5be84f_1576582523919_MF_a0d983_1576582523155-InternalReceiver], linkName[LN_c1a230_1576582524239_87ec_G6], errorCondition[null], errorDescription[null]
2019-12-17 12:36:24 INFO  BaseLinkHandler:69 - processOnClose clientName[PR_5be84f_1576582523919_MF_a0d983_1576582523155-InternalReceiver], linkName[LN_c1a230_1576582524239_87ec_G6], errorCondition[null], errorDescription[null]
2019-12-17 12:36:24 INFO  SessionHandler:129 - onSessionRemoteClose connectionId[my-ehub/ConsumerGroups/dev-consumer/Partitions/2], entityName[MF_a0d983_1576582523155], condition[Error{condition=null, description='null', info=null}]
2019-12-17 12:36:24 INFO  PartitionPump:313 - host ec-ehub-consumer-xxx: 2: Closing EH client
2019-12-17 12:36:24 INFO  ClientEntity:76 - close: clientId[EC_c59e64_1576582523155]
2019-12-17 12:36:24 INFO  ClientEntity:76 - close: clientId[MF_a0d983_1576582523155]
2019-12-17 12:36:24 INFO  ConnectionHandler:241 - onConnectionLocalClose hostname[my-mynamespace.servicebus.windows.net:5671], connectionId[MF_a0d983_1576582523155], errorCondition[null], errorDescription[null]
2019-12-17 12:36:24 INFO  BaseLinkHandler:34 - onLinkLocalClose clientName[cbs], linkName[cbs:sender], errorCondition[null], errorDescription[null]
2019-12-17 12:36:24 INFO  BaseLinkHandler:90 - closeSession for clientName[cbs], linkName[cbs:sender], errorCondition[null], errorDescription[null]
2019-12-17 12:36:24 INFO  BaseLinkHandler:34 - onLinkLocalClose clientName[cbs], linkName[cbs:receiver], errorCondition[null], errorDescription[null]
2019-12-17 12:36:24 INFO  SessionHandler:121 - onSessionLocalClose connectionId[cbs-session], entityName[MF_a0d983_1576582523155], condition[Error{condition=null, description='null', info=null}]
2019-12-17 12:36:24 INFO  BaseLinkHandler:47 - onLinkRemoteClose clientName[cbs], linkName[cbs:sender], errorCondition[null], errorDescription[null]
2019-12-17 12:36:24 INFO  BaseLinkHandler:69 - processOnClose clientName[cbs], linkName[cbs:sender], errorCondition[null], errorDescription[null]
2019-12-17 12:36:25 INFO  BaseLinkHandler:47 - onLinkRemoteClose clientName[cbs], linkName[cbs:receiver], errorCondition[null], errorDescription[null]
2019-12-17 12:36:25 INFO  BaseLinkHandler:69 - processOnClose clientName[cbs], linkName[cbs:receiver], errorCondition[null], errorDescription[null]
2019-12-17 12:36:25 INFO  RequestResponseOpener:95 - requestResponseChannel.onClose complete clientId[MF_a0d983_1576582523155], session[cbs-session], link[cbs], endpoint[$cbs]
2019-12-17 12:36:25 INFO  MessagingFactory:543 - messagingFactory[MF_a0d983_1576582523155], hostName[my-mynamespace.servicebus.windows.net], info[cbsChannel closed]
2019-12-17 12:36:25 INFO  ConnectionHandler:261 - onConnectionRemoteClose hostname[my-mynamespace.servicebus.windows.net:5671], connectionId[MF_a0d983_1576582523155], errorCondition[null], errorDescription[null]
2019-12-17 12:36:25 WARN  MessagingFactory:353 - onConnectionError messagingFactory[MF_a0d983_1576582523155], hostname[my-mynamespace.servicebus.windows.net], error[null]
2019-12-17 12:36:25 INFO  ConnectionHandler:201 - onTransportClosed hostname[my-mynamespace.servicebus.windows.net:5671], connectionId[MF_a0d983_1576582523155], error[n/a]
2019-12-17 12:36:25 INFO  CustomIOHandler:31 - onTransportClosed name[MF_a0d983_1576582523155], hostname[my-mynamespace.servicebus.windows.net:5671]
2019-12-17 12:36:25 INFO  ConnectionHandler:159 - onConnectionUnbound hostname[my-mynamespace.servicebus.windows.net:5671], connectionId[MF_a0d983_1576582523155], state[CLOSED], remoteState[CLOSED]
2019-12-17 12:36:25 INFO  SessionHandler:158 - onSessionFinal connectionId[MF_a0d983_1576582523155], entityName[cbs-session], condition[null], description[null]
2019-12-17 12:36:25 INFO  SessionHandler:158 - onSessionFinal connectionId[MF_a0d983_1576582523155], entityName[my-ehub/ConsumerGroups/dev-consumer/Partitions/2], condition[null], description[null]
2019-12-17 12:36:25 INFO  ConnectionHandler:274 - onConnectionFinal hostname[my-mynamespace.servicebus.windows.net:5671], connectionId[MF_a0d983_1576582523155], errorCondition[null], errorDescription[null]
2019-12-17 12:36:25 WARN  MessagingFactory:638 - messagingFactory[MF_a0d983_1576582523155], hostName[my-mynamespace.servicebus.windows.net], message[stopping the reactor because thread was interrupted or the reactor has no more events to process.]
2019-12-17 12:36:25 WARN  EventHubConsumer:70 - Processor closed; partitionId=2; reason=Shutdown
2019-12-17 12:36:27 ERROR EventHubConsumer:113 - Stop processing
2019-12-17 12:36:27 ERROR EventHubConsumer:85 - Start processing
2019-12-17 12:36:27 ERROR EventHubConsumer:113 - Stop processing
2019-12-17 12:36:27 ERROR EventHubConsumer:85 - Start processing
2019-12-17 12:36:37 ERROR EventHubConsumer:113 - Stop processing

I would expect that consuming of new messages should be blocked ASAP once unregister is invoked. Anyways, once consumers completed consuming all remaining messages the unregister method also completes and there are no other messages consumed which is what I expect. The only issue I see is that lots of messages has to be consumed between start invoking unregisterEventProcessor and completion of it.

JamesBirdsall commented 3 years ago

EventProcessorHost is now a legacy client in maintenance mode and we will not be making enhancements. You can find the new Java client azure-messaging-eventhubs at https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/eventhubs/azure-messaging-eventhubs