NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
802 stars 234 forks source link

[FEA] See if an intermediate merge helps reduce memory in hash aggregate #8390

Open revans2 opened 1 year ago

revans2 commented 1 year ago

Is your feature request related to a problem? Please describe. Currently when doing a hash aggregate we will do an initial aggregate per batch and save away the resulting batch. When we are done we look at the size of those intermediate batches and if it is too large we do a sort + key partitioning pass on the intermediate data and then can do the final pass per batch that comes out.

This generally works, but we might end up using a lot more intermediate data to store those merge results than we should, which might cause us to sort more often than we need to. This would happen when we have a lot of input data, multiple batches, which reduces the size decently, but the keys are evenly distributed through all the batches. It is rather common to have something like a GpuExpandExec or a join that explodes right before an aggregation that drops the size of the data down to something really small again.

It would be great to test out what happens if we put in a heuristic where we have a merging of intermediate batches.

When a batch is added we would see if there is more than one pending batch and if the sum of those batches is larger than 1/2 the target batch size. It should probably not take into account batches that have already been merged once when making the decision to merge batches. If the check passes then we would concat all of those batches together, including ones that have already been merged previously, unless the size would be too large..., and do a merge aggregation on them which would replace the existing input batches with the new merged result. If the size of the output data did not go down by at least some percentage (I am thinking 50%, but we should play with this) then we set a flag and stop trying to merge the intermediate results. This would be to protect us against situations where it looks like we are not going to combine anything at all.

This is related https://github.com/NVIDIA/spark-rapids/issues/7404 where if ti really looks like nothing is going to combine on a partial aggregate, it might be worth just passing the batches on through to the shuffle. But we might want to have the heuristic for that be a little higher than 50%. Would need to play with both of those to see what really works well. To be clear #7404 should be separate from this. I just think if we do this and the numbers look good, it might be nice to see what happens if we add #7404 to it because it should be fairly simple to do.

As for testing I want to see a number of different situations that we test from both a performance and a reliability standpoint. All of these would be from the standpoint of a lot of data going into a small number of tasks. Like we had way too few shuffle partitions for the size of the data. A lot of this is going to really be about the cardinality and ordering of the grouping keys.

I want to see what happens when the key's

  1. cardinality is high and is highly grouped (the data is almost sorted by the key, should get good combining initially, but not after a first pass)
  2. cardinality is high and is randomly distributed (should get almost no combining in the partial)
  3. cardinality is low and is highly grouped
  4. cardinality is low and is randomly distributed
  5. cardinality is medium and is high grouped
  6. cardinality is medium and is randomly distributed

By high cardinality I mean we each key shows up 2 to 3 times in the entire dataset, for medium 200 to 300 times, and for low 20,000 to 30,000 times. But we want enough data that a single task cannot hold all of it in GPU memory. At least a few hundred GiB of data.

binmahone commented 4 months ago

Hi @revans2 , is this issue same thing as what we do in com.nvidia.spark.rapids.GpuMergeAggregateIterator#tryMergeAggregatedBatches today?

revans2 commented 3 months ago

@binmahone sorry for the late reply. No it is not exactly the same as what happens today with tryMergeAggregatedBatches. tryMergeAggregatedBatches happens after an initial aggregation pass through all of the input data is done. This is saying lets see if we can be more aggressive with the merging. As soon as we hit a merge window size, lets try and merge that data instead of waiting until the end. If it reduces the size significantly, then we can go on and process more data the same way, but with a smaller footprint/spill. If it does not reduce the size, then we might want to take it as feedback and decide if we should fall back to doing hash repartitioning/sort instead.