Closed andygrove closed 2 months ago
With the latest DataFusion, I see that count is around the same performance as sum.
> select l_shipmode, sum(l_linenumber) from 'lineitem.parquet' group by 1;
+------------+------------------------------------+
| l_shipmode | sum(lineitem.parquet.l_linenumber) |
+------------+------------------------------------+
| AIR | 257182624 |
| REG AIR | 257116345 |
| MAIL | 257171951 |
| FOB | 257164573 |
| RAIL | 257195346 |
| SHIP | 257156584 |
| TRUCK | 257165514 |
+------------+------------------------------------+
7 row(s) fetched.
Elapsed 1.136 seconds.
> select l_shipmode, sum(1) from 'lineitem.parquet' group by 1;
+------------+---------------+
| l_shipmode | sum(Int64(1)) |
+------------+---------------+
| RAIL | 85734456 |
| AIR | 85729153 |
| SHIP | 85718196 |
| TRUCK | 85719689 |
| REG AIR | 85710420 |
| MAIL | 85714788 |
| FOB | 85711200 |
+------------+---------------+
7 row(s) fetched.
Elapsed 1.039 seconds.
> select l_shipmode, count(l_linenumber) from 'lineitem.parquet' group by 1;
+------------+--------------------------------------+
| l_shipmode | count(lineitem.parquet.l_linenumber) |
+------------+--------------------------------------+
| REG AIR | 85710420 |
| FOB | 85711200 |
| TRUCK | 85719689 |
| SHIP | 85718196 |
| AIR | 85729153 |
| RAIL | 85734456 |
| MAIL | 85714788 |
+------------+--------------------------------------+
7 row(s) fetched.
Elapsed 1.027 seconds.
> select l_shipmode, count(*) from 'lineitem.parquet' group by 1;
+------------+----------+
| l_shipmode | count(*) |
+------------+----------+
| REG AIR | 85710420 |
| FOB | 85711200 |
| TRUCK | 85719689 |
| MAIL | 85714788 |
| SHIP | 85718196 |
| RAIL | 85734456 |
| AIR | 85729153 |
+------------+----------+
7 row(s) fetched.
Elapsed 1.122 seconds.
When running in a cluster, I cannot reproduce this issue
scala> spark.time(spark.sql("select l_shipmode, sum(1) from lineitem group by 1").collect)
24/07/31 16:53:59 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
CometHashAggregate
+- AQEShuffleRead [COMET: AQEShuffleRead is not supported]
+- CometSinkPlaceHolder
+- CometExchange
+- CometHashAggregate
+- CometScan parquet
Time taken: 1572 ms
res3: Array[org.apache.spark.sql.Row] = Array([AIR,85729153], [MAIL,85714788], [RAIL,85734456], [SHIP,85718196], [TRUCK,85719689], [REG AIR,85710420], [FOB,85711200])
scala> spark.time(spark.sql("select l_shipmode, count(1) from lineitem group by 1").collect)
24/07/31 16:54:06 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
CometHashAggregate
+- AQEShuffleRead [COMET: AQEShuffleRead is not supported]
+- CometSinkPlaceHolder
+- CometExchange
+- CometHashAggregate
+- CometScan parquet
Time taken: 1633 ms
res4: Array[org.apache.spark.sql.Row] = Array([AIR,85729153], [MAIL,85714788], [RAIL,85734456], [SHIP,85718196], [TRUCK,85719689], [REG AIR,85710420], [FOB,85711200])
I managed to reproduce with a real-world query in a cluster.
scala> val df = spark.sql("select cs_sold_date_sk, cs_item_sk, cs_order_number, count(*) from catalog_sales group by cs_sold_date_sk, cs_item_sk, cs_order_number")
df: org.apache.spark.sql.DataFrame = [cs_sold_date_sk: int, cs_item_sk: int ... 2 more fields]
scala> spark.time(df.write.format("noop").mode("overwrite").save())
24/08/05 16:13:44 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
OverwriteByExpression [COMET: OverwriteByExpression is not supported]
+- CometHashAggregate
+- CometExchange
+- CometHashAggregate
+- CometScan parquet
24/08/05 16:14:03 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
CometHashAggregate
+- AQEShuffleRead [COMET: AQEShuffleRead is not supported]
+- CometSinkPlaceHolder
+- CometExchange
+- CometHashAggregate
+- CometScan parquet
Time taken: 134709 ms
scala> val df = spark.sql("select cs_sold_date_sk, cs_item_sk, cs_order_number, sum(cs_quantity) from catalog_sales group by cs_sold_date_sk, cs_item_sk, cs_order_number")
df: org.apache.spark.sql.DataFrame = [cs_sold_date_sk: int, cs_item_sk: int ... 2 more fields]
scala> spark.time(df.write.format("noop").mode("overwrite").save())
24/08/05 16:16:53 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
OverwriteByExpression [COMET: OverwriteByExpression is not supported]
+- CometHashAggregate
+- CometExchange
+- CometHashAggregate
+- CometScan parquet
24/08/05 16:16:58 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
CometHashAggregate
+- AQEShuffleRead [COMET: AQEShuffleRead is not supported]
+- CometSinkPlaceHolder
+- CometExchange
+- CometHashAggregate
+- CometScan parquet
Time taken: 11439 ms
Debug logging with stats
AggregateExec: mode=Final, gby=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2], aggr=[sum], metrics=[output_rows=7919602, elapsed_compute=1.479441583s]
AggregateExec: mode=Final, gby=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2], aggr=[sum], metrics=[output_rows=7928167, elapsed_compute=1.50249403s]
AggregateExec: mode=Final, gby=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2], aggr=[sum], metrics=[output_rows=7917152, elapsed_compute=1.521509782s]
AggregateExec: mode=Final, gby=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2], aggr=[sum], metrics=[output_rows=7922306, elapsed_compute=1.525740231s]
AggregateExec: mode=Final, gby=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2], aggr=[sum], metrics=[output_rows=7917057, elapsed_compute=1.841284372s]
AggregateExec: mode=Final, gby=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2], aggr=[count], metrics=[output_rows=9359786, elapsed_compute=54.775684465s]
AggregateExec: mode=Final, gby=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2], aggr=[count], metrics=[output_rows=9360626, elapsed_compute=54.641645191s]
AggregateExec: mode=Final, gby=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2], aggr=[count], metrics=[output_rows=9359130, elapsed_compute=54.842657479s]
AggregateExec: mode=Final, gby=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2], aggr=[count], metrics=[output_rows=9360684, elapsed_compute=54.877841075s]
Looks like the issue is at count implementation?
The two queries take almost identical time with DataFusion standalone, and we are using their count implementation
What is the problem the feature request solves?
The benchmarks in
CometAggregateBenchmark
show thatCOUNT
is slower than Spark, butSUM
is faster than Spark. There should not be so much difference between these two aggregates. I could not reproduce the performance difference in standalone DataFusion.SUM
COUNT
Describe the potential solution
No response
Additional context
No response