NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
798 stars 232 forks source link

[BUG] NDS running hits DPP error on Databricks 10.4 when enable Alluxio cache. #7933

Closed res-life closed 1 year ago

res-life commented 1 year ago

Describe the bug NDS running hits DPP error on Databricks 10.4 when enable Alluxio cache.

: org.apache.spark.SparkUnsupportedOperationException: Cannot evaluate expression: dynamicpruning#98816 98815
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:81)
...
    at com.nvidia.spark.rapids.AlluxioUtils$.replacePathIfNeeded(AlluxioUtils.scala:536)

Steps/Code to reproduce bug Create a Databrick cluster. Run NDS test against the cluster

Environment details Databricks Runtime Version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12) How to setup Databrick cluster: https://github.com/NVIDIA/spark-rapids-container/tree/dev/Databricks

Additional context Seems DB upgraded recently, now the tests failed on DPP. Nothing is changed, only restarted the cluster, so the DB is the suspect.

Details log

====== Creating TempView for table web_sales ======
Time taken: 3279 millis for table web_sales
====== Creating TempView for table catalog_sales ======
Time taken: 3279 millis for table catalog_sales
====== Creating TempView for table store_sales ======
Time taken: 3183 millis for table store_sales
====== Run query96 ======
Not found com.nvidia.spark.rapids.listener.Manager 'JavaPackage' object is not callable
Time taken: [8559] millis for query96
====== Run query7 ======
Not found com.nvidia.spark.rapids.listener.Manager 'JavaPackage' object is not callable
An error occurred while calling o3349.save.
: org.apache.spark.SparkUnsupportedOperationException: Cannot evaluate expression: dynamicpruning#98816 98815
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:81)
    at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:446)
    at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:445)
    at org.apache.spark.sql.catalyst.expressions.DynamicPruningSubquery.eval(DynamicPruning.scala:49)
    at org.apache.spark.sql.catalyst.expressions.And.eval(predicates.scala:769)
    at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:68)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.$anonfun$prunePartitions$4(PartitioningAwareFileIndex.scala:214)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.$anonfun$prunePartitions$4$adapted(PartitioningAwareFileIndex.scala:214)
    at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.prunePartitions(PartitioningAwareFileIndex.scala:214)
    at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listFiles(PartitioningAwareFileIndex.scala:114)
    at com.nvidia.spark.rapids.AlluxioUtils$.replacePathIfNeeded(AlluxioUtils.scala:536)
    at com.nvidia.spark.rapids.shims.FileSourceScanExecMeta.convertToGpu(FileSourceScanExecMeta.scala:143)
    at com.nvidia.spark.rapids.shims.FileSourceScanExecMeta.convertToGpu(FileSourceScanExecMeta.scala:29)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at com.nvidia.spark.rapids.GpuOverrides$$anon$203.convertToGpu(GpuOverrides.scala:3810)
    at com.nvidia.spark.rapids.GpuOverrides$$anon$203.convertToGpu(GpuOverrides.scala:3808)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.$anonfun$convertToGpu$3(GpuBroadcastHashJoinExec.scala:39)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.convertToGpu(GpuBroadcastHashJoinExec.scala:39)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.convertToGpu(GpuBroadcastHashJoinExec.scala:26)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at com.nvidia.spark.rapids.GpuProjectExecMeta.convertToGpu(basicPhysicalOperators.scala:50)
    at com.nvidia.spark.rapids.GpuProjectExecMeta.convertToGpu(basicPhysicalOperators.scala:41)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.$anonfun$convertToGpu$3(GpuBroadcastHashJoinExec.scala:39)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.convertToGpu(GpuBroadcastHashJoinExec.scala:39)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.convertToGpu(GpuBroadcastHashJoinExec.scala:26)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at com.nvidia.spark.rapids.GpuProjectExecMeta.convertToGpu(basicPhysicalOperators.scala:50)
    at com.nvidia.spark.rapids.GpuProjectExecMeta.convertToGpu(basicPhysicalOperators.scala:41)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.$anonfun$convertToGpu$3(GpuBroadcastHashJoinExec.scala:39)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.convertToGpu(GpuBroadcastHashJoinExec.scala:39)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.convertToGpu(GpuBroadcastHashJoinExec.scala:26)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at com.nvidia.spark.rapids.GpuProjectExecMeta.convertToGpu(basicPhysicalOperators.scala:50)
    at com.nvidia.spark.rapids.GpuProjectExecMeta.convertToGpu(basicPhysicalOperators.scala:41)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.$anonfun$convertToGpu$3(GpuBroadcastHashJoinExec.scala:39)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.convertToGpu(GpuBroadcastHashJoinExec.scala:39)
    at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.convertToGpu(GpuBroadcastHashJoinExec.scala:26)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at com.nvidia.spark.rapids.GpuProjectExecMeta.convertToGpu(basicPhysicalOperators.scala:50)
    at com.nvidia.spark.rapids.GpuProjectExecMeta.convertToGpu(basicPhysicalOperators.scala:41)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at com.nvidia.spark.rapids.GpuBaseAggregateMeta.convertToGpu(aggregate.scala:973)
    at com.nvidia.spark.rapids.GpuBaseAggregateMeta.convertToGpu(aggregate.scala:828)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at org.apache.spark.sql.rapids.execution.GpuShuffleMetaBase.convertToGpu(GpuShuffleExchangeExecBase.scala:117)
    at org.apache.spark.sql.rapids.execution.GpuShuffleMetaBase.convertToGpu(GpuShuffleExchangeExecBase.scala:43)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at com.nvidia.spark.rapids.GpuBaseAggregateMeta.convertToGpu(aggregate.scala:973)
    at com.nvidia.spark.rapids.GpuBaseAggregateMeta.convertToGpu(aggregate.scala:828)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at com.nvidia.spark.rapids.GpuOverrides$$anon$200.convertToGpu(GpuOverrides.scala:3767)
    at com.nvidia.spark.rapids.GpuOverrides$$anon$200.convertToGpu(GpuOverrides.scala:3741)
    at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:761)
    at com.nvidia.spark.rapids.GpuOverrides$.com$nvidia$spark$rapids$GpuOverrides$$doConvertPlan(GpuOverrides.scala:4073)
    at com.nvidia.spark.rapids.GpuOverrides.applyOverrides(GpuOverrides.scala:4412)
    at com.nvidia.spark.rapids.GpuOverrides.$anonfun$applyWithContext$3(GpuOverrides.scala:4278)
    at com.nvidia.spark.rapids.GpuOverrides$.logDuration(GpuOverrides.scala:452)
    at com.nvidia.spark.rapids.GpuOverrides.$anonfun$applyWithContext$1(GpuOverrides.scala:4275)
    at com.nvidia.spark.rapids.GpuOverrideUtil$.$anonfun$tryOverride$1(GpuOverrides.scala:4241)
    at com.nvidia.spark.rapids.GpuOverrides.applyWithContext(GpuOverrides.scala:4295)
    at com.nvidia.spark.rapids.GpuQueryStagePrepOverrides.$anonfun$apply$1(GpuOverrides.scala:4258)
    at com.nvidia.spark.rapids.GpuOverrideUtil$.$anonfun$tryOverride$1(GpuOverrides.scala:4241)
    at com.nvidia.spark.rapids.GpuQueryStagePrepOverrides.apply(GpuOverrides.scala:4261)
    at com.nvidia.spark.rapids.GpuQueryStagePrepOverrides.apply(GpuOverrides.scala:4254)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$3(AdaptiveSparkPlanExec.scala:1089)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:1089)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.applyPhysicalRules(AdaptiveSparkPlanExec.scala:1088)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$initialPlan$1(AdaptiveSparkPlanExec.scala:293)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.<init>(AdaptiveSparkPlanExec.scala:292)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:83)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:48)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:63)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:48)
    at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:42)
    at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$2(QueryExecution.scala:596)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:596)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.execution.QueryExecution$.prepareForExecution(QueryExecution.scala:595)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:232)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:268)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:265)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:265)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:228)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:222)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:298)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:361)
    at org.apache.spark.sql.execution.QueryExecution.explainStringLocal(QueryExecution.scala:325)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:202)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:160)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:156)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:590)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:168)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:590)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:566)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:156)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:156)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:141)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:132)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:186)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:959)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:427)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:396)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:250)
    at sun.reflect.GeneratedMethodAccessor467.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:750)

Query7 is:

select [_LIMITB] i_item_id, 
        avg(ss_quantity) agg1,
        avg(ss_list_price) agg2,
        avg(ss_coupon_amt) agg3,
        avg(ss_sales_price) agg4 
 from store_sales, customer_demographics, date_dim, item, promotion
 where ss_sold_date_sk = d_date_sk and
       ss_item_sk = i_item_sk and
       ss_cdemo_sk = cd_demo_sk and
       ss_promo_sk = p_promo_sk and
       cd_gender = '[GEN]' and 
       cd_marital_status = '[MS]' and
       cd_education_status = '[ES]' and
       (p_channel_email = 'N' or p_channel_event = 'N') and
       d_year = [YEAR] 
 group by i_item_id
 order by i_item_id
 [_LIMITC];

If can't reproduce it, reach to me.

tgravescs commented 1 year ago

does this require specific configuration, like using CONVERT_TIME, or does it also happen with TASK_TIME?

NVnavkumar commented 1 year ago

I suspect that this logic (in AlluxioUtils.scala) is failing for some reason on Databricks 10.4. It should be filtering out the DynamicPruningExpression in this case. Since Databricks evaluates dynamicpruningexpressions differently than Apache Spark, there might be something missing in the Databricks case at this point:

          // With the base Spark FileIndex type we don't know how to modify it to
          // just replace the paths so we have to try to recompute.
          def isDynamicPruningFilter(e: Expression): Boolean =
            e.find(_.isInstanceOf[PlanExpression[_]]).isDefined

          val partitionDirs = relation.location.listFiles(
            partitionFilters.filterNot(isDynamicPruningFilter), dataFilters)
NVnavkumar commented 1 year ago

Figured out how to reproduce this:

First set spark.rapids.alluxio.replacement.algo to CONVERT_TIME. Then, either tune spark.rapids.alluxio.large.file.threshold or just set spark.rapids.alluxio.slow.disk to false. Basically you need to not read directly from S3 during the scan which includes the dynamicpruningexpression.

res-life commented 1 year ago

Seems did not ever test CONVERT_TIME algo + large file feature before this issue was filed.

tgravescs commented 1 year ago

@res-life can you confirm you needed to use CONVERT_TIME? One of our customers reported failures (though we don't have logs) and they were not using CONVERT_TIME

res-life commented 1 year ago

Just tested, The CONVERT_TIME algorithm runs into error. The TASK_TIME algorithm (default) is OK.

Details: Spark config:

spark.rapids.alluxio.slow.disk false
spark.task.resource.gpu.amount 0.125
spark.shuffle.manager com.nvidia.spark.rapids.spark321db.RapidsShuffleManager
spark.hadoop.fs.s3a.access.key {{secrets/chongg-s3/access_key}}
spark.plugins com.nvidia.spark.SQLPlugin
spark.locality.wait 3s
spark.rapids.alluxio.automount.enabled true
spark.rapids.memory.pinnedPool.size 4G
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.secret.key {{secrets/chongg-s3/secret_access_key}}
spark.sql.files.maxPartitionBytes 1G
spark.rapids.sql.multiThreadedRead.numThreads 100
spark.rapids.sql.concurrentGpuTasks 2
spark.rapids.alluxio.home /opt/alluxio-2.9.0

Environment variables

ENABLE_ALLUXIO=1