An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
I would like to have a api method ,to rollback /revert latest changes simply like DeltaTable.rolleback(), currently I am handling this scenario just removing latest version of log file and checkpoint file if any with same latest version . just to avoid rewriting/remove data files, vacuum will take care .
In my scenario , ETL pipeline has some steps after delta table merge, if any failure after this step, I need to revert the changes from delta table .
I am using below code snippet -
val deltaTable = DeltaTable.forPath(spark, path)
val lastOperationDF = deltaTable.history(1)
var currentVersion:Long=0
var readVersion:Long=0
lastOperationDF.select("version", "readVersion")
.collect()
.foreach( r => {
currentVersion = r.getLong(0)
readVersion = r.getLong(1)
})
val revertVersionFileName = appContext.deltaLogFileNameFormat.get.format(currentVersion).concat(".json")
val readVersionFileName = appContext.deltaLogFileNameFormat.get.format(readVersion).concat(".json")
revertPath = newPath.stripSuffix("/").concat("/_delta_log/").concat(revertVersionFileName)
currentPath = newPath.stripSuffix("/").concat("/_delta_log/").concat(readVersionFileName)
//Delete code to delete latest version log file and and corresponding checkpoint file if any exist for same version.
In my case as of now I am having this case with non partitioned tables so not sure how to handle checkpoint files with partitioned tables.
Thank you for great work !
I would like to have a api method ,to rollback /revert latest changes simply like DeltaTable.rolleback(), currently I am handling this scenario just removing latest version of log file and checkpoint file if any with same latest version . just to avoid rewriting/remove data files, vacuum will take care .
In my scenario , ETL pipeline has some steps after delta table merge, if any failure after this step, I need to revert the changes from delta table .
I am using below code snippet -
val deltaTable = DeltaTable.forPath(spark, path)
//Delete code to delete latest version log file and and corresponding checkpoint file if any exist for same version.
In my case as of now I am having this case with non partitioned tables so not sure how to handle checkpoint files with partitioned tables.
Thanks