NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
824 stars 236 forks source link

[BUG] Non-deterministic query result corruption when RAPIDS shuffle manager is enabled #5818

Open gerashegalov opened 2 years ago

gerashegalov commented 2 years ago

Describe the bug A query may produce incorrect results non-deterministically when the shuffle manager is enabled.

Steps/Code to reproduce bug Start pyspark with 2 executors with a local standalone backend or an equivalent local-cluster

$SPARK_HOME/bin/pyspark \
  --driver-class-path $PWD/dist/target/rapids-4-spark_2.12-22.06.0-SNAPSHOT-cuda11.jar  \
  --conf spark.executor.extraClassPath=$PWD/dist/target/rapids-4-spark_2.12-22.06.0-SNAPSHOT-cuda11.jar  \
  --conf spark.plugins=com.nvidia.spark.SQLPlugin  \
  --conf spark.rapids.sql.explain=ALL \
  --conf spark.shuffle.manager=com.nvidia.spark.rapids.spark321.RapidsShuffleManager \
  --conf spark.shuffle.service.enabled=false  \
  --conf spark.dynamicAllocation.enabled=false  \
  --conf spark.executorEnv.UCX_ERROR_SIGNALS=  \
  --conf spark.executorEnv.UCX_MEMTYPE_CACHE=n \
  --conf spark.rapids.memory.gpu.minAllocFraction=0   \
  --conf spark.rapids.memory.gpu.allocFraction=0.2 \
  --conf spark.rapids.shuffle.enabled=false  \
  --master local-cluster[2,1,1200]

if running with a full local standalone backend use

--executor-cores=2 --total-executor-cores=4 --master spark://$(hostname):7077

instead of --master local-cluster[2,1024,1200]

Materialize a simple dataframe in Parquet (same with avro)

>>> spark.createDataFrame([ [1], [10] ], 'a int').write.format('parquet').save(path='/tmp/df.parquet', mode='overwrite')

Repeatedly execute the following query

>>> spark.read.format('parquet').load('/tmp/df.parquet').selectExpr('avg(a)').collect()

and observe the correct result [Row(avg(a)=5.5)] without any issues

Enable RAPIDS Shuffle

>>> spark.conf.set('spark.rapids.shuffle.enabled', True)

and re-execute the query many times and see the query bouncing between [Row(avg(a)=10.0)], [Row(avg(a)=1.0)], [Row(avg(a)=5.5)]

Expected behavior AVG over [1, 10] should be a deterministic query regardless of partitioning and format

Environment details (please complete the following information)

Additional context N/A

abellina commented 2 years ago

@gerashegalov, I am trying to follow these instructions and I can't replicate it so far. I tried local-cluster mode and standalone mode. Could you provide a bit more information:

  1. What version of ucx do you have installed (ucx_info -v). I am running 1.12.0 locally.
  2. I see 5.5 average constantly. Do you ever see that? Or is it 1 or 10 alternating. I am wondering if the issue that you are seeing is on the write, or on the read (or both). It would be nice to get it to happen, and try to inspect the parquet file on the CPU or with the shuffle manger "off" (e.g. a read + a show to get its contents).
gerashegalov commented 2 years ago

@abellina I downloaded the latest full revision https://github.com/openucx/ucx/tags yesterday

$ ucx_info -v
# UCT version=1.12.1 revision dc92435
# configured with: --disable-logging --disable-debug --disable-assertions --disable-params-check --prefix=/usr --enable-examples --with-java=no

I'll try 1.12.0 too.

https://github.com/openucx/ucx/tags

UPDATE downgrading 1.12.0 yields the same buggy behavior

$ ucx_info -v
# UCT version=1.12.0 revision d367332
# configured with: --disable-logging --disable-debug --disable-assertions --disable-params-check --prefix=/usr --enable-examples --with-java=no

I see 5.5 average constantly. Do you ever see that? Or is it 1 or 10 alternating. I am wondering if the issue that you are seeing is on the write, or on the read (or both). It would be nice to get it to happen, and try to inspect the parquet file on the CPU or with the shuffle manger "off" (e.g. a read + a show to get its contents).

I see 5.5 too occasionally (updated description)

gerashegalov commented 2 years ago

@abellina Interestingly this behavior has not reproduced for me yet on a bare-metal Ubuntu20.04 although it reproduces very easily on WSL2 VM Ubuntu20.04

gerashegalov commented 2 years ago

Debugging with h/t @abellina it looks like the workaround is to set --conf spark.executorEnv.UCX_TLS=cuda_copy,tcp

gerashegalov commented 2 years ago
Wed Jun 15 14:41:09 2022
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 515.48.03    Driver Version: 516.25       CUDA Version: 11.7     |
|-------------------------------+----------------------+----------------------+
gerashegalov commented 2 years ago
ucx debug log files for the repro run
$ $SPARK_HOME/bin/pyspark   --driver-class-path $PWD/dist/target/rapids-4-spark_2.12-22.08.0-SNAPSHOT-cuda11.jar    --conf spark.executor.extraClassPath=$PWD/dist/target/rapids-4-spark_2.12-22.08.0-SNAPSHOT-cuda11.jar    --conf spark.plugins=com.nvidia.spark.SQLPlugin    --conf spark.rapids.sql.explain=ALL   --conf spark.shuffle.manager=com.nvidia.spark.rapids.spark321.RapidsShuffleManager   --conf spark.shuffle.service.enabled=false    --conf spark.dynamicAllocation.enabled=false  --conf spark.executorEnv.UCX_ERROR_SIGNALS= --conf spark.executorEnv.UCX_LOG_LEVEL=data --conf spark.executorEnv.UCX_LOG_FILE=/tmp/ucx_log_%p --conf spark.executorEnv.LD_LIBRARY_PATH=$HOME/dist/ucx_debug/lib --conf spark.executorEnv.UCX_MEMTYPE_CACHE=n   --conf spark.rapids.memory.gpu.minAllocFraction=0  --conf spark.rapids.memory.gpu.allocFraction=0.2 --master local-cluster[2,1,1200]
Python 3.8.13 | packaged by conda-forge | (default, Mar 25 2022, 06:04:10) 
[GCC 10.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
22/06/18 01:19:08 WARN Utils: Your hostname, NV-3L4YVG3 resolves to a loopback address: 127.0.1.1; using 172.22.19.221 instead (on interface eth0)
22/06/18 01:19:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/18 01:19:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/18 01:19:10 WARN RapidsPluginUtils: RAPIDS Accelerator 22.08.0-SNAPSHOT using cudf 22.08.0-SNAPSHOT.
22/06/18 01:19:10 WARN RapidsPluginUtils: RAPIDS Accelerator is enabled, to disable GPU support set `spark.rapids.sql.enabled` to false.
22/06/18 01:19:10 WARN RapidsShuffleInternalManager: Rapids Shuffle Plugin enabled. Transport enabled (remote fetches will use com.nvidia.spark.rapids.shuffle.ucx.UCXShuffleTransport. To disable the RAPIDS Shuffle Manager set `spark.rapids.shuffle.enabled` to false
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/

Using Python version 3.8.13 (default, Mar 25 2022 06:04:10)
Spark context Web UI available at http://172.22.19.221:4040/
Spark context available as 'sc' (master = local-cluster[2,1,1200], app id = app-20220618011911-0000).
SparkSession available as 'spark'.
>>> spark.read.format('parquet').load('/tmp/df.parquet').selectExpr('avg(a)').collect()
22/06/18 01:19:23 WARN GpuOverrides:                                            
*Exec  will run on GPU
  *Expression  avg(a#0) will run on GPU
    *Expression  avg(a#0) will run on GPU
  *Expression  avg(a#0)#2 AS avg(a)#3 will run on GPU
  *Exec  will run on GPU
    *Partitioning  will run on GPU
    *Exec  will run on GPU
      *Expression  partial_avg(a#0) will run on GPU
        *Expression  avg(a#0) will run on GPU
      *Exec  will run on GPU

22/06/18 01:19:23 WARN GpuOverrides: 
*Exec  will run on GPU
  *Expression  avg(a#0) will run on GPU
    *Expression  avg(a#0) will run on GPU
  *Expression  avg(a#0)#2 AS avg(a)#3 will run on GPU
  *Exec  will run on GPU
    *Partitioning  will run on GPU
    *Exec  will run on GPU
      *Expression  partial_avg(a#0) will run on GPU
        *Expression  avg(a#0) will run on GPU
      *Exec  will run on GPU

22/06/18 01:19:23 WARN GpuOverrides: 
*Exec  will run on GPU
  *Expression  avg(a#0) will run on GPU
    *Expression  avg(a#0) will run on GPU
  *Expression  avg(a#0)#2 AS avg(a)#3 will run on GPU
  *Exec  will run on GPU
    *Partitioning  will run on GPU
    *Exec  will run on GPU
      *Expression  partial_avg(a#0) will run on GPU
        *Expression  avg(a#0) will run on GPU
      *Exec  will run on GPU

22/06/18 01:19:23 WARN GpuOverrides: 
*Exec  will run on GPU
  *Partitioning  will run on GPU
  *Exec  will run on GPU
    *Expression  partial_avg(a#0) will run on GPU
      *Expression  avg(a#0) will run on GPU
    *Exec  will run on GPU

22/06/18 01:19:25 WARN GpuOverrides: =>                             (1 + 1) / 2]
*Exec  will run on GPU
  *Expression  avg(a#0) will run on GPU
    *Expression  avg(a#0) will run on GPU
  *Expression  avg(a#0)#2 AS avg(a)#3 will run on GPU

22/06/18 01:19:25 WARN GpuOverrides: 
*Exec  will run on GPU
  *Expression  avg(a#0) will run on GPU
    *Expression  avg(a#0) will run on GPU
  *Expression  avg(a#0)#2 AS avg(a)#3 will run on GPU

[Row(avg(a)=1.0)]                                                               
>>> spark.read.format('parquet').load('/tmp/df.parquet').selectExpr('avg(a)').collect()
22/06/18 01:19:27 WARN GpuOverrides: 
*Exec  will run on GPU
  *Expression  avg(a#31) will run on GPU
    *Expression  avg(a#31) will run on GPU
  *Expression  avg(a#31)#33 AS avg(a)#34 will run on GPU
  *Exec  will run on GPU
    *Partitioning  will run on GPU
    *Exec  will run on GPU
      *Expression  partial_avg(a#31) will run on GPU
        *Expression  avg(a#31) will run on GPU
      *Exec  will run on GPU

22/06/18 01:19:27 WARN GpuOverrides: 
*Exec  will run on GPU
  *Expression  avg(a#31) will run on GPU
    *Expression  avg(a#31) will run on GPU
  *Expression  avg(a#31)#33 AS avg(a)#34 will run on GPU
  *Exec  will run on GPU
    *Partitioning  will run on GPU
    *Exec  will run on GPU
      *Expression  partial_avg(a#31) will run on GPU
        *Expression  avg(a#31) will run on GPU
      *Exec  will run on GPU

22/06/18 01:19:27 WARN GpuOverrides: 
*Exec  will run on GPU
  *Expression  avg(a#31) will run on GPU
    *Expression  avg(a#31) will run on GPU
  *Expression  avg(a#31)#33 AS avg(a)#34 will run on GPU
  *Exec  will run on GPU
    *Partitioning  will run on GPU
    *Exec  will run on GPU
      *Expression  partial_avg(a#31) will run on GPU
        *Expression  avg(a#31) will run on GPU
      *Exec  will run on GPU

22/06/18 01:19:27 WARN GpuOverrides: 
*Exec  will run on GPU
  *Partitioning  will run on GPU
  *Exec  will run on GPU
    *Expression  partial_avg(a#31) will run on GPU
      *Expression  avg(a#31) will run on GPU
    *Exec  will run on GPU

22/06/18 01:19:27 WARN GpuOverrides: 
*Exec  will run on GPU
  *Expression  avg(a#31) will run on GPU
    *Expression  avg(a#31) will run on GPU
  *Expression  avg(a#31)#33 AS avg(a)#34 will run on GPU

22/06/18 01:19:27 WARN GpuOverrides: 
*Exec  will run on GPU
  *Expression  avg(a#31) will run on GPU
    *Expression  avg(a#31) will run on GPU
  *Expression  avg(a#31)#33 AS avg(a)#34 will run on GPU

[Row(avg(a)=10.0)]

ucx_log_sanitized.zip

abellina commented 2 years ago

@gerashegalov the UCX debug logs are useful, but I would like to see corresponding executor logs as well (especially in standalone). If you can turn the log level to DEBUG in spark that would be really great.

gerashegalov commented 2 years ago

Repro logs with the DEBUG level Spark logs @abellina ucx_log_sanitized.zip

abellina commented 2 years ago

OK, thanks for the logs! With @gerashegalov's help I was able to setup WSL2 and repro this in a windows machine.

The issue is specific to cuda_ipc with wakeup, which is using events to trigger when a send/recv is finished. What looks to be happening is that we are getting a callback from UCX even though the copy is not complete, which explains why it sometimes works. This is our default mode, rather than busy waiting, since we've found it to be better for performance and we know it works in other environments. Setting: --conf spark.rapids.shuffle.ucx.useWakeup=false (turning off wakeup and relying on busy waiting), is a potential workaround for WSL2, but it's not great. This looks to be a bug in UCX or CUDA (callback functions).

I added some logging to capture the batch being received and the corresponding sent batch. This is the batch at the receiving end:

DEBUG fetched cb Table{columns=[ColumnVector{rows=1, type=FLOAT64, nullCount=Optional.empty, offHeap=(ID: 27 7f77914ebbb0)}, ColumnVector{rows=1, type=INT64, nullCount=Optional.empty, offHeap=(ID: 28 7f77914fffd0)}], cudfTable=140151513387040, rows=1}
GPU COLUMN 0 - NC: 0 DATA: DeviceMemoryBufferView{address=0x800000200, length=8, id=-1} VAL: null
COLUMN 0 - FLOAT64
0 0.0
GPU COLUMN 1 - NC: 0 DATA: DeviceMemoryBufferView{address=0x800000240, length=8, id=-1} VAL: null
COLUMN 1 - INT64
0 0

But the sender sent this (before UCX send):

DEBUG to send Table{columns=[ColumnVector{rows=1, type=FLOAT64, nullCount=Optional.empty, offHeap=(ID: 25 7f160808ea00)}, ColumnVector{rows=1, type=INT64, nullCount=Optional.empty, offHeap=(ID: 26 7f160808eb70)}], cudfTable=139732600243792, rows=1}
GPU COLUMN 0 - NC: 0 DATA: DeviceMemoryBufferView{address=0x800000000, length=8, id=-1} VAL: null
COLUMN 0 - FLOAT64
0 10.0
GPU COLUMN 1 - NC: 0 DATA: DeviceMemoryBufferView{address=0x800000040, length=8, id=-1} VAL: null
COLUMN 1 - INT64
0 1

I believe the next step is to see if the above explanation makes sense to @Akshay-Venkatesh, then we may need to help repro at a lower level or help test a fix.

sameerz commented 2 years ago

Removing P1 and removing from 22.08 since the issue only occurs in WSL2 (which we do not support).

Akshay-Venkatesh commented 2 years ago

OK, thanks for the logs! With @gerashegalov's help I was able to setup WSL2 and repro this in a windows machine.

The issue is specific to cuda_ipc with wakeup, which is using events to trigger when a send/recv is finished. What looks to be happening is that we are getting a callback from UCX even though the copy is not complete, which explains why it sometimes works. This is our default mode, rather than busy waiting, since we've found it to be better for performance and we know it works in other environments. Setting: --conf spark.rapids.shuffle.ucx.useWakeup=false (turning off wakeup and relying on busy waiting), is a potential workaround for WSL2, but it's not great. This looks to be a bug in UCX or CUDA (callback functions).

I added some logging to capture the batch being received and the corresponding sent batch. This is the batch at the receiving end:

DEBUG fetched cb Table{columns=[ColumnVector{rows=1, type=FLOAT64, nullCount=Optional.empty, offHeap=(ID: 27 7f77914ebbb0)}, ColumnVector{rows=1, type=INT64, nullCount=Optional.empty, offHeap=(ID: 28 7f77914fffd0)}], cudfTable=140151513387040, rows=1}
GPU COLUMN 0 - NC: 0 DATA: DeviceMemoryBufferView{address=0x800000200, length=8, id=-1} VAL: null
COLUMN 0 - FLOAT64
0 0.0
GPU COLUMN 1 - NC: 0 DATA: DeviceMemoryBufferView{address=0x800000240, length=8, id=-1} VAL: null
COLUMN 1 - INT64
0 0

But the sender sent this (before UCX send):

DEBUG to send Table{columns=[ColumnVector{rows=1, type=FLOAT64, nullCount=Optional.empty, offHeap=(ID: 25 7f160808ea00)}, ColumnVector{rows=1, type=INT64, nullCount=Optional.empty, offHeap=(ID: 26 7f160808eb70)}], cudfTable=139732600243792, rows=1}
GPU COLUMN 0 - NC: 0 DATA: DeviceMemoryBufferView{address=0x800000000, length=8, id=-1} VAL: null
COLUMN 0 - FLOAT64
0 10.0
GPU COLUMN 1 - NC: 0 DATA: DeviceMemoryBufferView{address=0x800000040, length=8, id=-1} VAL: null
COLUMN 1 - INT64
0 1

I believe the next step is to see if the above explanation makes sense to @Akshay-Venkatesh, then we may need to help repro at a lower level or help test a fix.

@abellina Sorry for the late response but do you happen to know if this issue is specific to WSL2? Want to know if there is an identical linux setup where the issue doesn't show up.

gerashegalov commented 2 years ago

@Akshay-Venkatesh we haven't been able to reproduce the issue on available bare-metal Ubuntu environments. So it may be related to either virtualization aspect of WSL2 or to the Runtime/Driver combination aspect of WSL2. We statically link 11.5 CUDA Runtime.