apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

Spark driver should exit and report a failure when all executors get killed/fail #134

Open varunkatta opened 7 years ago

varunkatta commented 7 years ago

On launching a Spark job and killing the launched executors pods one by one till no executors are left, Driver didn't seem to recognize that executors terminated. Driver seemed to be stuck when all executors were killed/terminated instead of reporting a fatal failure or making job progress by launching more executors/pods.

Also, the Driver logs don't seem to give the whole picture when driver encounters executor pod failures.

spark on k8s upon killing around 5 pods): └─[1] <> cat /tmp/driver-log | grep -i lost 2017-02-15 13:44:36 ERROR TaskSchedulerImpl:70 - Lost an executor 1 (already removed): Executor heartbeat timed out after 148604 ms

spark on yarn upon killing 4 executors: (on yarn, spark requests new executors when they terminate unexpectedly) 17/02/15 05:54:52 INFO DAGScheduler: Executor lost: 1 (epoch 0) 17/02/15 05:55:46 INFO DAGScheduler: Executor lost: 3 (epoch 0) 17/02/15 05:56:01 INFO DAGScheduler: Executor lost: 4 (epoch 0) 17/02/15 05:57:05 INFO DAGScheduler: Executor lost: 5 (epoch 0) 17/02/15 05:57:17 INFO DAGScheduler: Executor lost: 6 (epoch 0) 17/02/15 05:57:34 INFO DAGScheduler: Executor lost: 7 (epoch 0) 17/02/15 05:57:47 INFO DAGScheduler: Executor lost: 2 (epoch 0) 17/02/15 05:57:52 INFO DAGScheduler: Executor lost: 9 (epoch 0)

duyanghao commented 7 years ago

@foxish @erikerlandson @varunkatta Is there any progress for this issue?i am currently encountering the same problem. maybe we can work together for this problem.

varunkatta commented 7 years ago

This should be fixed through #244 and #459. May be wait till #459 gets merged to verify, if you still see this problem?

duyanghao commented 7 years ago

@varunkatta i will try with #459 merged,and will be feedback to you as soon(maybe with my self customization)

And Is the combination of #244 and dynamic allocation treated as the solution of #133?

duyanghao commented 7 years ago

@varunkatta Also, another question bothers me is that some spark applications still can't recover for lots of reasons, such as data loss and so on, even though new executors have been produced. How about this problem? Is there any thoughts to solve this problem?

maybe it is a universal problem in spark(not only in spark on k8s),but we can still have a discussion here.

duyanghao commented 7 years ago

This should be fixed through #244 and #459. May be wait till #459 gets merged to verify, if you still see this problem?

@varunkatta Merged already.

ash211 commented 7 years ago

@duyanghao both of the PRs you linked have been merged (244 and 459). Is there another code change (open pull request) you're talking about?

duyanghao commented 7 years ago

@ash211 thanks, that all PRs. But there is still one related issue(#133), and Is the combination of #244 and dynamic allocation treated as the solution of #133?

Addition: @varunkatta @ash211 We can report related problems here in practical use since it has not been closed yet.

duyanghao commented 7 years ago

@ash211 @varunkatta Spark task still aborts with newly created executor as below: Step1: Run the spark task normally

kubectl get pods -n=xxx|grep spark-static-test
spark-static-test   1/1       Running             0          7m        192.168.25.118
spark-static-test-exec-1   1/1       Running             0          7m        192.168.25.120    
spark-static-test-exec-2   1/1       Running             0          7m        192.168.25.69 

Step2: delete executor 1

kubectl delete pods/spark-static-test-exec-1 -n=xxx
pod "spark-static-test-exec-1" deleted

driver aborts with following errors:

2017-09-20 15:20:13 DEBUG KubernetesClusterSchedulerBackend: Removing executor 1 with loss reason Pod spark-static-test-exec-1 deleted
or lost.
2017-09-20 15:20:13 DEBUG KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asked to remove executor 1 with reason Pod spark-
static-test-exec-1 deleted or lost.
2017-09-20 15:20:13 ERROR KubernetesTaskSchedulerImpl: Lost executor 1 on 192.168.25.120: Pod spark-static-test-exec-1 deleted or lost.
2017-09-20 15:20:13 DEBUG KubernetesTaskSetManager: Valid locality levels for TaskSet 2.0: ANY
2017-09-20 15:20:13 INFO KubernetesTaskSetManager: Task 71 failed because while it was being computed, its executor exited for a reason
 unrelated to the task. Not counting this failure towards the maximum number of failures for the task.
2017-09-20 15:20:13 DEBUG KubernetesTaskSetManager: Valid locality levels for TaskSet 3.0: ANY
2017-09-20 15:20:13 INFO KubernetesClusterSchedulerBackend: Requesting a new executor, total executors is now 2

...

2017-09-20 15:22:03 INFO KubernetesTaskSetManager: Starting task 0.0 in stage 191.0 (TID 1684, 192.168.25.70, executor 3, partition 0,
ANY, 6340 bytes)

...

2017-09-20 15:22:49 ERROR KubernetesTaskSetManager: Task 0 in stage 196.0 failed 4 times; aborting job

...

2017-09-20 15:22:49 ERROR JobScheduler: Error running job streaming job 1505892120000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 196.0 failed 4 times, most recent failure: Lost task
 0.3 in stage 196.0 (TID 2096, 192.168.25.70, executor 3): java.lang.Exception: Could not compute split, block input-1-1505891642000 no
t found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:50)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
ala:1435)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
        at com.tencent.ieg.ds.rt.FrequnceAudit$$anonfun$Run$1.apply(FrequnceAudit.scala:119)
        at com.tencent.ieg.ds.rt.FrequnceAudit$$anonfun$Run$1.apply(FrequnceAudit.scala:118)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not compute split, block input-1-1505891642000 not found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:50)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        ... 3 more
...
2017-09-20 15:22:49 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
2017-09-20 15:22:49 ERROR ReceiverTracker: Deregistered receiver for stream 1: Stopped by driver
...

While executor 3 reports following errors:

...
2017-09-20 15:22:24 ERROR RetryingBlockFetcher: Exception while beginning fetch of 2 outstanding blocks
java.io.IOException: Failed to connect to /192.168.25.120:33544
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
        at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:97)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
        at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:106)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:350)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
        at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:979)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:954)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:894)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:954)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:100)
        at org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:99)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
        at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:99)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:97)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: Host is unreachable: /192.168.25.120:33544
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:640)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        ... 1 more

...

2017-09-20 15:22:49 ERROR RetryingBlockFetcher: Exception while beginning fetch of 2 outstanding blocks (after 3 retries)
java.io.IOException: Failed to connect to /192.168.25.120:33544
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
        at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:97)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: Host is unreachable: /192.168.25.120:33544
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:640)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
        ... 2 more
2017-09-20 15:22:49 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from 192.168.25.120:33544
java.io.IOException: Failed to connect to /192.168.25.120:33544
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
        at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:97)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: Host is unreachable: /192.168.25.120:33544
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:640)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
        ... 2 more
2017-09-20 15:22:49 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from 192.168.25.120:33544
java.io.IOException: Failed to connect to /192.168.25.120:33544
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
        at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:97)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: Host is unreachable: /192.168.25.120:33544
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:640)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
        ... 2 more
...

2017-09-20 15:22:49 ERROR Executor: Exception in task 0.0 in stage 196.0 (TID 2090)
java.lang.Exception: Could not compute split, block input-1-1505891642000 not found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:50)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
...

2017-09-20 15:22:49 ERROR Executor: Exception in task 0.3 in stage 196.0 (TID 2096)
java.lang.Exception: Could not compute split, block input-1-1505891642000 not found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:50)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
... 

2017-09-20 15:22:49 ERROR Executor: Exception in task 1.0 in stage 196.0 (TID 2091)
java.lang.Exception: Could not compute split, block input-1-1505891642400 not found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:50)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

...

2017-09-20 15:22:49 ERROR RetryingBlockFetcher: Exception while beginning fetch of 2 outstanding blocks
java.io.IOException: Failed to connect to /192.168.25.120:33544
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
        at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:97)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
        at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:106)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:350)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
        at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:979)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:954)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:894)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:954)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:100)
        at org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:99)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
        at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:99)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:97)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
        at io.netty.channel.nio.AbstractNioChannel.doClose()(Unknown Source)
...

Attention: 192.168.25.120 in the log is the IP of executor 1.

kubectl get pods -n=xxx -a|grep spark-static-test        
spark-static-test                                0/1       Error              0          1h           192.168.25.118
spark-static-test-exec-2                         0/1       Completed          0          1h             192.168.25.69
spark-static-test-exec-3                         0/1       Completed          0          1h             192.168.25.70

We can see that Job still aborts with the executor 3 created when executor 1 is deleted.

I guess this error traces back to following problem:

Also, another question bothers me is that some spark applications still can't recover for lots of reasons, such as data loss and so on, even though new executors have been produced. How about this problem? Is there any thoughts to solve this problem?

duyanghao commented 7 years ago

@mccheah Any suggestions for this problem?

duyanghao commented 7 years ago

@varunkatta And for spark on k8s, the docker exits with 137 for out of memory instead of -103 or -104 as below:

private object KubernetesClusterSchedulerBackend {
  private val DEFAULT_STATIC_PORT = 10000
  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
  private val VMEM_EXCEEDED_EXIT_CODE = -103
  private val PMEM_EXCEEDED_EXIT_CODE = -104
  private val UNKNOWN_EXIT_CODE = -111
  // Number of times we are allowed check for the loss reason for an executor before we give up
  // and assume the executor failed for good, and attribute it to a framework fault.
  val MAX_EXECUTOR_LOST_REASON_CHECKS = 10

  def memLimitExceededLogMessage(diagnostics: String): String = {
    s"Pod/Container killed for exceeding memory limits. $diagnostics" +
      " Consider boosting spark executor memory overhead."
  }
}
duyanghao commented 7 years ago

@varunkatta Is there any reasons for DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1 and UNKNOWN_EXIT_CODE=-111?

varunkatta commented 7 years ago

@duyanghao These exit codes are same as what are being used for Yarn today. If errors are caused by application code, then it doesn't make sense to recover the executors since it is probably user fault, and not related to the framework. If we can reliably determine, if it is framework fault, then it makes sense to recover failed executors. If the exit codes have changed and are different for K8s, we should change them.