apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.26k stars 1.23k forks source link

LLC stopped realtime ingestion due to deadlock caused by out of order Helix state transition events (Pinot 0.4+) #5960

Open chenboat opened 3 years ago

chenboat commented 3 years ago

We found during our testing that Pinot LLC realtime consumer stopped ingestion due to out of order delivery of Helix state transition events for consecutive segments of a partition. The detailed log is attached.

Note that OFFLINE->CONSUMING for _terra_lever_events08820200901T2252Z_ was delivered slightly before CONSUMING->ONLINE for _terra_lever_events08720200901T1652Z_.

For _terra_lever_events08820200901T2252Z_ to get to the consuming state, HelixTaskExecutor needs to acquire the semaphore for the partition. However the semaphore is hold by the _terra_lever_events08720200901T1652Z_ which can not be released until HelixTaskExecutor executes its CONSUMING->ONLINE transition (which is next in the task queue). As a result, a deadlock occurred and the LLC stop ingestions. In fact the server stopped ingestion for all tables because HelixTaskExecutor has java.util.concurrent.ThreadPoolExecutor default pool size = 1 and active threads = 1.

2020-09-01 22:52:03.795 [terra_lever_events__0__87__20200901T1652Z] INFO  org.apache.pinot.common.utils.FileUploadDownloadClient  - Sending request: http://streampinot-canary01-dca1:7936/segmentConsumed?name=terra_lever_events__0__87__20200901T1652Z&offset=150641945&instance=Server_streampinot-canary04-dca1_7090&reason=timeLimit&memoryUsedBytes=63737644&rowCount=265476 to controller: localhost, version: Unknown
2020-09-01 22:52:03.795 [terra_lever_events__0__87__20200901T1652Z] INFO  o.a.p.server.realtime.ServerSegmentCompletionProtocolHandler  - Controller response {"buildTimeSec":-1,"isSplitCommitType":false,"status":"HOLD","offset":150641945} for http://streampinot-canary01-dca1:7936/segmentConsumed?name=terra_lever_events__0__87__20200901T1652Z&offset=150641945&instance=Server_streampinot-canary04-dca1_7090&reason=timeLimit&memoryUsedBytes=63737644&rowCount=265476
2020-09-01 22:52:06.801 [terra_lever_events__0__87__20200901T1652Z] INFO  org.apache.pinot.common.utils.FileUploadDownloadClient  - Sending request: http://streampinot-canary01-dca1:7936/segmentConsumed?name=terra_lever_events__0__87__20200901T1652Z&offset=150641945&instance=Server_streampinot-canary04-dca1_7090&reason=timeLimit&memoryUsedBytes=63737644&rowCount=265476 to controller: localhost, version: Unknown
2020-09-01 22:52:06.801 [terra_lever_events__0__87__20200901T1652Z] INFO  o.a.p.server.realtime.ServerSegmentCompletionProtocolHandler  - Controller response {"buildTimeSec":-1,"isSplitCommitType":false,"status":"DISCARD","offset":-1} for http://streampinot-canary01-dca1:7936/segmentConsumed?name=terra_lever_events__0__87__20200901T1652Z&offset=150641945&instance=Server_streampinot-canary04-dca1_7090&reason=timeLimit&memoryUsedBytes=63737644&rowCount=265476
2020-09-01 22:52:06.856 [ZkClient-EventThread-66-streampinot-stgzk01-dca1,streampinot-stgzk02-dca1,streampinot-stgzk03-dca1/pinot/pinot-canary] INFO  org.apache.helix.messaging.handling.HelixTaskExecutor  - Scheduling message a195fc82-b406-4de4-9008-7d35a901aaf7: terra_lever_events_REALTIME:terra_lever_events__0__88__20200901T2252Z, OFFLINE->CONSUMING
2020-09-01 22:52:06.856 [ZkClient-EventThread-66-streampinot-stgzk01-dca1,streampinot-stgzk02-dca1,streampinot-stgzk03-dca1/pinot/pinot-canary] INFO  org.apache.helix.messaging.handling.HelixTaskExecutor  - Scheduling message 5c856925-0bb0-4422-8f3b-ab8fffe26e21: terra_lever_events_REALTIME:terra_lever_events__0__87__20200901T1652Z, CONSUMING->ONLINE
2020-09-01 22:52:06.857 [HelixTaskExecutor-message_handle_thread] INFO  o.a.helix.messaging.handling.HelixStateTransitionHandler  - handling message: a195fc82-b406-4de4-9008-7d35a901aaf7 transit terra_lever_events_REALTIME.terra_lever_events__0__88__20200901T2252Z|[] from:OFFLINE to:CONSUMING, relayedFrom: null
2020-09-01 22:52:06.865 [HelixTaskExecutor-message_handle_thread] INFO  o.a.helix.messaging.handling.HelixStateTransitionHandler  - Instance Server_streampinot-canary04-dca1_7090, partition terra_lever_events__0__88__20200901T2252Z received state transition from OFFLINE to CONSUMING on session 36b963066e477de, message id: a195fc82-b406-4de4-9008-7d35a901aaf7
2020-09-01 22:52:06.865 [HelixTaskExecutor-message_handle_thread] INFO  S.a.p.s.s.h.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel  - SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : ZnRecord=a195fc82-b406-4de4-9008-7d35a901aaf7, {CREATE_TIMESTAMP=1599000726835, ClusterEventName=IdealStateChange, EXECUTE_START_TIMESTAMP=1599000726857, EXE_SESSION_ID=36b963066e477de, FROM_STATE=OFFLINE, MSG_ID=a195fc82-b406-4de4-9008-7d35a901aaf7, MSG_STATE=read, MSG_TYPE=STATE_TRANSITION, PARTITION_NAME=terra_lever_events__0__88__20200901T2252Z, READ_TIMESTAMP=1599000726851, RESOURCE_NAME=terra_lever_events_REALTIME, RESOURCE_TAG=terra_lever_events_REALTIME, SRC_NAME=streampinot-canary01-dca1_7936, SRC_SESSION_ID=26b96306664764b, STATE_MODEL_DEF=SegmentOnlineOfflineStateModel, STATE_MODEL_FACTORY_NAME=DEFAULT, TGT_NAME=Server_streampinot-canary04-dca1_7090, TGT_SESSION_ID=36b963066e477de, TO_STATE=CONSUMING}{}{}, Stat=Stat {_version=0, _creationTime=1599000726843, _modifiedTime=1599000726843, _ephemeralOwner=0}
2020-09-01 22:52:06.865 [HelixTaskExecutor-message_handle_thread] INFO  S.a.p.s.s.h.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel  - SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : ZnRecord=a195fc82-b406-4de4-9008-7d35a901aaf7, {CREATE_TIMESTAMP=1599000726835, ClusterEventName=IdealStateChange, EXECUTE_START_TIMESTAMP=1599000726857, EXE_SESSION_ID=36b963066e477de, FROM_STATE=OFFLINE, MSG_ID=a195fc82-b406-4de4-9008-7d35a901aaf7, MSG_STATE=read, MSG_TYPE=STATE_TRANSITION, PARTITION_NAME=terra_lever_events__0__88__20200901T2252Z, READ_TIMESTAMP=1599000726851, RESOURCE_NAME=terra_lever_events_REALTIME, RESOURCE_TAG=terra_lever_events_REALTIME, SRC_NAME=streampinot-canary01-dca1_7936, SRC_SESSION_ID=26b96306664764b, STATE_MODEL_DEF=SegmentOnlineOfflineStateModel, STATE_MODEL_FACTORY_NAME=DEFAULT, TGT_NAME=Server_streampinot-canary04-dca1_7090, TGT_SESSION_ID=36b963066e477de, TO_STATE=CONSUMING}{}{}, Stat=Stat {_version=0, _creationTime=1599000726843, _modifiedTime=1599000726843, _ephemeralOwner=0}
2020-09-01 22:52:06.865 [HelixTaskExecutor-message_handle_thread] INFO  o.apache.pinot.server.starter.helix.HelixInstanceDataManager  - Adding segment: terra_lever_events__0__88__20200901T2252Z to table: terra_lever_events_REALTIME
2020-09-01 22:52:06.878 [HelixTaskExecutor-message_handle_thread] INFO  o.a.p.c.d.m.r.LLRealtimeSegmentDataManager_terra_lever_events__0__88__20200901T2252Z  - RealtimeDataResourceZKMetadata contains no information about sorted column for segment terra_lever_events__0__88__20200901

@jackjlli @mcvsubbu

chenboat commented 3 years ago

@jackjlli and @mcvsubbu I wonder if you have encountered the above deadlock issue in your Prod clusters.

jackjlli commented 3 years ago

The default pool size for HelixTaskExecutor shouldn't be just 1. I checked the Helix code and the default value is 40: https://github.com/apache/helix/blob/78f6c8beaf05302703809dd7de231605ff4d74a3/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java#L115 Please adjust the STATE_TRANSITION.maxThreads in CONFIGS/CLUSTER/pinot ZNode and restart all the pinot components sequentially to correctly pick up your new setting (controller first, then broker, server, etc).

jackjlli commented 3 years ago

In Pinot 1.0, we used to have a logic that the value of STATE_TRANSITION.maxThreads is set to 1 if this config is missing in the cluster. This will cause issue as this default value 1 will be used as the thread pool size, which will make Pinot cluster extremely slow. If you try to upgrade Pinot cluster from 1.0 to a higher version, please keep in mind that STATE_TRANSITION.maxThreads be set to a higher number instead of 1 in CONFIGS/CLUSTER/pinot Znode before rolling out to higher Pinot version.

Jackie-Jiang commented 3 years ago

Alternatively, just remove STATE_TRANSITION.maxThreads, and the cluster will use the Helix default value of 40

chenboat commented 3 years ago

Thanks. The issue was resolved by now. But it will be great to explicitly exposed this config to Pinot installation. It is also worthwhile to fix the deadlock when maxThreads == 1.

mcvsubbu commented 3 years ago

It is useful add some notes here on this.

The reason the semaphore was put in place was because Kafka consumers had an issue with dynamically creating new kafka metrics when a new consumer was created on the same partition while keeping the old consumer open. This led to metrics proliferation.

From kafka (or any other stream) point of view, if they are trying to emit per-partition consumer metrics, it probably makes sense that they add different metrics for each consumer so as to get the health of the system correctly. So, we need to handle it somehow.

One way could be that when we implement an internal "queue" of state transitions, holding the consuming transition until the ONLINE transition is received. Of course, we will have to respond back OK for the consuming state transition, but just not allow our software to handle it. A little tricky to implement, and sounds somewhat hacky to me. Also has a downside that we will be responding OK without actually handling the state transition. Helix does not provide a way to mark a transition in ERROR from the participant. We could ask for that feature (useful for other things as well), and then we can mark the consumer in ERROR state if it later fails to transition correctly.

A simple work-around in the short/medium term may be we stop realtime table creation in the controller if the max-threads is not set to at least 2 (we don't expect to have more than one outstanding state transition for the same partition).

I am open to hearing other ideas on how to fix the deadlock.