pingcap / tidb

TiDB is an open-source, cloud-native, distributed, MySQL-Compatible database for elastic scale and real-time analytics. Try AI-powered Chat2Query free at : https://www.pingcap.com/tidb-serverless/
https://pingcap.com
Apache License 2.0
37.06k stars 5.83k forks source link

need to support pulling aggregate up upon join #6895

Open zz-jason opened 6 years ago

zz-jason commented 6 years ago

The original query 17 in the TPC-H benchmark is:

select
    sum(l_extendedprice) / 7.0 as avg_yearly
from
    lineitem,
    part
where
    p_partkey = l_partkey
    and p_brand = 'Brand#44'
    and p_container = 'WRAP PKG'
    and l_quantity < (
        select
            0.2 * avg(l_quantity)
        from
            lineitem
        where
            l_partkey = p_partkey
    );

The physical plan generated by TiDB optimizer is:

2018-06-24 20 13 08
+------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| id                           | task | operator info                                                                                                                                                      | count       |
+------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| Projection_15                | root | div(11_col_0, 7.0)                                                                                                                                                 | 1.00        |
|   StreamAgg_20               | root | funcs:sum(tpch.lineitem.l_extendedprice)                                                                                                                           | 1.00        |
|     Projection_43            | root | tpch.lineitem.l_partkey, tpch.lineitem.l_quantity, tpch.lineitem.l_extendedprice, tpch.part.p_partkey, tpch.part.p_brand, tpch.part.p_container, mul(0.2, 7_col_0) | 47187.51    |
|       Selection_44           | root | lt(tpch.lineitem.l_quantity, mul(0.2, 7_col_0))                                                                                                                    | 47187.51    |
|         HashLeftJoin_45      | root | left outer join, inner:HashAgg_39, equal:[eq(tpch.part.p_partkey, tpch.lineitem.l_partkey)]                                                                        | 58984.39    |
|           HashRightJoin_51   | root | inner join, inner:TableReader_34, equal:[eq(tpch.part.p_partkey, tpch.lineitem.l_partkey)]                                                                         | 58984.39    |
|             TableReader_34   | root | data:Selection_33                                                                                                                                                  | 1958.42     |
|               Selection_33   | cop  | eq(tpch.part.p_brand, Brand#44), eq(tpch.part.p_container, WRAP PKG)                                                                                               | 1958.42     |
|                 TableScan_32 | cop  | table:part, range:[-inf,+inf], keep order:false                                                                                                                    | 2000000.00  |
|             TableReader_53   | root | data:TableScan_52                                                                                                                                                  | 59986052.00 |
|               TableScan_52   | cop  | table:lineitem, range:[-inf,+inf], keep order:false                                                                                                                | 59986052.00 |
|           HashAgg_39         | root | group by:col_3, funcs:avg(col_0, col_1), firstrow(col_2)                                                                                                           | 1991680.00  |
|             TableReader_40   | root | data:HashAgg_35                                                                                                                                                    | 1991680.00  |
|               HashAgg_35     | cop  | group by:tpch.lineitem.l_partkey, funcs:avg(tpch.lineitem.l_quantity), firstrow(tpch.lineitem.l_partkey)                                                           | 1991680.00  |
|                 TableScan_38 | cop  | table:lineitem, range:[-inf,+inf], keep order:false                                                                                                                | 59986052.00 |
+------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
15 rows in set (0.00 sec)

We handle the subquery in the method of "aggregate(HashAgg_39) then join(HashLeftJoin_45)", this query runs about 3 minutes in my computer with scale factor 10

In fact, we can pull the aggregate HashAgg_39 up, handle this subquery in the method of "join then aggregate", which means the subquery can be modified to the following form:

select sum(sumb)/7.0 as avg_yearly
from (
    select a,
           sumb,
           0.2*avg(l_quantity) as filter
    from (
        select l_quantity as a,
               l_extendedprice as b,
               p_partkey as c,
               sum(l_extendedprice) as sumb
        from lineitem
        inner join part
        on p_partkey = l_partkey and p_brand = 'Brand#44' and p_container = 'WRAP PKG'
        group by a, b, c
    ) tmp
    left outer join lineitem
    on l_partkey = c
    group by a, b, c, sumb
    having a < filter
) xx;

The corresponding execution plan for this modified query is:

2018-06-24 20 19 52
+----------------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| id                               | task | operator info                                                                                                                                                                                                                               | count       |
+----------------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| Projection_15                    | root | div(12_col_0, 7.0)                                                                                                                                                                                                                          | 1.00        |
|   StreamAgg_20                   | root | funcs:sum(tmp.sumb)                                                                                                                                                                                                                         | 1.00        |
|     Projection_43                | root | sumb, mul(0.2, 8_col_0), tpch.lineitem.a                                                                                                                                                                                                    | 47187.51    |
|       Selection_44               | root | lt(tpch.lineitem.a, mul(0.2, 8_col_0))                                                                                                                                                                                                      | 47187.51    |
|         HashAgg_47               | root | group by:tpch.lineitem.a, tpch.lineitem.b, tpch.part.c, sumb, funcs:avg(tpch.lineitem.l_quantity), firstrow(tpch.lineitem.a), firstrow(sumb)                                                                                                | 58984.39    |
|           HashLeftJoin_26        | root | left outer join, inner:TableReader_42, equal:[eq(tpch.part.c, tpch.lineitem.l_partkey)]                                                                                                                                                     | 1776510.56  |
|             HashAgg_29           | root | group by:tpch.lineitem.l_quantity, tpch.lineitem.l_extendedprice, tpch.part.p_partkey, funcs:sum(tpch.lineitem.l_extendedprice), firstrow(tpch.lineitem.l_quantity), firstrow(tpch.lineitem.l_extendedprice), firstrow(tpch.part.p_partkey) | 58984.39    |
|               HashLeftJoin_34    | root | inner join, inner:TableReader_40, equal:[eq(tpch.lineitem.l_partkey, tpch.part.p_partkey)]                                                                                                                                                  | 58984.39    |
|                 TableReader_37   | root | data:TableScan_36                                                                                                                                                                                                                           | 59986052.00 |
|                   TableScan_36   | cop  | table:lineitem, range:[-inf,+inf], keep order:false                                                                                                                                                                                         | 59986052.00 |
|                 TableReader_40   | root | data:Selection_39                                                                                                                                                                                                                           | 1958.42     |
|                   Selection_39   | cop  | eq(tpch.part.p_brand, Brand#44), eq(tpch.part.p_container, WRAP PKG)                                                                                                                                                                        | 1958.42     |
|                     TableScan_38 | cop  | table:part, range:[-inf,+inf], keep order:false                                                                                                                                                                                             | 2000000.00  |
|             TableReader_42       | root | data:TableScan_41                                                                                                                                                                                                                           | 59986052.00 |
|               TableScan_41       | cop  | table:lineitem, range:[-inf,+inf], keep order:false                                                                                                                                                                                         | 59986052.00 |
+----------------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
15 rows in set (0.00 sec)

This modified version only runs 45 seconds and produced the same result with the original one.

But, this rule is not guaranteed to always generate a better plan, we should take cost into consideration: in the physical plan exhausting phase, not only consider the implementation rules(which physical operator implementation to use, chose hash join or merge join), but also consider the transformation rules, like aggregate push down, aggregate pull up

winoros commented 6 years ago

Is the result still be correct? Maybe you should try to use a merge join hint.

zz-jason commented 6 years ago

@winoros The result is correct, merge join will not save the performance. The main overhead here is we aggregate a table with 6kw rows, and the aggregate resulted in 200w rows, which takes a lot of time. In stead we should first do a outer join, which can have a smaller result set.

winoros commented 6 years ago

This rewrite is not reasonable. sum(l_extendedprice) should be the aggregate function on the outside lineitem table instead of on the subquery. And that this rewriting can be one equivalence of the original sql relies on the fact that p_partkey is primary key of the part table. So the join won't expand the data of the inner lineitem. Pull up and add group by field is not reasonable, This may destroy the data distribution. So it just happens to be right. Not a uniform rewriting method.

In most cases, a rule should be something performed on a small field. Usually just about a plan and its children. Neither should you consider this plan's father. Nor should you consider the children of its children.

zz-jason commented 6 years ago

@winoros You are right, the rule only need to consider a piece of the operator tree. But here I only meant to show that we need to perform the "join then aggregate" operation, maybe you should drive your focus of attention to this point. As the plan I showed in this case, we can generate it through the combination of other rules.

zz-jason commented 6 years ago

And yes, before we applying a rule, we need to consider the equivalency between the two operator trees before and after applying the rule. And as you said we may need to take data distribution, column index, etc. into consideration. For these subquery rewriting methods I think maybe we can lean from "Orthogonal Optimization of Subqueries and Aggregation"

winoros commented 6 years ago

@zz-jason The actual part that speed the query is three part: