apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.22k stars 437 forks source link

[VL] Offload struct type for parquet cause VeloxUserError #7178

Closed zml1206 closed 1 month ago

zml1206 commented 2 months ago

Backend

VL (Velox)

Bug description

Found in read delta lake with checkpoint in version 3.0.0rc1. Simplify recurrence logic:

    withSQLConf(("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.PushDownPredicates,org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughNonJoin")) {
      val df = spark.range(10).toDF("aA")
        .withColumn("a", struct("aA"))
      df.write.mode("overwrite").parquet("tmp/t1")
      spark.read.parquet("tmp/t1")
        .union(spark.read.parquet("tmp/t1"))
        .filter("a.aA > 3")
        .collect()
    }

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

Job aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost task 1.0 in stage 3.0 (TID 4) (192.168.130.126 executor driver): org.apache.gluten.exception.GlutenException: org.apache.gluten.exception.GlutenException: Exception: VeloxUserError
Error Source: USER
Error Code: INVALID_ARGUMENT
Reason: Field not found: aA. Available fields are: aa.
Retriable: False
Context: (n0_1).aA
Additional Context: Top-level Expression: and(isnotnull((n0_1).aA), greaterthan((n0_1).aA, 3:BIGINT))
Function: getChildIdx
File: /Users/zml/Desktop/git_hub/incubator-gluten/ep/build-velox/build/velox_ep/velox/type/Type.cpp
Line: 424
Stack trace:
# 0  
# 1  
# 2  
# 3  
# 4  
# 5  
# 6  
# 7  
# 8  
# 9  
# 10 
# 11 
# 12 
# 13 
# 14 
# 15 
# 16 
# 17 
# 18 
# 19 
# 20 
# 21 
# 22 
# 23 
# 24 
# 25 
# 26 
# 27 
# 28 
# 29 
# 30 
# 31 

    at org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:39)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
    at org.apache.gluten.iterator.IteratorsV1$InvocationFlowProtection.hasNext(IteratorsV1.scala:159)
    at org.apache.gluten.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:71)
    at org.apache.gluten.iterator.IteratorsV1$PayloadCloser.hasNext(IteratorsV1.scala:37)
    at org.apache.gluten.iterator.IteratorsV1$LifeTimeAccumulator.hasNext(IteratorsV1.scala:100)
    at scala.collection.Iterator.isEmpty(Iterator.scala:387)
    at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
    at org.apache.gluten.iterator.IteratorsV1$LifeTimeAccumulator.isEmpty(IteratorsV1.scala:90)
    at org.apache.gluten.execution.VeloxColumnarToRowExec$.toRowIterator(VeloxColumnarToRowExec.scala:124)
    at org.apache.gluten.execution.VeloxColumnarToRowExec.$anonfun$doExecuteInternal$1(VeloxColumnarToRowExec.scala:79)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.gluten.exception.GlutenException: Exception: VeloxUserError
Error Source: USER
Error Code: INVALID_ARGUMENT
Reason: Field not found: aA. Available fields are: aa.
Retriable: False
Context: (n0_1).aA
Additional Context: Top-level Expression: and(isnotnull((n0_1).aA), greaterthan((n0_1).aA, 3:BIGINT))
Function: getChildIdx
File: /Users/zml/Desktop/git_hub/incubator-gluten/ep/build-velox/build/velox_ep/velox/type/Type.cpp
Line: 424
Stack trace:
# 0  
# 1  
# 2  
# 3  
# 4  
# 5  
# 6  
# 7  
# 8  
# 9  
# 10 
# 11 
# 12 
# 13 
# 14 
# 15 
# 16 
# 17 
# 18 
# 19 
# 20 
# 21 
# 22 
# 23 
# 24 
# 25 
# 26 
# 27 
# 28 
# 29 
# 30 
# 31 

    at org.apache.gluten.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native Method)
    at org.apache.gluten.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:61)
    at org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:37)
    ... 27 more
zml1206 commented 2 months ago

It cannot be reproduced without union or filter. I didn't find out what the problem is, can you help take a look @zhztheplayer @rui-mo thank you.

FelixYBW commented 2 months ago

@yma11 Any insights?

yma11 commented 2 months ago

spark.read.parquet("tmp/t1") .union(spark.read.parquet("tmp/t1")) .filter("a.aA > 3") .collect()

will it work if you use a.aa?

      spark.read.parquet("tmp/t1")
        .union(spark.read.parquet("tmp/t1"))
        .filter("a.aa > 3")
        .collect()

I think the column name becomes lower case after some operation so it can't be matched any more.

zml1206 commented 2 months ago

will it work if you use a.aa?

Yes, it worked with a.aa.

zml1206 commented 2 months ago

Fallback scan or remove union, it also worked.

yma11 commented 2 months ago

Then it should be problem of native parquet scan together with union, do you prefer to fix it by yourself?