NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
795 stars 232 forks source link

[QST] How to configure to run multiple Concurrent Tasks on single GPU for Spark RAPIDS Jobs? #11586

Open vara-bonthu opened 5 days ago

vara-bonthu commented 5 days ago

What is your question? I am currently running PySpark data processing jobs to read partitioned CSV data from S3 and perform some aggregations. The same script is being run on both CPU and GPU instances using Spark RAPIDS. However, I am noticing that Spark RAPIDS on GPU is significantly slower than CPU, especially when loading partitioned CSV data from S3 using the s3a protocol.

Issue: When running on CPUs, I can see that all 4 cores are being fully utilized, running 4+ tasks in parallel per executor. However, when using GPUs with Spark RAPIDS, each GPU node is only running 1 task per node, resulting in underutilization and much slower performance (up to 4 times slower than CPU).

GPU Spark Rapids config

  "spark.rapids.sql.concurrentGpuTasks": "4" # 2 tasks on g6.4x large takes 32 mins and 4 tasks takes: 
    "spark.executor.resource.gpu.amount": "1"
    "spark.rapids.memory.pinnedPool.size": "2g"
    "spark.executor.memoryOverhead": "3g"
    "spark.sql.files.maxPartitionBytes": "512m"
    "spark.sql.shuffle.partitions": "10"
    "spark.plugins": "com.nvidia.spark.SQLPlugin"
    "spark.shuffle.manager": "com.nvidia.spark.rapids.spark351.RapidsShuffleManager"
    "spark.executor.resource.gpu.discoveryScript": "/opt/sparkRapidsPlugin/getGpusResources.sh"
    "spark.executor.resource.gpu.vendor": "nvidia.com"
    "spark.executor.extraLibraryPath": "/usr/local/cuda/lib:/usr/local/cuda/lib64:/usr/local/cuda/extras/CUPTI/lib64:/usr/local/cuda/targets/x86_64-linux/lib:/usr/lib/hadoop/lib/native:/usr/lib/hadooplzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native"
    "spark.task.cpus": "4"
    "spark.dynamicAllocation.enabled": "false"
    "spark.rapids.sql.enabled": "true"
    "spark.rapids.cloudSchemes": "s3a"
    "spark.rapids.memory.gpu.pool": "ASYNC"
    "spark.rapids.memory.gpu.allocFraction": "0.8"  # Adjusted to avoid exceeding GPU memory limits
    "spark.rapids.memory.gpu.maxAllocFraction": "0.9"  # Ensure it's consistent
    "spark.rapids.memory.gpu.minAllocFraction": "0.25"
    "spark.rapids.sql.shuffle.spillThreads": "16"
    "spark.rapids.sql.batchSizeBytes": "512m"
    "spark.rapids.sql.reader.batchSizeBytes": "512m"
    "spark.rapids.sql.metrics.level": "MODERATE"  # Optional, set to DEBUG if needed
    "spark.rapids.sql.explain": "ALL"  # Optional for debugging
    "spark.rapids.sql.stableSort.enabled": "true"
    "spark.rapids.sql.variableFloatAgg.enabled": "false"
    "spark.task.resource.gpu.amount": "0.25" # experimenting
    "spark.rapids.sql.explain": "NOT_ON_GPU"
    "spark.rapids.sql.incompatibleOps.enabled": "true"
    "spark.rapids.shuffle.mode": "MULTITHREADED"
    "spark.sql.adaptive.enabled": "true"
-
-
-
executor:
    cores: 4
    instances: 4  # Single executor with one GPU - aiming for g5 or g6 xlarge
    memory: "10g"
    gpu:
      name: "nvidia.com/gpu"
      quantity: 1

CPU PySpark Config


  executor:
    cores: 4
    instances: 2  
    memory: "10g"
    labels:
      version: "3.5.1"

Despite trying multiple configurations, the GPU seems to be running only 1 task per node, while CPUs are efficiently running 4+ tasks per node.

Screenshots: In the attached screenshots, you can see the difference between the Spark History Server for both CPU and GPU runs: Image

Left (CPU): Shows 5 active tasks per node. Right (GPU): Shows only 1 active task per node.

Questions: How can I ensure that multiple tasks can run concurrently on a single GPU, similar to how multiple tasks run on multiple CPU cores?

Is there any specific configuration I am missing that could enable better utilization of the GPU for Spark workloads?

Are there any known limitations or best practices for running Spark tasks on GPUs that might explain this behavior?

Is it recommended to use one instance with multiple GPUs instead of using 4 instances with 1 GPU each?

vara-bonthu commented 5 days ago

I think i found the issue is due to task cpu set to 4. When i changed this from 4 to 1 then i could see the parallel tasks

"spark.task.cpus": "4"

to

"spark.task.cpus": "1"
abellina commented 5 days ago

Yeah spark.task.cpus is a config that can make a task take up more slots, in your case all of them. Was there a reason it was set to 4?

How can I ensure that multiple tasks can run concurrently on a single GPU, similar to how multiple tasks run on multiple CPU cores?

As long as there are multiple tasks on an executor (spark.executor.cores > 1 and concurrentGpuTasks > 1). Artificially changing the cpu allotment per task (spark.task.cpus) makes this math harder.

Is there any specific configuration I am missing that could enable better utilization of the GPU for Spark workloads?

There is a lot in this question and it greatly depends in the workload. We have tools to help you decide what the configs should be, and you can always ask questions here. To get you started:

Are there any known limitations or best practices for running Spark tasks on GPUs that might explain this behavior? Is it recommended to use one instance with multiple GPUs instead of using 4 instances with 1 GPU each?

I think these question are about spark.task.cpus, which you already resolved. That said, I assume when you say instances you either mean executors or spark apps. We support 1 executor per GPU. It depends on your needs on how many apps you should run, and also how you want to organize things logically.

There are lots of configs in your original question. Here is some feedback on them:

For L4 GPU we have a recommendation around 3 concurrent, but this is a guideline. I'd keep it in mind when suggesting 4 concurrent, as we may see OOM in some jobs. That said, it really depends on the queries:

  "spark.rapids.sql.concurrentGpuTasks": "4" # 2 tasks on g6.4x large takes 32 mins and 4 tasks takes: 

We have spent some time with the number of shuffle partitions. Usually, 200 is a really good default. Note that 201+ is a different shuffle algorithm in Apache Spark. It might make sense, for small data, to have 10 partitions, especially if you have 10 spark.executor.cores total (every core has some work to do and they all finish at once).

    "spark.sql.shuffle.partitions": "10"

Related to the above, set this to 1/spark.executor.cores always. For the 4 core case, 0.25 is the right value:

   "spark.task.resource.gpu.amount": "0.25" # experimenting

batchSizeBytes is a good config to tune to find better performance on the GPU. We tend to see that the higher this number is the better, with a maximum of Integer.MAX_VALUE. What we do with this is that at various points of the job, we will accumulate results on GPU or CPU to get to these larger batches, amortizing the cost of calling kernels and getting results back from the GPU, allocations, and more.

    "spark.rapids.sql.batchSizeBytes": "512m"

If you configure this, without a "mode" you are using a multi-threaded shuffle, not the UCX shuffle (I see UCX shuffle referenced in your github repo). See this for more configs around UCX mode: https://docs.nvidia.com/spark-rapids/user-guide/latest/additional-functionality/rapids-shuffle.html#ucx-mode.

    "spark.shuffle.manager": "com.nvidia.spark.rapids.spark351.RapidsShuffleManager"

This is the default:

    "spark.rapids.shuffle.mode": "MULTITHREADED"

I don't think you have to set this, it should be auto populated by the discovery script:

    "spark.executor.resource.gpu.vendor": "nvidia.com"

Note that if you don't use UCX mode (if you use the default multi-threaded mode) you can use dynamicAllocation.

    "spark.dynamicAllocation.enabled": "false"

s3a is already part of the cloudSchemes, shouldn't need this:

    "spark.rapids.cloudSchemes": "s3a"

ASYNC allocator is the default, we shouldn't need to set any of this:

    "spark.rapids.memory.gpu.pool": "ASYNC"
    "spark.rapids.memory.gpu.allocFraction": "0.8"  # Adjusted to avoid exceeding GPU memory limits
    "spark.rapids.memory.gpu.maxAllocFraction": "0.9"  # Ensure it's consistent
    "spark.rapids.memory.gpu.minAllocFraction": "0.25"

This is actually a conf bug. There is a config we don't actually use:

    "spark.rapids.sql.shuffle.spillThreads": "16"

I would keep this value default unless you start seeing OOM near the scan side (parquet/orc reading). It limits the amount of memory we are going to be batching up on the read.

    "spark.rapids.sql.reader.batchSizeBytes": "512m"

Don't set this unless you really want to see more metrics:

    "spark.rapids.sql.metrics.level": "MODERATE"  # Optional, set to DEBUG if needed

These are conflicting with each other. The first one, would log all of explain output with GPU annotations. The second one would log only the pieces of the plan that we don't put on the GPU:

    "spark.rapids.sql.explain": "ALL"  # Optional for debugging
    "spark.rapids.sql.explain": "NOT_ON_GPU"

Why was this enabled? I am mostly curious if you do need a stable sort (https://github.com/NVIDIA/spark-rapids/blob/4866941bbf70f93931d9e23ee8f9a65b0f7c6c01/docs/compatibility.md#ordering-of-output):

    "spark.rapids.sql.stableSort.enabled": "true"

This applies to joins where we are using floating point as the key. Do you have this use case? (https://github.com/NVIDIA/spark-rapids/blob/4866941bbf70f93931d9e23ee8f9a65b0f7c6c01/docs/compatibility.md#floating-point)

    "spark.rapids.sql.variableFloatAgg.enabled": "false"

This enabled operations where we are not 100% compatible with the CPU. Please go to https://nvidia.github.io/spark-rapids/docs/additional-functionality/advanced_configs.html and look for the operations where we are not 100% compatible. Do you want/need them turned on? I'd recommend leaving this set to false and only enabling if you really need an op and understand the differences.

    "spark.rapids.sql.incompatibleOps.enabled": "true"

This should be default:

    "spark.sql.adaptive.enabled": "true"