NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
806 stars 234 forks source link

Consider releasing the GPU semaphore earlier during shuffle partitioning #11281

Open jlowe opened 3 months ago

jlowe commented 3 months ago

11280 adds asynchronous copying of shuffle data after partitioning and synchronizes on the stream before releasing the GPU semaphore. Instead we could release the semaphore after freeing the device data but before synchronizing on the stream, e.g. via a patch like this:

diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala
index 6394e2974..7b62f1178 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala
@@ -132,7 +132,6 @@ trait GpuPartitioning extends Partitioning {
       }
     }
     withResource(hostPartColumns) { _ =>
-      Cuda.DEFAULT_STREAM.sync()
       // Leaving the GPU for a while
       GpuSemaphore.releaseIfNecessary(TaskContext.get())

@@ -144,6 +143,8 @@ trait GpuPartitioning extends Partitioning {
         start = idx
       }
       origParts(numPartitions - 1) = sliceBatch(hostPartColumns, start, numRows)
+      // Ensure async copies have completed before examining the host data
+      Cuda.DEFAULT_STREAM.sync()
       val tmp = origParts.zipWithIndex.filter(_._1 != null)
       // Spark CPU shuffle in some cases has limits on the size of the data a single
       //  row can have. It is a little complicated because the limit is on the compressed

With a lot of data being copied back to the host during shuffle via pinned memory, this could releasing the GPU semaphore significantly earlier, which has the potential to improve query performance.

However this could also impact query performance if it leads to excessive spilling. The memory has been freed on the stream but is only available to be allocated once the stream has been synchronized past the point of the frees. This change wouldn't trigger an OOM error, but it might cause spilling since IIRC we exhaust spilling before executing a cudaDeviceSynchronize to try to free memory.

binmahone commented 1 month ago

I have a silly question: is it safe to use the hostPartColumns before Cuda.DEFAULT_STREAM.sync() is called ?

binmahone commented 1 month ago

I have a silly question: is it safe to use the hostPartColumns before Cuda.DEFAULT_STREAM.sync() is called ?

Liangcai already offline explained my question. thx!