Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
Job :
Spark sql job is written in scala, and to run on 1TB TPCDS BENCHMARK DATA which is in parquet,snappy format and hive tables created on top of it.
Kryoserialiser is being used and with "spark.kryoserializer.buffer.mb" value of 64mb.
50 executors are being spawned using spark.executor.instances=50 submit argument.
Issues Observed:
Spark sql job is terminating abruptly and the drivers,executors are being killed randomly.
driver and executors pods gets killed suddenly the job fails.
Few different stack traces are found across different runs,
Stack Trace 2:
"org.apache.spark.shuffle.FetchFailedException: Failed to connect to /192.178.1.105:38039^M
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)^M
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:418)"
File attached : StackTrace2.txt
Stack Trace 3:
"18/05/10 11:21:17 WARN KubernetesTaskSetManager: Lost task 3.0 in stage 48.0 (TID 16486, 192.178.1.35, executor 41): FetchFailed(null, shuffleId=29, mapId=-1, reduceId=3, message=^M
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 29^M
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)^M
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)"
File attached : StackTrace3.txt
Stack Trace 4:
"ERROR KubernetesTaskSchedulerImpl: Lost executor 11 on 192.178.1.123: Executor lost for unknown reasons."
This is repeating constantly until the executors are dead completely without any stack traces.
Also, we see 18/05/11 07:23:23 INFO DAGScheduler: failed: Set()
what does this mean ? anything is wrong or it says failed set is empty that means no failure ?
Observations or changes tried out :
Monitored memory and CPU utilisation across executors and none of them are hitting the limits.
As per few readings and suggestions
spark.network.timeout was increased to 1800 from 600, but did not help.
Also, driver and executor memory overhead was kept default in set 1 of the config and it was 0.1*15g=1.5gb.
Increased this value also, explicitly to 4gb and reduced driver and executor memory values to 11gb from 15gb as per set 2.
this did not yield any valuable results, same failures are being observed.
SparkSql is being used to run the queries,
sample code lines :
val qresult = spark.sql(q)
qresult.show()
No manual repartitioning is being done in the code.
Below is the scenario being tested,
Job : Spark sql job is written in scala, and to run on 1TB TPCDS BENCHMARK DATA which is in parquet,snappy format and hive tables created on top of it.
Cluster manager : Kubernetes 1.9.7 spark-2.2.0-k8s-0.5.0-bin-2.7.3
Spark sql configuration :
Set 1 : spark.executor.heartbeatInterval 20s spark.executor.cores 4 spark.driver.cores 4 spark.driver.memory 15g spark.executor.memory 15g spark.cores.max 220 spark.rpc.numRetries 5 spark.rpc.retry.wait 5 spark.network.timeout 1800 spark.sql.broadcastTimeout 1200 spark.sql.crossJoin.enabled true spark.sql.starJoinOptimization true spark.eventLog.enabled true spark.eventLog.dir hdfs://namenodeHA/tmp/spark-history spark.sql.codegen true spark.kubernetes.allocation.batch.size 30
Set 2 : spark.executor.heartbeatInterval 20s spark.executor.cores 4 spark.driver.cores 4 spark.driver.memory 11g spark.driver.memoryOverhead 4g spark.executor.memory 11g spark.executor.memoryOverhead 4g spark.cores.max 220 spark.rpc.numRetries 5 spark.rpc.retry.wait 5 spark.network.timeout 1800 spark.sql.broadcastTimeout 1200 spark.sql.crossJoin.enabled true spark.sql.starJoinOptimization true spark.eventLog.enabled true spark.eventLog.dir hdfs://namenodeHA/tmp/spark-history spark.sql.codegen true spark.kubernetes.allocation.batch.size 30
Kryoserialiser is being used and with "spark.kryoserializer.buffer.mb" value of 64mb. 50 executors are being spawned using spark.executor.instances=50 submit argument.
Issues Observed:
Spark sql job is terminating abruptly and the drivers,executors are being killed randomly. driver and executors pods gets killed suddenly the job fails.
Few different stack traces are found across different runs,
Stack Trace 1: "2018-05-10 06:31:28 ERROR ContextCleaner:91 - Error cleaning broadcast 136 org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)" File attached : StackTrace1.txt
Stack Trace 2: "org.apache.spark.shuffle.FetchFailedException: Failed to connect to /192.178.1.105:38039^M at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)^M at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:418)" File attached : StackTrace2.txt
Stack Trace 3: "18/05/10 11:21:17 WARN KubernetesTaskSetManager: Lost task 3.0 in stage 48.0 (TID 16486, 192.178.1.35, executor 41): FetchFailed(null, shuffleId=29, mapId=-1, reduceId=3, message=^M org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 29^M at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)^M at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)" File attached : StackTrace3.txt
Stack Trace 4: "ERROR KubernetesTaskSchedulerImpl: Lost executor 11 on 192.178.1.123: Executor lost for unknown reasons." This is repeating constantly until the executors are dead completely without any stack traces.
Also, we see 18/05/11 07:23:23 INFO DAGScheduler: failed: Set() what does this mean ? anything is wrong or it says failed set is empty that means no failure ?
Observations or changes tried out :
SparkSql is being used to run the queries, sample code lines : val qresult = spark.sql(q) qresult.show() No manual repartitioning is being done in the code.
StackTrace1.txt StackTrace2.txt StackTrace3.txt StackTrace4.txt