Open raunaqmorarka opened 1 year ago
fyi @ebyhr @alexjo2144 @findepi
Hi @raunaqmorarka, we like to use the colocate join support on Iceberg for a long time! Do you know if anyone is actively working on this task? If not, I'm happy to look into it.
No one is working on it, AFAIK
Hi, @raunaqmorarka, I saw this issue when I was trying to create a issue for colocated join support on Iceberg. We at Pinterest has an implementation on it internally and would like to take a stab on the problem. Would that be OK?
Hi, @raunaqmorarka, I saw this issue when I was trying to create a issue for colocated join support on Iceberg. We at Pinterest has an implementation on it internally and would like to take a stab on the problem. Would that be OK?
Sure, here's my level thoughts on how to go about it :
For a start we should restrict this to only bucket partitioning transforms in iceberg and skip this for normal partitioned tables to get parity with Hive connector. Partitioned tables should be tackled as a follow-up.
There can be situations where the number of partitions/buckets might be small relative to size of cluster. We'd want to avoid "bucketed execution" here to avoid reducing parallelism. Verify that the existing rule DetermineTableScanNodePartitioning
is sufficient to tackle this issue.
If distribution of data per partition/bucket is highly skewed, then also we'd want to avoid "bucketed execution" to avoid creating a big skew in distribution of work for the query. We could think about using some table statistics based approach to detect this scenario.
If distribution of data per partition/bucket is highly skewed, then also we'd want to avoid "bucketed execution" to avoid creating a big skew in distribution of work for the query. We could think about using some table statistics based approach to detect this scenario.
The notion of skewness is missing in stats, but maybe Iceberg connector could itself determine if table is highly skewed and don't return partitioning then
Verify that the existing rule DetermineTableScanNodePartitioning is sufficient to tackle this issue.
For partitions DetermineTableScanNodePartitioning
won't work out of box because parquet doesn't return bucket count right now. I think we should know beforehand the number of partitions in a table in Iceberg.
@raunaqmorarka , @sopel39 , this is an draft PR I have for adding Partitioning information along with supporting bucketing join: https://github.com/trinodb/trino/pull/22206
We don't handle skew or unbucketed partition columns yet. For skewed table, we just turn the session property off :). But iceberg technically should have this information (though scanning through manifest can be slow sometimes).
We can potentially assign an implicit bucket count to partitioning columns unbucketed, however it seems like right now, we can only generate PartitioningHandle before getCommonPartitioningHandle(which also don't provide connector the information on which columns are used in join vs projection, so the bucket join condition can easily break if a partitioning column is selected for projection but not for join). A hint provided by user or if engine can pass some information to connector on columns used in join would be very helpful to make it more robust to support scenarios like tableA bucketed on [column1, column2] tableB bucketed on [column1] tableC bucketed on [column2]
Right now, we have to be very careful on avoiding selecting column2 when join tableA and tableB.
There can be situations where the number of partitions/buckets might be small relative to size of cluster. We'd want to avoid "bucketed execution" here to avoid reducing parallelism. Verify that the existing rule DetermineTableScanNodePartitioning is sufficient to tackle this issue.
How would we draw the line here? In certain case, for smaller queries, we likely will be relatively small compared to cluster size but colocated join still provides better performance by avoiding shuffling and better cache hit rate. Would it be possible for us to set a session property for user to tell the minimal number of total buckets needed for using bucketed join (where its default is based on nodeManager). We can then return Optional.empty()
in getCommonPartitioningHandle
to stop doing colocated join.
If distribution of data per partition/bucket is highly skewed, then also we'd want to avoid "bucketed execution" to avoid creating a big skew in distribution of work for the query. We could think about using some table statistics based approach to detect this scenario.
We could scan the manifest to collect file size per bucket and calculate variance I guess? This likely is a bit expensive and we may want to avoid doing multiple times during planning though.
In certain case, for smaller queries, we likely will be relatively small compared to cluster size but colocated join still provides better performance by avoiding shuffling and better cache hit rate
it the query is small, then shuffling the data is probably not a problem. Anyways, we have DeterminePartitionCount
which will reduce task count for intermediate stages if query is small. We could apply similar logic to DetermineTableScanNodePartitioning
(if number of buckets is small, but query is small too, then we could use node partitioning for table scan)
We could scan the manifest to collect file size per bucket and calculate variance I guess? This likely is a bit expensive and we may want to avoid doing multiple times during planning though.
How expensive would it be to get partition count from Iceberg metadata? I'm not even talking about skewness or partition sizes.
How expensive would it be to get partition count from Iceberg metadata? I'm not even talking about skewness or partition sizes.
Due to Iceberg design, it is unfortunately same(at least I/O wise) cost if you just get a partition count vs get data size of partitions or vs get more complex data.
Anyways, we have DeterminePartitionCount which will reduce task count for intermediate stages if query is small. We could apply similar logic to DetermineTableScanNodePartitioning (if number of buckets is small, but query is small too, then we could use node partitioning for table scan)
Interesting idea, I think one way to achieve that is to allow return back a bit extra information:
ConnectorPartitioningHandle {
Optional<PartitionStatistics> getPartitionStatistics;
}
where PartitionStatistics
can be something that contains
int numberOfPartitions;
OptionalInt totalSize;
OptionalDouble sizeVariance;
...
We will just determine if it make sense to use bucket join in engine level without needing every connector to implement it.
There might be a few problems though:
getCommonPartitioningHandle
. The decision we made now may not be good enough later once we updated the TableHandle via getCommonPartitioningHandle
. Due to Iceberg design, it is unfortunately same(at least I/O wise) cost if you just get a partition count vs get data size of partitions or vs get more complex data.
That's in idea for Iceberg spec improvement, cc @findepi . Anyway, we need to list partitions during planning anyway, right? See io.trino.plugin.iceberg.IcebergMetadata#getTableProperties
. Usually metadata is cached per transaction (per query).
In extreme cases, we have TBs of metadata to read through...
That's pretty bad, are these tables that are not regularly compacted?
Interesting idea, I think one way to achieve that is to allow return back a bit extra information:
For just "partition count" this information can be returned as part of io.trino.spi.connector.ConnectorBucketNodeMap
, so we don't immediately need ConnectorPartitioningHandle
. For skewness it's probably better to return io.trino.spi.metrics.Distribution
instead (which connector can internally construct using TDigestHistogram
).
Even if we return numberOfPartitions (I think this will be the product of bucket numbers of bucketed columns if we only considered bucketed columns) but we will change numberOfPartitions later to smaller values when we getCommonPartitioningHandle. The decision we made now may not be good enough later once we updated the TableHandle via getCommonPartitioningHandle.
I don't think this can happen for "normal" partitions like date
or identity partitions (we would underestimate bucket count in that case even in case of join). Anyways, I don't think this would be a big problem and could be fixed later.
I think https://github.com/trinodb/trino/pull/23432 will make it unnecessary to do this cc: @dain
Implement the TODO here: https://github.com/trinodb/trino/blob/master/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java#L542
Here's an example from Hive: https://github.com/trinodb/trino/blob/master/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java#L2942
The expected benefit is that it could potentially allow the same optimisations we have today for bucketed hive tables (avoid remote exchanges and enable co-located joins). For a start we can restrict this to only bucket partitioning transforms in iceberg rather than all partitioning transforms.