delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.39k stars 1.66k forks source link

AnalysisException when merging new dataframe into existing data lake #484

Closed matthewpick closed 4 years ago

matthewpick commented 4 years ago

Environment

AWS EMR 5.28.1 Delta 0.6.1

Problem

Not sure why I keep getting this AnalysisException... the data in S3 and the new spark dataframe have the same schema (even the error message contains the correct column names)

The error is inconsistent but typically shows up after 5-10 minutes of processing the Kinesis stream data.

Logs

EMR Logs

20/07/30 20:27:33 INFO DAGScheduler: ResultStage 1529 (start at KinesisToDelta.scala:70) finished in 0.571 s
20/07/30 20:27:33 INFO DAGScheduler: Job 722 finished: start at KinesisToDelta.scala:70, took 0.659081 s
20/07/30 20:27:33 INFO GenerateSymlinkManifest: Generated manifest partitions for s3://mybucket/client1/datalake/delta-production/timeline_1_0_0 [1]:

20/07/30 20:27:33 INFO GenerateSymlinkManifest: Deleted manifest partitions [0]:

20/07/30 20:27:33 INFO GenerateSymlinkManifest: DELTA: Done
20/07/30 20:27:33 INFO JobScheduler: Finished job streaming job 1596140400000 ms.0 from job set of time 1596140400000 ms
20/07/30 20:27:33 ERROR JobScheduler: Error running job streaming job 1596140400000 ms.0
org.apache.spark.sql.AnalysisException: cannot resolve `oldData.myPrimaryKey` in search condition given columns newData.`myPrimaryKey`, newData.`streaming_timestamp`, newData.`_id`, newData.`clientEngagementId`, newData.`deleted`, newData.`duration`, newData.`fileName_uniqueIdentifier`, newData.`fileName_audio`, newData.`fileName_audioReceived`, newData.`cdp_interactionstartdate`; line 1 pos 0
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$$anonfun$org$apache$spark$sql$catalyst$plans$logical$DeltaMergeInto$$resolveOrFail$1$3.apply(deltaMerge.scala:259)
    at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$$anonfun$org$apache$spark$sql$catalyst$plans$logical$DeltaMergeInto$$resolveOrFail$1$3.apply(deltaMerge.scala:255)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.org$apache$spark$sql$catalyst$plans$logical$DeltaMergeInto$$resolveOrFail$1(deltaMerge.scala:255)
    at org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto$.resolveReferences(deltaMerge.scala:385)
    at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:228)
    at com.topbox.bigdata.delta.DeltaWriter$.write(DeltaWriter.scala:72)
    at com.topbox.bigdata.EventProcessing$$anonfun$processEvents$1$$anonfun$apply$1.apply(EventProcessing.scala:90)
    at com.topbox.bigdata.EventProcessing$$anonfun$processEvents$1$$anonfun$apply$1.apply(EventProcessing.scala:50)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at com.topbox.bigdata.EventProcessing$$anonfun$processEvents$1.apply(EventProcessing.scala:50)
    at com.topbox.bigdata.EventProcessing$$anonfun$processEvents$1.apply(EventProcessing.scala:43)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
    at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
    at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
    at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:169)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Delta merge statement

DeltaWriter.scala

...
val primaryKey = "myPrimaryKey"

deltaTable.as("oldData")
  .merge(df.as("newData"), s"oldData.${primaryKey} = newData.${primaryKey}")
  .whenMatched("newData.deleted = true")
  .delete()
  .whenMatched
  .updateAll()
  .whenNotMatched
  .insertAll()
  .execute()  // Line 72
...

Delta.io Code references

https://github.com/delta-io/delta/blob/v0.6.1/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/deltaMerge.scala#L253-L262

Thoughts

Maybe this could be caused by case sensitivity in the primaryKey?

deltaMerge.scala hasn't changed very much between 0.6.1 to 0.7.0 so I am not confident that upgrading to 0.7.0 will fix this problem (maybe the spark 3.0 catalyst code is more robust?).

tdas commented 4 years ago

This is an inconsistent error that means there is probably something non-deterministic going on here. The analysis resolution stuff is deterministic, so it's likely that its something to do with the DataFrame generation. Can you update your code to catch this error and print the schema of the DeltaTable object and the newData DataFrame?

In addition, there has actually been significant changes in the analysis part of merge in 0.7.0 because now Spark 3.0 catalyst directly understands Merge logical plan. So would be good to try it out.

matthewpick commented 4 years ago

@tdas Thanks for the recommendations / insight.

I've been working on upgrading to EMR 6.0.0 and Delta 0.7.0 today and hopefully that helps (I need to upgrade to Spark 3.x at this point anyway). I'll keep this issue up-to-date with my findings.

matthewpick commented 4 years ago

@tdas your recommendation to compare datalake / new dataframe schemas worked great 👍

My delta writer logic (built for streaming purposes) has to choose between writing a brand new dataset or using a merge statement.

A few of my delta tables (I have a 150 tables or so) were in a weird state where DeltaTable.isDeltaTable() returned true even though there wasn't any data in the table (and the schema was completely undefined).

By combining the logic of "isDeltaTable" and "isTableEmpty" I was able to properly create/update my delta tables and account for this odd edge case where isDeltaTable returned true but the table did not have any data or a schema defined.

Here is my new delta writer code: https://gist.github.com/matthewpick/d7ca9504179c84df945d37a8c34ed107#file-deltawriter-scala-L39-L42

tdas commented 4 years ago

Glad to hear!