apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
615 stars 113 forks source link

TPCDS query 91 throws java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate exception #182

Closed sagarlakshmipathy closed 1 month ago

sagarlakshmipathy commented 3 months ago

Describe the bug

I am running TPCDS on Comet+Spark. Esp. Query 91 throws this error

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.comet.CometNativeException: General execution error with reason java.lang.RuntimeException: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.lang.ClassNotFoundException: org.apache.comet.parquet.ParquetFilters$$anon$1.
  at org.apache.comet.Native.executePlan(Native Method)
  at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:71)
  at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:123)
  at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:138)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
  at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
  at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
  at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
  at org.apache.spark.scheduler.Task.run(Task.scala:139)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)

I see that you guys have run the tpcds benchmarking tests, but I couldn't find the tpcds queries in resources dir in the repo. Can you point me to the query that worked?

Steps to reproduce

Start Spark Shell

./spark-3.4.1-bin-hadoop3/bin/spark-shell     \
    --master yarn \
    --deploy-mode client \
    --driver-memory 19g     \
    --executor-memory 19g       \
    --executor-cores 5     \
    --num-executors 29     \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer     \
    --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension,org.apache.comet.CometSparkSessionExtensions     \
    --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \
    --conf spark.sql.catalogImplementation=in-memory     \
    --conf spark.ui.proxyBase="" \
    --conf 'spark.eventLog.enabled=true' --conf 'spark.eventLog.dir=hdfs:///var/log/spark/apps' \
    --jars /home/hadoop/hudi-benchmarks-0.1-SNAPSHOT.jar,/home/hadoop/hudi-spark3.4-bundle_2.12-0.14.1.jar,/home/hadoop/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar \
    --packages org.apache.hadoop:hadoop-aws:3.2.4 \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.all.enabled=true

Run TPCDS Query 91

--TPC-DS Q91
select  
        cc_call_center_id Call_Center,
        cc_name Call_Center_Name,
        cc_manager Manager,
        sum(cr_net_loss) Returns_Loss
from
        call_center,
        catalog_returns,
        date_dim,
        customer,
        customer_address,
        customer_demographics,
        household_demographics
where
        cr_call_center_sk       = cc_call_center_sk
and     cr_returned_date_sk     = d_date_sk
and     cr_returning_customer_sk= c_customer_sk
and     cd_demo_sk              = c_current_cdemo_sk
and     hd_demo_sk              = c_current_hdemo_sk
and     ca_address_sk           = c_current_addr_sk
and     d_year                  = 2002 
and     d_moy                   = 11
and     ( (cd_marital_status       = 'M' and cd_education_status     = 'Unknown')
        or(cd_marital_status       = 'W' and cd_education_status     = 'Advanced Degree'))
and     hd_buy_potential like 'Unknown%'
and     ca_gmt_offset           = -6
group by cc_call_center_id,cc_name,cc_manager,cd_marital_status,cd_education_status
order by sum(cr_net_loss) desc;

Expected behavior

Query to pass successfully

Additional context

No response

viirya commented 3 months ago

I see that you guys have run the tpcds benchmarking tests, but I couldn't find the tpcds queries in resources dir in the repo. Can you point me to the query that worked?

We reuse the tpcds queries from Spark resource, that's why you don't see tpcds queries in Comet resources dir.

sagarlakshmipathy commented 3 months ago

I checked the query 91 in spark repo and it doesn't look vastly different except for some filters in WHERE clause.

https://github.com/apache/spark/blob/76b1c122cb7d77e8f175b25b935b9296a669d5d8/sql/core/src/test/resources/tpcds/q91.sql

SELECT
  cc_call_center_id Call_Center,
  cc_name Call_Center_Name,
  cc_manager Manager,
  sum(cr_net_loss) Returns_Loss
FROM
  call_center, catalog_returns, date_dim, customer, customer_address,
  customer_demographics, household_demographics
WHERE
  cr_call_center_sk = cc_call_center_sk
    AND cr_returned_date_sk = d_date_sk
    AND cr_returning_customer_sk = c_customer_sk
    AND cd_demo_sk = c_current_cdemo_sk
    AND hd_demo_sk = c_current_hdemo_sk
    AND ca_address_sk = c_current_addr_sk
    AND d_year = 1998
    AND d_moy = 11
    AND ((cd_marital_status = 'M' AND cd_education_status = 'Unknown')
    OR (cd_marital_status = 'W' AND cd_education_status = 'Advanced Degree'))
    AND hd_buy_potential LIKE 'Unknown%'
    AND ca_gmt_offset = -7
GROUP BY cc_call_center_id, cc_name, cc_manager, cd_marital_status, cd_education_status
ORDER BY sum(cr_net_loss) DESC

I wonder why comet doesn't like my query though

sagarlakshmipathy commented 3 months ago

Ran with the query in spark's repo as well. It still failed with the same error.

FWIW here's the entire log.

====================================================================================================
RUNNING: Query # 91 (round 1) (1 statements)
----------------------------------------------------------------------------------------------------
24/03/10 01:07:26 WARN QueryPlanSerde: Comet native execution is disabled due to: unsupported Spark expression: 'might_contain(Subquery subquery#212, [id=#315], xxhash64(c_current_addr_sk#141, 42))' of class 'org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain
24/03/10 01:07:26 WARN QueryPlanSerde: Comet native execution is disabled due to: unsupported Spark expression: 'might_contain(Subquery subquery#215, [id=#352], xxhash64(c_current_cdemo_sk#139, 42))' of class 'org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain
24/03/10 01:07:26 WARN QueryPlanSerde: Comet native execution is disabled due to: unsupported Spark expression: 'might_contain(Subquery subquery#218, [id=#389], xxhash64(c_current_hdemo_sk#140, 42))' of class 'org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain
24/03/10 01:07:26 WARN QueryPlanSerde: Comet native execution is disabled due to: unsupported Spark expression: 'might_contain(Subquery subquery#212, [id=#315], xxhash64(c_current_addr_sk#141, 42))' of class 'org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain
24/03/10 01:07:26 WARN QueryPlanSerde: Comet native execution is disabled due to: unsupported Spark expression: 'might_contain(Subquery subquery#215, [id=#352], xxhash64(c_current_cdemo_sk#139, 42))' of class 'org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain
24/03/10 01:07:26 WARN QueryPlanSerde: Comet native execution is disabled due to: unsupported Spark expression: 'might_contain(Subquery subquery#218, [id=#389], xxhash64(c_current_hdemo_sk#140, 42))' of class 'org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain
24/03/10 01:07:28 WARN TaskSetManager: Lost task 0.0 in stage 108.0 (TID 2899) (ip-10-0-66-142.us-west-2.compute.internal executor 14): org.apache.comet.CometNativeException: General execution error with reason java.lang.RuntimeException: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.lang.ClassNotFoundException: org.apache.comet.parquet.ParquetFilters$$anon$1.
    at org.apache.comet.Native.executePlan(Native Method)
    at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:71)
    at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:123)
    at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:138)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

24/03/10 01:07:28 WARN TaskSetManager: Lost task 0.0 in stage 104.0 (TID 2865) (ip-10-0-100-128.us-west-2.compute.internal executor 8): org.apache.comet.CometNativeException: General execution error with reason java.lang.RuntimeException: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.lang.ClassNotFoundException: org.apache.comet.parquet.ParquetFilters$$anon$1.
    at org.apache.comet.Native.executePlan(Native Method)
    at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:71)
    at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:123)
    at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:138)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

24/03/10 01:07:29 ERROR TaskSetManager: Task 0 in stage 104.0 failed 4 times; aborting job
24/03/10 01:07:29 ERROR TaskSetManager: Task 0 in stage 108.0 failed 4 times; aborting job

Failed to process:
----------------------------------------------------------------------------------------------------
--TPC-DS Q91
SELECT
  cc_call_center_id Call_Center,
  cc_name Call_Center_Name,
  cc_manager Manager,
  sum(cr_net_loss) Returns_Loss
FROM
  call_center, catalog_returns, date_dim, customer, customer_address,
  customer_demographics, household_demographics
WHERE
  cr_call_center_sk = cc_call_center_sk
    AND cr_returned_date_sk = d_date_sk
    AND cr_returning_customer_sk = c_customer_sk
    AND cd_demo_sk = c_current_cdemo_sk
    AND hd_demo_sk = c_current_hdemo_sk
    AND ca_address_sk = c_current_addr_sk
    AND d_year = 1998
    AND d_moy = 11
    AND ((cd_marital_status = 'M' AND cd_education_status = 'Unknown')
    OR (cd_marital_status = 'W' AND cd_education_status = 'Advanced Degree'))
    AND hd_buy_potential LIKE 'Unknown%'
    AND ca_gmt_offset = -7
GROUP BY cc_call_center_id, cc_name, cc_manager, cd_marital_status, cd_education_status
ORDER BY sum(cr_net_loss) DESC
----------------------------------------------------------------------------------------------------
Exception thrown in awaitResult: 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:339)
org.apache.spark.sql.execution.SubqueryExec.executeCollect(basicPhysicalOperators.scala:861)
org.apache.spark.sql.execution.ScalarSubquery.updateResult(subquery.scala:82)
org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1(SparkPlan.scala:276)
org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1$adapted(SparkPlan.scala:275)
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
org.apache.spark.sql.execution.SparkPlan.waitForSubqueries(SparkPlan.scala:275)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:245)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92)
org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92)
org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:216)
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:660)
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:135)
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:135)
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:181)
org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:181)
org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:183)
org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:266)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:264)
scala.collection.Iterator.foreach(Iterator.scala:943)
scala.collection.Iterator.foreach$(Iterator.scala:943)
scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
scala.collection.IterableLike.foreach(IterableLike.scala:74)
scala.collection.IterableLike.foreach$(IterableLike.scala:73)
scala.collection.AbstractIterable.foreach(Iterable.scala:56)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:264)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:236)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:381)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3418)
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
org.apache.spark.sql.Dataset.collect(Dataset.scala:3418)
org.apache.hudi.benchmarks.TPCDS.$anonfun$runSqlInternal$6(TPCDS.scala:318)
org.apache.hudi.benchmarks.TPCDS.$anonfun$runSqlInternal$6$adapted(TPCDS.scala:317)
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
org.apache.hudi.benchmarks.TPCDS.$anonfun$runSqlInternal$5(TPCDS.scala:317)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.apache.spark.sql.SparkSession.time(SparkSession.scala:731)
org.apache.hudi.benchmarks.TPCDS.runSqlInternal(TPCDS.scala:317)
org.apache.hudi.benchmarks.TPCDS.$anonfun$runQueryNums$2(TPCDS.scala:286)
scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
scala.collection.immutable.List.foreach(List.scala:431)
org.apache.hudi.benchmarks.TPCDS.$anonfun$runQueryNums$1(TPCDS.scala:284)
scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
org.apache.hudi.benchmarks.TPCDS.runQueryNums(TPCDS.scala:283)
$line22.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:89)
$line22.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:95)
$line22.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:97)
$line22.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:99)
$line22.$read$$iw$$iw$$iw$$iw.<init>(<console>:101)
$line22.$read$$iw$$iw$$iw.<init>(<console>:103)
$line22.$read$$iw$$iw.<init>(<console>:105)
$line22.$read$$iw.<init>(<console>:107)
$line22.$read.<init>(<console>:109)
$line22.$read$.<init>(<console>:113)
$line22.$read$.<clinit>(<console>)
$line22.$eval$.$print$lzycompute(<console>:7)
$line22.$eval$.$print(<console>:6)
$line22.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:865)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:883)
scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:733)
scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:435)
scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:456)
org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:239)
org.apache.spark.repl.Main$.doMain(Main.scala:78)
org.apache.spark.repl.Main$.main(Main.scala:58)
org.apache.spark.repl.Main.main(Main.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
"

org.apache.spark.SparkException: Exception thrown in awaitResult:
  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:339)
  at org.apache.spark.sql.execution.SubqueryExec.executeCollect(basicPhysicalOperators.scala:861)
  at org.apache.spark.sql.execution.ScalarSubquery.updateResult(subquery.scala:82)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1(SparkPlan.scala:276)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1$adapted(SparkPlan.scala:275)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.sql.execution.SparkPlan.waitForSubqueries(SparkPlan.scala:275)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:245)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
  at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92)
  at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92)
  at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:216)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:660)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:135)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:135)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
  at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:181)
  at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:181)
  at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:183)
  at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:266)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:264)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:264)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:236)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:381)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3418)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
  at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:3418)
  at org.apache.hudi.benchmarks.TPCDS.$anonfun$runSqlInternal$6(TPCDS.scala:318)
  at org.apache.hudi.benchmarks.TPCDS.$anonfun$runSqlInternal$6$adapted(TPCDS.scala:317)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at org.apache.hudi.benchmarks.TPCDS.$anonfun$runSqlInternal$5(TPCDS.scala:317)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.SparkSession.time(SparkSession.scala:731)
  at org.apache.hudi.benchmarks.TPCDS.runSqlInternal(TPCDS.scala:317)
  at org.apache.hudi.benchmarks.TPCDS.$anonfun$runQueryNums$2(TPCDS.scala:286)
  at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at org.apache.hudi.benchmarks.TPCDS.$anonfun$runQueryNums$1(TPCDS.scala:284)
  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
  at org.apache.hudi.benchmarks.TPCDS.runQueryNums(TPCDS.scala:283)
  ... 109 elided
Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 104.0 failed 4 times, most recent failure: Lost task 0.3 in stage 104.0 (TID 2907) (ip-10-0-108-73.us-west-2.compute.internal executor 17): org.apache.comet.CometNativeException: General execution error with reason java.lang.RuntimeException: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.lang.ClassNotFoundException: org.apache.comet.parquet.ParquetFilters$$anon$1.
    at org.apache.comet.Native.executePlan(Native Method)
    at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:71)
    at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:123)
    at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:138)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:331)
  ... 185 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 104.0 failed 4 times, most recent failure: Lost task 0.3 in stage 104.0 (TID 2907) (ip-10-0-108-73.us-west-2.compute.internal executor 17): org.apache.comet.CometNativeException: General execution error with reason java.lang.RuntimeException: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.lang.ClassNotFoundException: org.apache.comet.parquet.ParquetFilters$$anon$1.
    at org.apache.comet.Native.executePlan(Native Method)
    at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:71)
    at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:123)
    at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:138)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.comet.CometNativeException: General execution error with reason java.lang.RuntimeException: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.lang.ClassNotFoundException: org.apache.comet.parquet.ParquetFilters$$anon$1.
  at org.apache.comet.Native.executePlan(Native Method)
  at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:71)
  at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:123)
  at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:138)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
  at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
  at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
  at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
  at org.apache.spark.scheduler.Task.run(Task.scala:139)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)
sunchao commented 3 months ago

@sagarlakshmipathy This is interesting. How did you run this? If ClassNotFoundException: org.apache.comet.parquet.ParquetFilters$$anon$1. is the real issue here, then you should be able to see other TPC-DS queries fail with the same reason too.

Have you checked $SPARK_HOME/jars and see if org.apache.comet.parquet.ParquetFilters$$anon$1 is there in the classpath? We do run TPC-DS queries through CometTPCDSQuerySuite and I don't remember seeing this.

advancedxy commented 3 months ago

Could you try to running Comet without hudi bundled jars, just with the vanilla Spark 3.4?

It might be there's incompatible parquet classes(such as parquet from different versions) and makes org.apache.comet.parquet.ParquetFilters$$anon$1 failed to be loaded by the JVM.

andygrove commented 1 month ago

I just ran into the same issue with TPC-H q2 on my MBP. I will debug and add some notes here once I know more.

andygrove commented 1 month ago

My repro:

Using latest commit from main (1a04805be5e0f3a634521a821b24c0e0efb43d31) I ran make release.

Started Spark shell with:

$SPARK_HOME/bin/spark-shell \
    --jars spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.all.enabled=true \
    --conf spark.comet.explainFallback.enabled=true

Ran this code:

val tables = Seq("customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier")
tables.foreach(t => spark.read.parquet(s"/Users/andy/Data/sf100-parquet/${t}.parquet").createOrReplaceTempView(t))
val sql = scala.io.Source.fromFile("/Users/andy/git/datafusion-contrib/sqlbench-h/queries/sf=100/q2.sql").mkString
spark.time(spark.sql(sql).collect)

The Parquet files were produced by DataFusion. I have been benchmarking with these same files with Comet on my Linux desktop with no issues.

andygrove commented 1 month ago

I do see the same on Linux. I am using JDK 11 in both cases.

andygrove commented 1 month ago

Error:

org.apache.comet.CometNativeException: General execution error with reason java.lang.RuntimeException: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.io.IOException: Could not read object from config with key parquet.private.read.filter.predicate
Caused by: java.lang.ClassNotFoundException: org.apache.comet.parquet.ParquetFilters$$anon$2.
    at org.apache.comet.Native.executePlan(Native Method)

Signature of class org.apache.comet.parquet.ParquetFilters$$anon$2:

package org.apache.comet.parquet;

class ParquetFilters$$anon$2 {
  org.apache.spark.unsafe.types.UTF8String suffixStr;

  org.apache.spark.unsafe.types.UTF8String suffixStr();

  boolean canDrop(org.apache.parquet.filter2.predicate.Statistics);

  boolean inverseCanDrop(org.apache.parquet.filter2.predicate.Statistics);

  boolean keep(org.apache.parquet.io.api.Binary);

  boolean keep(java.lang.Comparable);

  void <init>(org.apache.comet.parquet.ParquetFilters, java.lang.String);

}
viirya commented 1 month ago

I don't use spark-shell to test it. Maybe it is the reason I cannot reproduce the issue.

andygrove commented 1 month ago

The issue is that we need to specify extra classpath. This resolved it for me:

export COMET_JAR=spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar
$SPARK_HOME/bin/spark-shell \
    --jars $COMET_JAR \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.all.enabled=true \
    --conf spark.comet.explainFallback.enabled=true \
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR

The difference was adding these two lines:

    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR

I will create a PR to update the user guide, where I copied the spark-shell command from.

andygrove commented 1 month ago

@sagarlakshmipathy fyi

viirya commented 1 month ago

Based on https://spark.apache.org/docs/latest/submitting-applications.html

When using spark-submit, the application jar along with any jars included with the --jars option will be automatically transferred to the cluster. URLs supplied after --jars must be separated by commas. That list is included in the driver and executor classpaths. Directory expansion does not work with --jars.

Seems the jars of --jars will be included in the driver and executor classpaths. Maybe it is why spark-submit is okay and only spark-shell has the issue.