kubeflow / spark-operator

Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes.
Apache License 2.0
2.81k stars 1.38k forks source link

How to define dependency packages #352

Open richardxy opened 5 years ago

richardxy commented 5 years ago

Greetings!

Other than using --jars to specify dependency jars, is it possible to provide package names just like using --packages argument to provide a list of dependencies in spark-submit command?

The problem with --jars is some jars may not include all of their own dependencies.

Thanks in advance!

Richard

liyinan926 commented 5 years ago

We currently don't have a dedicated field in SparkApplication equivalent to --packages. But you can use spec.arguments for that, e.g.:

spec:
  arguments:
  - --packages <packages>
richardxy commented 5 years ago

That's great to know. Thanks!

On Fri, Dec 21, 2018 at 5:03 PM Yinan Li notifications@github.com wrote:

We currently don't have a dedicated field in SparkApplication equivalent to --packages. But you can use spec.arguments for that, e.g.:

spec: arguments:

  • --packages

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/352#issuecomment-449509452, or mute the thread https://github.com/notifications/unsubscribe-auth/ACFv2B_a4YMSdcnrsYC2JxS5Ys5de1uOks5u7VqigaJpZM4ZfEyT .

richardxy commented 5 years ago

Updates. I added the --argument into the sparkapplication yaml file, like this:

spec:
  arguments:
    - --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0  

After applying the yaml file, I got the logs of the sparkapplication. One part shows:

+ exec /sbin/tini -s -- /opt/spark/bin/spark-submit --conf spark.driver.bindAddress=10.72.1.48 --deploy-mode client --properties-file /opt/spark/conf/spark.properties --class OneMinuteRawAgg spark-internal '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0'

However, in the end it crashes with the reason:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka.

It seems to me that the package hasn't be loaded, and I didn't see that in the log.

Anything I did wrong?

Thanks!

liyinan926 commented 5 years ago

Hmm, I'm not familiar with how --packages works in general and in other scheduler backends, e.g., yarn or mesos. Is this something that the Spark core supports?

richardxy commented 5 years ago

Yes, --packages used in spark-submit is supported by Spark core. Please see https://spark.apache.org/docs/latest/submitting-applications.html

Is there any intricate formatting thing that I may have missed? I noticed there is a "-" in the beginning of the line under "arguments". Or the package name needs to be in double quotes?

On Fri, Dec 21, 2018 at 6:52 PM Yinan Li notifications@github.com wrote:

Hmm, I'm not familiar with how --packages works in general and in other scheduler backends, e.g., yarn or mesos. Is this something that the Spark core supports?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/352#issuecomment-449525801, or mute the thread https://github.com/notifications/unsubscribe-auth/ACFv2KjatRxfvypu_ZV3foPrns3BvR8Uks5u7XRAgaJpZM4ZfEyT .

liyinan926 commented 5 years ago

I suspect this has something to do with the fact that arguments are put at the end of the spark-submit command. The best option would be to add a dedicated field named packages under .spec.dep. I can add that after the holiday break.

liyinan926 commented 5 years ago

Actually it's really straightforward so I just created a PR #353 for that.

liyinan926 commented 5 years ago

I just pushed the latest version of the image gcr.io/spark-operator/spark-operator:v2.4.0-v1alpha1-latest with the --packages support. Please give it a try. The way it works is as follows:

spec:
    deps:
        packages:
        - <your package 1>
        - <your package 2>  

The leading - indicates this is an item in a list and is required.

richardxy commented 5 years ago

Wonderful! Thank you so much! Will get back to you after the holiday break. Merry Christmas!

Richard

On Fri, Dec 21, 2018 at 7:35 PM Yinan Li notifications@github.com wrote:

I just pushed the latest version of the image gcr.io/spark-operator/spark-operator:v2.4.0-v1alpha1-latest with the --packages support. Please give it a try. The way it works is as follows:

spec: deps: packages:

  • <your package 1>
  • <your package 2>

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/352#issuecomment-449530649, or mute the thread https://github.com/notifications/unsubscribe-auth/ACFv2P-qxKiRvlUwBRqfRXYy67RfOQA1ks5u7X5WgaJpZM4ZfEyT .

richardxy commented 5 years ago

I've created a new sparkoperator using the new image, and modified the sparkapplication yaml file with adding the changes below:

  deps:
    packages:
      - org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0  

After applying the yaml file, the log printed:

+ exec /sbin/tini -s -- /opt/spark/bin/spark-submit --conf spark.driver.bindAddress=10.72.1.55 --deploy-mode client --properties-file /opt/spark/conf/spark.properties --class OneMinuteRawAgg spark-internal
2018-12-23 22:11:41 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-12-23 22:11:46 WARN  DependencyUtils:66 - Local jar /root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.4.0.jar does not exist, skipping.
2018-12-23 22:11:46 WARN  DependencyUtils:66 - Local jar /root/.ivy2/jars/org.apache.kafka_kafka-clients-2.0.0.jar does not exist, skipping.
...

Then, in the later part:

2018-12-23 22:11:58 ERROR SparkContext:91 - Failed to add file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.4.0.jar to Spark environment
java.io.FileNotFoundException: Jar /root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.4.0.jar not found
        at org.apache.spark.SparkContext.addJarFile$1(SparkContext.scala:1838)
        at org.apache.spark.SparkContext.addJar(SparkContext.scala:1868)
        at org.apache.spark.SparkContext$$anonfun$12.apply(SparkContext.scala:458)
        at org.apache.spark.SparkContext$$anonfun$12.apply(SparkContext.scala:458)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:458)
        at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
        at OneMinuteRawAgg$.main(sparkkafka_df.scala:15)
        at OneMinuteRawAgg.main(sparkkafka_df.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2018-12-23 22:11:58 ERROR SparkContext:91 - Failed to add file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-2.0.0.jar to Spark environment
java.io.FileNotFoundException: Jar /root/.ivy2/jars/org.apache.kafka_kafka-clients-2.0.0.jar not found
        at org.apache.spark.SparkContext.addJarFile$1(SparkContext.scala:1838)
        at org.apache.spark.SparkContext.addJar(SparkContext.scala:1868)
        at org.apache.spark.SparkContext$$anonfun$12.apply(SparkContext.scala:458)
        at org.apache.spark.SparkContext$$anonfun$12.apply(SparkContext.scala:458)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:458)
        at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
        at OneMinuteRawAgg$.main(sparkkafka_df.scala:15)
...

So it seems to me that the "spec.deps.packages" clause was applied, but sparkoperator was still looking for the jar files in local instead of grabbing them from Maven. Please advice if anything I couldn't have done differently.

Thanks!

liyinan926 commented 5 years ago

The logs you showed above seem the driver logs, not the operator logs. Does --packages work with client mode? The k8s backend runs the driver in client mode (that's why you saw --deploy-mode client in the spark-submit command in the driver logs).

richardxy commented 5 years ago

The operator log doesn't tell much. It just says something like:

I1227 03:37:56.878272       1 controller.go:509] Trying to update SparkApplication spark/spark-kafka-pipeline, from: [{spark-8caa7dfb10114d8291bb4db6d95b47c9 2018-12-27 03:36:03 +0000 UTC 2018-12-27 03:36:42 +0000 UTC {spark-kafka-pipeline-ui-svc 31225    spark-kafka-pipeline-driver} {FAILED } map[] 1 1}] to [{spark-8caa7dfb10114d8291bb4db6d95b47c9 2018-12-27 03:36:03 +0000 UTC 2018-12-27 03:36:42 +0000 UTC {spark-kafka-pipeline-ui-svc 31225    spark-kafka-pipeline-driver} {FAILED } map[] 1 1}]

The driver was not attempting to fetch packages from Maven repository but only looking for local jars. That caused the failure.

Yes, --package clause does work with deployment mode "client".

liyinan926 commented 5 years ago

Have you tried using the --packages option with bare spark-submit in the Kubernertes mode?

skonto commented 5 years ago

@liyinan926 @richardxy AFAIK this is not yet supported in cluster mode with Spark on K8s (the operator does a submit by default if not mistaken in cluster mode). Previously I did the fix for the standalone mode and mesos (https://issues.apache.org/jira/browse/SPARK-12559) and the last time I tested this with 2.4 it didnt work for K8s. I dont think the property is propagated in the driver at the second submit in client mode in there. Packages will be resolved locally at the spark-submit side and the driver pod will fail as there will be no such packaged resolved at the driver pod. In client mode, it should work as expected for both the driver and the executors. The way it works is as follows in client mode; the driver resolves the dependency locally and when an executor needs it, it will fetch the dependency.

liyinan926 commented 5 years ago

Thanks @skonto for the explanation. So the fix would be to propagate --packages to the spark-submit command in the driver?

skonto commented 5 years ago

@liyinan926 sorry for the late reply, yes we should I already did in the PR for the client dependencies.

ghost commented 5 years ago

Can't this be implemented by specifying spark.jars.packages property? This should be passed on correctly.

ssvinoth22 commented 4 years ago

i still cant able to use , `deps: packages:

ssvinoth22 commented 4 years ago

is there any other option to use packages if it is not yet implemented?

justinwesleyvizio commented 4 years ago

is there any other option to use packages if it is not yet implemented?

Here is how I got past this issue. If you would like more details, let me know and I will add them.

  1. Create a new Maven project: https://maven.apache.org/guides/getting-started/maven-in-five-minutes.html
  2. Get the dependency section for your package from Maven website, for my Kafka structured streaming I used: https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.0.0
  3. Copy the section from the Maven tab of the website and replace the JUnit dependency in the pom.xml file in your Maven project.
  4. Run the following from the root of your project to download all jar files: mvn dependency:copy-dependencies
  5. The previous command created the following directory, target/dependency. All of the required jar files are in that directory.
  6. Create a custom Docker image from an existing Spark image (I used FROM gcr.io/spark-operator/spark-py:v3.0.0-hadoop3 for my purposes), and COPY all of the jar files from the previous step into the /opt/spark/jars/ directory
  7. Upload your custom Docker image to a repo and then reference it in your Spark Application yaml. That should be it!
ets commented 3 years ago

Can't this be implemented by specifying spark.jars.packages property? This should be passed on correctly.

I'm successfully loading a packages requirement with v1beta2 using:

  sparkVersion: "3.0.1"
  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"
kaaquist commented 3 years ago

I'm successfully loading a packages requirement with v1beta2 using:

 sparkVersion: "3.0.1"
 sparkConf:
   spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"

I did also succeed in loading my package this here way.. I'm loading org.apache.spark:spark-avro_2.12:3.0.0, with sparkVersion: "3.0.0"

I did also try other approaches mentioned above, but with on luck.

sushiljacksparrow commented 3 years ago

I am getting Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

Here is my sparkapp.yaml file looks like


apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-app
  namespace: default
spec:
  deps:
    packages:
      - org.apache.hadoop:hadoop-common:3.0.1
      - org.apache.hadoop:hadoop-aws:3.0.1
      - com.amazonaws:aws-java-sdk-bundle:1.11.271
  type: Scala
  mode: cluster
  image: "sushiljacksparrow/spark:3.0.1"
  mainClass: com.app.AppRunner
  arguments: ["10000"]
  mainApplicationFile: s3a://s3-bucker-random/Applications.jar
  sparkVersion: 3.0.1
  driver:
    cores: 1
    coreLimit: 1024m
    memory: 1024m
    labels:
      version: 3.0.1
    serviceAccount: spark
  executor:
    cores: 4
    instances: 200
    memory: 1024m
    coreRequest: 100m
    labels:
      version: 3.0.1

  sparkConf:
    "spark.kubernetes.container.image.pullPolicy": Always
    "spark.kubernetes.authenticate.driver.serviceAccountName": spark
    "spark.jars.packages": "org.apache.hadoop:hadoop-common:3.0.1,org.apache.hadoop:hadoop-aws:3.0.1,com.amazonaws:aws-java-sdk-bundle:1.11.271,org.apache.hadoop:hadoop-cloud-storage:3.0.1"
    "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
    "spark.kubernetes.container.image": "sushiljacksparrow/spark:3.0.1"
    "spark.hadoop.fs.s3a.path.style.access": "true"
    "spark.hadoop.fs.s3a.access.key": "<random>"
    "spark.hadoop.fs.s3a.secret.key": "<random_secret>"
    "spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Divy.home=/tmp"
ketanhdoshi commented 3 years ago

Neither of the approaches mentioned above worked for me. deps.packages seems to be ignored and using the workaround mentioned above:

spec:
  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0"

gave me this error:

+ CMD=("$SPARK_HOME/bin/spark-submit" --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client "$@")
+ exec /usr/bin/tini -s -- /opt/spark/bin/spark-submit --conf spark.driver.bindAddress=10.1.4.89 --deploy-mode client --properties-file /opt/spark/conf/spark.properties --class com.ketan.KafkaAvro local:///tmp/dev/kafkaavro/target/scala-2.12/kafka-avro_2.12-1.0.jar kafka.kd-confluent.svc.cluster.local:9071 /tmp/data
Ivy Default Cache set to: /opt/spark/.ivy2/cache
The jars for the packages stored in: /opt/spark/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7c17c84c-a8ab-48c8-ade4-26e4b9bb930b;1.0
confs: [default]
Exception in thread "main" java.io.FileNotFoundException: /opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-7c17c84c-a8ab-48c8-ade4-26e4b9bb930b-1.0.xml (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at org.apache.ivy.plugins.parser.xml.XmlModuleDescriptorWriter.write(XmlModuleDescriptorWriter.java:70)
at org.apache.ivy.plugins.parser.xml.XmlModuleDescriptorWriter.write(XmlModuleDescriptorWriter.java:62)
at org.apache.ivy.core.module.descriptor.DefaultModuleDescriptor.toIvyFile(DefaultModuleDescriptor.java:563)
at org.apache.ivy.core.cache.DefaultResolutionCacheManager.saveResolvedModuleDescriptor(DefaultResolutionCacheManager.java:176)
at org.apache.ivy.core.resolve.ResolveEngine.resolve(ResolveEngine.java:245)
at org.apache.ivy.Ivy.resolve(Ivy.java:523)
at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1387)
at org.apache.spark.deploy.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:54)
at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:308)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:871)
ketanhdoshi commented 3 years ago

Can't this be implemented by specifying spark.jars.packages property? This should be passed on correctly.

I'm successfully loading a packages requirement with v1beta2 using:

  sparkVersion: "3.0.1"
  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"

@ets, @kaaquist, would you mind sharing your yaml files that worked for you? I tried the workaround that you mentioned but it didn't work (error included in my post)

kaaquist commented 3 years ago

So here is the yaml, I used for my project. I had a custom image that had all jars for s3 access located under the /jars directory of the Spark code. @ketanhdoshi

#
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: graphframes-demo
  namespace: spark-jobs
spec:
  dynamicAllocation:
    enabled: true
    initialExecutors: 2
    minExecutors: 2
    maxExecutors: 30
  driver:
    coreLimit: 1200m
    cores: 1
    labels:
      version: 3.0.0
    memory: 3000m
    nodeSelector:
      role: batchexecutor
    serviceAccount: spark
    tolerations:
      - key: batch
        operator: Equal
        value: c5d-4xl
    volumeMounts:
      - mountPath: /tmp
        name: test-volume
  executor:
    instances: 60
    cores: 6
    memory: 31g
    labels:
      version: 3.0.0
    nodeSelector:
      role: batchexecutor
    tolerations:
      - key: batch
        operator: Equal
        value: c5d-4xl
    volumeMounts:
      - name: "spark-local-dir-1"
        mountPath: "/tmp/spark-local-dir"
  hadoopConf:
    fs.s3.aws.credentials.provider: com.amazonaws.auth.DefaultAWSCredentialsProviderChain
    fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    fs.s3a.aws.credentials.provider: com.amazonaws.auth.DefaultAWSCredentialsProviderChain
    fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
  image: <name of image> 
  imagePullPolicy: Always
  mainApplicationFile: local:///opt/demo/spark-demo_2.12-0.0.1.jar
  mainClass: Main
  arguments: [
    "-e", "s3://<name of bucket>/export/event_all_e_2/period=20200701-20201217/",
    "-v", "s3://<name of bucket>/export/event_all_v_2/period=20200701-20201217/",
    "-d", "s3://<name of bucket>/output/components"
  ]
  mode: cluster
  restartPolicy:
    type: Never
  sparkConf:
    spark.eventLog.dir: s3://<name of bucket>/demo/spark-logs/
    spark.eventLog.enable: "true"
    spark.jars.packages: org.apache.spark:spark-avro_2.12:3.0.0,com.github.scopt:scopt_2.12:3.7.1
    spark.kubernetes.allocation.batch.size: "10"
  sparkVersion: 3.0.1
  type: Scala
  volumes:
    - hostPath:
        path: /local/
        type: Directory
      name: spark-local-dir-1
    - hostPath:
        path: /tmp
        type: Directory
      name: test-volume
ketanhdoshi commented 3 years ago

Thanks, @kaaquist. Not able to make out why the behavior for me is so different from yours.

I was able to make a little progress but ran into another problem:

I made the change to include spark.jars.ivy, as explained here. A volume for the ivy directory had to be added and mounted in both the driver and executor specs.

spec:
  deps:
    packages:
      - org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
      - org.apache.spark:spark-avro_2.12:3.0.0
  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-avro_2.12:3.0.0"
    spark.jars.ivy: "/tmp/ivy"

Now the driver is able to find the dependencies and go further. However these options are not being passed to the executor and it does not see any of the dependencies. It fails with java.lang.ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaBatchInputPartition

kaaquist commented 3 years ago

@ketanhdoshi This here did not work for me.

spec:
  deps:
    packages:
      - org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
      - org.apache.spark:spark-avro_2.12:3.0.0

I did also find that the:

spec:
  sparkConf:
    spark.jars.packages: ...

only worked for packages located in mvnrepository. I was using one from SparkPackages which I had to include separately to the docker images it self. Not sure if that will help you. Another thing is that you can see in the logs fr the jobs when it gets started, that the operator loads the jars files. If that is not happening, then something is wrong. Hope that helps.

ketanhdoshi commented 3 years ago

@kaaquist. Yes, the spec.deps.packages is ignored for me too.

The following works fine for me on the driver.

spec:
  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-avro_2.12:3.0.0"

However, that setting is not getting propagated to the executor. The executor doesn't find any of those packages and fails as below.

+ CMD=(${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP)
+ exec /usr/bin/tini -s -- /usr/local/openjdk-8/bin/java -Dspark.driver.blockManager.port=7079 -Dspark.driver.port=7078 -Xms512m -Xmx512m -cp ':/opt/spark/jars/*:' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@kd-kafkaavro-3a99e07715d477b2-driver-svc.spark-app.svc:7078 --executor-id 1 --cores 1 --app-id spark-0a6417c1b3284e97a27f617c0f2dd034 --hostname 10.1.4.171
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/01/18 14:10:13 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 14@kafka-avro-9aba567715d4cb68-exec-1
21/01/18 14:10:13 INFO SignalUtils: Registered signal handler for TERM
21/01/18 14:10:13 INFO SignalUtils: Registered signal handler for HUP
21/01/18 14:10:13 INFO SignalUtils: Registered signal handler for INT
21/01/18 14:10:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/01/18 14:10:14 INFO SecurityManager: Changing view acls to: 185,root
21/01/18 14:10:14 INFO SecurityManager: Changing modify acls to: 185,root
21/01/18 14:10:14 INFO SecurityManager: Changing view acls groups to:
21/01/18 14:10:14 INFO SecurityManager: Changing modify acls groups to:
21/01/18 14:10:14 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(185, root); groups with view permissions: Set(); users  with modify permissions: Set(185, root); groups with modify permissions: Set()
21/01/18 14:10:15 INFO TransportClientFactory: Successfully created connection to kd-kafkaavro-3a99e07715d477b2-driver-svc.spark-app.svc/10.1.4.170:7078 after 129 ms (0 ms spent in bootstraps)
21/01/18 14:10:15 INFO SecurityManager: Changing view acls to: 185,root
21/01/18 14:10:15 INFO SecurityManager: Changing modify acls to: 185,root
21/01/18 14:10:15 INFO SecurityManager: Changing view acls groups to:
21/01/18 14:10:15 INFO SecurityManager: Changing modify acls groups to:
21/01/18 14:10:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(185, root); groups with view permissions: Set(); users  with modify permissions: Set(185, root); groups with modify permissions: Set()
21/01/18 14:10:15 INFO TransportClientFactory: Successfully created connection to kd-kafkaavro-3a99e07715d477b2-driver-svc.spark-app.svc/10.1.4.170:7078 after 5 ms (0 ms spent in bootstraps)
21/01/18 14:10:15 INFO DiskBlockManager: Created local directory at /var/data/spark-07f269fd-4b5a-428e-90b1-90e9a250c0d0/blockmgr-02756d41-fd31-4d4d-a650-ceb2fe83a533
21/01/18 14:10:15 INFO MemoryStore: MemoryStore started with capacity 117.0 MiB
21/01/18 14:10:15 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@kd-kafkaavro-3a99e07715d477b2-driver-svc.spark-app.svc:7078
21/01/18 14:10:15 INFO ResourceUtils: ==============================================================
21/01/18 14:10:15 INFO ResourceUtils: Resources for spark.executor:
21/01/18 14:10:15 INFO ResourceUtils: ==============================================================
21/01/18 14:10:16 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
21/01/18 14:10:16 INFO Executor: Starting executor ID 1 on host 10.1.4.171
21/01/18 14:10:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44843.
21/01/18 14:10:16 INFO NettyBlockTransferService: Server created on 10.1.4.171:44843
21/01/18 14:10:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/01/18 14:10:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, 10.1.4.171, 44843, None)
21/01/18 14:10:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, 10.1.4.171, 44843, None)
21/01/18 14:10:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(1, 10.1.4.171, 44843, None)
21/01/18 14:10:30 INFO CoarseGrainedExecutorBackend: Got assigned task 0
21/01/18 14:10:31 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/01/18 14:10:31 INFO Executor: Fetching file:/tmp/dev/kafkaavro/target/scala-2.12/kafka-avro_2.12-1.0.jar with timestamp 1610979002865
21/01/18 14:10:31 INFO Utils: Copying /tmp/dev/kafkaavro/target/scala-2.12/kafka-avro_2.12-1.0.jar to /var/data/spark-07f269fd-4b5a-428e-90b1-90e9a250c0d0/spark-119a54b8-42c9-42e5-9f26-c85c64f0605a/-21118480811610979002865_cache
21/01/18 14:10:31 INFO Utils: Copying /var/data/spark-07f269fd-4b5a-428e-90b1-90e9a250c0d0/spark-119a54b8-42c9-42e5-9f26-c85c64f0605a/-21118480811610979002865_cache to /opt/spark/work-dir/./kafka-avro_2.12-1.0.jar
21/01/18 14:10:31 INFO Executor: Adding file:/opt/spark/work-dir/./kafka-avro_2.12-1.0.jar to class loader
21/01/18 14:10:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaBatchInputPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
vvavepacket commented 3 years ago

@ketanhdoshi

Were you able to solve this? I followed the tutorial and the samples mentioning spec.deps.packages but it seems they dont work with Spark 3.x

ketanhdoshi commented 3 years ago

@vvavepacket FYI. @kaaquist, I was able to load external repositories as below.

I was able to get this to work using a workaround. I used the spec.sparkConf settings for packages, jars, and repositories rather than spec.deps"

  sparkConf:
    spark.jars.packages: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,..."
    spark.jars.repositories: "https://packages.confluent.io/maven"
    spark.jars: "local:///tmp/ivy/jars/com.101tec_zkclient-0.10.jar,\
      .....
      local:///tmp/ivy/jars/com.fasterxml.jackson.core_jackson-annotations-2.9.10.jar"
    spark.jars.ivy: "/tmp/ivy"

The documented solutions did not work. I believe the problem is in Spark not in the Operator. I tried running the same programs directly using spark-submit on Kubernetes, without Operator, and encountered the same problem when using "--packages", "--jars" and "--repositories"

  deps:
    repositories:
    packages:
Yashg19 commented 3 years ago

@ketanhdoshi I am trying to follow what you did but still getting the error "Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found"

My YAML file looks like:

apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: e2edriver namespace: default spec: type: Python pythonVersion: "3" mode: cluster image: "XXXX:5000/test/test-spark-py:latest" imagePullPolicy: Always mainApplicationFile: local:///opt/spark/dependencies/spark-driver/end2end.py arguments: ['--input_path', 's3a://bucket/testdata.parquet'] sparkVersion: "3.0.1" sparkConf: spark.jars.packages: "com.amazonaws:aws-java-sdk-bundle:1.11.563,org.apache.hadoop:hadoop-aws:3.2.0" spark.jars.ivy: "/tmp/ivy" restartPolicy: type: OnFailure onFailureRetries: 3 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 driver: cores: 1 coreLimit: "1200m" memory: "4096m" labels: version: 3.0.1 serviceAccount: default executor: cores: 1 instances: 2 memory: "4096m" labels: version: 3.0.1 hadoopConf: "fs.s3a.access.key": XXX "fs.s3a.secret.key": XXXX "fs.s3a.endpoint": XXXX

Can any one of you please help @kaaquist @ketanhdoshi ? I am using Spark 3.0.1 with hadoop 3.2.0

kaaquist commented 3 years ago

Hi @Yashg19 , I had a custom image that had all jars for s3 access located under the /jars directory of the Spark code. Did you add the jars to your image as well, or can you check they are there? To access the image you can use:

> docker run --entrypoint "/bin/sh" -it <name of docker image>:<version>

Then make sure that the class path has the jars loaded. Hope that help you find your problem.

Yashg19 commented 3 years ago

@kaaquist thank you for your response. I ended up copying the jars into the jars directory and it worked. I was hoping that there will be an easy way to specify packages in the YAML file (like how we do in spark-defaults.conf) but I guess there is no easy way to do that. Thank you again!

sajjadGG commented 2 years ago

I have tested almost all of the proposed solutions and none of them worked for me. In the end, I circumvented the problem by editing the docker image to add the jar files to /opt/spark/jars by adding the following line in my docker file COPY jarFiles/ /opt/spark/jars/ I used this image and passed it to the following parameter in spec

spec:
  image: "<image_path>"

and there was no need to add any other config such as --jars or --packages

avinovarov commented 2 years ago

Have the same issue, jars are being added to driver through deps: but executors don't get any.

So currently, as of Spark version 3.2.0 the only working option seems to be baking all required jars into the spark image.

Which essentially means that deps: in SparkApplication yaml manifest does not work for downloaded jars from maven repo.

zihao123yang commented 1 year ago

This seems to be a bug with Spark 3.3 - where the dependencies aren't being added into the SparkContext, thus not being propagated to the executors https://issues.apache.org/jira/browse/SPARK-35084.

https://github.com/stackabletech/spark-k8s-operator/issues/141 It should be fixed in Spark 3.4.

But if you are using Spark 3.3 (like me 😅 ), I found the best workaround is to just bake the dependencies into the application image, like everyone else above has done.

ToanP commented 1 year ago

@ketanhdoshi workaround was fine to me. I tried to add some dependent packages and faced issue of ".ivy2 cache".

Simply, adding and it will resolve "Exception in thread "main" java.io.FileNotFoundException: /opt/spark/.ivy2/cache/ ...."

sparkConf: spark.jars.ivy: "/tmp/ivy"

github-actions[bot] commented 3 weeks ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

danielvincenzi commented 2 weeks ago

Is there any way to import Python packages with some argument?