apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

Append HADOOP_CONF_DIR to SPARK_CLASS in driver/executor Dockerfiles #578

Closed chenchun closed 6 years ago

chenchun commented 6 years ago

What changes were proposed in this pull request?

As the title says

How was this patch tested?

manual tests

I'm running a pagerank job which loads datasets from a HA HDFS with the following command.

export HADOOP_CONF_DIR=`pwd`/hadoopconf

bin/spark-submit \
  --deploy-mode cluster \
  --class org.apache.spark.examples.SparkPageRank \
  --master k8s://http://10.0.0.1:8081 \
  --kubernetes-namespace default \
  --conf spark.executor.instances=2 \
  --conf spark.app.name=ramipagerank \
  --conf spark.kubernetes.driver.docker.image=spark-driver:20171218_17 \
  --conf spark.kubernetes.executor.docker.image=spark-executor:20171218_17 \
  --conf spark.kubernetes.initcontainer.docker.image=spark-init-container:20171218_17 \
  --conf spark.kubernetes.resourceStagingServer.uri=http://10.0.0.2:31000 \
  --conf spark.eventLog.enabled=true \
  examples/jars/spark-examples_2.11-2.2.0-k8s-0.5.0.jar \
  hdfs://hdfsCluster/spark/data/mllib/pagerank_data.txt 10

The driver pod failed with exceptions in log.

2017-12-19 10:52:37 WARN  FileStreamSink:66 - Error while looking for metadata directory.
Exception in thread "main" java.lang.IllegalArgumentException: java.net.UnknownHostException: hdfsCluster
        at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
        at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:668)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:604)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:350)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:344)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
        at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:623)
        at org.apache.spark.sql.DataFrameReader.textFile(DataFrameReader.scala:657)
        at org.apache.spark.sql.DataFrameReader.textFile(DataFrameReader.scala:632)
        at org.apache.spark.examples.SparkPageRank$.main(SparkPageRank.scala:64)
        at org.apache.spark.examples.SparkPageRank.main(SparkPageRank.scala)
Caused by: java.net.UnknownHostException: hdfsCluster
        ... 26 more

It seems the exception is due to the fact that hadoop_conf_dir is missing from SPARK_CLASSPATH. After adding HADOOP_CONF_DIR to SPARK_CLASSPATH in driver/executor image, my job can run successfully.

Not sure why this Dockerfile change is missing compare https://github.com/apache-spark-on-k8s/spark/pull/540 to https://github.com/apache-spark-on-k8s/spark/pull/414

ifilonenko commented 6 years ago

Thank you for this, as I am developing the integration tests right now to catch these errors.

ifilonenko commented 6 years ago

rerun unit tests please

ifilonenko commented 6 years ago

This change would need to be added to all the dockerfiles as this logic is shared amongst the -py and -r versions

liyinan926 commented 6 years ago

LGTM. Thanks for fixing!

echarles commented 6 years ago

I have been successfully using 0612195f9027a4641b43c9d444fe8336cfeaa8c0 for a while to connect to S3 passing the credentials with spark.hadoop.fs.s3a....

I have update to latest commit (so this commit and the big one related to Kerberos HDFS), and the Authentication to S3 does not work anymore.

I don't know where this issue comes from but I have rolled-back and now everything's fine again. Could it be that the executor reads the conf from the given HADOOP_CONF_DIR and does not care about the given additional properties?

ifilonenko commented 6 years ago

That could be the case. Maybe modifying the xml to include that? As it is probably overwriting with the HADOOP_CONF_DIR

echarles commented 6 years ago

@ifilonenko after hours of debugging suspects in moving zones, I have finally found the issue which is not the one I mentioned (so no worries, all good with this commit).

For the record, I was instantiating a HadoopConf object before accessing for the first time the spark read methods. This apparently lead to a hadoop conf available that spark was picking. That hadoop conf however was not feeded with the needed s3 properties, resulting in access denied (I needed that hadoop conf to perform Honky-Tonky work on the bare HDFS files).

The strange thing is that with spark-yarn, this sequence works without problem, but with spark-k8s it seems to give issues. I don't think we have to manage this, but maybe it should be documented...

Btw, looking at the last hdfs-kerberos commit, I see you introduced an additional prop spark.kubernetes.hadoop.executor.hadoopConfigMapName:

  1. This prop is also used to apply the config map on the driver - shouldn't the name be spark.kubernetes.hadoop.hadoopConfigMapName (omit the executor part).
  2. While we are discussing about naming, spark.kubernetes.initcontainer.executor.configmapname is in lower case, while spark.kubernetes.hadoop.executor.hadoopConfigMapName in upper case.
  3. Most important, I rely for now on client mode which had an hdfs configuration mounted via a config map (hdfs-k8s-hdfs-k8s-hdfs-dn in my case) which works well. As I also want to support hdfs access in cluster mode, I have tested passing spark.kubernetes.initcontainer.executor.configmapname=hdfs-k8s-hdfs-k8s-hdfs-dn and I see a submission with SPARK_JAVA_OPT_14: -Dspark.kubernetes.hadoop.executor.hadoopConfigMapName=zeppelin-k8s-spark-1515498884837-hadoop-config which is not what I was excepting (I was expecting the driver to benefit from my existing hadoop conf). My goal is to reuse an existing hadoop confi via config map in the cluster driver and executors. Am I missing something?
chenchun commented 6 years ago

My goal is to reuse an existing hadoop confi via config map in the cluster driver and executors.

@echarles I raised a same issue https://github.com/apache-spark-on-k8s/spark/issues/580 I think spark.kubernetes.hadoop.executor.hadoopConfigMapName is currently an internal config, not a user config.

chenchun commented 6 years ago

cc @ChenLingPeng . Can you submit your patch which makes it possible for driver/executor to reuse an existing hadoopconf configmap

ChenLingPeng commented 6 years ago

Will do this ASAP

echarles commented 6 years ago

Thx @ChenLingPeng (cc/ @chenchun). From the code and behavior, I understand that the DriverConfigurationStepsOrchestrator will create a new hadoop config map each time a driver is created.

private val kubernetesResourceNamePrefix = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
private val hadoopConfigMapName = s"$kubernetesResourceNamePrefix-hadoop-config"

Actually, I am looking a way to provide my own hadoopConfigMapName via a spark property. Does you patch implement this?

echarles commented 6 years ago

@ChenLingPeng @chenchun My bad (not sure what I was doing...). Now I can mount an existing hadoop configmap in the driver. A new confimap is created based on the given one with that code in the DriverConfigurationStepsOrchestrator

      hadoopConfDir.map { conf =>
        val hadoopStepsOrchestrator =
          new HadoopStepsOrchestrator(
            kubernetesResourceNamePrefix,
            namespace,
            hadoopConfigMapName,
            submissionSparkConf,
            conf)

Once the driver is created with the correct hadoop configmap, the executors benefit with the classical spark.sparkContext.hadoopConfiguration.