NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
783 stars 228 forks source link

[BUG] Slow/no progress with cascaded pandas udfs/mapInPandas in Databricks #10770

Open eordentlich opened 3 months ago

eordentlich commented 3 months ago

Describe the bug Successively applied Pandas UDFs and MapInPandas make no progress in Databricks.

Steps/Code to reproduce bug

import pyspark.sql.functions as F
import numpy as np
import pandas as pd
transformed_df = spark.range(1000000) 
from pyspark.sql.functions import pandas_udf

@pandas_udf("int")
def rand_label(col: pd.Series) -> pd.Series: 
  import logging
  logger = logging.getLogger('rand_label')
  logger.info("in rand label")
  print("in rand label 1")
  return pd.Series(np.random.randint(0,999,size=col.shape[0]))

@pandas_udf("int")
def rand_label2(col: pd.Series) -> pd.Series: 
  import logging
  logger = logging.getLogger('rand_label')
  logger.info("in rand label")
  print("in rand label 2")
  return pd.Series(np.random.randint(0,999,size=col.shape[0]))

transformed_df_w_label_2 = transformed_df.withColumn("label", rand_label(F.lit(0)))

The following then is problematic.

transformed_df = spark.read.parquet("s3a://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet")
features_col = 'feature_array'
prediction_col = 'label'
centers = np.random.rand(1000,3000)
from pyspark.sql.types import StructType, StructField, DoubleType

sc = transformed_df.rdd.context
centers_bc = sc.broadcast(centers)

def partition_score_udf(
    pdf_iter
) :
    local_centers = centers_bc.value.astype(np.float64)
    partition_score = 0.0
    import logging
    logger = logging.getLogger('partition_score_udf')
    logger.info("in partition score udf")
    for pdf in pdf_iter:
        print("in partition score udf")
        input_vecs = np.array(list(pdf[features_col]), dtype=np.float64)
        predictions = list(pdf[prediction_col])
        center_vecs = local_centers[predictions, :]
        partition_score += np.sum((input_vecs - center_vecs) ** 2)
    yield pd.DataFrame({"partition_score": [partition_score]})

total_score = (
  # the below is extremely slow
  # if instead of transformed_df_w_label_2 we apply to transformed_df_w_label it runs fine
  # one difference is that transformed_df_ws_label_2 is itself the output of another pandas udf
  # so data for this case is passing back and forth between jvm and python workers multiple times
    transformed_df_w_label_2.mapInPandas(
        partition_score_udf,  # type: ignore
        StructType([StructField("partition_score", DoubleType(), True)]),
    )
    .agg(F.sum("partition_score").alias("total_score"))
    .toPandas()
)  # type: ignore
total_score = total_score["total_score"][0]  # type: ignore

In this case, at least in 13.3ML, the computation slows dramatically and may be deadlocked.

Expected behavior No slowdowns, like with baseline Spark without the plugin.

Environment details (please complete the following information)

Cluster shape: 2x workers with g5.2xlarge and driver with g4dn.xlarge Additional context Also, based on print statement output in the logs, the first udf appears to complete fully before the second one starts. The batches should flow through both python udfs incrementally as is the case with baseline Spark.

Might be related to: https://github.com/NVIDIA/spark-rapids/issues/10751

firestarman commented 3 months ago

could u first try to increase the value of "concurrentGpuTask" to see if we can get any better perf ?

eordentlich commented 3 months ago

The computation gets pretty much stuck with essentially no progress. I don't think that will make a difference. Partial stack trace after reaching this point (might be from similar but not identical example to this repro):

Details

``` ``` at sun.misc.Unsafe.copyMemory(Native Method) at sun.misc.Unsafe.copyMemory(Unsafe.java:560) at java.nio.DirectByteBuffer.put(DirectByteBuffer.java:331) at org.apache.spark.util.DirectByteBufferOutputStream.grow(DirectByteBufferOutputStream.scala:63) at org.apache.spark.util.DirectByteBufferOutputStream.ensureCapacity(DirectByteBufferOutputStream.scala:49) at org.apache.spark.util.DirectByteBufferOutputStream.write(DirectByteBufferOutputStream.scala:44) at java.io.DataOutputStream.write(DataOutputStream.java:107) - locked <0x0000000768a99630> (a java.io.DataOutputStream) at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter.$anonfun$handleBuffer$1(GpuArrowWriter.scala:48) at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter.$anonfun$handleBuffer$1$adapted(GpuArrowWriter.scala:42) at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter$$Lambda$3498/1244767780.apply(Unknown Source) at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30) at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter.handleBuffer(GpuArrowWriter.scala:42) at ai.rapids.cudf.Table.writeArrowIPCArrowChunk(Native Method) at ai.rapids.cudf.Table.access$2000(Table.java:41) at ai.rapids.cudf.Table$ArrowIPCTableWriter.write(Table.java:1739) at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$write$1(GpuArrowWriter.scala:99) at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$write$1$adapted(GpuArrowWriter.scala:97) at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter$$Lambda$3493/108528776.apply(Unknown Source) at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30) at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.write(GpuArrowWriter.scala:97) at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.write$(GpuArrowWriter.scala:96) at org.apache.spark.sql.rapids.execution.python.GpuArrowPythonWriter.write(GpuArrowWriter.scala:144) at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$writeAndClose$1(GpuArrowWriter.scala:93) at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$writeAndClose$1$adapted(GpuArrowWriter.scala:92) at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter$$Lambda$3492/1674125626.apply(Unknown Source) at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30) at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.writeAndClose(GpuArrowWriter.scala:92) at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.writeAndClose$(GpuArrowWriter.scala:92) at org.apache.spark.sql.rapids.execution.python.GpuArrowPythonWriter.writeAndClose(GpuArrowWriter.scala:144) at org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner$$anon$1.writeNextInputToStream(GpuArrowPythonRunner.scala:74) at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:931) at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:851) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) - locked <0x0000000765602c48> (a java.io.BufferedInputStream) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonOutput$$anon$1.read(GpuArrowPythonOutput.scala:71) at org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonOutput$$anon$1.read(GpuArrowPythonOutput.scala:48) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:635) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at com.nvidia.spark.rapids.AbstractProjectSplitIterator.hasNext(basicPhysicalOperators.scala:233) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at com.nvidia.spark.rapids.GpuMergeAggregateIterator.$anonfun$next$2(GpuAggregateExec.scala:751) at com.nvidia.spark.rapids.GpuMergeAggregateIterator$$Lambda$3706/165795055.apply(Unknown Source)```

firestarman commented 2 months ago

Hi @eordentlich, where can i get the file s3a://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet ?

firestarman commented 2 months ago

And another try is to set the spark.rapids.sql.python.gpu.enabled to false and remove this line spark.python.daemon.module rapids.daemon_databricks if no GPU is required in the UDFs.

eordentlich commented 2 months ago

Hi @eordentlich, where can i get the file s3a://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet ?

It's a public s3 bucket/file. Can you access via spark parquet reader or s3 cli?

firestarman commented 2 months ago

It's a public s3 bucket/file. Can you access via spark parquet reader or s3 cli?

I tried to reproduce this locally, but always getting the error as below, seems there is something I missed.

Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider : com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
    at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:216)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1269)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:845)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:794)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5456)
    at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:6431)
    at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:6404)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5441)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5403)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1372)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)
    ... 22 more
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
    at com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
    at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
    ... 43 more
crajive commented 2 months ago

Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider : com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))

Use AnonymousAWSCredentialsProvider.

Set fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider

eg:

hdfs dfs \
    -D fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider -ls \ 
    s3a://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet/

or via aws cli

aws s3 --no-sign-request ls \
    s3://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet/ 
firestarman commented 2 months ago

i can not reproduce this locally. And another try is to