apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.45k stars 2.23k forks source link

Flink SQL with Iceberg snapshots doesn't react if table has upsert #9948

Open VidakM opened 8 months ago

VidakM commented 8 months ago

Query engine

Flink 1.17.2 with Iceberg 1.4.2 libraries

Question

I have a few Iceberg v2 tables defined and a Flink job that reads them in a streaming fashion before transforming to another Iceberg table.

If the source tables are basic, then subscribing to them works great and the SQL query can continuously run.

But if the tables are defined with 'write.upsert.enabled'='true', then the subscribing Flink SQL will read only once, and not react to new snapshots. Even if the SQL definition asks it to monitor intervals and the streaming strategy is any incremental version.

Flink streaming query that normally works:

INSERT INTO iceberg.target_packaging
SELECT
  usr.`user_id` AS `user_id`,
  usr.`adress` AS `address`,
  ord.`item_id` AS `item_id`,
  ....
FROM
  iceberg.source_users /*+  OPTIONS('streaming'='true', 'monitor-interval'='15s')  */ usr
JOIN
  iceberg.source_orders /*+  OPTIONS('streaming'='true', 'monitor-interval'='15s')  */ ord ON usr.`user_id` = ord.`user_id`;

The streaming join works great if the source Iceberg tables are defined like this:

CREATE TABLE iceberg.source_users (
  `user_id` STRING,
  `adress` STRING,
  ....
  PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2');

Resulting table properties example: 
[current-snapshot-id=7980858807056176990,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd]

But the streaming join runs only once, and then stops triggering on new snapshots. It does not finish though, just stops reacting from source and produces no new records.

CREATE TABLE iceberg.source_users (
  `user_id` STRING,
  `adress` STRING,
  ....
  PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2', 'write.upsert.enabled'='true');

Resulting table properties example: 
[current-snapshot-id=3566387524956156231,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd,write.upsert.enabled=true]

In my Flink job i simply define the connector and run the SQL join/insert. Both source and target table is already defined.

I also noticed that If I have an SQL Join, it too stops streaming if at least one table has upsert enabled.

Looking at the documentation for both Iceberg and Flink I don't find any indication that enabling upsert should alter the behaviour - but I do remember reading somewhere that FLIP27 only supports append and not update / delete. Is this the reason I'm seeing this behaviour?

pvary commented 8 months ago

Flink streaming read only supports append only tables ATM.

When you imsert to a table it generates appedds, but when you switch to upsert mode, you start adding update changes to the table. Appends are read in streaming mode, but updates are ignored, as we do not have the capability to correctly emit the delete records

VidakM commented 8 months ago

Thanks for getting back to me.

Looking at the docs I also see that Spark Structured Streaming has a similar warning of only supporting appends. A bit too bad, as it would unlock a lot of interesting solutions.

Is Flink and Spark append+upsert (delete) streaming not something you want to support due to some different preferred way or is it just later in the roadmap? If so any idea on timeline?

For us this would unlock plenty of interesting use-cases for Iceberg. Atm we dump raw events into Iceberg for replay storage, with Flink we also unpack them and tidy them a little bit. Call it a silver layer if you feel like.

But if we could subscribe to the more complex upsert silver writes (late arrival joins, aggregations, pivots, partial GDPR deletes), we could actually automate pipelines that maintain more “golden” tables. For data science teams to gain clean generated and maintained tables, SuperSet users better views, easier to stream transformed data from Iceberg to other products etc. Now we would perhaps need to use Kafka or something as a buffer and do dual writes to gain CDC for upsert.

It would also circumvent some of the Flink limitations, such as not needing to buffer in memory as much for late arrivals, as it lacks some good transformation SQL that Spark has.

We would happily help contribute if possible and given a few pointers. I suppose this PR is the one implementing the upsert compatible deletes? https://github.com/apache/iceberg/pull/3095

I also saw Flink implemented FLIP-282 in 1.17 release, to enable row level delete / update APIs

pvary commented 8 months ago

@VidakM: I suggest to bring this up on the mailing list. I would also love to see this feature in Flink.

I checked myself a few months ago, and the main blocker is that the planning part for streaming V2 tables is not implemented yet. The interfaces are created, but there is no working implementation for planning. Also the proposed interfaces have a base class different than the one returned by the V1 planning, which will make it harder to do the implementation part on the execution engine side.

Seeing these tasks, I started to work on the Flink in-job compaction, as it have more immediate gains for us, but in the medium term (in a year), I would like to revisit this feature. (No commitment though)

If you need reviewers, then I could help there, so any contribution would be welcome here!

Thanks, Peter

CJDrew commented 1 month ago

@pvary

Hi Peter, I am also interested in this feature. I see in iceberg-spark we have the ability to use the procedure "create_changelog_view" and I wonder how difficult it would be to implement something similar as a Flink source. Do you know if there's an implementation for streaming v2 tables yet? Are there any other major roadblocks you're aware of?

I also see there is an ongoing proposal for row lineage here: PR

I'm wondering if it makes sense to pursue Flink CDC right now given these potential changes.

Thanks! Carter

pvary commented 1 month ago

@CJDrew: There’s an open PR (#10935) for the planning part of the incremental reads. That could be a good foundation for implementing CDC reads in Flink.

On the row lineage topic: sadly, I don't see even a theoretical solution to provide a way to generate row lineage information with Flink CDC writes.