apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.93k stars 2.08k forks source link

Is there any way on Flink to read newly appended data only (NOT in current Iceberg table snapshot)? #9955

Open okayhooni opened 4 months ago

okayhooni commented 4 months ago

Query engine

Flink

Question

When I ingest from iceberg table to iceberg table with Flink, I want to start sink table ingestion with newly added records from current snapshot, like consumer.override.auto.offset.reset: "latest" in Kafka source..!

But, there is no option out of connector.iceberg.starting-strategy available, for my use case.

Is there any way to read newly appended data only (NOT in current Iceberg table snapshot)?

dyzcs commented 4 months ago

Hi @okayhooni regarding this issue, I would like to express my opinion. I am currently exploring the use of Iceberg as a replacement for Kafka in near real-time processing. The problem you mentioned is divided into two parts.

The first part is that Iceberg does not maintain offsets for messages like Kafka, but there is an internal snapshot list in chronological order. You can obtain the latest snapshot ID each time and then specify incremental consumption from the current ID.

The second part is Iceberg to satisfy the problem of message order in Kafka as much as possible. Iceberg consumes snapshots one by one in FIFO mode, and sorts the read incremental files according to their write time before passing them downstream.

The above is my opinion, welcome to discuss together.

okayhooni commented 4 months ago

@dyzcs , Thank you for kind reply!

Did you mean the two parts you mentioned have to be implemented on Iceberg flink connector for my use case..? (= theoritically possible, but not impemented yet)

Or, it can be implemented in user application layer with current iceberg-flink-runtime?

dyzcs commented 4 months ago

@okayhooni I'm not sure if Iceberg has a similar development plan, but overall, Iceberg won't imitate Kafka's related features. I understand Iceberg's positioning as a table format.

Returning to your question, the above two features can be developed based on the current version of Iceberg.

Firstly, obtain the latest snapshot ID: https://iceberg.apache.org/docs/latest/flink-queries/#snapshots You can obtain all snapshot information.

Secondly, https://iceberg.apache.org/docs/latest/flink-configuration/#read-options The following situations should meet the requirements: TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just for FIP27 Source.

pvary commented 4 months ago

@okayhooni: IIUC, INCREMENTAL_FROM_LATEST_SNAPSHOT is the reading mode you are looking for.

@dyzcs: There is an option for Flink Iceberg readers to read the splits in commit order. See #7661.

There is also an option to order the splits based on event time. See: #8553

Since the reading of the splits are done in parallel, it would mean that this order could be honored only on task level, but watermark alignment could be used to decrease the number of the records read by a runaway reader.

okayhooni commented 4 months ago

@dyzcs Thank you..! I'll try that

@pvary I already tested ingestion with INCREMENTAL_FROM_LATEST_SNAPSHOT option. But it is not working as I expected!

example)

pvary commented 4 months ago

I do not fully understand your test setup, but using INSERT ... SELECT ... will not guarantee that the snapshots are preserved between the tables. It might happen that both changes happened during the same monitoring interval, and the changes got read and inserted in one commit into the target table.

vmaksimenko commented 1 month ago

I think the keyword for you is a checkpoint of your Flink application. If you are reading in streaming mode and restarting your application then restart it from the latest checkpoint by configuring https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#execution-savepoint-path , if you do it in batch and provide "from" and "to" parameters for your job then use it to configure as starting and ending boundaries. How you are going to get these parameters is up to the logic of your application. Apache Iceberg does not provide an application checkpoint store for your queries, it has an API to to start from the first, latest or specific snapshotid.