apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.14k stars 411 forks source link

[VL] window function error #6600

Closed FelixYBW closed 3 weeks ago

FelixYBW commented 1 month ago

Backend

VL (Velox)

Bug description

24/07/26 00:58:09 ERROR [Driver] datasources.FileFormatWriter: Aborting job c89ecb16-71d7-49b6-8963-cd58dd9873e1.
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object
    at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:160)
    at org.apache.spark.sql.types.StructType$.$anonfun$fromAttributes$1(StructType.scala:548)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)

the write columns are all string and bigint. except one date column which is used as partition column.

@JkSelf is the date columun the reason?

Spark version

Spark-3.2.x

Spark configurations

No response

System information

No response

Relevant logs

No response

LoseYSelf commented 1 month ago

what is the sql and the table metadata?

FelixYBW commented 1 month ago

The error is caused by window function:

sum(a) OVER (
        PARTITION BY b,
        c
        ORDER BY date RANGE BETWEEN 6 preceding AND CURRENT ROW
      ) 
zml1206 commented 1 month ago

Can you assign this issue to me, I want to try to solve it, thank you. @FelixYBW

JkSelf commented 1 month ago

I have disabled the date type in https://github.com/apache/incubator-gluten/pull/6637. @zml1206, if you have the time, you are welcome to continue supporting the date type in window range frame. Thank you for your contributions.

FelixYBW commented 1 month ago

@JkSelf Is there dependency of your current streaming window PR?

FelixYBW commented 4 weeks ago

@JkSelf Looks the issue isn't fixed by 6600

zml1206 commented 4 weeks ago

https://github.com/apache/incubator-gluten/blob/51d0a37faafc0457d594d180f7787e8c0da1f518/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala#L377

The UT verification here has passed. Are there any special circumstances that have been missed? Can you provide the test sql? @FelixYBW

FelixYBW commented 4 weeks ago

@JkSelf Did you verify the SQL I shared? failed from my test.

JkSelf commented 3 weeks ago

@FelixYBW I tested the fix in my local environment and confirmed that the issue has been resolved. However, I encountered the following exception during execution select sum(impression_count_last7days ) from ( SELECT sum(impression_count) OVER ( PARTITION BY user_id, surface ORDER BY date RANGE BETWEEN 6 preceding AND CURRENT ROW ) AS impression_count_last7days from d233 )"

24/08/12 16:13:26 WARN TaskSetManager: Lost task 86.1 in stage 3.0 (TID 263) (sr246 executor 8): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 90 in stage 3.0 failed 4 times, most recent failure: Lost task 90.3 in stage 3.0 (TID 254) (sr246 executor 6): java.lang.NullPointerException
        at org.apache.gluten.expression.ExpressionMappings$.expressionsMap(ExpressionMappings.scala:341)
        at org.apache.gluten.expression.ExpressionConverter$.replaceWithExpressionTransformer(ExpressionConverter.scala:54)
        at org.apache.gluten.expression.ExpressionConverter.replaceWithExpressionTransformer(ExpressionConverter.scala)
        at org.apache.gluten.substrait.expression.WindowFunctionNode.setBound(WindowFunctionNode.java:102)
        at org.apache.gluten.substrait.expression.WindowFunctionNode.toProtobuf(WindowFunctionNode.java:180)
        at org.apache.gluten.substrait.rel.WindowRelNode.toProtobuf(WindowRelNode.java:77)
        at org.apache.gluten.substrait.rel.ProjectRelNode.toProtobuf(ProjectRelNode.java:69)
        at org.apache.gluten.substrait.rel.AggregateRelNode.toProtobuf(AggregateRelNode.java:89)
        at org.apache.gluten.substrait.plan.PlanNode.toProtobuf(PlanNode.java:74)
        at org.apache.gluten.backendsapi.velox.VeloxIteratorApi.genFinalStageIterator(VeloxIteratorApi.scala:238)
        at org.apache.gluten.execution.WholeStageZippedPartitionsRDD.$anonfun$compute$1(WholeStageZippedPartitionsRDD.scala:59)
        at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25)
        at org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
        at org.apache.gluten.execution.WholeStageZippedPartitionsRDD.compute(WholeStageZippedPartitionsRDD.scala:46)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

@zml1206 Do you have time to look at this issue? If not, I can follow up. Thanks.

zml1206 commented 3 weeks ago

Do you have time to look at this issue? If not, I can follow up. Thanks.

I'll try to reproduce and solve it. @JkSelf

FelixYBW commented 3 weeks ago

fixed by #6803