Open arhimondr opened 1 year ago
In https://github.com/trinodb/trino/pull/13834 i tried to introduce a local exchange in between of a TableScan
and a PartitionedOutputOperator
. This way a new PartitionedOutputOperator
is no longer created for each split, but instead a fixed number of PartitionedOutputOperator
s is created per task.
Unfortunately it introduces a performance regression, so it had to be reverted in https://github.com/trinodb/trino/pull/15358. Details could be found in the PR.
I had a chance to discuss this problem with @lukasz-stec and @sopel39 offline. It seems like a strategy worth exploring is to try to reuse PagePartitioner
's instead of creating a new one for every new PartitionedOutputOperator
.
CC: @losipiuk @linzebing @mwd410 @joshthoward
Currently Trino runs PartitionedOutputOperator in the same pipeline as TableScan. Since a TableScan pipeline is source distributed Trino creates a new instance of PartitionedOutputOperator operator for every split. A new PagePartitioner is allocated for every PartitionedOutputOperator.
The standard split size (in Hive connector) is 64MB. However splits are created statically without taking into account the amount of useful data contained in a file. It is not uncommon for a single split to produce hundreeds of kilobytes to several megabytes of useful data after projections and filtering is applied.
When data is partitioned in a small number of partitions (1 - 100) it's usually not a problem. For example splitting 5MB of data 100 ways would still give us pages of moderately decent size (
50KB
).However if 5MB is split 1000 ways (something we want to support for fault tolerant execution) it would result in pages being at around 5KB. That is further exacerbated by the fact that the pages produced may be further partitioned down the line (for example by a partitioned local exchange before an aggregation).
In order to support high cardinality partitioning with decent performance it is desired to avoid generating too many small pages.