apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.43k stars 954 forks source link

[core] Fix Limit push down sort splits violates its general contract. #4491

Closed LinMingQiang closed 1 week ago

LinMingQiang commented 1 week ago

Purpose

Exception stack :


java.lang.IllegalArgumentException: Comparison method violates its general contract!
        at java.util.TimSort.mergeLo(TimSort.java:777)
        at java.util.TimSort.mergeAt(TimSort.java:514)
        at java.util.TimSort.mergeCollapse(TimSort.java:441)
        at java.util.TimSort.sort(TimSort.java:245)
        at java.util.Arrays.sort(Arrays.java:1512)
        at java.util.ArrayList.sort(ArrayList.java:1462)
        at org.apache.paimon.table.source.DataTableBatchScan.applyPushDownLimit(DataTableBatchScan.java:105)
        at org.apache.paimon.table.source.DataTableBatchScan.plan(DataTableBatchScan.java:79)
        at org.apache.paimon.spark.PaimonBaseScan.getOriginSplits(PaimonBaseScan.scala:73)
        at org.apache.paimon.spark.PaimonBaseScan.lazyInputPartitions(PaimonBaseScan.scala:82)
        at org.apache.paimon.spark.PaimonScan.$anonfun$outputPartitioning$1(PaimonScan.scala:83)
        at scala.Option.map(Option.scala:230)
        at org.apache.paimon.spark.PaimonScan.outputPartitioning(PaimonScan.scala:83)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:44)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:42)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
        at org.apache.spark.sql.catalyst.plans.logical.LocalLimit.mapChildren(basicLogicalOperators.scala:1563)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)

Just simple SQL : select * from xxxx limit 1;

How to fix :

We do not need to sort dataSplits. Just pick out the dataSplit that is rawConvertible.

Tests

API and Format

Documentation