apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
759 stars 150 forks source link

Implement Common Subexpression Elimination optimizer rule #942

Open andygrove opened 2 weeks ago

andygrove commented 2 weeks ago

What is the problem the feature request solves?

When running TPC-H q1 in Spark/Comet, the expression l_extendedprice#21 * (1 - l_discount#22) appears twice in the query and currently gets evaluated twice. This could be optimized out so that it is only evaluated once. I was able to test this by manually rewriting the query.

Original Query

select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
    l_shipdate <= date '1998-12-01' - interval '68 days'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

Optimized Query

select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(foo) as sum_disc_price,
    sum(foo * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from (select
          l_returnflag,
          l_linestatus,
          l_quantity,
          l_extendedprice,
          l_extendedprice * (1 - l_discount) as foo,
          l_tax,
          l_discount
  from lineitem
  where
    l_shipdate <= date '1998-12-01' - interval '68 days')
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

Timings (Original)

13.752424478530884,
11.648030281066895,
11.35965895652771,
11.335061311721802,
11.383593797683716,
11.291191101074219,
11.31091046333313,
11.351991653442383,
11.32134222984314,
11.374904155731201

Timings (Optimized)

12.734412908554077,
10.684742212295532,
10.157625198364258,
10.06518030166626,
10.043614149093628,
9.986022233963013,
9.939271688461304,
9.925782918930054,
10.024176120758057,
10.018519401550293

Spark UI (Original)

Screenshot from 2024-09-13 12-16-22

Spark UI (Optimized)

Screenshot from 2024-09-13 12-15-51

Describe the potential solution

No response

Additional context

No response

viirya commented 2 weeks ago

Good finding. I think this kind of optimization should be in Spark optimizer instead.

viirya commented 2 weeks ago

I remember Spark SQL has corresponding optimization rule. But not sure why it doesn't affect the query.

andygrove commented 1 week ago

Related to this, it would be nice if we could improve the metrics for CometHashAggregate to show the time for evaluating the aggregate input expressions. I am not sure how much work that would be though.

viirya commented 1 week ago

Related to this, it would be nice if we could improve the metrics for CometHashAggregate to show the time for evaluating the aggregate input expressions. I am not sure how much work that would be though.

Sounds good. It should be added into DataFusion hash aggregate operator.

andygrove commented 4 days ago

Good finding. I think this kind of optimization should be in Spark optimizer instead.

It would make sense for Spark to add this, but I think that it could also be beneficial for DataFusion to support this as a physical optimizer rule.

I filed https://github.com/apache/datafusion/issues/12599

eejbyfeldt commented 2 days ago

Spark has it, but not at the plan level. Instead they do it as part of their code generation: https://github.com/apache/spark/blob/v3.5.3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L1064-L1098C1