datastrato / gravitino

World's most powerful data catalog service with providing a high-performance, geo-distributed and federated metadata lake.
https://datastrato.ai/docs/
Apache License 2.0
347 stars 150 forks source link

[#3264] feat(spark-connector): Support Iceberg time travel in SQL queries #3265

Open caican00 opened 2 weeks ago

caican00 commented 2 weeks ago

What changes were proposed in this pull request?

Support Iceberg time travel in SQL queries

Why are the changes needed?

supports time travel in SQL queries using TIMESTAMP AS OF, FOR SYSTEM_TIME AS OF or VERSION AS OF, FOR SYSTEM_VERSION AS OF clauses.

Fix: https://github.com/datastrato/gravitino/issues/3264

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New ITs.

caican00 commented 3 days ago

Finally, I still choose to implement Iceberg time travel by overriding newScanBuilder, for the following reasons:

  1. Although SparkIcebergTable extended SparkTable, it still needs to initialize its member variables, such as snapshotId or branch, before it can directly reuse the newScanBuilder implementation of SparkTable.

  2. However, initializing snapshotId or branch is difficult, not as easy as initializing refreshEagerly, because it is difficult to determine snapshotId or branch is initialized in the real sparkTable. Therefore, it is difficult to selectively initialize snapshotId or branch through the super method.

In this case, users specify the version for time travel, but the version may be snapshotId or branch name. https://github.com/apache/iceberg/blob/2058053b0c6e5b1c7e91fa029162f22d109aafb1/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java#L195-L199

  1. Therefore, overriding newScanBuilder is more convenient and does not introduce too much maintenance burden.
caican00 commented 3 days ago

Finally, I still choose to implement Iceberg time travel by overriding newScanBuilder, for the following reasons:

  1. Although SparkIcebergTable extended SparkTable, it still needs to initialize its member variables, such as snapshotId or branch, before it can directly reuse the newScanBuilder implementation of SparkTable.
  2. However, initializing snapshotId or branch is difficult, not as easy as initializing refreshEagerly, because it is difficult to determine snapshotId or branch is initialized in the real sparkTable. Therefore, it is difficult to selectively initialize snapshotId or branch through the super method.

In this case, users specify the version for time travel, but the version may be snapshotId or branch name. https://github.com/apache/iceberg/blob/2058053b0c6e5b1c7e91fa029162f22d109aafb1/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java#L195-L199

  1. Therefore, overriding newScanBuilder is more convenient and does not introduce too much maintenance burden.

cc @FANNG1

FANNG1 commented 2 days ago

How about Override loadTable(Identifier ident, String version) and loadTable(Identifier ident, String version) for GravitinoIcebergCatalog?

caican00 commented 2 days ago

How about Override loadTable(Identifier ident, String version) and loadTable(Identifier ident, String version) for GravitinoIcebergCatalog?

in this way, we still have the problem of initializing the snapshotId or branch when invoking super method of SparkTable.

FANNG1 commented 2 days ago

How about Override loadTable(Identifier ident, String version) and loadTable(Identifier ident, String version) for GravitinoIcebergCatalog?

in this way, we still have the problem of initializing the snapshotId or branch when invoking super method of SparkTable.

Let me think a while

caican00 commented 1 day ago

fixed conflict, and could you please help review again if you are free? Thank you.