apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.53k stars 3.71k forks source link

Kafka supervisor sometimes kills succeeded tasks #6648

Open jihoonson opened 6 years ago

jihoonson commented 6 years ago

Recently I noticed that sometimes succeeded tasks marked as failed since the supervisor killed them. This was because

1) One of tasks of the same taskGroup completed successfully and the supervisor sent the stop message to other tasks in the group. 2) The task didn't respond because of channel disconnection, but it succeeded. 3) The supervisor called TaskQueue.shutdown() which simply marks the task as failed.

This doesn't make anything wrong, but can make kafka ingestion a bit slower because the supervisor spawns new tasks to reproduce the same data which were processed by the task marked as failed.

I think there are three issues to be fixed.

1) IndexTaskClient didn't retry on channel disconnection even though it's supposed to do. Here is the stack trace which occurred at https://github.com/apache/incubator-druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java#L384.

2018-11-18T13:01:09,440 WARN [IndexTaskClient-datasource-8] org.apache.druid.indexing.common.IndexTaskClient - Exception while sending request
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.jboss.netty.channel.ChannelException: Channel disconnected
        at org.apache.druid.indexing.common.IndexTaskClient.submitRequest(IndexTaskClient.java:306) ~[druid-indexing-service-0.13.0-iap-pre3.jar:0.13.0-iap-pre3]
        at org.apache.druid.indexing.common.IndexTaskClient.submitRequestWithEmptyContent(IndexTaskClient.java:182) ~[druid-indexing-service-0.13.0-iap-pre3.jar:0.13.0-iap-pre3]
        at org.apache.druid.indexing.kafka.KafkaIndexTaskClient.getStatus(KafkaIndexTaskClient.java:155) ~[?:?]
        at org.apache.druid.indexing.kafka.KafkaIndexTaskClient.lambda$getStatusAsync$4(KafkaIndexTaskClient.java:320) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_163]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_163]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_163]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_163]
Caused by: java.util.concurrent.ExecutionException: org.jboss.netty.channel.ChannelException: Channel disconnected
        at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) ~[guava-16.0.1.jar:?]
        at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[guava-16.0.1.jar:?]
        at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[guava-16.0.1.jar:?]
        at org.apache.druid.indexing.common.IndexTaskClient.submitRequest(IndexTaskClient.java:296) ~[druid-indexing-service-0.13.0-iap-pre3.jar:0.13.0-iap-pre3]
        ... 7 more
Caused by: org.jboss.netty.channel.ChannelException: Channel disconnected
        at org.apache.druid.java.util.http.client.NettyHttpClient$1.channelDisconnected(NettyHttpClient.java:351) ~[java-util-0.13.0-iap-pre3.jar:0.13.0-iap-pre3]
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:102) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.channelDisconnected(SimpleChannelUpstreamHandler.java:208) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:102) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.channelDisconnected(SimpleChannelUpstreamHandler.java:208) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:102) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.channelDisconnected(SimpleChannelUpstreamHandler.java:208) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:102) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:570) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:365) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:102) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:92) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:493) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:365) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.handler.ssl.SslHandler.channelDisconnected(SslHandler.java:580) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:102) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:396) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:360) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[netty-3.10.6.Final.jar:?]
        at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[netty-3.10.6.Final.jar:?]
        ... 3 more

2) The supervisor should check the last task status before killing tasks. In the supervisor, task status is cached in memory and updated periodically in updateTaskStatus(), while killing tasks can happen anytime. As a result, there can be some mismatches between the task status in the cache and that in the metadata store. 3) Failed status doesn't make sense for killed tasks. Their last status should be stopped or killed.

pantaoran commented 6 years ago

Failed status doesn't make sense for killed tasks. Their last status should be stopped or killed.

Yes, please do this. We trigger email alerts for failed tasks, and such a distinction would allow us to make that much less verbose.

github-actions[bot] commented 1 year ago

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.