apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.24k stars 2.17k forks source link

Spark: Support alter partition in V2 Catalog SparkCatalog Class #3558

Open RussellSpitzer opened 2 years ago

RussellSpitzer commented 2 years ago

Following https://github.com/apache/iceberg/pull/3459/files we should have the beginning of SupportsPartitionManagement. This issue is open to provide implementations of ALTER TABLE Partition commands.

This would cause us to no longer rely on the Spark SQL extensions for our ALTER Partition syntax.

This was brought up when discussing #3554

RussellSpitzer commented 2 years ago

@felixYyu You wanted to take a crack at this?

felixYyu commented 2 years ago

I want

felixYyu commented 2 years ago

I try to edit SparkTable implements SupportsPartitionManagement

@Override
  public StructType partitionSchema() {
    return (StructType) SparkSchemaUtil.convert(icebergTable.spec().partitionType());
  }

  @Override
  public void createPartition(InternalRow ident, Map<String, String> properties)
          throws PartitionAlreadyExistsException, UnsupportedOperationException {
    if(partitionExists(ident)){
      throw new PartitionAlreadyExistsException(name(), ident, schema());
    }
    else{
      icebergTable.updateSpec()
              .addField((Term) ident.toSeq(partitionSchema()))
              .commit();
    }
  }

  @Override
  public boolean dropPartition(InternalRow ident) {
    try{
      if (partitionExists(ident)){
        icebergTable.updateSpec()
                .removeField(ident.toSeq(partitionSchema()).mkString())// term
                .commit();
        return true;
      }
    }catch (IllegalArgumentException e) {
      LOG.info("no partition exists for the identifier");
    }
    return false;
  }

  @Override
  public void replacePartitionMetadata(InternalRow ident, Map<String, String> properties) throws UnsupportedOperationException {
    throw new UnsupportedOperationException("Iceberg partitions do not support metadata");
  }

  @Override
  public Map<String, String> loadPartitionMetadata(InternalRow ident) throws UnsupportedOperationException {
    throw new UnsupportedOperationException("Iceberg partitions do not support metadata");
  }

  @Override
  public InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident) {
    return new InternalRow[0];
  }
felixYyu commented 2 years ago

unit test don't use IcebergSparkSessionExtensions :

val spark = SparkSession
      .builder()
      .master("local[2]")
      .appName("IcebergAPI")
      .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
      .config("spark.sql.catalog.hadoop_prod.warehouse", "file:///D:\\lake-icebergv2\\warehouse")
      //.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .config("spark.rapids.sql.enabled", true)//GPU
      .getOrCreate()

def dropPartitionByTable(spark: SparkSession): Unit = {
    spark.sql(
      s"""
         |ALTER TABLE hadoop_prod.$schemaName.$tableName DROP PARTITION FIELD days(cre_time)
         |
         |""".stripMargin)
  }

error log:

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
missing '(' at 'FIELD'(line 2, pos 61)

== SQL ==

ALTER TABLE hadoop_prod.tenant.t_user_customer ADD PARTITION FIELD bucket(12, id)
-------------------------------------------------------------^^^

    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:265)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:126)
    at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:51)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:77)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:616)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:616)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
felixYyu commented 2 years ago

ALTER TABLE support must been usedSQL extensions? I have no good idea, can hou help me. cc @RussellSpitzer

felixYyu commented 2 years ago

sorry, I suddenly realized,SparkCatalog shoud extends SupportsPartitionManagement, is right? @RussellSpitzer

RussellSpitzer commented 2 years ago

sorry, I suddenly realized,SparkCatalog shoud extends SupportsPartitionManagement, is right? @RussellSpitzer

Yep! That's what we should be adding to handle this with the new API.

felixYyu commented 2 years ago

Hi Russell,I test flow:

1.public class SparkCatalog extends BaseCatalog implements SupportsPartitionManagement
2.in TestPartitionedWrites's junit
@Test
  public void testAddPartition() {
    sql("ALTER TABLE %s ADD IF NOT EXISTS PARTITION (data='1')", tableName);
    spark.table(tableName).printSchema();
  }

  @Test
  public void testDropPartition() {
    sql("ALTER TABLE %s DROP IF EXISTS PARTITION (data_bucket=3)", tableName);
  }
3.error log

org.apache.iceberg.spark.sql.TestPartitionedWrites > testAddPartition[catalogName = testhive, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hive, default-namespace=default}] FAILED
    org.apache.spark.sql.AnalysisException: Table testhive.default.table does not support partition management.;
    'AddPartitions [unresolvedpartitionspec((data,1), None)], true
    +- ResolvedTable org.apache.iceberg.spark.SparkCatalog@1a4564a2, default.table, testhive.default.table, [id#9L, data#10]
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:51)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:50)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:172)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:151)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:172)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:195)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:192)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at org.apache.iceberg.spark.SparkTestBase.sql(SparkTestBase.java:95)
        at org.apache.iceberg.spark.sql.TestPartitionedWrites.testAddPartition(TestPartitionedWrites.java:182)

Table testhive.default.table does not support partition management. How support partition management? thanks.

felixYyu commented 2 years ago

cc @RussellSpitzer

felixYyu commented 2 years ago

1.Spark Sql Alter Table:

ALTER TABLE table_identifier ADD [IF NOT EXISTS] 
    ( partition_spec [ partition_spec ... ] )
Partition to be added. Note that one can use a typed literal (e.g., date’2019-01-02’) in the partition spec.

Syntax: PARTITION ( partition_col_name = partition_col_val [ , ... ] )

2.Iceberg SparkSqlExtensions Alter Table:

ALTER TABLE prod.db.sample ADD PARTITION FIELD bucket(16, id)
ALTER TABLE prod.db.sample ADD PARTITION FIELD truncate(data, 4)

3.Iceberg supports adding new partition fields to a spec, but spark sql don't support adding new partition field, so I think iceberg [ALTER TABLE] SQL extensions commands is better.

4.about [IF NOT EXISTS] and [IF EXISTS] commands, the method addField of BaseUpdatePartitionSpec has already check,so the [IF NOT EXISTS] and [IF EXISTS] commands are not necessary to add in the IcebergSqlExtensions.g4.

PartitionField existing = transformToField.get(validationKey);
    Preconditions.checkArgument(existing == null,
        "Cannot add duplicate partition field %s=%s, conflicts with %s", name, term, existing);
psilos commented 3 months ago

hi @RussellSpitzer @felixYyu

What's the latest on this issue? I would love to use IF EXISTS while modifying partitions. are there any news on adding this functionality?

Thanks in advance!

szehon-ho commented 3 months ago

Is this still an issue? I think this fell off the radar, not sure if @felixYyu want to continue on this, or else i think @huaxingao could also help if needed

huaxingao commented 3 months ago

@szehon-ho Thanks for pinging me. I am happy to help if needed.

huaxingao commented 3 months ago

@psilos Do you need the original Spark syntax

ALTER TABLE table_identifier ADD [IF NOT EXISTS] partition_spec
ALTER TABLE table_identifier DROP [ IF EXISTS ] partition_spec

or you need IF EXISTS / IF NOT EXISTS to add to the iceberg extended syntax?

ALTER TABLE table_identifier ADD PARTITION FIELD ...
ALTER TABLE table_identifier DROP PARTITION FIELD ...
psilos commented 3 months ago

hi @huaxingao I was probably referring to the later one, adding it on the iceberg extended syntax.