delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.63k stars 1.71k forks source link

Cannot read/write Delta into ADLS Gen2 #2832

Open ArunPesari2 opened 7 months ago

ArunPesari2 commented 7 months ago

Cannot read/write Delta into ADLS Gen2

Which Delta project/connector is this regarding?

Describe the problem

Could not read and write Delta format into ADLS Gen2

Steps to reproduce

I have added a file to reproduce list_of_steps.txt

Observed results

Py4JJavaError: An error occurred while calling o408.save. : java.lang.ClassCastException: class org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore$VersionedFileStatus cannot be cast to class org.apache.spark.sql.execution.datasources.FileStatusWithMetadata (org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore$VersionedFileStatus and org.apache.spark.sql.execution.datasources.FileStatusWithMetadata are in unnamed module of loader 'app') at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.FileSourceScanLike.$anonfun$setFilesNumAndSizeMetric$2(DataSourceScanExec.scala:466) at org.apache.spark.sql.execution.FileSourceScanLike.$anonfun$setFilesNumAndSizeMetric$2$adapted(DataSourceScanExec.scala:466) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.execution.FileSourceScanLike.setFilesNumAndSizeMetric(DataSourceScanExec.scala:466) at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions(DataSourceScanExec.scala:257) at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions$(DataSourceScanExec.scala:251) at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:506) at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:506) at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions(DataSourceScanExec.scala:286) at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions$(DataSourceScanExec.scala:267) at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions$lzycompute(DataSourceScanExec.scala:506) at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions(DataSourceScanExec.scala:506) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:553) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:537) at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:575) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191) at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:527) at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:455) at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:454) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:498) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51) at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:242) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:51) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:751) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:445) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3573) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320) at org.apache.spark.sql.Dataset.collect(Dataset.scala:3573) at org.apache.spark.sql.delta.Snapshot.protocolAndMetadataReconstruction(Snapshot.scala:215) at org.apache.spark.sql.delta.Snapshot.x$1$lzycompute(Snapshot.scala:134) at org.apache.spark.sql.delta.Snapshot.x$1(Snapshot.scala:129) at org.apache.spark.sql.delta.Snapshot._metadata$lzycompute(Snapshot.scala:129) at org.apache.spark.sql.delta.Snapshot._metadata(Snapshot.scala:129) at org.apache.spark.sql.delta.Snapshot.metadata(Snapshot.scala:197) at org.apache.spark.sql.delta.Snapshot.toString(Snapshot.scala:390) at java.base/java.lang.String.valueOf(Unknown Source) at java.base/java.lang.StringBuilder.append(Unknown Source) at org.apache.spark.sql.delta.Snapshot.$anonfun$new$1(Snapshot.scala:393) at org.apache.spark.sql.delta.Snapshot.$anonfun$logInfo$1(Snapshot.scala:370) at org.apache.spark.internal.Logging.logInfo(Logging.scala:60) at org.apache.spark.internal.Logging.logInfo$(Logging.scala:59) at org.apache.spark.sql.delta.Snapshot.logInfo(Snapshot.scala:370) at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:393) at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshot$4(SnapshotManagement.scala:356) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment(SnapshotManagement.scala:480) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment$(SnapshotManagement.scala:468) at org.apache.spark.sql.delta.DeltaLog.createSnapshotFromGivenOrEquivalentLogSegment(DeltaLog.scala:74) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:349) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:343) at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:74) at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshotAtInitInternal$1(SnapshotManagement.scala:304) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal(SnapshotManagement.scala:301) at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal$(SnapshotManagement.scala:298) at org.apache.spark.sql.delta.DeltaLog.createSnapshotAtInitInternal(DeltaLog.scala:74) at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:293) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:74) at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:288) at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:287) at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:74) at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:57) at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:80) at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:790) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:785) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:595) at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133) at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:595) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112) at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:595) at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:784) at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$5(DeltaLog.scala:802) at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257) at com.google.common.cache.LocalCache.get(LocalCache.java:4000) at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789) at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:801) at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:811) at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:643) at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:172) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:304) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Unknown Source)

Expected results

Delta format data to be inserted into ADLS gen2

Further details

let us know if we raise this concern at Other portal

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

sherlockbeard commented 7 months ago

@ArunPesari2 try with this replace the line .config('spark.jars.packages','io.delta:delta-core_2.12:2.2.0') with .config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:3.3.1,io.delta:delta-spark_2.12:3.1.0')

Spark version:3.5. requires Delta Lake version 3.1.

fei819 commented 1 week ago

we met with same issue in spark 3.5 and delta 3.0. does any delta 3.* fix this, or should we roll back to spark 3.4? thanks