apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.49k stars 2.24k forks source link

Flink: add more sink shuffling support #6303

Open stevenzwu opened 1 year ago

stevenzwu commented 1 year ago

Feature Request / Improvement

Today, Flink Iceberg sink only supports simple keyBy hash distribution on partition columns. In practice, keyBy shuffle on partition values doesn't work very well.

We can make the following shuffling enhancements in Flink streaming writer. More details can be found in the design doc. This is an uber issue for tracking purpose. Here are the rough phases.

  1. [hash distribution] custom partitioner on bucket values. PR 4228 demonstrated that keyBy on low-cardinality partitioning buckets resulted in skewed traffic distribution. Flink sink can add a custom partitioner that directly map the bucket value (integer) to the downstream writer tasks (integer) in round-robin fashion (mod). This is a relatively simple case.

This is a case when write.distribution-mode=hash and there is a bucketing partition column. Other partition columns (like hourly partition) will be ignored regarding shuffling. The assumption is that bucketing column is where we want to distribute/cluster the rows.

  1. [hash distribution] bin packing based on traffic distribution statistics. This works well for skewed data on partition columns (like event time). This requires calculating traffic distribution statistics across partition columns and use the statistics to guide shuffling decision.

This is a case when write.distribution-mode=hash and there is NO bucketing partition column.

  1. [range distribution] range partition based on traffic distribution statistics. It is a variant of 2 above. This works well for "sorting" non-partition columns (e.g. country code, event type). It can improve data clustering by creating data files with narrow value ranges. Note that Flink streaming writer probably won't sort rows within a file, as that would be very expensive (not impossible). Even without rows sorted within a file, the improved data clustering can result in effective file pruning. We just can't get the additional benefits of row group level skipping (for Parquet) with rows sorted within a file.

This is a case when write.distribution-mode=range and SortOrder is defined for non-partition columns. partition columns will be ignored for range shuffling as the assumption is that non-partition sort columns are what matter here.

  1. [high cardinality columns] 2 and 3 above are mostly for low-cardinality columns (e.g. unique values in hundreds), where a simple dictionary of count per value can be used to track traffic distribution statistics. For high-cardinality column (like device or user id), we would need to use probabilistic data sketches algorithm to calculate traffic distribution.

Query engine

Flink

stevenzwu commented 1 year ago

Created a new project as this is a relatively large scope overall: https://github.com/apache/iceberg/projects/27

hililiwei commented 1 year ago

Great design! I think we can continue adding new issues so that guys can choose the tasks they want to work on.

github-actions[bot] commented 3 months ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

bendevera commented 2 months ago

@stevenzwu was wondering the status of this project. We have faced issues with the performance of the default HASH distribution mode. This project looked promising and saw that some progress has been made with various related tasks.

stevenzwu commented 2 months ago

@bendevera range distribution has been added to the main branch and will be part of the next 1.7 release. you can also see the doc here: https://iceberg.apache.org/docs/nightly/flink-writes/#range-distribution-experimental