datastrato / gravitino

World's most powerful open data catalog for building a high-performance, geo-distributed and federated metadata lake.
https://datastrato.ai/docs/
Apache License 2.0
616 stars 193 forks source link

[#2543] feat(spark-connector): support row-level operations to iceberg Table #2642

Closed caican00 closed 1 month ago

caican00 commented 2 months ago

What changes were proposed in this pull request?

support row-level operations to iceberg Table

1. update tableName set c1=v1, c2=v2, ...

2. merge into targetTable t
   using sourceTable s
   on s.key=t.key
   when matched then ...
   when not matched then ...

3. delete from table where xxx

Why are the changes needed?

support row-level operations to iceberg Table

Fix: https://github.com/datastrato/gravitino/issues/2543

Does this PR introduce any user-facing change?

Yes, support update ... , merge into ..., delete from ...

How was this patch tested?

New ITs. And tested locally.

caican00 commented 1 month ago

Hi @FANNG1 could you help review this PR? Thank you!

caican00 commented 1 month ago

comments have been addressed, please help review again, thank you @FANNG1

FANNG1 commented 1 month ago

Another thought, how about upgrading Iceberg to a newer version since the problem only exists in older versions? @caican00 @qqqttt123 @jerryshao WDYT?

caican00 commented 1 month ago

Another thought, how about upgrading Iceberg to a newer version since the problem only exists in older versions? @caican00 @qqqttt123 @jerryshao WDYT?

@FANNG1

  1. IMO, we have to support spark multi-version, such as spark3.1, spark3.3, spark3.4, spark3.5, etc.
  2. Iceberg parser only has this problem before spark3.5, but some physical plans in Iceberg spark-connector, such as AddPartitionFieldExec, SetWriteDistributionAndOrderingExec, have this problem in all versions.

such as spark3.5: https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala#L47

https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166

caican00 commented 1 month ago

Another thought, how about upgrading Iceberg to a newer version since the problem only exists in older versions? @caican00 @qqqttt123 @jerryshao WDYT?

@FANNG1

  1. IMO, we have to support spark multi-version, such as spark3.1, spark3.3, spark3.4, spark3.5, etc.
  2. Iceberg parser only has this problem before spark3.5, but some physical plans in Iceberg spark-connector, such as AddPartitionFieldExec, SetWriteDistributionAndOrderingExec, have this problem in all versions.

such as spark3.5: https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala#L47

https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166

Unless SparkIcebergTable extended SparkTable and GravitinoIcebergCatalog extended SparkCatalog. This solution, discussed earlier, also needs to override some of scala's methods, such as productElement, productArity, canEqual. WDYT? cc @FANNG1 @qqqttt123 @jerryshao

FANNG1 commented 1 month ago

My concern is current implementation seems hard to maintain especially for different versions of the Spark and Iceberg. If there is no simple solution, I prefer the original implement which returns a GravitinoIcebergTable extends SparkTable.

caican00 commented 1 month ago

My concern is current implementation seems hard to maintain especially for different versions of the Spark and Iceberg. If there is no simple solution, I prefer the original implement which returns a GravitinoIcebergTable extends SparkTable.

I prefer this solution too.

caican00 commented 1 month ago

My concern is current implementation seems hard to maintain especially for different versions of the Spark and Iceberg. If there is no simple solution, I prefer the original implement which returns a GravitinoIcebergTable extends SparkTable.

I prefer this solution too.

@FANNG1 should i fallback to the original implementation? I would like to finish it today. And another thought, could we only make SparkIcebergTable extend SparkTable? hive, jdbc and so on can be inconsistent?
for example, it seems unnecessary to make SparkHiveTable also extend kyuubi HiveTable.

FANNG1 commented 1 month ago

My concern is current implementation seems hard to maintain especially for different versions of the Spark and Iceberg. If there is no simple solution, I prefer the original implement which returns a GravitinoIcebergTable extends SparkTable.

I prefer this solution too.

@FANNG1 should i fallback to the original implementation? I would like to finish it today. And another thought, could we only make SparkIcebergTable inherit SparkTable? hive, jdbc and so on can be inconsistent? for example, it seems unnecessary to make SparkHiveTable also inherit kyuubi HiveTable.

@qqqttt123 @jerryshao WDYT?

qqqttt123 commented 1 month ago

Some questions.

  1. How the Trino to solve the issue?
  2. Is it necessary to support parser if we support row-level operations?
FANNG1 commented 1 month ago

Some questions.

  1. How the Trino to solve the issue?

seems there are no relationship about How Trino solve the issue ? Spark and Trino had different frameworks.

caican00 commented 1 month ago

Some questions.

  1. How the Trino to solve the issue?
  2. Is it necessary to support parser if we support row-level operations?

@qqqttt123

  1. Trino sqlParser does not have this issue.
  2. for spark-connector in Iceberg, it explicitly uses SparkTable to identify whether it is an Iceberg table, so we have to rewrite the parser or inherit SparkTable to make rowLevelCommands recognizable.

https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala#L127-L186 image

caican00 commented 1 month ago

Some questions.

  1. How the Trino to solve the issue?

seems there are no relationship about How Trino solve the issue ? Spark and Trino had different frameworks.

Yes, it actually doesn't have the same problem with spark. So is flink.

qqqttt123 commented 1 month ago

What's the problem to extend the SparkTable?

wForget commented 1 month ago

Why not just return org.apache.iceberg.spark.source.SparkTable but wrap it? Did we do any extra work?

caican00 commented 1 month ago

What's the problem to extend the SparkTable?

@qqqttt123

  1. we have to make SparkBaseTable as an interface, as java can only extend one class. https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java if we use combined interface,it seems that we can no longer extract a common baseTable. And Then each data source needs to implement Table, SupportsRead, and SupportsWrite interfaces separately, it will cause redundant code and the readability of the code becomes terrible.

  2. some physical rules also have same problem, such as ExtendedDataSourceV2Strategy, it explicitly uses org.apache.iceberg.spark.SparkCatalog to identify whether it is an Iceberg Catalog. https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166 therefore, we also have to make GravitinoIcebergCatalog extend SparkCatalog of Iceberg, and then we have to make BaseCatalog of Gravitino as an interface. It will cause the same problem as above. https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java

qqqttt123 commented 1 month ago

What's the problem to extend the SparkTable?

@qqqttt123

  1. we have to make SparkBaseTable as an interface, as java can only extend one class. https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java if we use combined interface,it seems that we can no longer extract a common baseTable. And Then each data source needs to implement Table, SupportsRead, and SupportsWrite interfaces separately, it will cause redundant code and the readability of the code becomes terrible.
  2. some physical rules also have same problem, such as ExtendedDataSourceV2Strategy, it explicitly uses SparkCatalog to identify whether it is an Iceberg Catalog. https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166 therefore, we also have to make GravitinoIcebergCatalog extend SparkCatalog of Iceberg, and then we have to make BaseCatalog of Gravitino as an interface. It will cause the same problem as above. https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java

This is not a problem. We can extend the interface. We can extract a common a class as new class field. It's ok.

caican00 commented 1 month ago

Why not just return org.apache.iceberg.spark.source.SparkTable but wrap it? Did we do any extra work?

we have wrapped org.apache.iceberg.spark.source.SparkTable in com.datastrato.gravitino.spark.connector.iceberg.SparkIcebergTable, but the parent class of com.datastrato.gravitino.spark.connector.iceberg.SparkIcebergTable is not org.apache.iceberg.spark.source.SparkTable.

caican00 commented 1 month ago

What's the problem to extend the SparkTable?

@qqqttt123

  1. we have to make SparkBaseTable as an interface, as java can only extend one class. https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java if we use combined interface,it seems that we can no longer extract a common baseTable. And Then each data source needs to implement Table, SupportsRead, and SupportsWrite interfaces separately, it will cause redundant code and the readability of the code becomes terrible.
  2. some physical rules also have same problem, such as ExtendedDataSourceV2Strategy, it explicitly uses SparkCatalog to identify whether it is an Iceberg Catalog. https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166 therefore, we also have to make GravitinoIcebergCatalog extend SparkCatalog of Iceberg, and then we have to make BaseCatalog of Gravitino as an interface. It will cause the same problem as above. https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java

This is not a problem. We can extend the interface. We can extract a common a class as new class field. It's ok.

ok for me, @FANNG1 WDYT?

caican00 commented 1 month ago

What's the problem to extend the SparkTable?

@qqqttt123

  1. we have to make SparkBaseTable as an interface, as java can only extend one class. https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/table/SparkBaseTable.java if we use combined interface,it seems that we can no longer extract a common baseTable. And Then each data source needs to implement Table, SupportsRead, and SupportsWrite interfaces separately, it will cause redundant code and the readability of the code becomes terrible.
  2. some physical rules also have same problem, such as ExtendedDataSourceV2Strategy, it explicitly uses SparkCatalog to identify whether it is an Iceberg Catalog. https://github.com/apache/iceberg/blob/426818bfe7fa93e8c677ebf886638d5c50db597b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala#L166 therefore, we also have to make GravitinoIcebergCatalog extend SparkCatalog of Iceberg, and then we have to make BaseCatalog of Gravitino as an interface. It will cause the same problem as above. https://github.com/datastrato/gravitino/blob/main/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java

This is not a problem. We can extend the interface. We can extract a common a class as new class field. It's ok.

ok for me, @FANNG1 WDYT?

if we all think it is ok, i will go ahead, thank you.

FANNG1 commented 1 month ago

if we all think it is ok, i will go ahead, thank you.

It's ok for me, something to keep consistent before refactor,

  1. please keep consistent of the implement of HiveTable and IcebergTable.
  2. refacor the common logic in SparkBaseTable as a common helper class, not interface.
  3. close current PR and propose a new PR.

WDYT? @caican00

caican00 commented 1 month ago
  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent? if do it like this,why not just get the implementation of the parent class directly? In this case, why implement SparkBaseTable, which seems a little redundant?

caican00 commented 1 month ago
  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent? if do it like this,why not just get the implementation of the parent class? In this case, why implement SparkBaseTable, which seems a bit redundant?

and in addition, kyuubi's HiveTable is a scala class, and extending it in java requires overriding some of scala's methods

FANNG1 commented 1 month ago
  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent? if do it like this,why not just get the implementation of the parent class? In this case, why implement SparkBaseTable, which seems a bit redundant?

because keep consistent is easy to maintenance, SparkIcebergTable extends SparkTable while SparkHiveTable compose KyubbyHiveTable is really confusing to new developers, when implementing new features should consider the two cases, if not, may encounter bugs which is unmaintainable.

caican00 commented 1 month ago
  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent? if do it like this,why not just get the implementation of the parent class? In this case, why implement SparkBaseTable, which seems a bit redundant?

because keep consistent is easy to maintenance, SparkIcebergTable extends SparkTable while SparkHiveTable compose KyubbyHiveTable is really confusing to new developers, when implementing new features should consider the two cases, if not, may encounter bugs which is unmaintainable.

@FANNG1 got it, should I submit a separate pr to refactor the table implementation? This does not include row level operations. In this new pr we can explain the reasons for the refactoring and then submit the row level pr

FANNG1 commented 1 month ago
  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent? if do it like this,why not just get the implementation of the parent class? In this case, why implement SparkBaseTable, which seems a bit redundant?

because keep consistent is easy to maintenance, SparkIcebergTable extends SparkTable while SparkHiveTable compose KyubbyHiveTable is really confusing to new developers, when implementing new features should consider the two cases, if not, may encounter bugs which is unmaintainable.

@FANNG1 got it, should I submit a separate pr to refactor the table implementation? This does not include row level operations. In this new pr we can explain the reasons for the refactoring and then submit the row level pr

both are ok for me.

caican00 commented 1 month ago
  1. please keep consistent of the implement of HiveTable and IcebergTable.

@FANNG1 I'm a little confused,is it necessary to keep consistent? if do it like this,why not just get the implementation of the parent class? In this case, why implement SparkBaseTable, which seems a bit redundant?

because keep consistent is easy to maintenance, SparkIcebergTable extends SparkTable while SparkHiveTable compose KyubbyHiveTable is really confusing to new developers, when implementing new features should consider the two cases, if not, may encounter bugs which is unmaintainable.

@FANNG1 got it, should I submit a separate pr to refactor the table implementation? This does not include row level operations. In this new pr we can explain the reasons for the refactoring and then submit the row level pr

both are ok for me.

ok

caican00 commented 1 month ago

close this pr and create a new pr to refactor table implementation and support row-level operations feature.