apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.42k forks source link

[SUPPORT] Global index upsert when using flink and FLINK_STATE index type. #8203

Open BruceKellan opened 1 year ago

BruceKellan commented 1 year ago

Tips before filing an issue

Hi, team!

Now when partitioning upsert, according to precombine.field parameter, keep the record with the largest value after upserting.

This is widely used to solve the case of out-of-order data, by setting the precombine.field to the event time to keep records with the largest event time.

However, when using the FLINK_STATE index type, if cross-partition occurs, the precombine.field parameter will not fully take effect.

In the case of cross-partitioning, the master's logic uses data that arrives later, even if the event time is smaller.

https://github.com/apache/hudi/blob/b79ce80f709ddffb5ad7aead10711222c086db24/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java#L181-L184

I'm wondering if there is any particular reason for implementing it this way? If not, I'll open a PR to fix it.

It may be necessary to unify the logic of intra-partition upsert and cross-partition upsert, which is convenient for users to understand and use.

Environment Description

danny0405 commented 1 year ago

It's kind of a trade-off, we can implemet the coss-partition update only if we record the version field for each index item, that can increase the index storage significantly, but I think it is feasible.

BruceKellan commented 1 year ago

Thanks for your reply, danny. I will open a ticket to do it.