apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.46k stars 2.23k forks source link

Support bucket transform on multiple data columns #5626

Open vamen opened 2 years ago

vamen commented 2 years ago

Feature Request / Improvement

Currently (iceberg 0.14.1) iceberg supports bucketing on only one data column. i.e. bucket(N, col).

Bucketing is a very important feature in Iceberg. Bucketing helps in filtering and narrowing down the files required to answer the query. As we know, querying on primary key is the major pattern. Hence bucketing on primary key speeds up the primary key based queries. But in most cases the primary key is made of multiple columns and hence supporting bucketing on multiple columns gives major enhancements to the primary key based queries.

Query engine

Spark

rdblue commented 2 years ago

@vamen, you can just add multiple bucket partitions to bucket by more than one column. We chose to do this rather than hash the fields together so they can be used together or independently.

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

advancedxy commented 1 year ago

@vamen, you can just add multiple bucket partitions to bucket by more than one column. We chose to do this rather than hash the fields together so they can be used together or independently.

Hi, @rdblue I think we cannot express the same semantic for bucket transform on multiple columns with multiple bucket partitions.

For example bucket(16, id_col, sec_col) means that we need to bucket the (id_col, sec_col) into 16 buckets. If we are going to replace it with bucket(16, id_col) + bucket(16, sec_col), it would create 16 16 = 256 buckets. The most similar bucket spec would be bucket(4, id_col) + bucket(4, sec_col), which creates 4 4 = 16 buckets. However that assumes the cardinality of bucket columns are balanced. It might also possible be bucket(2, id_col) + bucket(8, sec_col). Did I miss anything?

Therefore, I believe it's still valuable to support bucketing on multiple columns, especially for primary key is made of multiple columns. WDYT?

RussellSpitzer commented 1 year ago

@vamen, you can just add multiple bucket partitions to bucket by more than one column. We chose to do this rather than hash the fields together so they can be used together or independently.

Hi, @rdblue I think we cannot express the same semantic for bucket transform on multiple columns with multiple bucket partitions.

For example bucket(16, id_col, sec_col) means that we need to bucket the (id_col, sec_col) into 16 buckets. If we are going to replace it with bucket(16, id_col) + bucket(16, sec_col), it would create 16 16 = 256 buckets. The most similar bucket spec would be bucket(4, id_col) + bucket(4, sec_col), which creates 4 4 = 16 buckets. However that assumes the cardinality of bucket columns are balanced. It might also possible be bucket(2, id_col) + bucket(8, sec_col). Did I miss anything?

Therefore, I believe it's still valuable to support bucketing on multiple columns, especially for primary key is made of multiple columns. WDYT?

Other than being able to do prime numbers of buckets, i'm not sure I see the difference between Bucket(column_a, x) and Bucket(column_b, y) and Bucket(column_a, column_b, x*y). In the example above I don't think 2, 8 or 4, 4 would actually be distributed differently for normally distributed data. For skewed data they both would still most likely have an issue since only a single bucket would receive skew.

Could you be a bit more clear?

advancedxy commented 1 year ago

Other than being able to do prime numbers of buckets,

It didn't occur to me, thanks for pointing out.

In the example above I don't think 2, 8 or 4, 4 would actually be distributed differently for normally distributed data.

Like you said, for normally distributed data, and to be more precisely, it requires every bucket column to be almost normally distributed. The buckets are more likely to be skewed if any of the bucket column is skewed or has low cardinality.

Here is an extreme example: suppose col_a is a enum type column, it has finite values, such as 1,2, col_b is a uniformly distributed. Then bucket(col_a, 4) is going to have only two non-empty buckets, which means bucket(col_a, 4) + bucket(col_b, 4) would have half(2/4) of empty buckets, which I didn't think it's acceptable. In contrast, for bucket(col_a, col_b, 16) its buckets are most likely balanced.

In summary, bucketing on multiple data columns makes data distribution more balanced.

RussellSpitzer commented 1 year ago

The example you give is a problem though regardless of the bucketing function if the number of buckets is ~ = the cardinality of the column (or group of columns). The other thing to note about this example is that we would probably have just as good a distribution of rows if we just bucket'd (col_b,16). If we really wanted to include col_a we would do an identity transform correct?

advancedxy commented 1 year ago

The example you give is a problem though regardless of the bucketing function if the number of buckets is ~ = the cardinality of the column (or group of columns). The other thing to note about this example is that we would probably have just as good a distribution of rows if we just bucket'd (col_b,16). If we really wanted to include col_a we would do an identity transform correct?

I don't think I made it clearly. Let me try to rephrase it in probability.

TL;DR: bucket on each column multiplies the skewness between columns, bucket on multiple columns don't, instead it would reduce the skewness in different buckets as it has more entropy and more likely to be balanced.

To make the example a bit simple, suppose:

  1. we have two columns: col_a and col_b, and we need to create 4 buckets via bucket(col_a, col_b, 4)
  2. the possibility of hash(col_a) mod 2 === 0 is 1/3, hash(col_a) mod 2 === 1 is 2/3
  3. the possibility of hash(col_b) mod 2 === 0 is 1/4, hash(col_b) mod 2 === 1 is 3/4
Then, for partition spec: bucket(col_a, 2) + bucket(col_b, 2), the possibility of each bucket(this is effectively the possible row number of that bucket) is: [1/12, 1/6, 1/4, 1/2](See calculations below). c2 - c1 c1 = 0 c1 = 1
c2 = 0 1/3 * 1/4 = 1/12 2/3 * 1/4 = 1/6
c2 = 1 1/3 * 3/4 = 1/4 2/3 * 3/4 = 1/2

What about the possibility of each bucket for bucket(col_a, col_b, 4)? We may need to add another assumption that:

  1. the possibility of hash(col_a) mod 4 === 0 is 1/6, hash(col_a) mod 4 === 1 is 1/3, hash(col_a) mod 4 === 2 is 1/6, hash(col_a) mod 4 === 3 is 1/3. (one may argue that the distribution could be in other ways, yeah, that's correct. But this distribution is most likely).
  2. the possibility of col_b is likewise.

Then the possibility of each bucket is: [7/24, 5/24, 7/24, 5/24] (assuming my calculation is correct and I don't miss anything). IMG_3608

Compare [1/12, 1/6, 1/4, 1/2] vs [7/24, 5/24, 7/24, 5/24], I believe the second one is more balanced.

RussellSpitzer commented 1 year ago

I think the distribution math there is a bit off since it assumes that the skew is only present at the first bit of the hashing function. The assumption that increasing the number of buckets evenly divides the skew is a bit of an issue since this assumes the skew is generally present and correlated with the hashing function but only at the first bit.

That said I did run some experiments on my own and while there wasn't a ton of difference between composition and running the function on it's own, there was a benefit to using the combined function. I'll try to finish up my test framework for running more examples.

Anyway we probably do need to support multi-arg transforms within Iceberg at some point, so this may be a good time to start a design document and work towards adding that to spec as a First step.

advancedxy commented 1 year ago

I think the distribution math there is a bit off since it assumes that the skew is only present at the first bit of the hashing function. The assumption that increasing the number of buckets evenly divides the skew is a bit of an issue since this assumes the skew is generally present and correlated with the hashing function but only at the first bit.

The assumption is that skew is generally present especially for multiple cols. I don't agree with the part the skew is only present at the first bit of the hashing function. When the skew is present, it would somehow related to the hash result.

That said I did run some experiments on my own and while there wasn't a ton of difference between composition and running the function on it's own, there was a benefit to using the combined function. I'll try to finish up my test framework for running more examples.

This is a good way to demonstrate the distribution. I also did a quick program to demonstrate my idea, see https://gist.github.com/advancedxy/236a8db8de03cf40c2ecbfebd4bf07ef for details. That might still be a simplified version, but I think it matches my previous calculation.

If we really wanted to include col_a we would do an identity transform correct?

For this part, when we are dealing with primary key table when the primary key is made of multiple columns, I think it's important to include all columns as buckets. An identity transform might not be sufficient.

Anyway we probably do need to support multi-arg transforms within Iceberg at some point, so this may be a good time to start a design document and work towards adding that to spec as a First step.

+1. I agree adding that to spec as first step.

RussellSpitzer commented 1 year ago

The assumption is that skew is generally present especially for multiple cols. I don't agree with the part the skew is only present at the first bit of the hashing function. When the skew is present, it would somehow related to the hash result.

This was in reference to

the possibility of hash(col_a) mod 4 === 0 is 1/6, hash(col_a) mod 4 === 1 is 1/3, hash(col_a) mod 4 === 2 is 1/6, hash(col_a) mod 4 === 3 is 1/3. (one may argue that the distribution could be in other ways, yeah, that's correct. But this distribution is most likely).

I think we have very different models for skew. In my mind skew is an artifact of a small number of values which are extremely over-represented in a data set, not a systematic alignment with the hashing function. In my model increasing the number of buckets does not evenly divide their contents. Skew is apparent in our buckets because each time we bucket we get some random assortment of partitions and the number of "skewed" partitions. As a trivial example, if I have a data set containing of the integers 1,2,3,4 but 3 is common for 70% of the dataset and the remaining digits cover 10% each. I would see (1,3 - 80% ) (2, 4 - %20)

But if I divide my buckets I see

(10%, 70%, 10%, 10%)

Because in my model skew is modeled like this there is no correlation with the hashing function (at least not in a broad way) and whenever we change our bucketing it's really just changing what random number of the "skewed" partitions end up in in a specific bucket.

advancedxy commented 1 year ago

I think we have very different models for skew.

Ah, yeah. I can get your point now. For your model of skewness, bucket per column vs bucket multiple columns indeed don't have a ton of difference.

When I'm modeling skewness, I am more concerning of unbalanced data partitions/buckets, consider the Pareto_principle

pdames commented 11 months ago

+1 to adding support for this feature. On my side, the principal use-case where this is beneficial is when hashing a composite primary key where the cardinality of each individual primary key column is unknown. In this case, we rely on (1) a uniform random hash distribution of all composite primary key values and (2) configuring the number of hash buckets such that all records for each bucket fit on a single node during a distributed shuffle.

This allows us to optimize operations like distributed joins, deduplication, merge by primary key or any other composite unique key, etc. A bit more on the merge-by-primary-key use-case and subsequent optimizations as implemented in Ray can also be reviewed at https://github.com/ray-project/deltacat/blob/main/deltacat/compute/compactor/TheFlashCompactorDesign.pdf. We would like to be able to run our open source implementation of this design in Ray across Iceberg tables with equivalent efficiency, and I believe that the resolution of this issue would help here.