apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.94k stars 2.08k forks source link

flink autoscaler: how set write-parallelism ? #10147

Open sannaroby opened 3 months ago

sannaroby commented 3 months ago

Apache Iceberg version

1.5.0

Query engine

Flink (iceberg-flink-1.18:1.5.0)

Question

Hello, I'm using iceberg-flink-1.18-1.5.0. I've configured the flink-operator autoscaler feature. I noted that the iceberg-stream-writer operator doesn't change the "write-parallelism" when the autoscaler change the operator parallelism. As you can see in the image below, the parallelism is equals to 32 but is working just one subtask:

autoscaler

There is a way to force the "write-parallelism" to the parallelism sets by the autoscaler ?

pvary commented 3 months ago

@sannaroby: can you share the Sink code? What distribution mode do you use? Maybe we need a rebalance step before the writer?

sannaroby commented 3 months ago

Hi @pvary, thanks for your reply. We're using the HASH distribution mode and this is an extract from our flink job:

SingleOutputStreamOperator<Row> mainFunction = env.addSource(kinesisSource)
        .keyBy(inputMessage-> inputMessage.getKey())
        .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
        .process(new InputMessageToRowProcessFunction(), rowTypeInfo);

FlinkSink.forRow(mainFunction, loadedTableSchema)
        .tableLoader(tableLoader)
        .table(icebergTable)
        .equalityFieldColumns(List.of("messageKey", "messageTimestamp"))
        .distributionMode(DistributionMode.HASH)
        .append();

The autoscaler didn't change the upstream operator parallelism (the "InputMessageToRowProcessFunction"), because it judged it to be unnecessary. What do you mean for "rebalance step" ? Thanks.

pvary commented 3 months ago

Could it be, that the table is partitioned and all of the new data is targeting a single partition?

If you start the sink with higher writer parallelism, how does the data distribution look like?