apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

Default value is required for sc.getConf.getSizeAsMb(EXECUTOR_MEMORY.key) #1045

Closed neyama closed 3 weeks ago

neyama commented 3 weeks ago

Describe the bug

NoSuchElementException occurs in org.apache.spark.CometDriverPlugin.init(Plugins.scala:56).

java.util.NoSuchElementException: spark.executor.memory
        at org.apache.spark.SparkConf.$anonfun$get$1(SparkConf.scala:245)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.SparkConf.get(SparkConf.scala:245)
        at org.apache.spark.SparkConf.$anonfun$getSizeAsMb$1(SparkConf.scala:355)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at org.apache.spark.SparkConf.catchIllegalValue(SparkConf.scala:482)
        at org.apache.spark.SparkConf.getSizeAsMb(SparkConf.scala:355)
        at org.apache.spark.CometDriverPlugin.init(Plugins.scala:56)
        at org.apache.spark.internal.plugin.DriverPluginContainer.$anonfun$driverPlugins$1(PluginContainer.scala:53)
        at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
        at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
        at org.apache.spark.internal.plugin.DriverPluginContainer.<init>(PluginContainer.scala:46)
        at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:210)
        at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:193)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:574)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:238)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)

Steps to reproduce

Prerequisites

How to reproduce the bug

Build datafusion-comet from scratch.

$ git clone https://github.com/apache/datafusion-comet.git
$ cd datafusion-comet
$ make release PROFILES="-Pspark-3.5"

Use datafusion-benchmarks with tpch (scale factor=1) without specifying --conf spark.executor.memory=<size> for producing the bug.

$ git clone https://github.com/apache/datafusion-benchmarks.git
$ cd datafusion-benchmarks/runners/datafusion-comet
$ export COMET_JAR=<path-to>/datafusion-comet/spark/target/comet-spark-spark3.5_2.12-0.4.0-SNAPSHOT.jar
$ $SPARK_HOME/bin/spark-submit     \
    --master "local[*]" \
     --jars $COMET_JAR \
     --conf spark.driver.extraClassPath=$COMET_JAR \
     --conf spark.executor.extraClassPath=$COMET_JAR \
     --conf spark.plugins=org.apache.spark.CometPlugin \
     --conf spark.comet.enabled=true \
     --conf spark.comet.exec.enabled=true \
     --conf spark.comet.cast.allowIncompatible=true \
     --conf spark.comet.exec.shuffle.enabled=true \
     --conf spark.comet.exec.shuffle.mode=auto \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
     tpcbench.py \
     --benchmark tpch \
     --data <path-to-tpch-sf1-data> \
    --queries ../../tpch/queries/ \
    --output .

When I specify --conf spark.executor.memory=<size> explicitly, the error does not occur.

I guess the default value of spark.executor.memory is not defined when org.apache.spark.CometDriverPlugin.init(Plugins.scala:56) is called. So, we should provide a default value for spark.executor.memory when calling org.apache.spark.SparkConf.getSizeAsMb(SparkConf.scala:355), or check whether or not the spark.executor.memory is contained in the conf, then only when it is set, we call org.apache.spark.SparkConf.getSizeAsMb(SparkConf.scala:355).

Expected behavior

TPCH completes successfully.

Additional context

No response