NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
797 stars 232 forks source link

[BUG] ClassNotFound error when running a job #3482

Closed rongou closed 3 years ago

rongou commented 3 years ago

Describe the bug Running tpcds query fails with

Caused by: org.apache.spark.SparkException:
 Job aborted due to stage failure: ClassNotFound with classloader: 
scala.tools.nsc.interpreter.IMain$TranslatingClassLoader@46fb0c33

Steps/Code to reproduce bug Running a tpcds query in a spark shell:

/opt/spark/bin/spark-shell\
 --master spark://127.0.0.1:7077\
 --conf spark.locality.wait=0s\
 --conf spark.sql.files.maxPartitionBytes=1g\
 --conf spark.sql.shuffle.partitions=200\
 --conf spark.sql.adaptive.enabled=true\
 --conf spark.shuffle.manager=com.nvidia.spark.rapids.spark312.RapidsShuffleManager\
 --conf spark.shuffle.service.enabled=false\
 --conf spark.dynamicAllocation.enabled=false\
 --conf spark.sql.broadcastTimeout=600\
 --conf spark.rapids.sql.explain=ALL\
 --conf spark.plugins=com.nvidia.spark.SQLPlugin\
 --conf spark.rapids.cudfVersionOverride=true\
 --conf spark.rapids.sql.concurrentGpuTasks=1\
 --conf spark.rapids.memory.host.spillStorageSize=32G\
 --conf spark.rapids.memory.pinnedPool.size=8G\
 --conf spark.rapids.sql.batchSizeBytes=1g\
 --conf spark.rapids.memory.gpu.direct.storage.spill.enabled=false\
 --conf spark.rapids.memory.gpu.direct.storage.spill.useHostMemory=false\
 --conf spark.rapids.memory.gpu.direct.storage.spill.alignedIO=false\
 --conf spark.rapids.memory.gpu.direct.storage.spill.alignmentThreshold=8m\
 --conf spark.rapids.memory.gpu.unspill.enabled=false\
 --conf spark.rapids.shuffle.transport.enabled=true\
 --conf spark.executorEnv.UCX_ERROR_SIGNALS=\
 --conf spark.executorEnv.UCX_MEMTYPE_CACHE=n\
 --conf spark.executorEnv.UCX_IB_RX_QUEUE_LEN=1024\
 --conf spark.executorEnv.UCX_TLS=cuda_copy,cuda_ipc,rc,tcp\
 --conf spark.executorEnv.UCX_RNDV_SCHEME=put_zcopy\
 --conf spark.executorEnv.UCX_MAX_RNDV_RAILS=1\
 --conf spark.rapids.shuffle.maxMetadataSize=512K\
 --conf spark.rapids.shuffle.ucx.bounceBuffers.size=8M\
 --conf spark.driver.memory=10G\
 --conf spark.driver.maxResultSize=0\
 --conf spark.driver.extraJavaOptions=-Dai.rapids.cudf.nvtx.enabled=false\
 --conf spark.executor.extraClassPath=/opt/rapids/cudf.jar:/opt/rapids/rapids-4-spark.jar\
 --conf spark.executor.extraJavaOptions=-Dai.rapids.cudf.nvtx.enabled=false\
 --conf spark.executor.instances=1\
 --conf spark.executor.cores=24\
 --conf spark.executor.memory=64G\
 --conf spark.executor.resource.gpu.amount=1\
 --conf spark.task.cpus=1\
 --conf spark.task.resource.gpu.amount=0.0416\
 --jars /opt/rapids/cudf.jar,/opt/rapids/rapids-4-spark.jar,/opt/rapids/rapids-4-spark-benchmarks.jar

The script:

import com.nvidia.spark.rapids.tests._
import com.nvidia.spark.rapids.tests.tpcds._

TpcdsLikeSpark.setupAllParquet(spark, "/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=false", false)

val benchmark = new BenchmarkRunner(new TpcdsLikeBench(false))
benchmark.collect(spark, "q1", 1)

Expected behavior Should not fail.

Environment details (please complete the following information)

Additional context Full log:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/09/14 11:17:35 WARN RapidsPluginUtils: RAPIDS Accelerator 21.10.0-SNAPSHOT using cudf 21.10.0-SNAPSHOT. To disable GPU support set `spark.rapids.sql.enabled` to false
21/09/14 11:17:35 WARN Plugin: Installing rapids UDF compiler extensions to Spark. The compiler is disabled by default. To enable it, set `spark.rapids.sql.udfCompiler.enabled` to true
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = spark://127.0.0.1:7077, app id = app-20210914111735-0006).
Spark session available as 'spark'.
21/09/14 11:17:46 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[BENCHMARK RUNNER] [q1] Start iteration 0:                                      
21/09/14 11:17:59 WARN InsertAdaptiveSparkPlan: spark.sql.adaptive.enabled is enabled but is not supported for query: TakeOrderedAndProject(limit=100, orderBy=[c_customer_id#413 ASC NULLS FIRST], output=[c_customer_id#413])
+- Project [c_customer_id#413]
   +- SortMergeJoin [ctr_customer_sk#851], [c_customer_sk#412], Inner
      :- Project [ctr_customer_sk#851]
      :  +- BroadcastHashJoin [ctr_store_sk#852], [s_store_sk#664], Inner, BuildRight, false
      :     :- Project [ctr_customer_sk#851, ctr_store_sk#852]
      :     :  +- SortMergeJoin [ctr_store_sk#852], [ctr_store_sk#852#859], Inner, (ctr_total_return#853 > (avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857)
      :     :     :- Filter isnotnull(ctr_total_return#853)
      :     :     :  +- HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[sum(sr_return_amt#186)], output=[ctr_customer_sk#851, ctr_store_sk#852, ctr_total_return#853])
      :     :     :     +- HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[partial_sum(sr_return_amt#186)], output=[sr_customer_sk#178, sr_store_sk#182, sum#863])
      :     :     :        +- Project [sr_customer_sk#178, sr_store_sk#182, sr_return_amt#186]
      :     :     :           +- BroadcastHashJoin [sr_returned_date_sk#195], [d_date_sk#492], Inner, BuildRight, false
      :     :     :              :- Project [sr_customer_sk#178, sr_store_sk#182, sr_return_amt#186, sr_returned_date_sk#195]
      :     :     :              :  +- Filter (isnotnull(sr_store_sk#182) AND isnotnull(sr_customer_sk#178))
      :     :     :              :     +- FileScan parquet [sr_customer_sk#178,sr_store_sk#182,sr_return_amt#186,sr_returned_date_sk#195] Batched: true, DataFilters: [isnotnull(sr_store_sk#182), isnotnull(sr_customer_sk#178)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [isnotnull(sr_returned_date_sk#195), dynamicpruning#860 [sr_returned_date_sk#195]], PushedFilters: [IsNotNull(sr_store_sk), IsNotNull(sr_customer_sk)], ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:double>
      :     :     :              :           +- Project [d_date_sk#492]
      :     :     :              :              +- Filter ((isnotnull(d_year#498) AND (d_year#498 = 2000)) AND isnotnull(d_date_sk#492))
      :     :     :              :                 +- Relation[d_date_sk#492,d_date_id#493,d_date#494,d_month_seq#495,d_week_seq#496,d_quarter_seq#497,d_year#498,d_dow#499,d_moy#500,d_dom#501,d_qoy#502,d_fy_year#503,d_fy_quarter_seq#504,d_fy_week_seq#505,d_day_name#506,d_quarter_name#507,d_holiday#508,d_weekend#509,d_following_holiday#510,d_first_dom#511,d_last_dom#512,d_same_day_ly#513,d_same_day_lq#514,d_current_day#515,... 4 more fields] parquet
      :     :     :              +- Project [d_date_sk#492]
      :     :     :                 +- Filter ((isnotnull(d_year#498) AND (d_year#498 = 2000)) AND isnotnull(d_date_sk#492))
      :     :     :                    +- FileScan parquet [d_date_sk#492,d_year#498] Batched: true, DataFilters: [isnotnull(d_year#498), (d_year#498 = 2000), isnotnull(d_date_sk#492)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
      :     :     +- Filter isnotnull((avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857)
      :     :        +- HashAggregate(keys=[ctr_store_sk#852], functions=[avg(ctr_total_return#853)], output=[(avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857, ctr_store_sk#852#859])
      :     :           +- HashAggregate(keys=[ctr_store_sk#852], functions=[partial_avg(ctr_total_return#853)], output=[ctr_store_sk#852, sum#866, count#867L])
      :     :              +- HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[sum(sr_return_amt#186)], output=[ctr_store_sk#852, ctr_total_return#853])
      :     :                 +- HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[partial_sum(sr_return_amt#186)], output=[sr_customer_sk#178, sr_store_sk#182, sum#869])
      :     :                    +- Project [sr_customer_sk#178, sr_store_sk#182, sr_return_amt#186]
      :     :                       +- BroadcastHashJoin [sr_returned_date_sk#195], [d_date_sk#492], Inner, BuildRight, false
      :     :                          :- Project [sr_customer_sk#178, sr_store_sk#182, sr_return_amt#186, sr_returned_date_sk#195]
      :     :                          :  +- Filter isnotnull(sr_store_sk#182)
      :     :                          :     +- FileScan parquet [sr_customer_sk#178,sr_store_sk#182,sr_return_amt#186,sr_returned_date_sk#195] Batched: true, DataFilters: [isnotnull(sr_store_sk#182)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [isnotnull(sr_returned_date_sk#195), dynamicpruning#861 [sr_returned_date_sk#195]], PushedFilters: [IsNotNull(sr_store_sk)], ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:double>
      :     :                          :           +- Project [d_date_sk#492]
      :     :                          :              +- Filter ((isnotnull(d_year#498) AND (d_year#498 = 2000)) AND isnotnull(d_date_sk#492))
      :     :                          :                 +- Relation[d_date_sk#492,d_date_id#493,d_date#494,d_month_seq#495,d_week_seq#496,d_quarter_seq#497,d_year#498,d_dow#499,d_moy#500,d_dom#501,d_qoy#502,d_fy_year#503,d_fy_quarter_seq#504,d_fy_week_seq#505,d_day_name#506,d_quarter_name#507,d_holiday#508,d_weekend#509,d_following_holiday#510,d_first_dom#511,d_last_dom#512,d_same_day_ly#513,d_same_day_lq#514,d_current_day#515,... 4 more fields] parquet
      :     :                          +- Project [d_date_sk#492]
      :     :                             +- Filter ((isnotnull(d_year#498) AND (d_year#498 = 2000)) AND isnotnull(d_date_sk#492))
      :     :                                +- FileScan parquet [d_date_sk#492,d_year#498] Batched: true, DataFilters: [isnotnull(d_year#498), (d_year#498 = 2000), isnotnull(d_date_sk#492)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
      :     +- Project [s_store_sk#664]
      :        +- Filter ((isnotnull(s_state#688) AND (s_state#688 = TN)) AND isnotnull(s_store_sk#664))
      :           +- FileScan parquet [s_store_sk#664,s_state#688] Batched: true, DataFilters: [isnotnull(s_state#688), (s_state#688 = TN), isnotnull(s_store_sk#664)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [], PushedFilters: [IsNotNull(s_state), EqualTo(s_state,TN), IsNotNull(s_store_sk)], ReadSchema: struct<s_store_sk:int,s_state:string>
      +- Project [c_customer_sk#412, c_customer_id#413]
         +- Filter isnotnull(c_customer_sk#412)
            +- FileScan parquet [c_customer_sk#412,c_customer_id#413] Batched: true, DataFilters: [isnotnull(c_customer_sk#412)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string>
.
21/09/14 11:17:59 WARN GpuOverrides: 
*Exec <ProjectExec> will run on GPU
  *Exec <FilterExec> will run on GPU
    *Expression <And> ((isnotnull(d_year#498) AND (d_year#498 = 2000)) AND isnotnull(d_date_sk#492)) will run on GPU
      *Expression <And> (isnotnull(d_year#498) AND (d_year#498 = 2000)) will run on GPU
        *Expression <IsNotNull> isnotnull(d_year#498) will run on GPU
        *Expression <EqualTo> (d_year#498 = 2000) will run on GPU
      *Expression <IsNotNull> isnotnull(d_date_sk#492) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

21/09/14 11:17:59 WARN GpuOverrides: 
*Exec <ProjectExec> will run on GPU
  *Exec <FilterExec> will run on GPU
    *Expression <And> ((isnotnull(d_year#498) AND (d_year#498 = 2000)) AND isnotnull(d_date_sk#492)) will run on GPU
      *Expression <And> (isnotnull(d_year#498) AND (d_year#498 = 2000)) will run on GPU
        *Expression <IsNotNull> isnotnull(d_year#498) will run on GPU
        *Expression <EqualTo> (d_year#498 = 2000) will run on GPU
      *Expression <IsNotNull> isnotnull(d_date_sk#492) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

21/09/14 11:17:59 WARN GpuOverrides: 
*Exec <TakeOrderedAndProjectExec> will run on GPU
  *Expression <SortOrder> c_customer_id#413 ASC NULLS FIRST will run on GPU
  *Exec <ProjectExec> will run on GPU
    *Exec <SortMergeJoinExec> will run on GPU
      #Exec <SortExec> could run on GPU but is going to be removed because replacing sortMergeJoin with shuffleHashJoin
        #Expression <SortOrder> ctr_customer_sk#851 ASC NULLS FIRST could run on GPU but is going to be removed because parent plan is removed
        *Exec <ShuffleExchangeExec> will run on GPU
          *Partitioning <HashPartitioning> will run on GPU
          *Exec <ProjectExec> will run on GPU
            *Exec <BroadcastHashJoinExec> will run on GPU
              *Exec <ProjectExec> will run on GPU
                *Exec <SortMergeJoinExec> will run on GPU
                  *Expression <GreaterThan> (ctr_total_return#853 > (avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857) will run on GPU
                  #Exec <SortExec> could run on GPU but is going to be removed because replacing sortMergeJoin with shuffleHashJoin
                    #Expression <SortOrder> ctr_store_sk#852 ASC NULLS FIRST could run on GPU but is going to be removed because parent plan is removed
                    *Exec <ShuffleExchangeExec> will run on GPU
                      *Partitioning <HashPartitioning> will run on GPU
                      *Exec <FilterExec> will run on GPU
                        *Expression <IsNotNull> isnotnull(ctr_total_return#853) will run on GPU
                        !Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
                          @Expression <AttributeReference> sr_customer_sk#178 could run on GPU
                          @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                          @Expression <AttributeReference> sr_customer_sk#178 could run on GPU
                          @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                          @Expression <AggregateExpression> sum(sr_return_amt#186) could run on GPU
                            !Expression <Sum> sum(sr_return_amt#186) cannot run on GPU because the GPU will aggregate floating point values in parallel and the result is not always identical each time. This can cause some Spark queries to produce an incorrect answer if the value is computed more than once as part of the same query.  To enable this anyways set spark.rapids.sql.variableFloatAgg.enabled to true.
                              @Expression <AttributeReference> sr_return_amt#186 could run on GPU
                          @Expression <AttributeReference> sum(sr_return_amt#186)#854 could run on GPU
                          @Expression <Alias> sr_customer_sk#178 AS ctr_customer_sk#851 could run on GPU
                            @Expression <AttributeReference> sr_customer_sk#178 could run on GPU
                          @Expression <Alias> sr_store_sk#182 AS ctr_store_sk#852 could run on GPU
                            @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                          @Expression <Alias> sum(sr_return_amt#186)#854 AS ctr_total_return#853 could run on GPU
                            @Expression <AttributeReference> sum(sr_return_amt#186)#854 could run on GPU
                          !Exec <ShuffleExchangeExec> cannot run on GPU because Columnar exchange without columnar children is inefficient
                            @Partitioning <HashPartitioning> could run on GPU
                              @Expression <AttributeReference> sr_customer_sk#178 could run on GPU
                              @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                            !Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
                              @Expression <AttributeReference> sr_customer_sk#178 could run on GPU
                              @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                              @Expression <AggregateExpression> partial_sum(sr_return_amt#186) could run on GPU
                                !Expression <Sum> sum(sr_return_amt#186) cannot run on GPU because the GPU will aggregate floating point values in parallel and the result is not always identical each time. This can cause some Spark queries to produce an incorrect answer if the value is computed more than once as part of the same query.  To enable this anyways set spark.rapids.sql.variableFloatAgg.enabled to true.
                                  @Expression <AttributeReference> sr_return_amt#186 could run on GPU
                              @Expression <AttributeReference> sum#862 could run on GPU
                              @Expression <AttributeReference> sr_customer_sk#178 could run on GPU
                              @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                              @Expression <AttributeReference> sum#863 could run on GPU
                              *Exec <ProjectExec> will run on GPU
                                *Exec <BroadcastHashJoinExec> will run on GPU
                                  *Exec <FilterExec> will run on GPU
                                    *Expression <And> (isnotnull(sr_store_sk#182) AND isnotnull(sr_customer_sk#178)) will run on GPU
                                      *Expression <IsNotNull> isnotnull(sr_store_sk#182) will run on GPU
                                      *Expression <IsNotNull> isnotnull(sr_customer_sk#178) will run on GPU
                                    *Exec <FileSourceScanExec> will run on GPU
                                  *Exec <BroadcastExchangeExec> will run on GPU
                                    *Exec <ProjectExec> will run on GPU
                                      *Exec <FilterExec> will run on GPU
                                        *Expression <And> ((isnotnull(d_year#498) AND (d_year#498 = 2000)) AND isnotnull(d_date_sk#492)) will run on GPU
                                          *Expression <And> (isnotnull(d_year#498) AND (d_year#498 = 2000)) will run on GPU
                                            *Expression <IsNotNull> isnotnull(d_year#498) will run on GPU
                                            *Expression <EqualTo> (d_year#498 = 2000) will run on GPU
                                          *Expression <IsNotNull> isnotnull(d_date_sk#492) will run on GPU
                                        *Exec <FileSourceScanExec> will run on GPU
                  #Exec <SortExec> could run on GPU but is going to be removed because replacing sortMergeJoin with shuffleHashJoin
                    #Expression <SortOrder> ctr_store_sk#852#859 ASC NULLS FIRST could run on GPU but is going to be removed because parent plan is removed
                    *Exec <FilterExec> will run on GPU
                      *Expression <IsNotNull> isnotnull((avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857) will run on GPU
                      !Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
                        @Expression <AttributeReference> ctr_store_sk#852 could run on GPU
                        @Expression <AttributeReference> ctr_store_sk#852 could run on GPU
                        @Expression <AggregateExpression> avg(ctr_total_return#853) could run on GPU
                          !Expression <Average> avg(ctr_total_return#853) cannot run on GPU because the GPU will aggregate floating point values in parallel and the result is not always identical each time. This can cause some Spark queries to produce an incorrect answer if the value is computed more than once as part of the same query.  To enable this anyways set spark.rapids.sql.variableFloatAgg.enabled to true.
                            @Expression <AttributeReference> ctr_total_return#853 could run on GPU
                        @Expression <AttributeReference> avg(ctr_total_return#853)#856 could run on GPU
                        @Expression <Alias> (avg(ctr_total_return#853)#856 * 1.2) AS (avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857 could run on GPU
                          @Expression <Multiply> (avg(ctr_total_return#853)#856 * 1.2) could run on GPU
                            @Expression <AttributeReference> avg(ctr_total_return#853)#856 could run on GPU
                            @Expression <Literal> 1.2 could run on GPU
                        @Expression <Alias> ctr_store_sk#852 AS ctr_store_sk#852#859 could run on GPU
                          @Expression <AttributeReference> ctr_store_sk#852 could run on GPU
                        !Exec <ShuffleExchangeExec> cannot run on GPU because Columnar exchange without columnar children is inefficient
                          @Partitioning <HashPartitioning> could run on GPU
                            @Expression <AttributeReference> ctr_store_sk#852 could run on GPU
                          !Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
                            @Expression <AttributeReference> ctr_store_sk#852 could run on GPU
                            @Expression <AggregateExpression> partial_avg(ctr_total_return#853) could run on GPU
                              !Expression <Average> avg(ctr_total_return#853) cannot run on GPU because the GPU will aggregate floating point values in parallel and the result is not always identical each time. This can cause some Spark queries to produce an incorrect answer if the value is computed more than once as part of the same query.  To enable this anyways set spark.rapids.sql.variableFloatAgg.enabled to true.
                                @Expression <AttributeReference> ctr_total_return#853 could run on GPU
                            @Expression <AttributeReference> sum#864 could run on GPU
                            @Expression <AttributeReference> count#865L could run on GPU
                            @Expression <AttributeReference> ctr_store_sk#852 could run on GPU
                            @Expression <AttributeReference> sum#866 could run on GPU
                            @Expression <AttributeReference> count#867L could run on GPU
                            !Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
                              @Expression <AttributeReference> sr_customer_sk#178 could run on GPU
                              @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                              @Expression <AttributeReference> sr_customer_sk#178 could run on GPU
                              @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                              @Expression <AggregateExpression> sum(sr_return_amt#186) could run on GPU
                                !Expression <Sum> sum(sr_return_amt#186) cannot run on GPU because the GPU will aggregate floating point values in parallel and the result is not always identical each time. This can cause some Spark queries to produce an incorrect answer if the value is computed more than once as part of the same query.  To enable this anyways set spark.rapids.sql.variableFloatAgg.enabled to true.
                                  @Expression <AttributeReference> sr_return_amt#186 could run on GPU
                              @Expression <AttributeReference> sum(sr_return_amt#186)#855 could run on GPU
                              @Expression <Alias> sr_store_sk#182 AS ctr_store_sk#852 could run on GPU
                                @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                              @Expression <Alias> sum(sr_return_amt#186)#855 AS ctr_total_return#853 could run on GPU
                                @Expression <AttributeReference> sum(sr_return_amt#186)#855 could run on GPU
                              !Exec <ShuffleExchangeExec> cannot run on GPU because Columnar exchange without columnar children is inefficient
                                @Partitioning <HashPartitioning> could run on GPU
                                  @Expression <AttributeReference> sr_customer_sk#178 could run on GPU
                                  @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                                !Exec <HashAggregateExec> cannot run on GPU because not all expressions can be replaced
                                  @Expression <AttributeReference> sr_customer_sk#178 could run on GPU
                                  @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                                  @Expression <AggregateExpression> partial_sum(sr_return_amt#186) could run on GPU
                                    !Expression <Sum> sum(sr_return_amt#186) cannot run on GPU because the GPU will aggregate floating point values in parallel and the result is not always identical each time. This can cause some Spark queries to produce an incorrect answer if the value is computed more than once as part of the same query.  To enable this anyways set spark.rapids.sql.variableFloatAgg.enabled to true.
                                      @Expression <AttributeReference> sr_return_amt#186 could run on GPU
                                  @Expression <AttributeReference> sum#868 could run on GPU
                                  @Expression <AttributeReference> sr_customer_sk#178 could run on GPU
                                  @Expression <AttributeReference> sr_store_sk#182 could run on GPU
                                  @Expression <AttributeReference> sum#869 could run on GPU
                                  *Exec <ProjectExec> will run on GPU
                                    *Exec <BroadcastHashJoinExec> will run on GPU
                                      *Exec <FilterExec> will run on GPU
                                        *Expression <IsNotNull> isnotnull(sr_store_sk#182) will run on GPU
                                        *Exec <FileSourceScanExec> will run on GPU
                                      *Exec <BroadcastExchangeExec> will run on GPU
                                        *Exec <ProjectExec> will run on GPU
                                          *Exec <FilterExec> will run on GPU
                                            *Expression <And> ((isnotnull(d_year#498) AND (d_year#498 = 2000)) AND isnotnull(d_date_sk#492)) will run on GPU
                                              *Expression <And> (isnotnull(d_year#498) AND (d_year#498 = 2000)) will run on GPU
                                                *Expression <IsNotNull> isnotnull(d_year#498) will run on GPU
                                                *Expression <EqualTo> (d_year#498 = 2000) will run on GPU
                                              *Expression <IsNotNull> isnotnull(d_date_sk#492) will run on GPU
                                            *Exec <FileSourceScanExec> will run on GPU
              *Exec <BroadcastExchangeExec> will run on GPU
                *Exec <ProjectExec> will run on GPU
                  *Exec <FilterExec> will run on GPU
                    *Expression <And> ((isnotnull(s_state#688) AND (s_state#688 = TN)) AND isnotnull(s_store_sk#664)) will run on GPU
                      *Expression <And> (isnotnull(s_state#688) AND (s_state#688 = TN)) will run on GPU
                        *Expression <IsNotNull> isnotnull(s_state#688) will run on GPU
                        *Expression <EqualTo> (s_state#688 = TN) will run on GPU
                      *Expression <IsNotNull> isnotnull(s_store_sk#664) will run on GPU
                    *Exec <FileSourceScanExec> will run on GPU
      #Exec <SortExec> could run on GPU but is going to be removed because replacing sortMergeJoin with shuffleHashJoin
        #Expression <SortOrder> c_customer_sk#412 ASC NULLS FIRST could run on GPU but is going to be removed because parent plan is removed
        *Exec <ShuffleExchangeExec> will run on GPU
          *Partitioning <HashPartitioning> will run on GPU
          *Exec <FilterExec> will run on GPU
            *Expression <IsNotNull> isnotnull(c_customer_sk#412) will run on GPU
            *Exec <FileSourceScanExec> will run on GPU

[BENCHMARK RUNNER] [q1] Iteration 0 failed after 3268 msec.33:>   (0 + 1) / 1]
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
GpuColumnarExchange gpusinglepartitioning$(), ENSURE_REQUIREMENTS, [id=#757]
+- GpuTopN(limit=100, orderBy=[c_customer_id#413 ASC NULLS FIRST], output=[c_customer_id#413])
   +- GpuProject [c_customer_id#413]
      +- GpuShuffledHashJoin [ctr_customer_sk#851], [c_customer_sk#412], Inner, GpuBuildRight, false
         :- GpuCoalesceBatches targetsize(1073741824)
         :  +- GpuColumnarExchange gpuhashpartitioning(ctr_customer_sk#851, 200), ENSURE_REQUIREMENTS, [id=#752]
         :     +- GpuProject [ctr_customer_sk#851]
         :        +- GpuBroadcastHashJoin [ctr_store_sk#852], [s_store_sk#664], Inner, GpuBuildRight
         :           :- GpuProject [ctr_customer_sk#851, ctr_store_sk#852]
         :           :  +- GpuCoalesceBatches targetsize(1073741824)
         :           :     +- GpuFilter (ctr_total_return#853 > (avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857)
         :           :        +- GpuShuffledHashJoin [ctr_store_sk#852], [ctr_store_sk#852#859], Inner, GpuBuildRight, false
         :           :           :- GpuCoalesceBatches targetsize(1073741824)
         :           :           :  +- GpuColumnarExchange gpuhashpartitioning(ctr_store_sk#852, 200), ENSURE_REQUIREMENTS, [id=#557]
         :           :           :     +- GpuCoalesceBatches targetsize(1073741824)
         :           :           :        +- GpuFilter gpuisnotnull(ctr_total_return#853)
         :           :           :           +- GpuRowToColumnar targetsize(1073741824)
         :           :           :              +- *(2) HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[sum(sr_return_amt#186)], output=[ctr_customer_sk#851, ctr_store_sk#852, ctr_total_return#853])
         :           :           :                 +- Exchange hashpartitioning(sr_customer_sk#178, sr_store_sk#182, 200), ENSURE_REQUIREMENTS, [id=#550]
         :           :           :                    +- *(1) HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[partial_sum(sr_return_amt#186)], output=[sr_customer_sk#178, sr_store_sk#182, sum#863])
         :           :           :                       +- GpuColumnarToRow false
         :           :           :                          +- GpuProject [sr_customer_sk#178, sr_store_sk#182, sr_return_amt#186]
         :           :           :                             +- GpuBroadcastHashJoin [sr_returned_date_sk#195], [d_date_sk#492], Inner, GpuBuildRight
         :           :           :                                :- GpuCoalesceBatches targetsize(1073741824)
         :           :           :                                :  +- GpuFilter (gpuisnotnull(sr_store_sk#182) AND gpuisnotnull(sr_customer_sk#178))
         :           :           :                                :     +- GpuFileGpuScan parquet [sr_customer_sk#178,sr_store_sk#182,sr_return_amt#186,sr_returned_date_sk#195] Batched: true, DataFilters: [isnotnull(sr_store_sk#182), isnotnull(sr_customer_sk#178)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [isnotnull(sr_returned_date_sk#195), dynamicpruningexpression(sr_returned_date_sk#195 IN dynamicp..., PushedFilters: [IsNotNull(sr_store_sk), IsNotNull(sr_customer_sk)], ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:double>
         :           :           :                                :           +- SubqueryBroadcast dynamicpruning#860, 0, [d_date_sk#492], [id=#222]
         :           :           :                                :              +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#221]
         :           :           :                                :                 +- GpuColumnarToRow false
         :           :           :                                :                    +- GpuProject [d_date_sk#492]
         :           :           :                                :                       +- GpuCoalesceBatches targetsize(1073741824)
         :           :           :                                :                          +- GpuFilter ((gpuisnotnull(d_year#498) AND (d_year#498 = 2000)) AND gpuisnotnull(d_date_sk#492))
         :           :           :                                :                             +- GpuFileGpuScan parquet [d_date_sk#492,d_year#498] Batched: true, DataFilters: [isnotnull(d_year#498), (d_year#498 = 2000), isnotnull(d_date_sk#492)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
         :           :           :                                +- GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#439]
         :           :           :                                   +- GpuProject [d_date_sk#492]
         :           :           :                                      +- GpuCoalesceBatches targetsize(1073741824)
         :           :           :                                         +- GpuFilter ((gpuisnotnull(d_year#498) AND (d_year#498 = 2000)) AND gpuisnotnull(d_date_sk#492))
         :           :           :                                            +- GpuFileGpuScan parquet [d_date_sk#492,d_year#498] Batched: true, DataFilters: [isnotnull(d_year#498), (d_year#498 = 2000), isnotnull(d_date_sk#492)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
         :           :           +- GpuCoalesceBatches RequireSingleBatch
         :           :              +- GpuFilter gpuisnotnull((avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857)
         :           :                 +- GpuRowToColumnar targetsize(1073741824)
         :           :                    +- *(5) HashAggregate(keys=[ctr_store_sk#852], functions=[avg(ctr_total_return#853)], output=[(avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857, ctr_store_sk#852#859])
         :           :                       +- Exchange hashpartitioning(ctr_store_sk#852, 200), ENSURE_REQUIREMENTS, [id=#739]
         :           :                          +- *(4) HashAggregate(keys=[ctr_store_sk#852], functions=[partial_avg(ctr_total_return#853)], output=[ctr_store_sk#852, sum#866, count#867L])
         :           :                             +- *(4) HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[sum(sr_return_amt#186)], output=[ctr_store_sk#852, ctr_total_return#853])
         :           :                                +- Exchange hashpartitioning(sr_customer_sk#178, sr_store_sk#182, 200), ENSURE_REQUIREMENTS, [id=#734]
         :           :                                   +- *(3) HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[partial_sum(sr_return_amt#186)], output=[sr_customer_sk#178, sr_store_sk#182, sum#869])
         :           :                                      +- GpuColumnarToRow false
         :           :                                         +- GpuProject [sr_customer_sk#178, sr_store_sk#182, sr_return_amt#186]
         :           :                                            +- GpuBroadcastHashJoin [sr_returned_date_sk#195], [d_date_sk#492], Inner, GpuBuildRight
         :           :                                               :- GpuCoalesceBatches targetsize(1073741824)
         :           :                                               :  +- GpuFilter gpuisnotnull(sr_store_sk#182)
         :           :                                               :     +- GpuFileGpuScan parquet [sr_customer_sk#178,sr_store_sk#182,sr_return_amt#186,sr_returned_date_sk#195] Batched: true, DataFilters: [isnotnull(sr_store_sk#182)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [isnotnull(sr_returned_date_sk#195), dynamicpruningexpression(sr_returned_date_sk#195 IN dynamicp..., PushedFilters: [IsNotNull(sr_store_sk)], ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:double>
         :           :                                               :           +- ReusedSubquery SubqueryBroadcast dynamicpruning#860, 0, [d_date_sk#492], [id=#222]
         :           :                                               +- ReusedExchange [d_date_sk#492], GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#439]
         :           +- GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#478]
         :              +- GpuProject [s_store_sk#664]
         :                 +- GpuCoalesceBatches targetsize(1073741824)
         :                    +- GpuFilter ((gpuisnotnull(s_state#688) AND (s_state#688 = TN)) AND gpuisnotnull(s_store_sk#664))
         :                       +- GpuFileGpuScan parquet [s_store_sk#664,s_state#688] Batched: true, DataFilters: [isnotnull(s_state#688), (s_state#688 = TN), isnotnull(s_store_sk#664)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [], PushedFilters: [IsNotNull(s_state), EqualTo(s_state,TN), IsNotNull(s_store_sk)], ReadSchema: struct<s_store_sk:int,s_state:string>
         +- GpuCoalesceBatches RequireSingleBatch
            +- GpuColumnarExchange gpuhashpartitioning(c_customer_sk#412, 200), ENSURE_REQUIREMENTS, [id=#484]
               +- GpuCoalesceBatches targetsize(1073741824)
                  +- GpuFilter gpuisnotnull(c_customer_sk#412)
                     +- GpuFileGpuScan parquet [c_customer_sk#412,c_customer_id#413] Batched: true, DataFilters: [isnotnull(c_customer_sk#412)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string>

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at com.nvidia.spark.rapids.shims.spark312.SparkBaseShims.attachTreeIfSupported(SparkBaseShims.scala:818)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.doExecuteColumnar(GpuShuffleExchangeExec.scala:224)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at com.nvidia.spark.rapids.GpuCoalesceBatches.doExecuteColumnar(GpuCoalesceBatches.scala:575)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at com.nvidia.spark.rapids.GpuTopN.doExecuteColumnar(limit.scala:292)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at com.nvidia.spark.rapids.GpuColumnarToRowExecParent.doExecute(GpuColumnarToRowExec.scala:301)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:387)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2965)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2965)
    at com.nvidia.spark.rapids.tests.common.BenchUtils$.$anonfun$runBench$1(BenchUtils.scala:220)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
    at com.nvidia.spark.rapids.tests.common.BenchUtils$.runBench(BenchUtils.scala:189)
    at com.nvidia.spark.rapids.tests.common.BenchUtils$.collect(BenchUtils.scala:68)
    at com.nvidia.spark.rapids.tests.BenchmarkRunner.collect(BenchmarkRunner.scala:206)
    at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(/dev/fd/63:30)
    at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(/dev/fd/63:44)
    at $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(/dev/fd/63:46)
    at $line14.$read$$iw$$iw$$iw$$iw$$iw.<init>(/dev/fd/63:48)
    at $line14.$read$$iw$$iw$$iw$$iw.<init>(/dev/fd/63:50)
    at $line14.$read$$iw$$iw$$iw.<init>(/dev/fd/63:52)
    at $line14.$read$$iw$$iw.<init>(/dev/fd/63:54)
    at $line14.$read$$iw.<init>(/dev/fd/63:56)
    at $line14.$read.<init>(/dev/fd/63:58)
    at $line14.$read$.<init>(/dev/fd/63:62)
    at $line14.$read$.<clinit>(/dev/fd/63)
    at $line14.$eval$.$print$lzycompute(/dev/fd/63:7)
    at $line14.$eval$.$print(/dev/fd/63:6)
    at $line14.$eval.$print(/dev/fd/63)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
    at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
    at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
    at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570)
    at scala.tools.nsc.interpreter.ILoop.$anonfun$pasteCommand$11(ILoop.scala:824)
    at scala.tools.nsc.interpreter.IMain.withLabel(IMain.scala:119)
    at scala.tools.nsc.interpreter.ILoop.interpretCode$1(ILoop.scala:824)
    at scala.tools.nsc.interpreter.ILoop.pasteCommand(ILoop.scala:830)
    at org.apache.spark.repl.SparkILoop.$anonfun$process$8(SparkILoop.scala:177)
    at org.apache.spark.repl.SparkILoop.$anonfun$process$8$adapted(SparkILoop.scala:176)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.repl.SparkILoop.loadInitFiles$1(SparkILoop.scala:176)
    at org.apache.spark.repl.SparkILoop.$anonfun$process$4(SparkILoop.scala:166)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.tools.nsc.interpreter.ILoop.$anonfun$mumly$1(ILoop.scala:168)
    at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
    at scala.tools.nsc.interpreter.ILoop.mumly(ILoop.scala:165)
    at org.apache.spark.repl.SparkILoop.loopPostInit$1(SparkILoop.scala:153)
    at org.apache.spark.repl.SparkILoop.$anonfun$process$10(SparkILoop.scala:221)
    at org.apache.spark.repl.SparkILoop.withSuppressedSettings$1(SparkILoop.scala:189)
    at org.apache.spark.repl.SparkILoop.startup$1(SparkILoop.scala:201)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:236)
    at org.apache.spark.repl.Main$.doMain(Main.scala:78)
    at org.apache.spark.repl.Main$.main(Main.scala:58)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
GpuColumnarExchange gpuhashpartitioning(ctr_customer_sk#851, 200), ENSURE_REQUIREMENTS, [id=#752]
+- GpuProject [ctr_customer_sk#851]
   +- GpuBroadcastHashJoin [ctr_store_sk#852], [s_store_sk#664], Inner, GpuBuildRight
      :- GpuProject [ctr_customer_sk#851, ctr_store_sk#852]
      :  +- GpuCoalesceBatches targetsize(1073741824)
      :     +- GpuFilter (ctr_total_return#853 > (avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857)
      :        +- GpuShuffledHashJoin [ctr_store_sk#852], [ctr_store_sk#852#859], Inner, GpuBuildRight, false
      :           :- GpuCoalesceBatches targetsize(1073741824)
      :           :  +- GpuColumnarExchange gpuhashpartitioning(ctr_store_sk#852, 200), ENSURE_REQUIREMENTS, [id=#557]
      :           :     +- GpuCoalesceBatches targetsize(1073741824)
      :           :        +- GpuFilter gpuisnotnull(ctr_total_return#853)
      :           :           +- GpuRowToColumnar targetsize(1073741824)
      :           :              +- *(2) HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[sum(sr_return_amt#186)], output=[ctr_customer_sk#851, ctr_store_sk#852, ctr_total_return#853])
      :           :                 +- Exchange hashpartitioning(sr_customer_sk#178, sr_store_sk#182, 200), ENSURE_REQUIREMENTS, [id=#550]
      :           :                    +- *(1) HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[partial_sum(sr_return_amt#186)], output=[sr_customer_sk#178, sr_store_sk#182, sum#863])
      :           :                       +- GpuColumnarToRow false
      :           :                          +- GpuProject [sr_customer_sk#178, sr_store_sk#182, sr_return_amt#186]
      :           :                             +- GpuBroadcastHashJoin [sr_returned_date_sk#195], [d_date_sk#492], Inner, GpuBuildRight
      :           :                                :- GpuCoalesceBatches targetsize(1073741824)
      :           :                                :  +- GpuFilter (gpuisnotnull(sr_store_sk#182) AND gpuisnotnull(sr_customer_sk#178))
      :           :                                :     +- GpuFileGpuScan parquet [sr_customer_sk#178,sr_store_sk#182,sr_return_amt#186,sr_returned_date_sk#195] Batched: true, DataFilters: [isnotnull(sr_store_sk#182), isnotnull(sr_customer_sk#178)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [isnotnull(sr_returned_date_sk#195), dynamicpruningexpression(sr_returned_date_sk#195 IN dynamicp..., PushedFilters: [IsNotNull(sr_store_sk), IsNotNull(sr_customer_sk)], ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:double>
      :           :                                :           +- SubqueryBroadcast dynamicpruning#860, 0, [d_date_sk#492], [id=#222]
      :           :                                :              +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#221]
      :           :                                :                 +- GpuColumnarToRow false
      :           :                                :                    +- GpuProject [d_date_sk#492]
      :           :                                :                       +- GpuCoalesceBatches targetsize(1073741824)
      :           :                                :                          +- GpuFilter ((gpuisnotnull(d_year#498) AND (d_year#498 = 2000)) AND gpuisnotnull(d_date_sk#492))
      :           :                                :                             +- GpuFileGpuScan parquet [d_date_sk#492,d_year#498] Batched: true, DataFilters: [isnotnull(d_year#498), (d_year#498 = 2000), isnotnull(d_date_sk#492)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
      :           :                                +- GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#439]
      :           :                                   +- GpuProject [d_date_sk#492]
      :           :                                      +- GpuCoalesceBatches targetsize(1073741824)
      :           :                                         +- GpuFilter ((gpuisnotnull(d_year#498) AND (d_year#498 = 2000)) AND gpuisnotnull(d_date_sk#492))
      :           :                                            +- GpuFileGpuScan parquet [d_date_sk#492,d_year#498] Batched: true, DataFilters: [isnotnull(d_year#498), (d_year#498 = 2000), isnotnull(d_date_sk#492)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
      :           +- GpuCoalesceBatches RequireSingleBatch
      :              +- GpuFilter gpuisnotnull((avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857)
      :                 +- GpuRowToColumnar targetsize(1073741824)
      :                    +- *(5) HashAggregate(keys=[ctr_store_sk#852], functions=[avg(ctr_total_return#853)], output=[(avg(ctr_total_return) * CAST(1.2 AS DOUBLE))#857, ctr_store_sk#852#859])
      :                       +- Exchange hashpartitioning(ctr_store_sk#852, 200), ENSURE_REQUIREMENTS, [id=#739]
      :                          +- *(4) HashAggregate(keys=[ctr_store_sk#852], functions=[partial_avg(ctr_total_return#853)], output=[ctr_store_sk#852, sum#866, count#867L])
      :                             +- *(4) HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[sum(sr_return_amt#186)], output=[ctr_store_sk#852, ctr_total_return#853])
      :                                +- Exchange hashpartitioning(sr_customer_sk#178, sr_store_sk#182, 200), ENSURE_REQUIREMENTS, [id=#734]
      :                                   +- *(3) HashAggregate(keys=[sr_customer_sk#178, sr_store_sk#182], functions=[partial_sum(sr_return_amt#186)], output=[sr_customer_sk#178, sr_store_sk#182, sum#869])
      :                                      +- GpuColumnarToRow false
      :                                         +- GpuProject [sr_customer_sk#178, sr_store_sk#182, sr_return_amt#186]
      :                                            +- GpuBroadcastHashJoin [sr_returned_date_sk#195], [d_date_sk#492], Inner, GpuBuildRight
      :                                               :- GpuCoalesceBatches targetsize(1073741824)
      :                                               :  +- GpuFilter gpuisnotnull(sr_store_sk#182)
      :                                               :     +- GpuFileGpuScan parquet [sr_customer_sk#178,sr_store_sk#182,sr_return_amt#186,sr_returned_date_sk#195] Batched: true, DataFilters: [isnotnull(sr_store_sk#182)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [isnotnull(sr_returned_date_sk#195), dynamicpruningexpression(sr_returned_date_sk#195 IN dynamicp..., PushedFilters: [IsNotNull(sr_store_sk)], ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:double>
      :                                               :           +- ReusedSubquery SubqueryBroadcast dynamicpruning#860, 0, [d_date_sk#492], [id=#222]
      :                                               +- ReusedExchange [d_date_sk#492], GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#439]
      +- GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#478]
         +- GpuProject [s_store_sk#664]
            +- GpuCoalesceBatches targetsize(1073741824)
               +- GpuFilter ((gpuisnotnull(s_state#688) AND (s_state#688 = TN)) AND gpuisnotnull(s_store_sk#664))
                  +- GpuFileGpuScan parquet [s_store_sk#664,s_state#688] Batched: true, DataFilters: [isnotnull(s_state#688), (s_state#688 = TN), isnotnull(s_store_sk#664)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=fa..., PartitionFilters: [], PushedFilters: [IsNotNull(s_state), EqualTo(s_state,TN), IsNotNull(s_store_sk)], ReadSchema: struct<s_store_sk:int,s_state:string>

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at com.nvidia.spark.rapids.shims.spark312.SparkBaseShims.attachTreeIfSupported(SparkBaseShims.scala:818)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.doExecuteColumnar(GpuShuffleExchangeExec.scala:224)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at com.nvidia.spark.rapids.GpuCoalesceBatches.doExecuteColumnar(GpuCoalesceBatches.scala:575)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at com.nvidia.spark.rapids.GpuShuffledHashJoinBase.doExecuteColumnar(GpuShuffledHashJoinBase.scala:74)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at com.nvidia.spark.rapids.GpuProjectExec.doExecuteColumnar(basicPhysicalOperators.scala:145)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at com.nvidia.spark.rapids.GpuTopN.doExecuteColumnar(limit.scala:292)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.inputBatchRDD$lzycompute(GpuShuffleExchangeExec.scala:194)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.inputBatchRDD(GpuShuffleExchangeExec.scala:194)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.shuffleDependencyColumnar$lzycompute(GpuShuffleExchangeExec.scala:204)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.shuffleDependencyColumnar(GpuShuffleExchangeExec.scala:202)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.$anonfun$doExecuteColumnar$1(GpuShuffleExchangeExec.scala:227)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 102 more
Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: ClassNotFound with classloader: scala.tools.nsc.interpreter.IMain$TranslatingClassLoader@46fb0c33
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:206)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.executeColumnarBroadcast(GpuBroadcastExchangeExec.scala:413)
    at com.nvidia.spark.rapids.shims.spark312.GpuBroadcastHashJoinExec.doExecuteColumnar(GpuBroadcastHashJoinExec.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at com.nvidia.spark.rapids.GpuProjectExec.doExecuteColumnar(basicPhysicalOperators.scala:145)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.inputBatchRDD$lzycompute(GpuShuffleExchangeExec.scala:194)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.inputBatchRDD(GpuShuffleExchangeExec.scala:194)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.shuffleDependencyColumnar$lzycompute(GpuShuffleExchangeExec.scala:204)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.shuffleDependencyColumnar(GpuShuffleExchangeExec.scala:202)
    at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.$anonfun$doExecuteColumnar$1(GpuShuffleExchangeExec.scala:227)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 139 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ClassNotFound with classloader: scala.tools.nsc.interpreter.IMain$TranslatingClassLoader@46fb0c33
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$anon$1.$anonfun$call$2(GpuBroadcastExchangeExec.scala:306)
    at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
    at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.withResource(GpuBroadcastExchangeExec.scala:252)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$anon$1.$anonfun$call$1(GpuBroadcastExchangeExec.scala:300)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:139)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:137)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$anon$1.call(GpuBroadcastExchangeExec.scala:293)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$anon$1.call(GpuBroadcastExchangeExec.scala:289)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
tgravescs commented 3 years ago

mark as p1 to analyze

revans2 commented 3 years ago

This is really mind boggling SparkException is in the same maven package and hence the same jar as the code that is trying to load it. It looks like the threads context class loader must have really been messed up some how. I'll try and reproduce this at a smaller scale and see if I can make it work.

gerashegalov commented 3 years ago

REPL has been more difficult to get to work in the prototype phase. Typically either the classloader was not set up in time before deeserialization or something is shimed while it should not be.

rongou commented 3 years ago

Hmm, I think both this issue and #3468 are caused by using spark shell. Switching to spark submit seems to work.

tgravescs commented 3 years ago

I don't see how #3468 works with spark-submit... that class isn't exposed so would expect failures, so perhaps you just means work for this issue?

rongou commented 3 years ago

If I switch to spark-submit, the query runs to completion, even after adding back GpuKryoRegistrator. ¯_(ツ)_/¯

Here is the command:

/opt/spark/bin/spark-submit\
 --master spark://127.0.0.1:7077\
 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer\
 --conf spark.kryoserializer.buffer=128m\
 --conf spark.kryo.registrator=com.nvidia.spark.rapids.GpuKryoRegistrator\
 --conf spark.locality.wait=0s\
 --conf spark.sql.files.maxPartitionBytes=1g\
 --conf spark.sql.shuffle.partitions=200\
 --conf spark.sql.adaptive.enabled=true\
 --conf spark.shuffle.manager=com.nvidia.spark.rapids.spark312.RapidsShuffleManager\
 --conf spark.shuffle.service.enabled=false\
 --conf spark.dynamicAllocation.enabled=false\
 --conf spark.sql.broadcastTimeout=600\
 --conf spark.plugins=com.nvidia.spark.SQLPlugin\
 --conf spark.rapids.cudfVersionOverride=true\
 --conf spark.rapids.sql.concurrentGpuTasks=1\
 --conf spark.rapids.memory.host.spillStorageSize=32G\
 --conf spark.rapids.memory.pinnedPool.size=8G\
 --conf spark.rapids.sql.batchSizeBytes=1g\
 --conf spark.rapids.memory.gpu.direct.storage.spill.enabled=false\
 --conf spark.rapids.memory.gpu.direct.storage.spill.useHostMemory=false\
 --conf spark.rapids.memory.gpu.direct.storage.spill.alignedIO=false\
 --conf spark.rapids.memory.gpu.direct.storage.spill.alignmentThreshold=8m\
 --conf spark.rapids.memory.gpu.unspill.enabled=false\
 --conf spark.rapids.shuffle.transport.enabled=true\
 --conf spark.executorEnv.UCX_ERROR_SIGNALS=\
 --conf spark.executorEnv.UCX_MEMTYPE_CACHE=n\
 --conf spark.executorEnv.UCX_IB_RX_QUEUE_LEN=1024\
 --conf spark.executorEnv.UCX_TLS=cuda_copy,cuda_ipc,rc,tcp\
 --conf spark.executorEnv.UCX_RNDV_SCHEME=put_zcopy\
 --conf spark.executorEnv.UCX_MAX_RNDV_RAILS=1\
 --conf spark.rapids.shuffle.maxMetadataSize=512K\
 --conf spark.rapids.shuffle.ucx.bounceBuffers.size=8M\
 --conf spark.driver.memory=10G\
 --conf spark.driver.maxResultSize=0\
 --conf spark.driver.extraJavaOptions=-Dai.rapids.cudf.nvtx.enabled=false\
 --conf spark.executor.extraClassPath=/opt/rapids/cudf.jar:/opt/rapids/rapids-4-spark.jar\
 --conf spark.executor.extraJavaOptions=-Dai.rapids.cudf.nvtx.enabled=false\
 --conf spark.executor.instances=1\
 --conf spark.executor.cores=24\
 --conf spark.executor.memory=64G\
 --conf spark.executor.resource.gpu.amount=1\
 --conf spark.task.cpus=1\
 --conf spark.task.resource.gpu.amount=0.0416\
 --jars /opt/rapids/cudf.jar,/opt/rapids/rapids-4-spark.jar\
 --class com.nvidia.spark.rapids.tests.BenchmarkRunner\
 /opt/rapids/rapids-4-spark-benchmarks.jar\
 --benchmark tpcds\
 --query q1\
 --input /opt/data/tpcds/sf1000-parquet/useDecimal=false,useDate=true,filterNull=false\
 --input-format parquet\
 --summary-file-prefix tpcds-q1-gpu\
 --iterations 1
gerashegalov commented 3 years ago

thanks @rongou, this helps narrow it down.

revans2 commented 3 years ago

I was able to reproduce something like this locally, but this is a different class that cannot be loaded.

org.apache.spark.SparkException: Job aborted due to stage failure: ClassNotFound with classloader: scala.tools.nsc.interpreter.IMain$TranslatingClassLoader@8aeab9e
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$anon$1.$anonfun$call$2(GpuBroadcastExchangeExec.scala:307)
    at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
    at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase.withResource(GpuBroadcastExchangeExec.scala:253)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$anon$1.$anonfun$call$1(GpuBroadcastExchangeExec.scala:301)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:139)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:137)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$anon$1.call(GpuBroadcastExchangeExec.scala:294)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$anon$1.call(GpuBroadcastExchangeExec.scala:290)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I should now be able to do some debugging.

revans2 commented 3 years ago

So Spark was not including the actual class not found error in the error message. I hacked up Spark to do it and I found...

Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.rapids.execution.SerializeBatchDeserializeHostBuffer
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:72)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:348)
  at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
  at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2048)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
  at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
  at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
  at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:103)
  at org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(TaskResultGetter.scala:75)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
  at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:63)
  ... 3 more

So my guess right now is that the context class loader is not set properly for the driver thread that is trying to deserialize the broadcast data on the driver side. I am going to have to dig in and see how all of this is set.

revans2 commented 3 years ago

I think I found a fix. The issue was that the tmpClassLoader was not good enough. It works just fine when you go through the front door to our plugin. But java serialization does not go through that front door, so it could not find what it needed (the class to deserialize into as a part of a broadcast). I was able to find a way to update the scala repl class loader on the driver side similar to how we update the ExecutorClassLoader on the executor.