datastrato / gravitino

World's most powerful data catalog service with providing a high-performance, geo-distributed and federated metadata lake.
https://datastrato.ai/docs/
Apache License 2.0
399 stars 166 forks source link

[#2542] feat(spark-connector): support file-level delete operation to Iceberg Table #2641

Closed caican00 closed 1 month ago

caican00 commented 2 months ago

What changes were proposed in this pull request?

support file-level operation to Iceberg Table:

delete from catalog.db.table where ...

Why are the changes needed?

support file-level delete operation to Iceberg Table

Fix: https://github.com/datastrato/gravitino/issues/2542

Does this PR introduce any user-facing change?

Yes, users can execute delete sql to iceberg table.

How was this patch tested?

New IT case.

caican00 commented 2 months ago

Hi @FANNG1 could you help review this PR when you are free? Thank you

jerryshao commented 2 months ago

LGTM, @FANNG1 would you please also take a look.

caican00 commented 2 months ago

IT failed and i'm fixing

FANNG1 commented 2 months ago

if not support SupportsRowLevelOperations, Iceberg only support delete all files which should match filter condition.

jerryshao commented 2 months ago

Yeah, that's the problem. The basic delete just check the filter to see if it matches to the data file, if so the whole file's data will be deleted, so it is basically not the row-level delete.

Iceberg supports both file-level delete and row-level delete, I think we should clarify that what delete should we support in this PR.

caican00 commented 1 month ago

if not support SupportsRowLevelOperations, Iceberg only support delete all files which should match filter condition.

got it.

caican00 commented 1 month ago

Yeah, that's the problem. The basic delete just check the filter to see if it matches to the data file, if so the whole file's data will be deleted, so it is basically not the row-level delete.

Iceberg supports both file-level delete and row-level delete, I think we should clarify that what delete should we support in this PR.

Thank you, and the description has been updated.

caican00 commented 1 month ago

Implement the SupportsMetadataColumns interface to support resolving metadata columns.

    org.apache.spark.sql.AnalysisException: Unable to resolve _file given [id,name,age].
        at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveAttributeError(QueryCompilationErrors.scala:1469)
        at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.resolveRef(V2ExpressionUtils.scala:48)
        at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.$anonfun$resolveRefs$1(V2ExpressionUtils.scala:53)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils$.resolveRefs(V2ExpressionUtils.scala:53)
        at org.apache.spark.sql.catalyst.analysis.RewriteRowLevelCommand.resolveRequiredMetadataAttrs(RewriteRowLevelCommand.scala:74)
caican00 commented 1 month ago

@FANNG1 could you help review again? Thanks

FANNG1 commented 1 month ago

LGTM, @jerryshao @qqqttt123 do you have time to review?

qqqttt123 commented 1 month ago

For Iceberg, we would better to test 4 cases.

v1 format and v2 format
cow vs mor table
caican00 commented 1 month ago

For Iceberg, we would better to test 4 cases.

v1 format and v2 format
cow vs mor table

@qqqttt123 i would like to do this in another pr about row-level update. WDYT?

qqqttt123 commented 1 month ago

For Iceberg, we would better to test 4 cases.

v1 format and v2 format
cow vs mor table

@qqqttt123 i would like to do this in another pr about row-level update. WDYT?

Ok for me. But we should finish it before Spark Iceberg connector is released. More test cases should be added. Our test cases don't cover the Iceberg logic which we needed. Some meta column deletion test cases are needed. You would better copy some test cases for Iceberg.

qqqttt123 commented 1 month ago

For Iceberg, we would better to test 4 cases.

v1 format and v2 format
cow vs mor table

@qqqttt123 i would like to do this in another pr about row-level update. WDYT?

Partioned table vs unparitioned table are also needed.

You would better use parameter tests to add them.

caican00 commented 1 month ago

For Iceberg, we would better to test 4 cases.

v1 format and v2 format
cow vs mor table

@qqqttt123 i would like to do this in another pr about row-level update. WDYT?

Ok for me. But we should finish it before Spark Iceberg connector is released. More test cases should be added. Our test cases don't cover the Iceberg logic which we needed. Some meta column deletion test cases are needed. You would better copy some test cases for Iceberg.

okay, these PRS are being refined.

caican00 commented 1 month ago

For Iceberg, we would better to test 4 cases.

v1 format and v2 format
cow vs mor table

@qqqttt123 i would like to do this in another pr about row-level update. WDYT?

Partioned table vs unparitioned table are also needed.

You would better use parameter tests to add them.

got it

caican00 commented 1 month ago

Hi @FANNG1 could you please help re-trigger the ci pipeline?Thank you.

qqqttt123 commented 1 month ago

Think twice. Maybe we should add the test cases in this pr. Your test cases won't cover your logic.

caican00 commented 1 month ago

Think twice. Maybe we should add the test cases in this pr. Your test cases won't cover your logic.

ok, i will add more test cases in this pr.

caican00 commented 1 month ago
    org.apache.spark.SparkUnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
        at app//org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:1109)
        at app//org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:898)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at app//scala.collection.Iterator.foreach(Iterator.scala:943)
        at app//scala.collection.Iterator.foreach$(Iterator.scala:943)
        at app//scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at app//scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at app//scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at app//scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:476)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:162)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:162)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:155)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:175)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
        at app//org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
        at app//org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)

https://github.com/apache/iceberg/blob/main/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala#L127-L186

Hi @FANNG1, i have an problem want to discuss, for row-level operations, IcebergSparkSqlExtensionsParser will check whether the loaded table is SparkTable, If not cannot apply the rules injected by IcebergSparkSessionExtensions, and in our design, the parent class of SparkIcebergTable is not SparkTable, so the rules such as RewriteUpdateTable, RewriteMergeIntoTable both cannnot be applied.

Firstly, i think it is impossible to modify iceberg to fit our design.

Secondly, iceberg-spark-3.5 removes this check, which does not have this problem in version 3.5, but does have this problem before 3.5.

Thirdly, it looks like we must to find a way to inherit SparkTable to pass this check.

FANNG1 commented 1 month ago
    org.apache.spark.SparkUnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
        at app//org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:1109)
        at app//org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:898)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at app//scala.collection.Iterator.foreach(Iterator.scala:943)
        at app//scala.collection.Iterator.foreach$(Iterator.scala:943)
        at app//scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at app//scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at app//scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at app//scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:476)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:162)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:162)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:155)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:175)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
        at app//org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
        at app//org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)

https://github.com/apache/iceberg/blob/main/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala#L127-L186

Hi @FANNG1, i have an problem want to discuss, for row-level operations, IcebergSparkSqlExtensionsParser will check whether the loaded table is SparkTable, If not cannot apply the rules injected by IcebergSparkSessionExtensions, and in our design, the parent of SparkIcebergTable is not SparkTable, so the rules such as RewriteUpdateTable, RewriteMergeIntoTable both cannnot be applied.

Firstly, i think it is impossible to modify iceberg to fit our design.

Secondly, iceberg-spark-3.5 removes this check, which does not have this problem in version 3.5, but does have this problem before 3.5.

Thirdly, it looks like we must to find a way to inherit SparkTable to pass this check.

It needs to refactor of SparkXXTable and SparkBaseTable, which seems out of the scope of this PR, @qqqttt123 do you think we keep doing it in this PR or a new PR?

qqqttt123 commented 1 month ago
    org.apache.spark.SparkUnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
        at app//org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:1109)
        at app//org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:898)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at app//scala.collection.Iterator.foreach(Iterator.scala:943)
        at app//scala.collection.Iterator.foreach$(Iterator.scala:943)
        at app//scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at app//scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at app//scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at app//scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:476)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:162)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:162)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:155)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:175)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
        at app//org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
        at app//org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)

https://github.com/apache/iceberg/blob/main/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala#L127-L186 Hi @FANNG1, i have an problem want to discuss, for row-level operations, IcebergSparkSqlExtensionsParser will check whether the loaded table is SparkTable, If not cannot apply the rules injected by IcebergSparkSessionExtensions, and in our design, the parent of SparkIcebergTable is not SparkTable, so the rules such as RewriteUpdateTable, RewriteMergeIntoTable both cannnot be applied. Firstly, i think it is impossible to modify iceberg to fit our design. Secondly, iceberg-spark-3.5 removes this check, which does not have this problem in version 3.5, but does have this problem before 3.5. Thirdly, it looks like we must to find a way to inherit SparkTable to pass this check.

It needs to refactor of SparkXXTable and SparkBaseTable, which seems out of the scope of this PR, @qqqttt123 do you think we keep doing it in this PR or a new PR?

I think so. This means that this pr has bugs. We can throw an exception to explicitly declaim this feature isn't supported, too.

caican00 commented 1 month ago
    org.apache.spark.SparkUnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
        at app//org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:1109)
        at app//org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:898)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at app//scala.collection.Iterator.foreach(Iterator.scala:943)
        at app//scala.collection.Iterator.foreach$(Iterator.scala:943)
        at app//scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at app//scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at app//scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at app//scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:476)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:162)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:162)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:155)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:175)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
        at app//org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
        at app//org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)

https://github.com/apache/iceberg/blob/main/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala#L127-L186 Hi @FANNG1, i have an problem want to discuss, for row-level operations, IcebergSparkSqlExtensionsParser will check whether the loaded table is SparkTable, If not cannot apply the rules injected by IcebergSparkSessionExtensions, and in our design, the parent of SparkIcebergTable is not SparkTable, so the rules such as RewriteUpdateTable, RewriteMergeIntoTable both cannnot be applied. Firstly, i think it is impossible to modify iceberg to fit our design. Secondly, iceberg-spark-3.5 removes this check, which does not have this problem in version 3.5, but does have this problem before 3.5. Thirdly, it looks like we must to find a way to inherit SparkTable to pass this check.

It needs to refactor of SparkXXTable and SparkBaseTable, which seems out of the scope of this PR, @qqqttt123 do you think we keep doing it in this PR or a new PR?

I think so. This means that this pr has bugs. We can throw an exception to explicitly declaim this feature isn't supported, too.

Maybe we could implement file-level deletion in this pr only, and then row-level operations in another pr. And then we could refactor SparkBaseTable before that. WDYT?cc @FANNG1 @qqqttt123

FANNG1 commented 1 month ago

Maybe we could implement file-level deletion in this pr only, and then row-level operations in another pr. And then we could refactor SparkBaseTable before that. WDYT?cc @FANNG1 @qqqttt123

+1

qqqttt123 commented 1 month ago
    org.apache.spark.SparkUnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
        at app//org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:1109)
        at app//org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:898)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at app//scala.collection.Iterator.foreach(Iterator.scala:943)
        at app//scala.collection.Iterator.foreach$(Iterator.scala:943)
        at app//scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at app//scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at app//scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at app//scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:476)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:162)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:162)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:155)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:175)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
        at app//org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
        at app//org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)

https://github.com/apache/iceberg/blob/main/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala#L127-L186 Hi @FANNG1, i have an problem want to discuss, for row-level operations, IcebergSparkSqlExtensionsParser will check whether the loaded table is SparkTable, If not cannot apply the rules injected by IcebergSparkSessionExtensions, and in our design, the parent of SparkIcebergTable is not SparkTable, so the rules such as RewriteUpdateTable, RewriteMergeIntoTable both cannnot be applied. Firstly, i think it is impossible to modify iceberg to fit our design. Secondly, iceberg-spark-3.5 removes this check, which does not have this problem in version 3.5, but does have this problem before 3.5. Thirdly, it looks like we must to find a way to inherit SparkTable to pass this check.

It needs to refactor of SparkXXTable and SparkBaseTable, which seems out of the scope of this PR, @qqqttt123 do you think we keep doing it in this PR or a new PR?

I think so. This means that this pr has bugs. We can throw an exception to explicitly declaim this feature isn't supported, too.

Maybe we could implement file-level deletion in this pr only, and then row-level operations in another pr. And then we could refactor SparkBaseTable before that. WDYT?cc @FANNG1 @qqqttt123

Ok for me.

caican00 commented 1 month ago
    org.apache.spark.SparkUnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
        at app//org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:1109)
        at app//org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:898)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at app//scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at app//scala.collection.Iterator.foreach(Iterator.scala:943)
        at app//scala.collection.Iterator.foreach$(Iterator.scala:943)
        at app//scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at app//scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at app//scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at app//scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at app//scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at app//scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at app//org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at app//org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
        at app//org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:476)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:162)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:162)
        at app//org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:155)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
        at app//org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at app//org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at app//org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at app//org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:175)
        at app//org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
        at app//org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
        at app//org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)

https://github.com/apache/iceberg/blob/main/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala#L127-L186 Hi @FANNG1, i have an problem want to discuss, for row-level operations, IcebergSparkSqlExtensionsParser will check whether the loaded table is SparkTable, If not cannot apply the rules injected by IcebergSparkSessionExtensions, and in our design, the parent of SparkIcebergTable is not SparkTable, so the rules such as RewriteUpdateTable, RewriteMergeIntoTable both cannnot be applied. Firstly, i think it is impossible to modify iceberg to fit our design. Secondly, iceberg-spark-3.5 removes this check, which does not have this problem in version 3.5, but does have this problem before 3.5. Thirdly, it looks like we must to find a way to inherit SparkTable to pass this check.

It needs to refactor of SparkXXTable and SparkBaseTable, which seems out of the scope of this PR, @qqqttt123 do you think we keep doing it in this PR or a new PR?

I think so. This means that this pr has bugs. We can throw an exception to explicitly declaim this feature isn't supported, too.

Maybe we could implement file-level deletion in this pr only, and then row-level operations in another pr. And then we could refactor SparkBaseTable before that. WDYT?cc @FANNG1 @qqqttt123

reverted to the file-level deletion. cc @FANNG1 @qqqttt123

FANNG1 commented 1 month ago

LGTM

FANNG1 commented 1 month ago

@caican00 , merged to main, thanks for your work!