kubeflow / spark-operator

Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes.
Apache License 2.0
2.8k stars 1.38k forks source link

Application Dependency not working (spec.deps.packages is ignored) and (sparkConf:spark.jars.packages not propagated to executor) #1132

Open ketanhdoshi opened 3 years ago

ketanhdoshi commented 3 years ago

Application dependencies added using the deps.packages seem to be ignored. It should work according to the docs but other users have mentioned in this issue #352 that they are facing the same problem.

Excerpt from yaml file:

spec:
  deps:
    packages:
      - org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
      - org.apache.spark:spark-avro_2.12:3.0.0

No mention of loading these packages in the Driver logs. It then fails with this error:

+ CMD=("$SPARK_HOME/bin/spark-submit" --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client "$@")
+ exec /usr/bin/tini -s -- /opt/spark/bin/spark-submit --conf spark.driver.bindAddress=10.1.4.90 --deploy-mode client --properties-file /opt/spark/conf/spark.properties --class com.ketan.KafkaAvro local:///tmp/dev/kafkaavro/target/scala-2.12/kafka-avro_2.12-1.0.jar kafka.kd-confluent.svc.cluster.local:9071 /tmp/data
21/01/15 04:22:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/01/15 04:22:29 INFO SparkContext: Running Spark version 3.0.0
...
21/01/15 04:22:44 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/opt/spark/work-dir/spark-warehouse').
21/01/15 04:22:44 INFO SharedState: Warehouse path is 'file:/opt/spark/work-dir/spark-warehouse'.
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:194)
at com.ketan.StreamsProcessor.process(KafkaAvro.scala:57)
at com.ketan.KafkaAvro$.main(KafkaAvro.scala:28)
at com.ketan.KafkaAvro.main(KafkaAvro.scala)

I tried a workaround as mentioned in the same issue 352, which has worked for some people recently.

spec:
  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0"

But that failed with this error (from the Driver logs)

+ CMD=("$SPARK_HOME/bin/spark-submit" --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client "$@")
+ exec /usr/bin/tini -s -- /opt/spark/bin/spark-submit --conf spark.driver.bindAddress=10.1.4.89 --deploy-mode client --properties-file /opt/spark/conf/spark.properties --class com.ketan.KafkaAvro local:///tmp/dev/kafkaavro/target/scala-2.12/kafka-avro_2.12-1.0.jar kafka.kd-confluent.svc.cluster.local:9071 /tmp/data
Ivy Default Cache set to: /opt/spark/.ivy2/cache
The jars for the packages stored in: /opt/spark/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7c17c84c-a8ab-48c8-ade4-26e4b9bb930b;1.0
confs: [default]
Exception in thread "main" java.io.FileNotFoundException: /opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-7c17c84c-a8ab-48c8-ade4-26e4b9bb930b-1.0.xml (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at org.apache.ivy.plugins.parser.xml.XmlModuleDescriptorWriter.write(XmlModuleDescriptorWriter.java:70)
at org.apache.ivy.plugins.parser.xml.XmlModuleDescriptorWriter.write(XmlModuleDescriptorWriter.java:62)
at org.apache.ivy.core.module.descriptor.DefaultModuleDescriptor.toIvyFile(DefaultModuleDescriptor.java:563)
at org.apache.ivy.core.cache.DefaultResolutionCacheManager.saveResolvedModuleDescriptor(DefaultResolutionCacheManager.java:176)
at org.apache.ivy.core.resolve.ResolveEngine.resolve(ResolveEngine.java:245)
at org.apache.ivy.Ivy.resolve(Ivy.java:523)
at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1387)
at org.apache.spark.deploy.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:54)
at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:308)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:871)
ketanhdoshi commented 3 years ago

I made the change to include spark.jars.ivy, as explained here. A volume for the ivy directory had to be added and mounted in both the driver and executor specs.

spec:
  deps:
    packages:
      - org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
      - org.apache.spark:spark-avro_2.12:3.0.0
  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-avro_2.12:3.0.0"
    spark.jars.ivy: "/tmp/ivy"

Now the driver is able to find the dependencies and go further. However these options are not being passed to the executor and it does not see any of the dependencies. It fails with java.lang.ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaBatchInputPartition as below:

+ CMD=(${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP)
+ exec /usr/bin/tini -s -- /usr/local/openjdk-8/bin/java -Dspark.driver.blockManager.port=7079 -Dspark.driver.port=7078 -Xms512m -Xmx512m -cp ':/opt/spark/jars/*:' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@kd-kafkaavro-3a99e07715d477b2-driver-svc.spark-app.svc:7078 --executor-id 1 --cores 1 --app-id spark-0a6417c1b3284e97a27f617c0f2dd034 --hostname 10.1.4.171
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/01/18 14:10:13 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 14@kafka-avro-9aba567715d4cb68-exec-1
21/01/18 14:10:13 INFO SignalUtils: Registered signal handler for TERM
21/01/18 14:10:13 INFO SignalUtils: Registered signal handler for HUP
21/01/18 14:10:13 INFO SignalUtils: Registered signal handler for INT
21/01/18 14:10:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/01/18 14:10:14 INFO SecurityManager: Changing view acls to: 185,root
21/01/18 14:10:14 INFO SecurityManager: Changing modify acls to: 185,root
21/01/18 14:10:14 INFO SecurityManager: Changing view acls groups to:
21/01/18 14:10:14 INFO SecurityManager: Changing modify acls groups to:
21/01/18 14:10:14 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(185, root); groups with view permissions: Set(); users  with modify permissions: Set(185, root); groups with modify permissions: Set()
21/01/18 14:10:15 INFO TransportClientFactory: Successfully created connection to kd-kafkaavro-3a99e07715d477b2-driver-svc.spark-app.svc/10.1.4.170:7078 after 129 ms (0 ms spent in bootstraps)
21/01/18 14:10:15 INFO SecurityManager: Changing view acls to: 185,root
21/01/18 14:10:15 INFO SecurityManager: Changing modify acls to: 185,root
21/01/18 14:10:15 INFO SecurityManager: Changing view acls groups to:
21/01/18 14:10:15 INFO SecurityManager: Changing modify acls groups to:
21/01/18 14:10:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(185, root); groups with view permissions: Set(); users  with modify permissions: Set(185, root); groups with modify permissions: Set()
21/01/18 14:10:15 INFO TransportClientFactory: Successfully created connection to kd-kafkaavro-3a99e07715d477b2-driver-svc.spark-app.svc/10.1.4.170:7078 after 5 ms (0 ms spent in bootstraps)
21/01/18 14:10:15 INFO DiskBlockManager: Created local directory at /var/data/spark-07f269fd-4b5a-428e-90b1-90e9a250c0d0/blockmgr-02756d41-fd31-4d4d-a650-ceb2fe83a533
21/01/18 14:10:15 INFO MemoryStore: MemoryStore started with capacity 117.0 MiB
21/01/18 14:10:15 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@kd-kafkaavro-3a99e07715d477b2-driver-svc.spark-app.svc:7078
21/01/18 14:10:15 INFO ResourceUtils: ==============================================================
21/01/18 14:10:15 INFO ResourceUtils: Resources for spark.executor:
21/01/18 14:10:15 INFO ResourceUtils: ==============================================================
21/01/18 14:10:16 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
21/01/18 14:10:16 INFO Executor: Starting executor ID 1 on host 10.1.4.171
21/01/18 14:10:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44843.
21/01/18 14:10:16 INFO NettyBlockTransferService: Server created on 10.1.4.171:44843
21/01/18 14:10:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/01/18 14:10:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, 10.1.4.171, 44843, None)
21/01/18 14:10:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, 10.1.4.171, 44843, None)
21/01/18 14:10:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(1, 10.1.4.171, 44843, None)
21/01/18 14:10:30 INFO CoarseGrainedExecutorBackend: Got assigned task 0
21/01/18 14:10:31 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/01/18 14:10:31 INFO Executor: Fetching file:/tmp/dev/kafkaavro/target/scala-2.12/kafka-avro_2.12-1.0.jar with timestamp 1610979002865
21/01/18 14:10:31 INFO Utils: Copying /tmp/dev/kafkaavro/target/scala-2.12/kafka-avro_2.12-1.0.jar to /var/data/spark-07f269fd-4b5a-428e-90b1-90e9a250c0d0/spark-119a54b8-42c9-42e5-9f26-c85c64f0605a/-21118480811610979002865_cache
21/01/18 14:10:31 INFO Utils: Copying /var/data/spark-07f269fd-4b5a-428e-90b1-90e9a250c0d0/spark-119a54b8-42c9-42e5-9f26-c85c64f0605a/-21118480811610979002865_cache to /opt/spark/work-dir/./kafka-avro_2.12-1.0.jar
21/01/18 14:10:31 INFO Executor: Adding file:/opt/spark/work-dir/./kafka-avro_2.12-1.0.jar to class loader
21/01/18 14:10:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaBatchInputPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
ketanhdoshi commented 3 years ago

@liyinan926 any suggestions? Appreciate any help you can provide.

The following is getting ignored altogether.

spec:
  deps:
    packages:
      - org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
      - org.apache.spark:spark-avro_2.12:3.0.0

And this works fine on the driver but is not getting passed to the executor at all.

spec:
  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-avro_2.12:3.0.0"

To debug this a bit, I tried adding stuff like the following. In that case, the executor logs do show these jars getting copied over and it is able to find those classes. But too many more jars are required so this isn't a practical workaround.

  sparkConf:
    spark.jars: "local:///tmp/ivy/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.0.0.jar,local:///tmp/ivy/jars/org.apache.kafka_kafka-clients-2.4.1.jar"

Here's the full yaml for completeness.

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: kd-kafkaavro
  namespace: spark-app
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v3.0.0"
  imagePullPolicy: Always
  mainClass: com.ketan.KafkaAvro
  mainApplicationFile: "local:///tmp/dev/kafkaavro/target/scala-2.12/kafka-avro_2.12-1.0.jar"
  arguments: 
    - "kafka.kd-confluent.svc.cluster.local:9071"
    - "/tmp/data"
  deps:
    packages:
      - org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
      - org.apache.spark:spark-avro_2.12:3.0.0
  sparkVersion: "3.0.0"
  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-avro_2.12:3.0.0"
#    spark.jars: "local:///tmp/ivy/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.0.0.jar,local:///tmp/ivy/jars/org.apache.kafka_kafka-clients-2.4.1.jar,local:///tmp/ivy/jars/org.apache.commons_commons-pool2-2.6.2.jar,local:///tmp/ivy/jars/org.xerial.snappy_snappy-java-1.1.7.5.jar,local:///tmp/ivy/jars/com.github.luben_zstd-jni-1.4.4-3.jar,local:///tmp/ivy/jars/org.lz4_lz4-java-1.7.1.jar,local:///tmp/ivy/jars/org.slf4j_slf4j-api-1.7.30.jar"
    spark.jars.ivy: "/tmp/ivy"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/run/desktop/mnt/host/c/kd/dev/bd/volumes/spark/tmp"
        type: Directory
    - name: "data"
      hostPath:
        path: "/run/desktop/mnt/host/c/kd/dev/bd/spark/data"
        type: Directory
    - name: "dev"
      hostPath:
        path: "/run/desktop/mnt/host/c/kd/dev/bd/spark/devp"
        type: Directory
    - name: "ivy"
      hostPath:
        path: "/run/desktop/mnt/host/c/kd/dev/bd/volumes/spark/ivy"
        type: Directory
  driver:
    coreRequest: 100m
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.0.0
    serviceAccount: sparkop-spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
      - name: "data"
        mountPath: "/tmp/data"
      - name: "dev"
        mountPath: "/tmp/dev"
      - name: "ivy"
        mountPath: "/tmp/ivy"
  executor:
    coreRequest: 100m
    instances: 1
    memory: "512m"
    labels:
      version: 3.0.0
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
      - name: "data"
        mountPath: "/tmp/data"
      - name: "dev"
        mountPath: "/tmp/dev"
      - name: "ivy"
        mountPath: "/tmp/ivy"
vvavepacket commented 3 years ago

@ketanhdoshi were you able to solve this issue?

ketanhdoshi commented 3 years ago

@vvavepacket

I was able to get this to work using a workaround. I used the spec.sparkConf settings for packages, jars, and repositories rather than spec.deps"

  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,..."
    spark.jars.repositories: "https://packages.confluent.io/maven"
    spark.jars: "local:///tmp/ivy/jars/com.101tec_zkclient-0.10.jar,\
      .....
      local:///tmp/ivy/jars/com.fasterxml.jackson.core_jackson-annotations-2.9.10.jar"
    spark.jars.ivy: "/tmp/ivy"

The documented solutions did not work. I believe the problem is in Spark not in the Operator. I tried running the same programs directly using spark-submit on Kubernetes, without Operator, and encountered the same problem when using "--packages", "--jars" and "--repositories"

  deps:
    repositories:
    packages:
dwsmith1983 commented 3 years ago

@ketanhdoshi do you know if there is a ticket on spark for this? If so, can you let me know which it is? I want to keep track of the updates.

ketanhdoshi commented 3 years ago

@ketanhdoshi do you know if there is a ticket on spark for this? If so, can you let me know which it is? I want to keep track of the updates.

@dwsmith1983, I hadn't opened one, but I think this one is for the same issue.

axiangcoding commented 3 years ago

same issue im facing, any updates?

GaruGaru commented 3 years ago

As mentioned by @ocworld: This is related to a Spark issue ( https://issues.apache.org/jira/browse/SPARK-35084 ) which should be solved by https://github.com/apache/spark/pull/32397

rahulbhatia2702 commented 2 years ago

is there a solution for this seems like the above PR was never merged @ocworld

ocworld commented 1 year ago

@ketanhdoshi @GaruGaru @rahulbhatia2702 In spark, the pr (https://github.com/apache/spark/pull/38828) is merged now.

javolek commented 1 year ago

Don't know if it is new issue but for me it is still not working - dependencies are not added to SparkContext of the driver and so they are not available on executor afterwards. I'm using official apache/spark-py images to deploy drive and executors. I was trying to deploy a spark job manually with almost the same settings as the spark operator uses and it was also not working - the same issue. Then I find out that if I remove the spark property spark.kubernetes.submitInDriver=true it works fine. Does anybody know why ist this property set?

javolek commented 1 year ago

Don't know if it is new issue but for me it is still not working - dependencies are not added to SparkContext of the driver and so they are not available on executor afterwards. I'm using official apache/spark-py images to deploy drive and executors. I was trying to deploy a spark job manually with almost the same settings as the spark operator uses and it was also not working - the same issue. Then I find out that if I remove the spark property spark.kubernetes.submitInDriver=true it works fine. Does anybody know why ist this property set?

Ok, it was my fault. With the PySpark version 3.4.0 it's actually working. It worked for me on older Spark version in client mode with the option spark.kubernetes.submitInDriver set to false (default). With the older version of Spark In cluster mode - as the spark-operator is working only in cluster mode - it didn't work anyway. The only question is why is spark trying to resolve dependencies in cluster mode on the submit side, as this unnecessary slows the submit process down.

ocworld commented 11 months ago

@javolek I understand your concern with slow down because of resolving dependencies on a spark-operator pod. I'm not an author about it, so the reasons are just my guessing.

1) To early find dependency failures. Creating pod is expensive operator I think. It is need to reserve resources and sometimes downloading docker image. Event if in public cloud like aws, gcp, and azure, it make costs. Early resolving is helping to prevent them.

2) To managing dependencies in a property file as configmap to simplify.

In my experience, if you reduce waiting time, you can use a persistent volume. Sharing downloaded jars path with spark-operator, driver and executors pods is to prevent download again and again.

junzhang999 commented 1 month ago

I had the same problem when starting spark driver pod with spark-operator. My workaround is to get all the jars inside the dockerfile with wget and put them into /opt/spark/jars directory. I had hoped spark.jars.packages should have worked as expected.