apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.19k stars 434 forks source link

[VL] error when group by complex data type #7000

Open FelixYBW opened 2 months ago

FelixYBW commented 2 months ago

Backend

VL (Velox)

Bug description

empty table: v1: map<string,int>, v2: map<string,int>, v3: map<string,struct<max:int,avg:int>> SQL:

SELECT 
    x1.v1
    FROM (
        select *
        from empty_table
      ) x1
    group by 1
24/08/24 23:17:29 ERROR [Driver] yarn.ApplicationMaster: User class threw exception: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object
    at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:160)
    at org.apache.spark.sql.types.StructType$.$anonfun$fromAttributes$1(StructType.scala:548)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:548)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:372)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:372)
    at org.apache.gluten.extension.GlutenPlan.doValidate(GlutenPlan.scala:68)
    at org.apache.gluten.extension.GlutenPlan.doValidate$(GlutenPlan.scala:66)
    at org.apache.gluten.execution.ProjectExecTransformer.doValidate(BasicPhysicalOperatorTransformer.scala:163)
    at org.apache.gluten.backendsapi.velox.VeloxSparkPlanExecApi.genColumnarShuffleExchange(VeloxSparkPlanExecApi.scala:364)
    at org.apache.gluten.extension.columnar.OffloadExchange.offload(OffloadSingleNode.scala:114)
    at org.apache.gluten.extension.columnar.MiscColumnarRules$TransformPreOverrides$$anonfun$$nestedInanonfun$apply$2$1.applyOrElse(MiscColumnarRules.scala:60)
    at org.apache.gluten.extension.columnar.MiscColumnarRules$TransformPreOverrides$$anonfun$$nestedInanonfun$apply$2$1.applyOrElse(MiscColumnarRules.scala:60)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:535)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:535)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:508)
    at org.apache.gluten.extension.columnar.MiscColumnarRules$TransformPreOverrides.$anonfun$apply$2(MiscColumnarRules.scala:60)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:89)
    at org.apache.gluten.extension.columnar.MiscColumnarRules$TransformPreOverrides.apply(MiscColumnarRules.scala:60)
    at org.apache.gluten.extension.columnar.MiscColumnarRules$TransformPreOverrides.apply(MiscColumnarRules.scala:49)
    at org.apache.gluten.extension.columnar.ColumnarRuleApplier$LoggedRule.$anonfun$apply$1(ColumnarRuleApplier.scala:73)
    at org.apache.gluten.metrics.GlutenTimeMetric$.withNanoTime(GlutenTimeMetric.scala:41)
    at org.apache.gluten.metrics.GlutenTimeMetric$.withMillisTime(GlutenTimeMetric.scala:46)
    at org.apache.gluten.metrics.GlutenTimeMetric$.recordMillisTime(GlutenTimeMetric.scala:50)
    at org.apache.gluten.extension.columnar.ColumnarRuleApplier$LoggedRule.apply(ColumnarRuleApplier.scala:73)
    at org.apache.gluten.extension.columnar.ColumnarRuleApplier$LoggedRule.apply(ColumnarRuleApplier.scala:56)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
    at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
    at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
    at org.apache.gluten.extension.columnar.heuristic.HeuristicApplier.transformPlan(HeuristicApplier.scala:80)
    at org.apache.gluten.extension.columnar.heuristic.HeuristicApplier.$anonfun$makeRule$3(HeuristicApplier.scala:61)
    at org.apache.gluten.extension.columnar.heuristic.HeuristicApplier.prepareFallback(HeuristicApplier.scala:87)
    at org.apache.gluten.extension.columnar.heuristic.HeuristicApplier.$anonfun$makeRule$2(HeuristicApplier.scala:60)
    at org.apache.gluten.utils.QueryPlanSelector.maybe(QueryPlanSelector.scala:74)
    at org.apache.gluten.extension.columnar.heuristic.HeuristicApplier.org$apache$gluten$extension$columnar$heuristic$HeuristicApplier$$$anonfun$makeRule$1(HeuristicApplier.scala:58)
    at org.apache.gluten.extension.columnar.heuristic.HeuristicApplier$$anonfun$makeRule$4.apply(HeuristicApplier.scala:57)
    at org.apache.gluten.extension.columnar.heuristic.HeuristicApplier$$anonfun$makeRule$4.apply(HeuristicApplier.scala:57)
    at org.apache.gluten.extension.columnar.heuristic.HeuristicApplier.apply(HeuristicApplier.scala:53)
    at org.apache.gluten.extension.ColumnarOverrideRules.org$apache$gluten$extension$ColumnarOverrideRules$$$anonfun$postColumnarTransitions$1(ColumnarOverrides.scala:120)
    at org.apache.gluten.extension.ColumnarOverrideRules$$anonfun$postColumnarTransitions$2.apply(ColumnarOverrides.scala:115)
    at org.apache.gluten.extension.ColumnarOverrideRules$$anonfun$postColumnarTransitions$2.apply(ColumnarOverrides.scala:115)
    at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$2(Columnar.scala:547)
    at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.$anonfun$apply$2$adapted(Columnar.scala:546)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.apply(Columnar.scala:546)
    at org.apache.spark.sql.execution.ApplyColumnarRulesAndInsertTransitions.apply(Columnar.scala:504)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:766)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:89)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.applyPhysicalRules(AdaptiveSparkPlanExec.scala:765)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.newQueryStage(AdaptiveSparkPlanExec.scala:530)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:485)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:515)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:515)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:515)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:515)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:481)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:515)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:515)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:229)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:776)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:224)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:363)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:336)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:421)
    at org.apache.spark.sql.execution.HiveResult$.hiveResultString(HiveResult.scala:76)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.$anonfun$run$2(SparkSQLDriver.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:172)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:776)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:71)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:389)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:517)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:511)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:511)
    at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336)
    at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474)
    at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:490)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:213)
    at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

FelixYBW commented 2 months ago

@JkSelf can you take a look?

JkSelf commented 2 months ago

@FelixYBW Sure. I will look at this issue later. Thanks.

zml1206 commented 2 months ago

Spark does not support map type as a grouping expression. The error should be org.apache.spark.sql.AnalysisException: expression x1.v1 cannot be used as a grouping expression because its data type map<string,int> is not an orderable data type. I don't know why the error message is different, what's your spark version? @FelixYBW

JkSelf commented 2 months ago

@FelixYBW @zml1206 It seems spark doesn't support group by map type. And I try both spark 3.2 and 3.4 and got the following exception:

expression type1.map cannot be used as a grouping expression because its data type map<string,string> is not an orderable data type.;
Aggregate [map#14], [byte#0, short#1, int#2, long#3L, float#4, double#5, decimal#6, string#7, binary#8, bool#9, date#10, timestamp#11, array#12, struct#13, map#14]
+- SubqueryAlias type1
   +- View (`type1`, [byte#0,short#1,int#2,long#3L,float#4,double#5,decimal#6,string#7,binary#8,bool#9,date#10,timestamp#11,array#12,struct#13,map#14])
      +- RelationV2[byte#0, short#1, int#2, long#3L, float#4, double#5, decimal#6, string#7, binary#8, bool#9, date#10, timestamp#11, array#12, struct#13, map#14] parquet file:/mnt/DP_disk3/jk/projects/gluten/backends-velox/target/scala-2.12/test-classes/data-type-validation-data/type1

org.apache.spark.sql.AnalysisException: expression type1.map cannot be used as a grouping expression because its data type map<string,string> is not an orderable data type.;
Aggregate [map#14], [byte#0, short#1, int#2, long#3L, float#4, double#5, decimal#6, string#7, binary#8, bool#9, date#10, timestamp#11, array#12, struct#13, map#14]
+- SubqueryAlias type1
   +- View (`type1`, [byte#0,short#1,int#2,long#3L,float#4,double#5,decimal#6,string#7,binary#8,bool#9,date#10,timestamp#11,array#12,struct#13,map#14])
      +- RelationV2[byte#0, short#1, int#2, long#3L, float#4, double#5, decimal#6, string#7, binary#8, bool#9, date#10, timestamp#11, array#12, struct#13, map#14] parquet file:/mnt/DP_disk3/jk/projects/gluten/backends-velox/target/scala-2.12/test-classes/data-type-validation-data/type1

        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:52)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:51)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:182)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkValidGroupingExprs$1(CheckAnalysis.scala:328)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$15(CheckAnalysis.scala:340)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$15$adapted(CheckAnalysis.scala:340)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:340)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:97)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:97)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:92)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:182)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:205)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:75)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:75)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:73)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:65)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at org.apache.gluten.execution.WholeStageTransformerSuite.$anonfun$compareResultsAgainstVanillaSpark$1(WholeStageTransformerSuite.scala:297)
        at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
        at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
        at org.apache.gluten.execution.WholeStageTransformerSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(WholeStageTransformerSuite.scala:40)
FelixYBW commented 2 months ago

It's odd the query passed here when I use vanilla spark.

zml1206 commented 2 months ago

It's odd the query passed here when I use vanilla spark.

Have you replaced the Analyzer? @FelixYBW

FelixYBW commented 1 month ago

customer enhanced their spark to add the support.