apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.49k stars 2.24k forks source link

Detecting duplicates in the Flink Data Stream API #10683

Open lkokhreidze opened 4 months ago

lkokhreidze commented 4 months ago

Query engine

Flink

Question

Hi, I was wondering if there's a way we could detect if ongoing batch written to the Iceberg table would perform the upsert?

Context: I have an Iceberg table that is being written with Flink DataStream API with upsert enabled. Identifier field is configured to be the ID of the domain. My use case is to mark the rows that are upserted with an appropriate flag. And I was wondering if there's a way to get a hold of this information within the Data Stream API?

pvary commented 4 months ago

ATM reading upsert streams from Iceberg Sources is not supported. Iceberg Flink Source only reads append commits from the Iceberg table.

lkokhreidze commented 4 months ago

Hi @pvary thanks for the answer.

Apologies, I now realise the question might have been a bit misleading. What I am trying to do is to have first row of the same primary key. I could do deduplication using Flink stateful APIs but was wondering if it's possible to tell the Iceberg to only keep the first row per identifier fields similar to Apache Paimon's first Row merge option.

pvary commented 4 months ago

@lkokhreidze: Currently you need to filter the input rows by yourself if you want any stateful operation on the Iceberg table. TBH, I do not understand the exact meaning of the "first-row merge option" yet. It is not clear to me, if the sink receives multiple rows with the same id, what will happen? Would it insert 2 rows or would it insert only the first row to the table?

lkokhreidze commented 4 months ago

Hi @pvary thanks for the reply. I do not know internals of Paimon, if it inserts both rows or not. But from the reader perspective, only the first row will be visible. Behaviour is similar to INSERT INTO (...) ON CONFLICT (ID) DO NOTHING in traditional RDMS.

pvary commented 4 months ago

Thanks @lkokhreidze! Currently there is no such thing in the Flink Iceberg Sink. You need to build your own operator for it.