apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.46k stars 3.7k forks source link

kafka indexing tasks may stall when checkpointing during coordinator restart #9913

Open xvrl opened 4 years ago

xvrl commented 4 years ago

Affected Version

0.17.0

Description

Upon coordinator/overlord restart, the overlord may receive a CheckPointDataSourceMetadataAction from an indexing task before the supervisor has initialized all the tasks groups. When this happens, the indexing task can stay paused, without ever resuming.

Checkpoint notices are handled asynchronously in the supervisor manager, so the overlord returns a successful http status, regardless of the state of the supervisor.

The indexing task calls requestPause() right before checkpointing, and since it got a successful status from the overlord, it transitions to paused state, waiting to resume.

During a normal checkpoint notice, the task will resume once the supervisor processes the notice and sends the task a setEndOffsets call. In this case however, the supervisor had not registered any of the task groups yet, so it will log an error in response to the checkpoint action.

During task discovery, the supervisor does not appear to do anything special for paused tasks, so once the supervisor finally gets up and running it will not resume the paused task. Logs show the supervisor discovered the task after starting, so discovery was not a problem here.

Select Overlord logs around the time of the event:

May 20th 2020, 17:54:21.545 qtp1190608890-156   INFO    Performing action for task[index_kafka_mydata_9cf69bbc8fc16f3_amnbifke]: CheckPointDataSourceMetadataAction{supervisorId='mydata', taskGroupId='7', checkpointMetadata=KafkaDataSourceMetadata{SeekableStreamStartSequenceNumbers=SeekableStreamStartSequenceNumbers{stream='mytopic', partitionSequenceNumberMap={55=47763944382, 7=105884059431}, exclusivePartitions=[]}}}

May 20th 2020, 17:54:21.545 qtp1190608890-156   INFO    Checkpointing [KafkaDataSourceMetadata{SeekableStreamStartSequenceNumbers=SeekableStreamStartSequenceNumbers{stream='mytopic', partitionSequenceNumberMap={55=47763944382, 7=105884059431}, exclusivePartitions=[]}}] for taskGroup [7]

May 20th 2020, 17:54:21.563 KafkaSupervisor-mydata  ERROR   SeekableStreamSupervisor[mydata] failed to handle notice: {class=org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor, exceptionType=class org.apache.druid.java.util.common.ISE, exceptionMessage=WTH?! cannot find taskGroup [7] among all activelyReadingTaskGroups [{}], noticeClass=CheckpointNotice}
org.apache.druid.java.util.common.ISE: WTH?! cannot find taskGroup [7] among all activelyReadingTaskGroups [{}]
    at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor$CheckpointNotice.isValidTaskGroup(SeekableStreamSupervisor.java:417)
    at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor$CheckpointNotice.handle(SeekableStreamSupervisor.java:371)
    at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.lambda$tryInit$3(SeekableStreamSupervisor.java:729)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
  [...]
May 20th 2020, 17:54:24.866 KafkaSupervisor-mydata  INFO    Registered listener [KafkaSupervisor-mydata]
  [...]
May 20th 2020, 17:54:25.199 KafkaSupervisor-mydata  INFO    New partition [7] discovered for stream [mytopic], added to task group [7]
  [...]
May 20th 2020, 17:54:25.207 KafkaSupervisor-mydata  INFO    New partition [55] discovered for stream [mytopic], added to task group [7]
May 20th 2020, 17:54:25.680 KafkaSupervisor-mydata-Worker-1 INFO    Creating a new task group for taskGroupId[7]
May 20th 2020, 17:54:25.829 KafkaSupervisor-mydata-Worker-8 INFO    Setting taskGroup sequences to [{0={55=47763944382, 7=105884059431}}] for group [7]
May 20th 2020, 17:54:28.480 KafkaSupervisor-mydata  INFO    [mydata] supervisor is running.
May 20th 2020, 17:54:28.483 KafkaSupervisor-mydata  INFO    {id='mydata', generationTime=2020-05-20T17:54:28.481Z, payload=KafkaSupervisorReportPayload{dataSource='mydata', topic='mytopic', partitions=96, replicas=1, durationSeconds=3600, active=[{id='index_kafka_mydata_b2711ee2015c849_gmafhija', startTime=2020-05-20T17:00:31.267Z, remainingSeconds=362}, {id='index_kafka_mydata_1061c6e2b0c7e8c_negmjmbh', startTime=2020-05-20T17:34:52.905Z, remainingSeconds=2424}, {id='index_kafka_mydata_9099e1097f2a863_knikbalp', startTime=2020-05-20T17:43:45.805Z, remainingSeconds=2957}, {id='index_kafka_mydata_c68cbbc35e6e1e3_hgndooca', startTime=2020-05-20T17:30:40.017Z, remainingSeconds=2171}, {id='index_kafka_mydata_1755d9c74b35a36_jaehbloj', startTime=2020-05-20T17:30:13.524Z, remainingSeconds=2145}, {id='index_kafka_mydata_49ef77f436f5d69_alphfnha', startTime=2020-05-20T17:01:22.959Z, remainingSeconds=414}, {id='index_kafka_mydata_aab51a2b22fd029_iepecbmj', startTime=2020-05-20T17:34:52.952Z, remainingSeconds=2424}, {id='index_kafka_mydata_9cf69bbc8fc16f3_amnbifke', startTime=2020-05-20T17:48:57.214Z, remainingSeconds=3268}, [...] ], publishing=[], suspended=false, healthy=true, state=RUNNING, detailedState=CREATING_TASKS, recentErrors=[org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager$SeekableStreamExceptionEvent@12e1707b]}}

When this happens, the problem can be addressed by manually killing the task, so the supervisor can spawn a new task to resume indexing.

acherla commented 1 year ago

This is still not resolved in 25.0.0 and can result in literally thousands of backlogged tasks that fail out.