delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.64k stars 1.71k forks source link

[QUESTION] delta lake collection structured streaming write azure storage suddenly checkpoint error, renaming .tmp file fails #1396

Open kongluo opened 2 years ago

kongluo commented 2 years ago

Bug

Describe the problem

When delta lake + Structured Streaming is writing azure storage gen2, after a period of normal operation, the streaming query stream is suddenly in the finished state. Check the log and find that an error occurs. The stack information of the error is as follows:

java.io.IOException: rename from abfss://xxx@xxx.dfs.core.windows.net/_checkpoint/ods_sensor/offsets/.415696.dbedeeda-0a23-4852-8d56-371a396af442.tmp to abfss://xxx@xxx.dfs.core.windows.net/_checkpoint/ods_sensor/offsets/415696 failed. at org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1548) ~[hadoop-common-3.2.0.jar:?] at org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:204) ~[hadoop-common-3.2.0.jar:?] at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:769) ~[hadoop-common-3.2.0.jar:?] at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) ~[hadoop-common-3.2.0.jar:?] at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) ~[hadoop-common-3.2.0.jar:?] at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:335) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:176) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) ~[scala-library-2.12.10.jar:?] at scala.Option.getOrElse(Option.scala:189) ~[scala-library-2.12.10.jar:?] at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$12(MicroBatchExecution.scala:432) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.10.jar:?] at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:430) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) ~[scala-library-2.12.10.jar:?] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:613) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:378) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:211) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.10.jar:?] at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333) ~[spark-sql_2.12-3.1.1.jar:3.1.1] at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) ~[spark-sql_2.12-3.1.1.jar:3.1.1]

I don't know if this is an internal problem of delta lake or what, because the delta lake service has been running for more than a year, this error is the first time I found it, and it can be restored after restarting

Environment information

zsxwing commented 2 years ago

@kongluo this is not a Delta Lake. It happened when writing an offset file to Structured Streaming checkpoint. However, I don't see any issue in Structured Streaming either. This seems a transient issue in azure storage gen2.