pingcap / tidb

TiDB - the open-source, cloud-native, distributed SQL database designed for modern applications.
https://pingcap.com
Apache License 2.0
37.38k stars 5.85k forks source link

The execution plan of joining two tables with large number of duplicated keys is not optimal #52162

Open JaySon-Huang opened 8 months ago

JaySon-Huang commented 8 months ago

Enhancement

In some scenarios, the user will execute join on two tables with a large amount of duplicated join keys.

  1. If we apply aggregation after joining the two tables, it will generate lots of rows as the join result. This is unacceptable because it takes lots of memory and CPU, and the query may fail to execute.
  2. If we apply aggregation only on the build side, it can avoid generating a large number of rows as the join result. But it still is not optimal when the prob side also contains a large number of duplicated keys.
  3. The optimal plan is to apply aggregation both on the build side and the prob side, then apply the join on the aggregated result. Which may involve generating and rewriting some functions to make the correct final result.

In a testing data set, the 1st plan took more than 20 minutes and failed to be executed. The 2nd plan took about 7 minutes, and the 3rd plan took only 3 seconds!

It would be best that:

For example:

CREATE TABLE `comp1` (
  `factory` varchar(20) DEFAULT NULL,
  `goods` varchar(20) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
CREATE TABLE `comp2` (
  `factory` varchar(20) DEFAULT NULL,
  `goods` varchar(20) DEFAULT NULL,
  `num` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

alter table comp1 set tiflash replica 1;
alter table comp2 set tiflash replica 1;

-- prepare data
insert into comp1(factory, goods) values ('A', 'A'),('A', 'A'),('A', 'A'),('A', 'A'),('A', 'A'),('A', 'A');
-- repeat it until comp1 contains more than 50,000 rows
insert into comp1 select * from comp1;
insert into comp2(factory, goods, num) values ('A', 'A', 2),('A', 'A', 2),('A', 'A', 2),('A', 'A', 2),('A', 'A', 2);
-- repeat it until comp2 contains more than 40,000 rows
insert into comp2 select * from comp2;

-- default execution plan
> select @@tidb_opt_agg_push_down;
+--------------------------+
| @@tidb_opt_agg_push_down |
+--------------------------+
| 0                        |
+--------------------------+
> explain select a.factory, a.goods, sum(b.num)
from comp1 a
inner join comp2 b
on a.goods=b.goods and a.factory=b.factory
group by a.factory, a.goods
+--------------------------------------------------+-----------------+--------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id                                               | estRows         | task         | access object | operator info                                                                                                                                                                             |
+--------------------------------------------------+-----------------+--------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| TableReader_73                                   | 1.00            | root         |               | MppVersion: 2, data:ExchangeSender_72                                                                                                                                                     |
| └─ExchangeSender_72                              | 1.00            | mpp[tiflash] |               | ExchangeType: PassThrough                                                                                                                                                                 |
|   └─Projection_10                                | 1.00            | mpp[tiflash] |               | test.comp1.factory, test.comp1.goods, Column#8                                                                                                                                            |
|     └─Projection_68                              | 1.00            | mpp[tiflash] |               | Column#8, test.comp1.factory, test.comp1.goods                                                                                                                                            |
|       └─HashAgg_69                               | 1.00            | mpp[tiflash] |               | group by:test.comp1.factory, test.comp1.goods, funcs:sum(Column#15)->Column#8, funcs:firstrow(test.comp1.factory)->test.comp1.factory, funcs:firstrow(test.comp1.goods)->test.comp1.goods |
|         └─ExchangeReceiver_71                    | 1.00            | mpp[tiflash] |               |                                                                                                                                                                                           |
|           └─ExchangeSender_70                    | 1.00            | mpp[tiflash] |               | ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.comp1.factory, collate: utf8mb4_bin], [name: test.comp1.goods, collate: utf8mb4_bin]                               |
|             └─HashAgg_66                         | 1.00            | mpp[tiflash] |               | group by:Column#19, Column#20, funcs:sum(Column#18)->Column#15                                                                                                                            |
|               └─Projection_74                    | 243605176320.00 | mpp[tiflash] |               | cast(test.comp2.num, decimal(10,0) BINARY)->Column#18, test.comp1.factory->Column#19, test.comp1.goods->Column#20                                                                         |
|                 └─Projection_55                  | 243605176320.00 | mpp[tiflash] |               | test.comp1.factory, test.comp1.goods, test.comp2.num                                                                                                                                      |
|                   └─HashJoin_44                  | 243605176320.00 | mpp[tiflash] |               | inner join, equal:[eq(test.comp2.goods, test.comp1.goods) eq(test.comp2.factory, test.comp1.factory)]                                                                                     |
|                     ├─ExchangeReceiver_25(Build) | 450560.00       | mpp[tiflash] |               |                                                                                                                                                                                           |
|                     │ └─ExchangeSender_24        | 450560.00       | mpp[tiflash] |               | ExchangeType: Broadcast, Compression: FAST                                                                                                                                                |
|                     │   └─Selection_23           | 450560.00       | mpp[tiflash] |               | not(isnull(test.comp2.factory)), not(isnull(test.comp2.goods))                                                                                                                            |
|                     │     └─TableFullScan_22     | 450560.00       | mpp[tiflash] | table:b       | pushed down filter:empty, keep order:false                                                                                                                                                |
|                     └─Selection_27(Probe)        | 540672.00       | mpp[tiflash] |               | not(isnull(test.comp1.factory)), not(isnull(test.comp1.goods))                                                                                                                            |
|                       └─TableFullScan_26         | 540672.00       | mpp[tiflash] | table:a       | pushed down filter:empty, keep order:false                                                                                                                                                |
+--------------------------------------------------+-----------------+--------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

-- With tidb_opt_agg_push_down set to true, tidb can optimal the plan on the build side
> select @@tidb_opt_agg_push_down;
+--------------------------+
| @@tidb_opt_agg_push_down |
+--------------------------+
| 1                        |
+--------------------------+
> explain select a.factory, a.goods, sum(b.num)
from comp1 a
inner join comp2 b
on a.goods=b.goods and a.factory=b.factory
group by a.factory, a.goods
+------------------------------------------------------+-----------+--------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id                                                   | estRows   | task         | access object | operator info                                                                                                                                                                             |
+------------------------------------------------------+-----------+--------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| TableReader_93                                       | 1.00      | root         |               | MppVersion: 2, data:ExchangeSender_92                                                                                                                                                     |
| └─ExchangeSender_92                                  | 1.00      | mpp[tiflash] |               | ExchangeType: PassThrough                                                                                                                                                                 |
|   └─Projection_11                                    | 1.00      | mpp[tiflash] |               | test.comp1.factory, test.comp1.goods, Column#8                                                                                                                                            |
|     └─Projection_91                                  | 1.00      | mpp[tiflash] |               | Column#8, test.comp1.factory, test.comp1.goods                                                                                                                                            |
|       └─HashAgg_90                                   | 1.00      | mpp[tiflash] |               | group by:test.comp1.factory, test.comp1.goods, funcs:sum(Column#9)->Column#8, funcs:firstrow(test.comp1.factory)->test.comp1.factory, funcs:firstrow(test.comp1.goods)->test.comp1.goods  |
|         └─ExchangeReceiver_86                        | 540672.00 | mpp[tiflash] |               |                                                                                                                                                                                           |
|           └─ExchangeSender_85                        | 540672.00 | mpp[tiflash] |               | ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.comp1.factory, collate: utf8mb4_bin], [name: test.comp1.goods, collate: utf8mb4_bin]                               |
|             └─Projection_83                          | 540672.00 | mpp[tiflash] |               | test.comp1.factory, test.comp1.goods, Column#9                                                                                                                                            |
|               └─HashJoin_79                          | 540672.00 | mpp[tiflash] |               | inner join, equal:[eq(test.comp2.goods, test.comp1.goods) eq(test.comp2.factory, test.comp1.factory)]                                                                                     |
|                 ├─ExchangeReceiver_37(Build)         | 1.00      | mpp[tiflash] |               |                                                                                                                                                                                           |
|                 │ └─ExchangeSender_36                | 1.00      | mpp[tiflash] |               | ExchangeType: Broadcast, Compression: FAST                                                                                                                                                |
|                 │   └─Projection_32                  | 1.00      | mpp[tiflash] |               | Column#9, test.comp2.goods, test.comp2.factory                                                                                                                                            |
|                 │     └─HashAgg_33                   | 1.00      | mpp[tiflash] |               | group by:test.comp2.factory, test.comp2.goods, funcs:sum(Column#10)->Column#9, funcs:firstrow(test.comp2.goods)->test.comp2.goods, funcs:firstrow(test.comp2.factory)->test.comp2.factory |
|                 │       └─ExchangeReceiver_35        | 1.00      | mpp[tiflash] |               |                                                                                                                                                                                           |
|                 │         └─ExchangeSender_34        | 1.00      | mpp[tiflash] |               | ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.comp2.goods, collate: utf8mb4_bin], [name: test.comp2.factory, collate: utf8mb4_bin]                               |
|                 │           └─HashAgg_22             | 1.00      | mpp[tiflash] |               | group by:Column#23, Column#24, funcs:sum(Column#22)->Column#10                                                                                                                            |
|                 │             └─Projection_94        | 450560.00 | mpp[tiflash] |               | cast(test.comp2.num, decimal(10,0) BINARY)->Column#22, test.comp2.goods->Column#23, test.comp2.factory->Column#24                                                                         |
|                 │               └─Selection_31       | 450560.00 | mpp[tiflash] |               | not(isnull(test.comp2.factory)), not(isnull(test.comp2.goods))                                                                                                                            |
|                 │                 └─TableFullScan_30 | 450560.00 | mpp[tiflash] | table:b       | pushed down filter:empty, keep order:false                                                                                                                                                |
|                 └─Selection_39(Probe)                | 540672.00 | mpp[tiflash] |               | not(isnull(test.comp1.factory)), not(isnull(test.comp1.goods))                                                                                                                            |
|                   └─TableFullScan_38                 | 540672.00 | mpp[tiflash] | table:a       | pushed down filter:empty, keep order:false                                                                                                                                                |
+------------------------------------------------------+-----------+--------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

-- The optimal plan for this data distribution
-- Push down aggregation to the join for both build side and prob side
-- Notice that the sum(b.num) in original query is rewritten to sum(a.cnt * b.num)
> explain select a.factory, a.goods, sum(a.cnt * b.num)
from (
select factory, goods, count(*) as cnt from
comp1 group by factory, goods
) a inner join (
select factory, goods, sum(num) as num from
comp2 group by factory, goods
) b
on a.goods=b.goods and a.factory=b.factory
group by a.factory, a.goods
+----------------------------------------------------+-----------+--------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id                                                 | estRows   | task         | access object | operator info                                                                                                                                                                             |
+----------------------------------------------------+-----------+--------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| TableReader_152                                    | 1.00      | root         |               | MppVersion: 2, data:ExchangeSender_151                                                                                                                                                    |
| └─ExchangeSender_151                               | 1.00      | mpp[tiflash] |               | ExchangeType: PassThrough                                                                                                                                                                 |
|   └─Projection_14                                  | 1.00      | mpp[tiflash] |               | test.comp1.factory, test.comp1.goods, Column#10                                                                                                                                           |
|     └─Projection_146                               | 1.00      | mpp[tiflash] |               | Column#10, test.comp1.factory, test.comp1.goods                                                                                                                                           |
|       └─HashAgg_144                                | 1.00      | mpp[tiflash] |               | group by:Column#53, Column#54, funcs:sum(Column#50)->Column#10, funcs:firstrow(Column#51)->test.comp1.factory, funcs:firstrow(Column#52)->test.comp1.goods                                |
|         └─Projection_154                           | 1.00      | mpp[tiflash] |               | mul(cast(Column#4, decimal(20,0) BINARY), Column#9)->Column#50, test.comp1.factory->Column#51, test.comp1.goods->Column#52, test.comp1.factory->Column#53, test.comp1.goods->Column#54    |
|           └─Projection_121                         | 1.00      | mpp[tiflash] |               | Column#4, test.comp1.factory, test.comp1.goods, Column#9                                                                                                                                  |
|             └─HashJoin_123                         | 1.00      | mpp[tiflash] |               | inner join, equal:[eq(test.comp2.goods, test.comp1.goods) eq(test.comp2.factory, test.comp1.factory)]                                                                                     |
|               ├─ExchangeReceiver_42(Build)         | 1.00      | mpp[tiflash] |               |                                                                                                                                                                                           |
|               │ └─ExchangeSender_41                | 1.00      | mpp[tiflash] |               | ExchangeType: Broadcast, Compression: FAST                                                                                                                                                |
|               │   └─Projection_37                  | 1.00      | mpp[tiflash] |               | Column#9, test.comp2.factory, test.comp2.goods                                                                                                                                            |
|               │     └─HashAgg_38                   | 1.00      | mpp[tiflash] |               | group by:test.comp2.factory, test.comp2.goods, funcs:sum(Column#11)->Column#9, funcs:firstrow(test.comp2.factory)->test.comp2.factory, funcs:firstrow(test.comp2.goods)->test.comp2.goods |
|               │       └─ExchangeReceiver_40        | 1.00      | mpp[tiflash] |               |                                                                                                                                                                                           |
|               │         └─ExchangeSender_39        | 1.00      | mpp[tiflash] |               | ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.comp2.factory, collate: utf8mb4_bin], [name: test.comp2.goods, collate: utf8mb4_bin]                               |
|               │           └─HashAgg_27             | 1.00      | mpp[tiflash] |               | group by:Column#48, Column#49, funcs:sum(Column#47)->Column#11                                                                                                                            |
|               │             └─Projection_153       | 450560.00 | mpp[tiflash] |               | cast(test.comp2.num, decimal(10,0) BINARY)->Column#47, test.comp2.factory->Column#48, test.comp2.goods->Column#49                                                                         |
|               │               └─Selection_36       | 450560.00 | mpp[tiflash] |               | not(isnull(test.comp2.factory)), not(isnull(test.comp2.goods))                                                                                                                            |
|               │                 └─TableFullScan_35 | 450560.00 | mpp[tiflash] | table:comp2   | pushed down filter:empty, keep order:false                                                                                                                                                |
|               └─Projection_127(Probe)              | 1.00      | mpp[tiflash] |               | Column#4, test.comp1.factory, test.comp1.goods                                                                                                                                            |
|                 └─HashAgg_128                      | 1.00      | mpp[tiflash] |               | group by:test.comp1.factory, test.comp1.goods, funcs:sum(Column#35)->Column#4, funcs:firstrow(test.comp1.factory)->test.comp1.factory, funcs:firstrow(test.comp1.goods)->test.comp1.goods |
|                   └─ExchangeReceiver_130           | 1.00      | mpp[tiflash] |               |                                                                                                                                                                                           |
|                     └─ExchangeSender_129           | 1.00      | mpp[tiflash] |               | ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.comp1.factory, collate: utf8mb4_bin], [name: test.comp1.goods, collate: utf8mb4_bin]                               |
|                       └─HashAgg_125                | 1.00      | mpp[tiflash] |               | group by:test.comp1.factory, test.comp1.goods, funcs:count(1)->Column#35                                                                                                                  |
|                         └─Selection_51             | 540672.00 | mpp[tiflash] |               | not(isnull(test.comp1.factory)), not(isnull(test.comp1.goods))                                                                                                                            |
|                           └─TableFullScan_50       | 540672.00 | mpp[tiflash] | table:comp1   | pushed down filter:empty, keep order:false                                                                                                                                                |
+----------------------------------------------------+-----------+--------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
JaySon-Huang commented 8 months ago

The execution plan from another database.

select a.factory, a.goods, sum(b.num)
from comp1 a
inner join comp2 b
on a.goods=b.goods and a.factory=b.factory
group by a.factory, a.goods

=======================================================
|ID|OPERATOR             |NAME |EST.ROWS |EST.TIME(us)|
-------------------------------------------------------
|0 |HASH JOIN            |     |256      |44074020    |
|1 |├─SUBPLAN SCAN       |VIEW1|130      |37090580    |
|2 |│ └─HASH GROUP BY    |     |130      |37090579    |
|3 |│   └─TABLE FULL SCAN|a    |300000001|5107107     |
|4 |└─SUBPLAN SCAN       |VIEW2|201      |6983257     |
|5 |  └─HASH GROUP BY    |     |201      |6983256     |
|6 |    └─TABLE FULL SCAN|b    |50000000 |1652634     |
=======================================================
Outputs & filters:
-------------------------------------
  0 - output([VIEW1.a.factory], [VIEW1.a.good], [cast(VIEW1.T_FUN_COUNT(*), DECIMAL(20, 0)) * VIEW2.T_FUN_SUM(b.num)]), filter(nil), rowset=256
      equal_conds([VIEW1.a.good = VIEW2.b.good], [VIEW1.a.factory = VIEW2.b.comp]), other_conds(nil)
  1 - output([VIEW1.a.good], [VIEW1.a.factory], [VIEW1.T_FUN_COUNT(*)]), filter(nil), rowset=256
      access([VIEW1.a.good], [VIEW1.a.factory], [VIEW1.T_FUN_COUNT(*)])
  2 - output([a.good], [a.factory], [T_FUN_COUNT(*)]), filter(nil), rowset=256
      group([a.good], [a.factory]), agg_func([T_FUN_COUNT(*)])
  3 - output([a.good], [a.factory]), filter(nil), rowset=256
      access([a.good], [a.factory]), partitions(p0)
      is_index_back=false, is_global_index=false, 
      range_key([a.__pk_increment]), range(MIN ; MAX)always true
  4 - output([VIEW2.b.good], [VIEW2.b.comp], [VIEW2.T_FUN_SUM(b.num)]), filter(nil), rowset=256
      access([VIEW2.b.good], [VIEW2.b.comp], [VIEW2.T_FUN_SUM(b.num)])
  5 - output([b.good], [b.comp], [T_FUN_SUM(b.num)]), filter(nil), rowset=256
      group([b.good], [b.comp]), agg_func([T_FUN_SUM(b.num)])
  6 - output([b.good], [b.comp], [b.num]), filter(nil), rowset=256
      access([b.good], [b.comp], [b.num]), partitions(p0)
      is_index_back=false, is_global_index=false, 
      range_key([b.__pk_increment]), range(MIN ; MAX)always true
elsa0520 commented 8 months ago

Yes, the oceanbase can write agg to both side of join table. select xxx, sum(a.sum) from a join b on a.id=b.id group by a.id => write to select xxx, sum(a.sum b.cnt) from (select a.id , sum(a.sum) from a group by a.id) join (select b.id, count() from b group by b.id)