apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

Some aggregate functions return 0.0 instead of NaN in some cases #1038

Open andygrove opened 3 weeks ago

andygrove commented 3 weeks ago

Describe the bug

SQL

SELECT c79, c54, stddev_pop(c73) FROM test1 GROUP BY c79,c54 ORDER BY c79, c54;

c79 is Byte, c54 is either Float or Double

Spark Plan

AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(3) Sort [c79#279 ASC NULLS FIRST, c54#254 ASC NULLS FIRST], true, 0
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange rangepartitioning(c79#279 ASC NULLS FIRST, c54#254 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=20613]
            +- *(2) HashAggregate(keys=[c79#279, c54#254], functions=[stddev_pop(c73#273)], output=[c79#279, c54#254, stddev_pop(c73)#28050])
               +- AQEShuffleRead coalesced
                  +- ShuffleQueryStage 0
                     +- Exchange hashpartitioning(c79#279, c54#254, 200), ENSURE_REQUIREMENTS, [plan_id=20585]
                        +- *(1) HashAggregate(keys=[c79#279, knownfloatingpointnormalized(normalizenanandzero(c54#254)) AS c54#254], functions=[partial_stddev_pop(c73#273)], output=[c79#279, c54#254, n#28038, avg#28039, m2#28040])
                           +- *(1) ColumnarToRow
                              +- FileScan parquet [c54#254,c73#273,c79#279] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/andy/git/apache/datafusion-comet/fuzz-testing/test1.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c54:float,c73:double,c79:tinyint>
+- == Initial Plan ==
   Sort [c79#279 ASC NULLS FIRST, c54#254 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(c79#279 ASC NULLS FIRST, c54#254 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=20567]
      +- HashAggregate(keys=[c79#279, c54#254], functions=[stddev_pop(c73#273)], output=[c79#279, c54#254, stddev_pop(c73)#28050])
         +- Exchange hashpartitioning(c79#279, c54#254, 200), ENSURE_REQUIREMENTS, [plan_id=20564]
            +- HashAggregate(keys=[c79#279, knownfloatingpointnormalized(normalizenanandzero(c54#254)) AS c54#254], functions=[partial_stddev_pop(c73#273)], output=[c79#279, c54#254, n#28038, avg#28039, m2#28040])
               +- FileScan parquet [c54#254,c73#273,c79#279] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/andy/git/apache/datafusion-comet/fuzz-testing/test1.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c54:float,c73:double,c79:tinyint>

Comet Plan

AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) ColumnarToRow
   +- CometSort [c79#279, c54#254, stddev_pop(c73)#28131], [c79#279 ASC NULLS FIRST, c54#254 ASC NULLS FIRST]
      +- AQEShuffleRead coalesced
         +- ShuffleQueryStage 1
            +- CometColumnarExchange rangepartitioning(c79#279 ASC NULLS FIRST, c54#254 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=20746]
               +- !CometHashAggregate [c79#279, c54#254, n#28119, avg#28120, m2#28121], Final, [c79#279, c54#254], [stddev_pop(c73#273)]
                  +- AQEShuffleRead coalesced
                     +- ShuffleQueryStage 0
                        +- CometExchange hashpartitioning(c79#279, c54#254, 200), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=20701]
                           +- !CometHashAggregate [c54#254, c73#273, c79#279], Partial, [c79#279, knownfloatingpointnormalized(normalizenanandzero(c54#254)) AS c54#254], [partial_stddev_pop(c73#273)]
                              +- CometScan parquet [c54#254,c73#273,c79#279] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/home/andy/git/apache/datafusion-comet/fuzz-testing/test1.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c54:float,c73:double,c79:tinyint>
+- == Initial Plan ==
   CometSort [c79#279, c54#254, stddev_pop(c73)#28131], [c79#279 ASC NULLS FIRST, c54#254 ASC NULLS FIRST]
   +- CometColumnarExchange rangepartitioning(c79#279 ASC NULLS FIRST, c54#254 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=20682]
      +- !CometHashAggregate [c79#279, c54#254, n#28119, avg#28120, m2#28121], Final, [c79#279, c54#254], [stddev_pop(c73#273)]
         +- CometExchange hashpartitioning(c79#279, c54#254, 200), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=20680]
            +- !CometHashAggregate [c54#254, c73#273, c79#279], Partial, [c79#279, knownfloatingpointnormalized(normalizenanandzero(c54#254)) AS c54#254], [partial_stddev_pop(c73#273)]
               +- CometScan parquet [c54#254,c73#273,c79#279] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/home/andy/git/apache/datafusion-comet/fuzz-testing/test1.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c54:float,c73:double,c79:tinyint>

First difference at row 4: Spark: -127,0.31308997,NaN Comet: -127,0.31308997,0.0

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response