apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.21k stars 2.17k forks source link

Support deletion in Apache Flink #8718

Open Gerrrr opened 11 months ago

Gerrrr commented 11 months ago

Feature Request / Improvement

Hello community,

Is DELETE FROM support for Apache Flink already in the roadmap? Example:

CREATE TABLE word_count (
    word STRING,
    cnt BIGINT,
    PRIMARY KEY(`word`) NOT ENFORCED
) PARTITIONED BY (word) WITH (
    'format-version'='2', 
    'write.upsert.enabled'='true'
);

DELETE FROM word_count WHERE word = 'z';
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Can't perform delete operation of the table nessie_catalog.db.word_count because the corresponding dynamic table sink has not yet implemented org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete.

It looks like this functionality already exists at least for Apache Spark - https://iceberg.apache.org/docs/latest/spark-writes/#delete-from.

Query engine

Flink

pvary commented 11 months ago

Is this for a V2 table? I have seen deleting rows working using V2 table, Java code with the stream API, but I yet to try out SQL.

Gerrrr commented 11 months ago

Yes, this is a V2 table. I just added the DDL to the description now.

Java code with the stream API, but I yet to try out SQL.

This is great news! Probably,IcebergTableSink just needs to implement SupportsRowLevelDelete.

VidakM commented 8 months ago

Is DELTE FROM still on the roadmap for Flink or could we help out in some way?

AjayChitumalla commented 6 months ago

Is this for a V2 table? I have seen deleting rows working using V2 table, Java code with the stream API, but I yet to try out SQL.

Can you share a reference for performing row level deletes on Iceberg V2 table using Flink DataStream API

pvary commented 6 months ago

If your input stream contains -D records, then they will be deleted from the table. Like:

      Row row = Row.withPositions(RowKind.DELETE, 3);
      row.setField(0, "hello");
      row.setField(1, true);
      row.setField(2, 1L);
github-actions[bot] commented 5 days ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.