delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
6.98k stars 1.6k forks source link

delta version 0.7.0 - java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias.<init> while executing Merge against delta table #594

Closed ncrakesh closed 3 years ago

ncrakesh commented 3 years ago

Trying to run merge on a delta table in version 0.7.0 in google dataproc instance

pyspark --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

Merge Sql :

deltaTable.alias("accnt").merge(
... csv_df_ltst.alias("updates"),\ ... "accnt.acct_nbr = updates.acct_nbr") \ ... .whenMatchedDelete(condition = "updates.cdc_ind='D'") \ ... .whenMatchedUpdateAll(condition = "updates.cdc_ind='U'") \ ... .whenNotMatchedInsertAll(condition = "updates.cdc_ind!='D'")\ ... .execute()

This error got thrown:

Traceback (most recent call last): File "", line 6, in File "/hadoop/spark/tmp/spark-c7ba1c7d-5b84-473b-9299-1ada47acb78f/userFiles-86297b85-576b-4a53-8111-b354374dcc2c/io.delta_delta-core_2.12-0.7.0.jar/delta/tables.py", line 611, in execute File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o178.execute. : java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias.(Lorg/apache/spark/sql/catalyst/expressions/Expression;Ljava/lang/String;Lorg/apache/spark/sql/catalyst/expressions/ExprId;Lscala/collection/Seq;Lscala/Option;)V at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$buildTargetPlanWithFiles$1(MergeIntoCommand.scala:441) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.delta.commands.MergeIntoCommand.buildTargetPlanWithFiles(MergeIntoCommand.scala:433) at org.apache.spark.sql.delta.commands.MergeIntoCommand.findTouchedFiles(MergeIntoCommand.scala:235) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$4(MergeIntoCommand.scala:154) at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77) at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:94) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:94) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$2(MergeIntoCommand.scala:154) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$2$adapted(MergeIntoCommand.scala:135) at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$1(MergeIntoCommand.scala:135) at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77) at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:94) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:94) at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:134) at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:236) at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:60) at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:48) at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:121) at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:225) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

tdas commented 3 years ago

What version of Spark are you running on? 0.7.0 is only compatible with Spark 3.0.x

ncrakesh commented 3 years ago

Its using Spark version 3.1.1

tdas commented 3 years ago

Apache Spark 3.1.1 has not been officially release yet. So there is no version of Delta Lake compatible with 3.1 yet. Please downgrade spark to 3.0.x. You can use the latest version of Delta Lake 0.8.0 with Spark 3.0.x

ncrakesh commented 3 years ago

Thank you TDas. Let me give a try by downgrading the pyspark version to 3.0.1.

caioalmeida97 commented 3 years ago

@tdas do you have any idea of when Delta Lake will be compatible with Spark 3.1?

I'm asking this because I'm currently working on a project that uses a Cluster from Dataproc on GCP, and when choosing an image for my cluster configuration, I have to choose between Apache Spark versions 3.1.x or 2.4.7.

Or do you have any "easy way" to downgrade the Spark version from the cluster?

zsxwing commented 3 years ago

@caioalmeida97 3.1.1 is not yet out. So we cannot migrate Delta Lake to 3.1.1 right now. Is there any reason you are using an unreleased Spark version?

Or do you have any "easy way" to downgrade the Spark version from the cluster?

I don't know how to do that on GCP. Does GCP not allow to pick up a Spark version?

caioalmeida97 commented 3 years ago

@zsxwing Google Cloud provides pre-built images for the clusters on Dataproc, which can be seen here https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0 or https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-1.5

On the first link, GCP provides Apache Spark on versions 3.1.1 or 3.1.0, and on the second link it provides Apache Spark version 2.4.5 :(

I don't know how to do that on GCP. Does GCP not allow to pick up a Spark version?

Not using these images. In this case, I'd have to build my own image and then install all packages on the versions I want. That is a possibility, but I think that would not be the easiest way of picking up my Spark version.

zsxwing commented 3 years ago

It looks weird that GCP picks up a rc version. There is no guarantee for a rc version. It's usually not production ready. APIs could even be changed between a rc version and a final release version. IMO, I prefer to not use any rc version in production.

liberzon commented 3 years ago

@zsxwing we look forward to have delta run on Spark 3.1.1. Do you have some estimation when will it be supported?

dnskr commented 3 years ago

I can reproduce this issue with Delta Lake 0.8.0 and Spark 3.1.1

zsxwing commented 3 years ago

We are working on this. The patch for Spark 3.1.1 support will be out soon. But the next release may take more time. We are still working some features for the next release.

agchang-cgl commented 3 years ago

I understand this is not yet a 1.0.0 release, but the quick-start documentation as written will show this error when following line for line ;-): https://docs.delta.io/latest/quick-start.html#pyspark. Just a heads up.

nitzmali commented 3 years ago

Hey, I see this error on spark 3.0.2-bin-hadoop2.7 Delta Lake 0.8.0/0.7.0. Any suggestions?

nitzmali commented 3 years ago

I got it working on spark-3.0.1-bin-hadoop3.2 with deltacore 0.8.0. Any ETA on Spark 3.1.1 patch? Thanks.

dishkakrauch commented 3 years ago

We are working on this. The patch for Spark 3.1.1 support will be out soon. But the next release may take more time. We are still working some features for the next release.

Hey there! Do you have any news about certain time when patched delta for spark 3.1.1 will be released? Many thanks.

zsxwing commented 3 years ago

We have upgraded Spark to 3.1.1 in master branch. We are still working on some items before doing a release.

dishkakrauch commented 3 years ago

We have upgraded Spark to 3.1.1 in master branch. We are still working on some items before doing a release.

We've tested delta 0.8.0 with Spark 3.1.1 and found out that update opertaion is still fails with error but insert and delete is okay.

e-compagno commented 3 years ago

I have the same error in merging operations with delta lake 0.8.0 and spark 3.1.1. The code is the following:

def upsertToDeltaLake(batchDF: DataFrame, batchId: Long): Unit = {
    DeltaTable
      .forPath(spark, deltaTablePath)
      .as("dLakeDF")
      .merge(
        batchDF,
        "dLakeDF.id = batchDF.id"
      )
      .whenMatched().updateAll()
      .whenNotMatched().insertAll()
      .execute()
  }

df
    .writeStream
    .foreachBatch(upsertToDeltaLake _)
    .outputMode("update")
    .start()
    .awaitTermination()

At the moment, the issue can be solved only by downgrading to spark 3.0.1 in GCP via creating a dataproc cluster from the command line using the image 2.0.0-RC22-debian10:

gcloud dataproc clusters create <cluster-name> --image-version=2.0.0-RC22-debian10
MrPowers commented 3 years ago

Any updates on this one? It's preventing us from upgrading to Spark 3.1.1 and the latest Databricks Runtime. Thanks!

diveyseek commented 3 years ago

I'm having the same issue Spark 3.1.1/Hadoop 3.2.0 & Delta 0.8

Example code causing the error

delta_table.alias("existing").merge(
    df.alias("updates"), existing.id=updates.id
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

The error I'm getting is ...

Py4JJavaError: An error occurred while calling o947.execute.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias.<init>(Lorg/apache/spark/sql/catalyst/expressions/Expression;Ljava/lang/String;Lorg/apache/spark/sql/catalyst/expressions/ExprId;Lscala/collection/Seq;Lscala/Option;)V
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$writeInsertsOnlyWhenNoMatchedClauses$4(MergeIntoCommand.scala:403)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$writeInsertsOnlyWhenNoMatchedClauses$1(MergeIntoCommand.scala:402)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:622)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.writeInsertsOnlyWhenNoMatchedClauses(MergeIntoCommand.scala:393)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$2(MergeIntoCommand.scala:265)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$2$adapted(MergeIntoCommand.scala:250)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$1(MergeIntoCommand.scala:250)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:622)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:249)
    at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:239)
    at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:60)
    at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:48)
    at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:123)
    at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:228)
    at sun.reflect.GeneratedMethodAccessor255.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
gassayy commented 3 years ago

I am having the same issue with Spark 3.1.1 and Delta-core 0.8.0

DeltaTable
      .forName(spark, tableName)
      .alias("s")
      .merge(dedupedDf.alias("d"), s"s.uuid=d.uuid")
      .whenMatched()
      .updateAll()
      .whenNotMatched()
      .insertAll()
      .execute()
java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias.<init>(Lorg/apache/spark/sql/catalyst/expressions/Expression;Ljava/lang/String;Lorg/apache/spark/sql/catalyst/expressions/ExprId;Lscala/collection/Seq;Lscala/Option;)V
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$buildTargetPlanWithFiles$1(MergeIntoCommand.scala:577)
    at scala.collection.immutable.List.map(List.scala:293)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.buildTargetPlanWithFiles(MergeIntoCommand.scala:569)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$findTouchedFiles$1(MergeIntoCommand.scala:330)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:622)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.findTouchedFiles(MergeIntoCommand.scala:306)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$2(MergeIntoCommand.scala:267)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$2$adapted(MergeIntoCommand.scala:250)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$1(MergeIntoCommand.scala:250)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:622)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:249)
    at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:239)
    at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:60)
    at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:48)
    at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:123)
    at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:228)
    at com.bosch.mas.deltalake.operations.StreamingQuery.mergeFn(StreamingQuery.scala:29)
    at com.bosch.mas.ingestion.MasDeltaLakeSpec.$anonfun$new$4(MasDeltaLakeSpec.scala:43)
    at com.bosch.mas.ingestion.MasDeltaLakeSpec.$anonfun$new$4$adapted(MasDeltaLakeSpec.scala:43)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Exception in thread "stream execution thread for test_query [id = f3e6834f-b13e-4095-b890-e83f63b6654d, runId = 3455cde0-42e9-468d-9796-f75c77fca4b7]" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias.<init>(Lorg/apache/spark/sql/catalyst/expressions/Expression;Ljava/lang/String;Lorg/apache/spark/sql/catalyst/expressions/ExprId;Lscala/collection/Seq;Lscala/Option;)V

Is there any plan to fix this issue? Thanks

dishkakrauch commented 3 years ago

@Gassa we've cure all error like these after building delta 0.9.0 using sbt.

Antauri commented 3 years ago

ETA for 0.9.0?

Teej42 commented 3 years ago

It looks like 1.0.0 is planned for release in 7-14 days, according to this comment: https://github.com/delta-io/delta/issues/282#issuecomment-842642968

jaceklaskowski commented 3 years ago

5 days left to Data + AI Summit 2021 (May 24—28) which seems the best time for 1.0 release, doesn't it?

gassayy commented 3 years ago

@Gassa we've cure all error like these after building delta 0.9.0 using sbt.

@dishkakrauch and @Teej42 I can't find any new release versioned 0.9.0 on maven central, reference: https://mvnrepository.com/artifact/io.delta/delta-core_2.12

Have it published in maven central repository or others? Or it is only internally available for internal usage at this moment?

@Teej42 if delta-core 1.0.0 is going to be release in 7 - 14 days, are you guys planning to skip 0.9.0 since it is not published yet.

My team are building our pipeline with DataBricks Cluster on Azure, we needs the feature of data change feed (Databricks Runtime 8.2 https://docs.databricks.com/delta/delta-change-data-feed.html), it is only available with runtime 8.2 with spark 3.1.1, then I encounter the above problems with merge function.

It is currently blocking our development, not sure if there is any shortcut that I can build/obtain either 0.9.0 or 1.0.0 or we have to wait for another two weeks. It will be appreciated if you guys can provide some insight for this problem.

Teej42 commented 3 years ago

@Gassa - I am just an interested party, not a developer of this product. I see this: https://github.com/delta-io/delta/milestone/10

tdas commented 3 years ago

@Gassa I am working on the 1.0 release right now (staging the release, testing, etc.). We are definitely going to release by next week. It will be consistent with DDBR 8.3 which will come out in the next few weeks as well. If you are curious .. here is a draft of the 1.0 release http://docs.delta.io/1.0.0/

In addition, Change Data Feed is going to be available only in DBR as of now.