NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
822 stars 235 forks source link

[FEA] Create benchmarks for multi-batch processing cases #11575

Open revans2 opened 1 month ago

revans2 commented 1 month ago

Is your feature request related to a problem? Please describe. We have seen issues with memory management when a task ends up processing multiple batches of input/output. The issue is described in https://github.com/NVIDIA/spark-rapids/issues/11343

But we don't have a good set of benchmarks to measure our progress in fixing this issue.

We need to come up with some formal benchmarks ideally using NDS data sets, or our stress test suite. They should try to cover cases when there are multiple batches of input and/or output to/from a single task. The cases we worry about are ones when a task leaves intermediate spillable data on the GPU, but we also want to verify that in cases where there is little to no intermediate data on the GPU that they also work well.

We should try and measure the impact of spilling and different I/O characteristics on the runtime of these jobs. I believe that eventually this will come down to deciding, in a few different situations, if it is better to try and buffer more remote data at the expense of increased local I/O + compute (spilling) and If we should try and do more GPU processing at the expense of increased local I/O + compute (spilling). So it might be nice to have a way to simulate different levels of local disk and remote disk throughput/IOPs. Not 100% sure how to do that, but if all we get are the queries that is a good start and we can try to simulate disk speeds later.

The proposed queries I want to see are:

  1. A join explodes and for most tasks there are multiple output batches going into a file write or a shuffle.
  2. A large window operation where the cardinality of the partition by keys make it so that most tasks end up with multiple batches worth of data. The window operation itself should ideally be a running window operation just to make that processing simpler.
  3. A large aggregation where there is a lot of input data, meaning multiple batches per task, but the output data is less than a single batch in size.
  4. A project where there are multiple batches being processed per task, but all of the batches are independent of each other.
  5. A parquet read + filter where the parquet read itself is likely to explode in size to multiple batches, and the filter does not filter much out.

In addition to the regular NDS runs that we already do to be sure there are no happy path performance regressions.