Open matt-gorman opened 4 months ago
I am having the same issue. Using:
This does NOT work using delta 3.2.0 (I have also tried 3.1.0):
from pyspark.sql import SparkSession
url = "spark://${SPARK_MASTER_IP}:7077"
spark = (
SparkSession.builder.master(url)
.appName("myapp")
.config("spark.jars.packages", "io.delta:delta-core_2.12:3.2.0,io.delta:delta-contribs_2.12:3.2.0")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
Here is the error log:
File /usr/local/spark/python/pyspark/sql/session.py:497, in SparkSession.Builder.getOrCreate(self)
495 sparkConf.set(key, value)
496 # This SparkContext may be an existing one.
--> 497 sc = SparkContext.getOrCreate(sparkConf)
498 # Do not update `SparkConf` for existing `SparkContext`, as it's shared
499 # by all sessions.
500 session = SparkSession(sc, options=self._options)
File /usr/local/spark/python/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf)
513 with SparkContext._lock:
514 if SparkContext._active_spark_context is None:
--> 515 SparkContext(conf=conf or SparkConf())
516 assert SparkContext._active_spark_context is not None
517 return SparkContext._active_spark_context
File /usr/local/spark/python/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls)
195 if gateway is not None and gateway.gateway_parameters.auth_token is None:
196 raise ValueError(
197 "You are trying to pass an insecure Py4j gateway to Spark. This"
198 " is not allowed as it is a security risk."
199 )
--> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
202 try:
203 self._do_init(
204 master,
205 appName,
(...)
215 memory_profiler_cls,
216 )
File /usr/local/spark/python/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf)
434 with SparkContext._lock:
435 if not SparkContext._gateway:
--> 436 SparkContext._gateway = gateway or launch_gateway(conf)
437 SparkContext._jvm = SparkContext._gateway.jvm
439 if instance:
File /usr/local/spark/python/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs)
104 time.sleep(0.1)
106 if not os.path.isfile(conn_info_file):
--> 107 raise PySparkRuntimeError(
108 error_class="JAVA_GATEWAY_EXITED",
109 message_parameters={},
110 )
112 with open(conn_info_file, "rb") as info:
113 gateway_port = read_int(info)
PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
But this works (using delta 2.4.0):
from pyspark.sql import SparkSession
url = "spark://${SPARK_MASTER_IP}:7077"
spark = (
SparkSession.builder.master(url)
.appName("myapp")
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0,io.delta:delta-contribs_2.12:2.4.0")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
I am not sure if I can use delta 2.4.0 with Spark 3.5.1 though. Can I?
Thanks for reporting this. Spark Connect support will be added in Delta 4, see this issue.
Feel free to chime in on #3240 if you have any suggestions!
Hey @matt-gorman, you said that you followed the steps of https://docs.delta.io/latest/delta-spark-connect.html and you got some Error? Could you please confirm what is the error that you are seeing?
@rhazegh Using Delta over Spark Connect will only be fully available in Delta 4.0, however, you can try to use the Delta 4.0 Preview already. Could you give that a try?
@longvu-db, The error using Delta Connect on Spark 4.0 preview with Delta 4.0 Preview was the same experience and errors as running Delta 3.2.0 on Spark 3.5.1.
Attempting to write a DataFrame:
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 13) (X.X.X.X executor 1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.execution.datasources.WriteJobDescription.statsTrackers of type scala.collection.Seq in instance of org.apache.spark.sql.execution.datasources.WriteJobDescription
Attempting to read a DataFrame from HDFS:
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 26) (X.X.X.X executor 1): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
The tracebacks similarly pointed back to core.py and this appeared to perhaps be a package or something similar missing when running delta-core with the new versions instead of delta-spark with the previous versions.
@matt-gorman Why are you using delta-core? AFAIK, this only works with delta-spark.
Are you able to start two processes, server and client on the same local machine and have them working with each other?
@longvu-db sorry, I mixed these up, delta-core is what we're using now with Spark 3.4.3; running Connect with delta-spark is where we're running into issues. Using these packages:
org.apache.spark:spark-connect_2.12:3.4.3 delta:delta-core_2.12:2.4.0
Connect starts and works fine with this command:
start-connect-server.sh --master spark://master-node:7077 --packages org.apache.spark:spark-connect_2.12:3.4.3,io.delta:delta-core_2.12:2.4.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
When trying to run Spark 3.5.1, delta-spark was the package and running Connect with these package versions:
org.apache.spark:spark-connect_2.12:3.5.1 io.delta:delta-spark_2.12:3.2.0 io.delta:delta-storage:3.2.0
With this command gives the errors above:
start-connect-server.sh --master spark://master-node:7077 --packages org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.2.0,io.delta:delta-storage:3.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
I can run PySpark locally on the master with those same packages without problems:
pyspark --packages io.delta:delta-spark_2.12:3.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
However, if I run a PySpark client using Connect started with those packages (the second start-connect-server.sh command above with delta-spark_2.12:3.2.0) from the master I get errors.
@matt-gorman I understood the problem, there were some fixes that needed to go into Spark for Delta over Spark Connect, and those fixes went after 3.5.1, could you please use the Spark Connect 4.0.0 preview1 package like in the guide here?
So for Delta over Spark Connect to work, you need to use the Spark Connect 4.0 preview version as well.
Bug
Which Delta project/connector is this regarding?
Describe the problem
Running Spark and reading/writing Delta from a client connection to a Spark Connect (start-connect-server.sh) with Delta packages unable to convert or reports missing storage class. Unable to find specific documentation about running Delta 3.2.0 on Spark 3.5.1 and if any additional packages or configuration is needed. This appears to work using a PySpark shell, however the same packages with Spark Connect gives different results.
Steps to reproduce
sbin/start-connect-server.sh
Observed results
PySpark (Works)
Using the Delta documentation, this DOES work using a PySpark shell:
This both successfully reads and writes to a Hadoop cluster in Delta format.
Spark Connect (Doesn't Work)
Running Spark Connect with the same options does not have the same effect:
Connect to Spark Connect according to the documentation:
Comparing environment settings, the only differences were the additional JARs on Spark Connect (expected this):
And a few other Spark configurations set with the PySpark App versus Spark Connect:
Attempted to re-run Spark Connect with these settings, however the result was the same:
Expected results
Expected that Spark Connect would act the same as a PySpark session on the Master Node.
Further details
Behavior is the same when connecting from a JupyterHub server:
This produces the same result from Spark Connect running the same commands. Ideally working from JupyterHub using a Spark Connect remote client. Previously, the following configuration with Spark Connect worked without any of the issues above with Spark 3.4.0 and Delta (delta-core) 2.4.0.
Spark/Delta 4.0.0 preview
Additionally set this up using the documentation here. Spark 4.0.0 appears to also use delta-spark and the result was the same error messages as above.
Spark 3.4.3/Delta 2.4.0
These versions were similar to what was originally being run without similar issues (3.4.0/2.4.0) and setting this up proved successful. This uses delta-core instead of delta-spark.
Environment information
Connect Python Deps:
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?