apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.4k stars 3.68k forks source link

Druid indexer tasks gets stuck in PUBLISHING state #16783

Open hardikbajaj opened 1 month ago

hardikbajaj commented 1 month ago

Druid indexer tasks sometimes get stuck in PUBLISHING state, due to executors are not shut down properly.

Affected Version

Druid 25.0.0

Description

We are running Kafka supervisor ingestion task, with task replication as two.

Why Task A2 got stuck in PUBLISHING state?

I did some debugging which is the probable cause of task getting stuck.

2 Jul 2024 @ 03:05:56.224 UTC Shutting down immediately... indexer-pod main thread 2 Jul 2024 @ 03:05:56.258 UTC Dropped segment[S0]. indexer-pod [task_id]-appenderator-persist

- The task seem to get stuck, waiting for the persist executor to shut down. As we can see, last log line the main thread printed was `Shutting down immediately...` . After that we [shut down executors](https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java#L1081) and wait for the them to shut down, **with a timeout of 365 Days**

Preconditions.checkState( persistExecutor == null || persistExecutor.awaitTermination(365, TimeUnit.DAYS), "persistExecutor not terminated" );

- After `PendingCompletionTimeout` minutes, this task is forcefully killed and marked as FAILED. And is stopped again, which again interrupts the main ingestion thread. We get this ERROR log line

2 Jul 2024 @ 03:40:01.670 UTC Exception caught during execution indexer-pod threading-task-runner-executor-0 java.lang.RuntimeException: org.apache.druid.java.util.common.RE: Current thread is interrupted after [0] tries at org.apache.druid.storage.s3.S3TaskLogs.pushTaskFile(S3TaskLogs.java:156) at org.apache.druid.storage.s3.S3TaskLogs.pushTaskReports(S3TaskLogs.java:141) at org.apache.druid.indexing.overlord.ThreadingTaskRunner$1.call(ThreadingTaskRunner.java:223) at org.apache.druid.indexing.overlord.ThreadingTaskRunner$1.call(ThreadingTaskRunner.java:152) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: org.apache.druid.java.util.common.RE: Current thread is interrupted after [0] tries at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:148) at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:81) at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:163) at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:153) at org.apache.druid.storage.s3.S3Utils.retryS3Operation(S3Utils.java:101) at org.apache.druid.storage.s3.S3TaskLogs.pushTaskFile(S3TaskLogs.java:147) ... 7 more


After this exception, the main thread to go to [finally](https://github.com/apache/druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java#L910) { [thread] block and remove chat handlers and stopping other task tools. 
hardikbajaj commented 2 weeks ago

I took some thread dumps during when this happens. From stack traces, It looks like there is some kind of a deadlock that happens and intermediateTempExecutor is stuck WAITING and we wait for it to shut down for 365 days These are the main task's stack trace for the effected TASK_ID. These were the threads that contain TASK_ID in name

Task main thread stack trace `**[TASK_ID]-threading-task-runner-executor-5**` ``` "[TASK_ID]-threading-task-runner-executor-5" #1314 daemon prio=5 os_prio=0 cpu=18599767.26ms elapsed=113684.84s allocated=25572G defined_classes=8 tid=0x00007f754c035210 nid=0x763 waiting on condition [0x00007f6d0d919000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@17.0.10/Native Method) - parking to wait for <0x000000043ee128f0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(java.base@17.0.10/LockSupport.java:252) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(java.base@17.0.10/AbstractQueuedSynchronizer.java:1672) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(java.base@17.0.10/ThreadPoolExecutor.java:1464) at com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.awaitTermination(MoreExecutors.java:459) at org.apache.druid.segment.realtime.appenderator.StreamAppenderator.closeNow(StreamAppenderator.java:1015) at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:866) at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:266) at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.runTask(SeekableStreamIndexTask.java:151) at org.apache.druid.indexing.common.task.AbstractTask.run(AbstractTask.java:169) at org.apache.druid.indexing.overlord.ThreadingTaskRunner$1.call(ThreadingTaskRunner.java:210) at org.apache.druid.indexing.overlord.ThreadingTaskRunner$1.call(ThreadingTaskRunner.java:152) at java.util.concurrent.FutureTask.run(java.base@17.0.10/FutureTask.java:264) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.10/ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.10/ThreadPoolExecutor.java:635) at java.lang.Thread.run(java.base@17.0.10/Thread.java:840) Locked ownable synchronizers: - <0x00000002ef0002c8> (a java.util.concurrent.ThreadPoolExecutor$Worker) ```
Stack trace: [TASK_ID]-appenderator-abandon ``` "[task_id]-appenderator-abandon" #1629 daemon prio=5 os_prio=0 cpu=1.68ms elapsed=4704.25s allocated=3384B defined_classes=0 tid=0x00007f6f14039b60 nid=0xa96 waiting on condition [0x00007f6bdbbaa000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@17.0.10/Native Method) - parking to wait for <0x00000003dc000700> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.park(java.base@17.0.10/LockSupport.java:341) at java.util.concurrent.SynchronousQueue$TransferStack$SNode.block(java.base@17.0.10/SynchronousQueue.java:288) at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.10/ForkJoinPool.java:3465) at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.10/ForkJoinPool.java:3436) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.base@17.0.10/SynchronousQueue.java:397) at java.util.concurrent.SynchronousQueue.put(java.base@17.0.10/SynchronousQueue.java:839) at org.apache.druid.java.util.common.concurrent.Execs$1.rejectedExecution(Execs.java:148) at java.util.concurrent.ThreadPoolExecutor.reject(java.base@17.0.10/ThreadPoolExecutor.java:833) at java.util.concurrent.ThreadPoolExecutor.execute(java.base@17.0.10/ThreadPoolExecutor.java:1365) at com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:484) at com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156) at com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145) at com.google.common.util.concurrent.ListenableFutureTask.done(ListenableFutureTask.java:91) at java.util.concurrent.FutureTask.finishCompletion(java.base@17.0.10/FutureTask.java:381) at java.util.concurrent.FutureTask.setException(java.base@17.0.10/FutureTask.java:250) at java.util.concurrent.FutureTask.run(java.base@17.0.10/FutureTask.java:269) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.10/ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.10/ThreadPoolExecutor.java:635) at java.lang.Thread.run(java.base@17.0.10/Thread.java:840) Locked ownable synchronizers: - <0x00000003dc0007e8> (a java.util.concurrent.ThreadPoolExecutor$Worker) ```
Stack trace: [TASK_ID]-publish ``` "[task_id]-publish" #1626 daemon prio=5 os_prio=0 cpu=63.69ms elapsed=4881.78s allocated=42167K defined_classes=0 tid=0x00007f6f280057f0 nid=0xa93 waiting on condition [0x00007f6d181fe000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@17.0.10/Native Method) - parking to wait for <0x00000004427d0578> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(java.base@17.0.10/LockSupport.java:341) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(java.base@17.0.10/AbstractQueuedSynchronizer.java:506) at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.10/ForkJoinPool.java:3465) at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.10/ForkJoinPool.java:3436) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@17.0.10/AbstractQueuedSynchronizer.java:1623) at java.util.concurrent.LinkedBlockingQueue.take(java.base@17.0.10/LinkedBlockingQueue.java:435) at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@17.0.10/ThreadPoolExecutor.java:1062) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.10/ThreadPoolExecutor.java:1122) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.10/ThreadPoolExecutor.java:635) at java.lang.Thread.run(java.base@17.0.10/Thread.java:840) Locked ownable synchronizers: - None ```