NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
783 stars 228 forks source link

[BUG] significant slow down with VectorUDT and ParquetCachedBatchSerializer #8474

Open eordentlich opened 1 year ago

eordentlich commented 1 year ago

Describe the bug The evaluation step in this example notebook: https://github.com/NVIDIA/spark-rapids-examples/blob/main/examples/XGBoost-Examples/mortgage/notebooks/python/MortgageETL%2BXGBoost.ipynb?short_path=f801328#L1035 is about 600x slower when ParquetCachedBatchSerializer is enabled. The result DataFrame being processed here has several columns of type VectorUDT. If these columns are either dropped or converted to array type using psypark.ml.functions.vector_to_array before cacheing and then converted back upon read using array_to_vector, the slow down can be avoided. This indicates that ParquetCachedBatchSerializer has an issue with processing VectorUDT columns on the read side.

Steps/Code to reproduce bug Run the above example notebook with ParquetCachedBatchSerializer enabled.

Expected behavior No 600x slow down.

Environment details (please complete the following information) Standalone.

pyspark                             \
--master ${SPARK_URL}            \
--jars ${PWD}/rapids-4-spark_2.12-23.04.0.jar \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.rapids.memory.gpu.allocFraction=0.5 --conf spark.rapids.memory.pinnedPool.size=2g \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.executor.cores=1 \
--conf spark.task.resource.gpu.amount=1 \
--conf spark.sql.execution.arrow.maxRecordsPerBatch=200000 \
--conf spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh \
--files $SPARK_HOME/examples/src/main/scripts/getGpusResources.sh --conf spark.executor.memory=10g --conf spark.sql.cache.serializer=com.nvidia.spark.ParquetCachedBatchSerializer

Additional context Related issue, with likely same underlying problem: https://github.com/NVIDIA/spark-rapids/issues/5975

revans2 commented 1 year ago

I have not run this, but I am just guessing. The GPU does not support UserDefinedTypes right now, and VectorUDT is a user defined type, so we are going to fall back to the CPU to serialize and deserialize them. This might not be ideal because the CPU is really bad at writing parquet in many cases compared to the GPU. 600x better (that is hard to believe so I need to do some testing).

revans2 commented 1 year ago

@eordentlich do you have any instructions on how to get an environment setup to do this? I tried to use conda to setup an environment following the instructions at https://xgboost.readthedocs.io/en/stable/tutorials/spark_estimator.html

conda create -y -n xgboost_env -c conda-forge conda-pack python=3.9
conda activate xgboost_env
# use conda when the supported version of xgboost (1.7) is released on conda-forge
pip install xgboost
conda install cudf pyarrow pandas -c rapids -c nvidia -c conda-forge

But it didn't work and I had to change the last command to

conda install cudf pyarrow pandas -c rapidsai -c nvidia -c conda-forge

Then when I tried to import xgboost in the notebook

from xgboost.spark import SparkXGBClassifier, SparkXGBClassifierModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

I got an error about no sklearn so I installed it

conda install scikit-learn

And now I am getting what appears to be CUDA mismatch of some kind.

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 822, in process
    serializer.dump_stream(out_iter, outfile)
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 345, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 86, in dump_stream
    for batch in iterator:
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 338, in init_stream_yield_batches
    for series in iterator:
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 519, in func
    for result_batch, result_type in result_iter:
  File "xgboost_env/lib/python3.9/site-packages/xgboost/spark/core.py", line 795, in _train_booster
    use_qdm = use_hist and is_cudf_available()
  File "xgboost_env/lib/python3.9/site-packages/xgboost/compat.py", line 83, in is_cudf_available
    import cudf
  File "xgboost_env/lib/python3.9/site-packages/cudf/__init__.py", line 5, in <module>
    validate_setup()
  File "xgboost_env/lib/python3.9/site-packages/cudf/utils/gpu_utils.py", line 20, in validate_setup
    from rmm._cuda.gpu import (
  File "xgboost_env/lib/python3.9/site-packages/rmm/__init__.py", line 16, in <module>
    from rmm import mr
  File "xgboost_env/lib/python3.9/site-packages/rmm/mr.py", line 14, in <module>
    from rmm._lib.memory_resource import (
  File "xgboost_env/lib/python3.9/site-packages/rmm/_lib/__init__.py", line 15, in <module>
    from .device_buffer import DeviceBuffer
  File "device_buffer.pyx", line 1, in init rmm._lib.device_buffer
TypeError: C function cuda.ccudart.cudaStreamSynchronize has wrong signature (expected __pyx_t_4cuda_7ccudart_cudaError_t (__pyx_t_4cuda_7ccudart_cudaStream_t), got cudaError_t (cudaStream_t))
eordentlich commented 1 year ago

Indeed, looks like those instructions can use some work. I think a conda cudatoolkit package needs to be added to your conda environment create command: e.g cudatoolkit=11.5 with version ( >= 11.2, <= 11.8) that matches the one installed on your host. If you are running on a single node, you can activate the conda environment and run in either local mode or standalone, with master and worker started in the environment.

That said, I think you can replicate the key issue via the following running in a pyspark shell started as

pyspark --master local[4] --conf spark.driver.memory=20g --jars rapids-4-spark_2.12-23.04.0.jar --conf spark.plugins=com.nvidia.spark.SQLPlugin --conf spark.sql.cache.serializer=com.nvidia.spark.ParquetCachedBatchSerializer

And then in the shell, paste

from pyspark.sql.functions import rand, element_at, sum
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.functions import vector_to_array
import timeit

df = spark.range(10000000)
df = df.select(rand().alias("r0"),rand().alias("r1"))

df_vec = VectorAssembler(inputCols=["r0","r1"],outputCol="vec").transform(df).drop("r0","r1")
timeit.timeit(lambda: print(df_vec.select(sum(element_at(vector_to_array("vec"), 1))).collect()), number=1)
df_vec.cache()
timeit.timeit(lambda: print(df_vec.select(sum(element_at(vector_to_array("vec"), 1))).collect()), number=1)

The first timeit call finishes reasonably fast, while the second, after the cache(), takes "forever". You can try dialing down the range size to compare running times.

revans2 commented 1 year ago

Thanks for the simplified setup. I was able to reproduce the caching issue. At least I was able to get the Spark to crash with a timeout when using the parquet cached batch serializer for your initial request (I made the data bigger because I was using more cores, but I guess I made it too big!!!).

revans2 commented 1 year ago

I found at least one really bad problem where we were doing code generation for each row in a specific code path. I need to do some more profiling to see what else might be bad about it.

revans2 commented 1 year ago

@eordentlich do you have the ability to try out #8495? It is not going to solve all of your problems but it would be good to know if it is good enough for now or if we have to start looking at some of the other optimizations too.

eordentlich commented 1 year ago

Thanks. I'll have to build the jar (unless it is already in cicd somewhere) and give it a try on that notebook.

eordentlich commented 1 year ago

@revans2 I tested the PR and it is a huge improvement. Still 4x slower than mapping vector to array type and back, with PCBS, and about 6x slower than regular non-PCBS caching for the notebook example eval stage.

revans2 commented 1 year ago

@revans2 I tested the PR and it is a huge improvement. Still 4x slower than mapping vector to array type and back, with PCBS, and about 6x slower than regular non-PCBS caching for the notebook example eval stage.

@eordentlich glad to hear that it is helping. I'll see if we can get some help in improving the performance even more.

@sameerz looks like we should spend some time on the other issues I filed especially https://github.com/NVIDIA/spark-rapids/issues/8496 I think we can make it work without too much difficulty.