delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.6k stars 1.71k forks source link

[BUG] Delta 3.2.1 read fails with Pyspark 3.5.1 #3737

Open umartin opened 1 month ago

umartin commented 1 month ago

Bug

Which Delta project/connector is this regarding?

Describe the problem

Using latest pyspark from conda-forge (pyspark 3.5.1) and latest delta (3.2.1) fails when reading delta format:

Exception in thread "main" java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.ExpressionSet org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus$plus(scala.collection.GenTraversableOnce)' at org.apache.spark.sql.delta.stats.DeltaScan.filtersUsedForSkipping$lzycompute(DeltaScan.scala:92) at org.apache.spark.sql.delta.stats.DeltaScan.filtersUsedForSkipping(DeltaScan.scala:92) at org.apache.spark.sql.delta.stats.DeltaScan.allFilters$lzycompute(DeltaScan.scala:93) at org.apache.spark.sql.delta.stats.DeltaScan.allFilters(DeltaScan.scala:93) ...

Delta 3.2.0 works with spark 3.51. Also using scala 2.13 artifacts works.

Steps to reproduce

Set up an environment in python using pyspark 3.5.1 and delta 3.2.1 or set up a jvm environment with same versions using the 2.12-scala builds. Run: spark.read.format("delta").load("path to delta files").show()

Observed results

An exception is thrown: Exception in thread "main" java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.ExpressionSet org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus$plus(scala.collection.GenTraversableOnce)' at org.apache.spark.sql.delta.stats.DeltaScan.filtersUsedForSkipping$lzycompute(DeltaScan.scala:92) at org.apache.spark.sql.delta.stats.DeltaScan.filtersUsedForSkipping(DeltaScan.scala:92) at org.apache.spark.sql.delta.stats.DeltaScan.allFilters$lzycompute(DeltaScan.scala:93) at org.apache.spark.sql.delta.stats.DeltaScan.allFilters(DeltaScan.scala:93)

Expected results

Output of data frame shown.

Further details

It looks like the combination of scala 2.12 builds of spark 3.5.1 and delta 3.2.1 is the only combination causing the exception. Scala 2.13 builds of the same version works fine. Spark 3.5.1 and delta 3.2.0 work fine. Spark 3.5.3 and delta 3.2.1 works fine.

Environment information

Willingness to contribute

umartin commented 1 month ago

Full stack trace:

Exception in thread "main" java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.ExpressionSet org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus$plus(scala.collection.GenTraversableOnce)'
    at org.apache.spark.sql.delta.stats.DeltaScan.filtersUsedForSkipping$lzycompute(DeltaScan.scala:92)
    at org.apache.spark.sql.delta.stats.DeltaScan.filtersUsedForSkipping(DeltaScan.scala:92)
    at org.apache.spark.sql.delta.stats.DeltaScan.allFilters$lzycompute(DeltaScan.scala:93)
    at org.apache.spark.sql.delta.stats.DeltaScan.allFilters(DeltaScan.scala:93)
    at org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex.matchingFiles(PrepareDeltaScan.scala:355)
    at org.apache.spark.sql.delta.files.TahoeFileIndex.listAddFiles(TahoeFileIndex.scala:111)
    at org.apache.spark.sql.delta.files.TahoeFileIndex.listFiles(TahoeFileIndex.scala:103)
    at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions(DataSourceScanExec.scala:256)
    at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions$(DataSourceScanExec.scala:251)
    at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:506)
    at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:506)
    at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions(DataSourceScanExec.scala:286)
    at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions$(DataSourceScanExec.scala:267)
    at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions$lzycompute(DataSourceScanExec.scala:506)
    at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions(DataSourceScanExec.scala:506)
    at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:553)
    at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:537)
    at org.apache.spark.sql.execution.FileSourceScanExec.doExecuteColumnar(DataSourceScanExec.scala:587)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
    at org.apache.spark.sql.execution.InputAdapter.doExecuteColumnar(WholeStageCodegenExec.scala:521)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:222)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:218)
    at org.apache.spark.sql.execution.ColumnarToRowExec.inputRDDs(Columnar.scala:204)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:498)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3314)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:838)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:797)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
    at com.github.umartin.spark_labb.Delta.main(Delta.java:15)
TharinduDG commented 1 month ago

I had the same issue when using scala api. Then I switched to scala 2.13 delta dependency and it worked. 🤷‍♂️

        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-spark_2.13</artifactId>
            <version>3.2.1</version>
            <scope>test</scope>
        </dependency>
saifat29 commented 1 week ago

Faced exact same issue, since minor versions were not important for me, I downgraded and it worked. This is what worked for me-

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.5.0",
  "org.apache.spark" %% "spark-sql" % "3.5.0",
  "io.delta" %% "delta-spark" % "3.1.0",
  "io.delta" % "delta-storage" % "3.1.0",
  "org.apache.hadoop" % "hadoop-client" % "3.3.3",
  "org.apache.hadoop" % "hadoop-common" % "3.3.3",
  "org.apache.hadoop" % "hadoop-azure" % "3.3.3",
)
patrickpio commented 3 days ago

I've encountered the same issue. The combination: spark 3.5.1, scala 2.12, delta 3.2.1 does not work.

The workaround is bump the scala to 2.13 or downgrade delta to 3.2.0. Both:

works well for me.

felipepessoto commented 2 days ago

I think Delta 3.2.1 requires Spark 3.5.3