apache / sedona

A cluster computing framework for processing large-scale geospatial data
https://sedona.apache.org/
Apache License 2.0
1.96k stars 692 forks source link

Sedona fails to write Delta Lake on Databricks 15.3 Beta: ClassCastException #1454

Closed jacob-talroo closed 5 months ago

jacob-talroo commented 5 months ago

Expected behavior

Writing out a delta lake should work.

Actual behavior

Fails with:

ClassCastException: class scala.collection.immutable.Map$Map1 cannot be cast to class com.databricks.sql.transaction.tahoe.actions.ParsedAddFileTags (scala.collection.immutable.Map$Map1 and com.databricks.sql.transaction.tahoe.actions.ParsedAddFileTags are in unnamed module of loader 'app')
    at scala.Option.flatMap(Option.scala:271)
    at com.databricks.sql.transaction.tahoe.actions.AddFile.longTag(actions.scala:938)
    at com.databricks.sql.transaction.tahoe.actions.AddFile.insertionTime(actions.scala:917)
    at com.databricks.sql.transaction.tahoe.files.DelayedCommitProtocolEdge.$anonfun$updateInsertionTime$1(DelayedCommitProtocolEdge.scala:116)
    at com.databricks.sql.transaction.tahoe.files.DelayedCommitProtocolEdge.$anonfun$updateInsertionTime$1$adapted(DelayedCommitProtocolEdge.scala:116)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at com.databricks.sql.transaction.tahoe.files.DelayedCommitProtocolEdge.updateInsertionTime(DelayedCommitProtocolEdge.scala:116)
    at com.databricks.sql.transaction.tahoe.files.DelayedCommitProtocolEdge.commitJob(DelayedCommitProtocolEdge.scala:80)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:400)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:532)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:400)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:430)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$1(FileFormatWriter.scala:300)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121)
    at com.databricks.sql.transaction.tahoe.commands.WriteIntoDeltaCommand.run(WriteIntoDeltaCommand.scala:121)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.$anonfun$sideEffectResult$5(commands.scala:137)
    at org.apache.spark.sql.execution.SparkPlan.runCommandWithAetherOff(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.runCommandInAetherOrSpark(SparkPlan.scala:191)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.$anonfun$sideEffectResult$4(commands.scala:137)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:133)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:132)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.$anonfun$doExecute$4(commands.scala:161)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:161)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$2(SparkPlan.scala:329)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:329)
    at org.apache.spark.sql.execution.SparkPlan$.org$apache$spark$sql$execution$SparkPlan$$withExecuteQueryLogging(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:387)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:383)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:324)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$doExecute$1(AdaptiveSparkPlanExec.scala:864)
    at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:663)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$7(SQLExecution.scala:778)
    at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$6(SQLExecution.scala:778)
    at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$5(SQLExecution.scala:778)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:777)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:776)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction$.withActive(OptimisticTransaction.scala:201)
    at com.databricks.sql.transaction.tahoe.ConcurrencyHelpers$.withOptimisticTransaction(ConcurrencyHelpers.scala:54)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:775)
    at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:759)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:134)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48)
    at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$4(SparkThreadLocalForwardingThreadPoolExecutor.scala:91)
    at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
    at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:90)
    at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:67)
    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:131)
    at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:134)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Steps to reproduce the problem

Launch a DBR 15.3 Beta with Sedona. Run:

val foo = Seq(1, 2, 3).toDF("a")

foo
  .write
  .mode("overwrite")
  .format("delta")
  .save("/mnt/tmp90/jt/foo1")

I am hopeful that the stack trace might be helpful to someone.

Settings

Sedona version = 1.5.3 and 1.60

Apache Spark version = 3.5

API type = Scala

Scala version = 2.12

JRE version = 17

Python version = 3.11.0

Environment = Databricks 15.3 beta

jiayuasu commented 5 months ago

The geometry type data will not be able to be written to delta lake because delta lake has no geometry type.

It was not supposed to work actually. The correct way to save a geometry column to Delta lake is ST_EWKB ST_EWKT and read it back using ST_GeomFromWKB and ST_GeomFromWKT

jacob-talroo commented 5 months ago

Oops. I had a bad test result. I think the issue is DBR 15.3 Beta. DBR 15.2 appears to be working. I'll update the title/description above.

jiayuasu commented 5 months ago

I will be surprised if this is working. Not sure how it could work in the first place

jacob-talroo commented 5 months ago

I can confirm that this IS a change on DBR 15.3 Beta. I'll file a defect with Databricks.

jiayuasu commented 5 months ago

@Kontinuation seems like the Databricks DeltaLake can read / write Sedona Geometry?

jacob-talroo commented 5 months ago

I couldn't write the geometry --- so I am writing the WKB in < DBR 15.2.

However, in DBR 15.3, all writes fail.

jacob-talroo commented 5 months ago

FYI - Databricks provided the following workaround - remove spark.serializer org.apache.spark.serializer.KryoSerializer from the cluster config.

I believe they are working on a real fix now.