apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.86k stars 2.06k forks source link

Introduce a parameter to control whether the flink writer is linked with the previous operator #10371

Open huyuanfeng2018 opened 1 month ago

huyuanfeng2018 commented 1 month ago

Feature Request / Improvement

The flink task written to iceberg can set write-parallelism, which allows it to have different parallelism degrees from the previous operator. However, when it modifies why the parallelism degree of the previous operator is the same, it cannot be restored from the checkpoint, because flink defaults to The operators are chained together, so we should have a parameter to actively cause them to be separated

Query engine

None

pvary commented 1 month ago

Could you use one of the ways described here to archive this?

huyuanfeng2018 commented 1 month ago

Could you use one of the ways described here to archive this?

Yes, it can be achieved through the flink api in this

   if (flinkWriteConf.writerDisableChaining() == true) {
        writerStream.disableChaining();
      } 

thank you for your reply, I will open a PR later

pvary commented 1 month ago

If we have a possibility to archive this using Flink features without any changes in Iceberg, then I don't think we need to add this feature to Iceberg too.

huyuanfeng2018 commented 1 month ago

If we have a possibility to archive this using Flink features without any changes in Iceberg, then I don't think we need to add this feature to Iceberg too.

In the scenario of using flinksql, it is impossible to use the capabilities provided by flink to complete this feature. This is similar to Apache Paimon. We have also added similar functions to paimon.

https://github.com/apache/paimon/pull/3300

pvary commented 1 month ago

Do I understand correctly, that this is a missing feature in Flink SQL? Did we consider adding this feature to Flink SQL? I might be able to find reviewers for a Flink PR, if we find that's a better way to move forward.

huyuanfeng2018 commented 1 month ago

Do I understand correctly, that this is a missing feature in Flink SQL? Did we consider adding this feature to Flink SQL? I might be able to find reviewers for a Flink PR, if we find that's a better way to move forward.

It is indeed impossible to implement it in flinksql at this stage, but I feel that this does not seem to be within the scope of flinksql's capabilities, because iceberg flink runtime is a flink connector. Does flinksql have the ability to intervene in the logic in the connector? I'm not sure. It seems that we need Flink developers who know more about this area to answer it. Can you help in this area?

pvary commented 1 month ago

@huyuanfeng2018: Did we try to add this feature to Flink? What was the response of the community there? Seems suboptimal for me to add this feature to every connector independently...

huyuanfeng2018 commented 1 month ago

@huyuanfeng2018: Did we try to add this feature to Flink? What was the response of the community there? Seems suboptimal for me to add this feature to every connector independently...

Sounds like a good implementation in flink, I'll try to push this feature in the flink community

pvary commented 1 month ago

@huyuanfeng2018: Please link the discussion here, so it it's easy to follow the discussion. Thanks, Peter

huyuanfeng2018 commented 1 month ago

https://issues.apache.org/jira/browse/FLINK-35462