apache / doris

Apache Doris is an easy-to-use, high performance and unified analytics database.
https://doris.apache.org
Apache License 2.0
12.66k stars 3.27k forks source link

Colocate plan #5589

Closed EmmyMiao87 closed 3 years ago

EmmyMiao87 commented 3 years ago

Is your feature request related to a problem? Please describe.

Currently Doris supports colocate table building and colocate join. But it did not make full use of the advantages of colocate. For example, when there is an Aggregation Node between Hash Join Node and Scan Node, colocate join cannot be performed. For another example, Aggregation Node, Sort Node, and Set Operation nodes can also be absorbed by child nodes when the data distribution matches, but they are not currently available.

Describe the solution you'd like

In the case of reasonable data distribution, the upper-level operators can be absorbed by the lower-level operators, thereby reducing unnecessary network transmission and serialization and deserialization operations.

Describe alternatives you've considered

The main operators for colocate optimization include the following 4 plan node: Hash Join Node Set Operation Node Aggregation Node Sort Node (in window function)

Step1: create table

CREATE TABLE `test_colocate` (
  `k1` int(11) NULL COMMENT "",
  `k2` int(11) NULL COMMENT "",
  `k3` int(11) NULL COMMENT "",
  `k4` int(11) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k1`, `k2`, `k3`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 10
PROPERTIES (
"replication_num" = "2",
"in_memory" = "false",
"storage_format" = "V2"
);

Step2: colocate join

explain select * from (select k1, k2 from test_colocate group by k1, k2) a , test_colocate b where a.k1=b.k1 and a.k2=b.k2;
+-------------------------------------------------------------------------------------------------------------------------+
| Explain String                                                                                                          |
+-------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                                         |
|  OUTPUT EXPRS:<slot 2> `k1` | <slot 3> `k2` | `b`.`k1` | `b`.`k2` | `b`.`k3` | `b`.`k4`                                 |
|   PARTITION: UNPARTITIONED                                                                                              |
|                                                                                                                         |
|   RESULT SINK                                                                                                           |
|                                                                                                                         |
|   4:EXCHANGE                                                                                                            |
|                                                                                                                         |
| PLAN FRAGMENT 1                                                                                                         |
|  OUTPUT EXPRS:                                                                                                          |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test_colocate`.`k1`, `default_cluster:test`.`test_colocate`.`k2` |
|                                                                                                                         |
|   STREAM DATA SINK                                                                                                      |
|     EXCHANGE ID: 04                                                                                                     |
|     UNPARTITIONED                                                                                                       |
|                                                                                                                         |
|   3:HASH JOIN                                                                                                           |
|   |  join op: INNER JOIN                                                                                                |
|   |  hash predicates:                                                                                                   |
|   |  colocate: true                                                                                                     |
|   |  equal join conjunct: <slot 2> `k1` = `b`.`k1`                                                                      |
|   |  equal join conjunct: <slot 3> `k2` = `b`.`k2`                                                                      |
|   |                                                                                                                     |
|   |----2:OlapScanNode                                                                                                   |
|   |       TABLE: test_colocate                                                                                          |
|   |                                                                                                                     |
|   1:AGGREGATE (update finalize)                                                                                         |
|   |  group by: `k1`, `k2`                                                                                               |
|   |                                                                                                                     |
|   0:OlapScanNode                                                                                                        |
|      TABLE: test_colocate                                                                                               |
+-------------------------------------------------------------------------------------------------------------------------+
47 rows in set (0.020 sec)

Step3: colocate aggregation node Condition: The input partition of aggregation node >= the data partition of child fragment

 explain select k1, k2  from test_colocate where k1=1  group by k1, k2;
+-------------------------------------------------------------------------------------------------------------------------+
| Explain String                                                                                                          |
+-------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                                         |
|  OUTPUT EXPRS:<slot 2> `k1` | <slot 3> `k2`                                                                             |
|   PARTITION: UNPARTITIONED                                                                                              |
|                                                                                                                         |
|   RESULT SINK                                                                                                           |
|                                                                                                                         |
|   2:EXCHANGE                                                                                                            |
|                                                                                                                         |
| PLAN FRAGMENT 1                                                                                                         |
|  OUTPUT EXPRS:                                                                                                          |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test_colocate`.`k1`, `default_cluster:test`.`test_colocate`.`k2` |
|                                                                                                                         |
|   STREAM DATA SINK                                                                                                      |
|     EXCHANGE ID: 02                                                                                                     |
|     UNPARTITIONED                                                                                                       |
|                                                                                                                         |
|   1:AGGREGATE (update finalize)                                                                                         |
|   |  group by: `k1`, `k2`                                                                                               |
|   |                                                                                                                     |
|   0:OlapScanNode                                                                                                        |
|      TABLE: test_colocate                                                                                               |
+-------------------------------------------------------------------------------------------------------------------------+
30 rows in set (0.011 sec)

Step4: sort node (in window function) Condition: The sort columns of sort node >= the data partition of child fragment

The sort column is k1 and k2.

explain select k1, sum(k2) over(partition by k1 order by k2) from test_colocate;
+-------------------------------------------------------------------------------------------------------------------------+
| Explain String                                                                                                          |
+-------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                                         |
|  OUTPUT EXPRS:<slot 4> <slot 0> | <slot 3>                                                                              |
|   PARTITION: UNPARTITIONED                                                                                              |
|                                                                                                                         |
|   RESULT SINK                                                                                                           |
|                                                                                                                         |
|   3:EXCHANGE                                                                                                            |
|                                                                                                                         |
| PLAN FRAGMENT 1                                                                                                         |
|  OUTPUT EXPRS:                                                                                                          |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test_colocate`.`k1`, `default_cluster:test`.`test_colocate`.`k2` |
|                                                                                                                         |
|   STREAM DATA SINK                                                                                                      |
|     EXCHANGE ID: 03                                                                                                     |
|     UNPARTITIONED                                                                                                       |
|                                                                                                                         |
|   2:ANALYTIC                                                                                                            |
|   |  functions: [, sum(<slot 5> ), ]                                                                                    |
|   |  partition by: `k1`                                                                                                 |
|   |  order by: <slot 5>  ASC                                                                                            |
|   |  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW                                                          |
|   |                                                                                                                     |
|   1:SORT                                                                                                                |
|   |  order by: <slot 4> <slot 0> ASC, <slot 5>  ASC                                                                     |
|   |  offset: 0                                                                                                          |
|   |                                                                                                                     |
|   0:OlapScanNode                                                                                                        |
|      TABLE: test_colocate                                                                                               |
+-------------------------------------------------------------------------------------------------------------------------+
36 rows in set (0.028 sec)

Colocate set operation node Condition1: The data partition of all child fragments >= input partition of set operation node Condition2: If there is no exchange node in child fragment, it will mean that the data hasn't been rehashed. Condition3: Scan node are all in the same colocate group

step1: create table

CREATE TABLE `t1` (
    `id` int(11) COMMENT "",
    `value` varchar(8) COMMENT ""
    ) ENGINE=OLAP
    DUPLICATE KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 10
    PROPERTIES (
    "colocate_with" = "t1",
    "replication_num" = "2"
    );

    CREATE TABLE `t2` (
    `id` int(11) COMMENT "",
    `value` varchar(8) COMMENT ""
    ) ENGINE=OLAP
    DUPLICATE KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 10
    PROPERTIES (
    "colocate_with" = "t1",
    "replication_num" = "2"
    );

    CREATE TABLE `t3` (
    `id` int(11) COMMENT "",
    `value` varchar(8) COMMENT ""
    ) ENGINE=OLAP
    DUPLICATE KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 10
    PROPERTIES (
    "colocate_with" = "t1",
    "replication_num" = "2"
    );

step2: colocate query

The t1, t2, t3 are in the same colocate group and the intersect column id is same as the distributed column id.

explain select id from t1 intersect select id from t2 intersect select id from t3;
+----------------------------------------------------------------------------------+
| Explain String                                                                   |
+----------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                  |
|  OUTPUT EXPRS:<slot 3> `id` `id` `id`                                            |
|   PARTITION: UNPARTITIONED                                                       |
|                                                                                  |
|   RESULT SINK                                                                    |
|                                                                                  |
|   4:EXCHANGE                                                                     |
|                                                                                  |
| PLAN FRAGMENT 1                                                                  |
|  OUTPUT EXPRS:                                                                   |
|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`t1`.`id`                  |
|                                                                                  |
|   STREAM DATA SINK                                                               |
|     EXCHANGE ID: 04                                                              |
|     UNPARTITIONED                                                                |
|                                                                                  |
|   0:INTERSECT                                                                    |
|   |  colocate=true                                                               |
|   |                                                                              |
|   |----2:OlapScanNode                                                            |
|   |       TABLE: t2                                                              |
|   |                                                                              |
|   |----3:OlapScanNode                                                            |
|   |       TABLE: t3                                                              |
|   |                                                                              |
|   1:OlapScanNode                                                                 |
|      TABLE: t1                                                                   |
+----------------------------------------------------------------------------------+

Additional context

The colocate plan will also have some bad cases. For example, when the data is skewed, because colocate eliminates the problem of rehash, the query will be more affected by the data skew. In this case, it is best to give priority to solving the problem of data skew and set a reasonable fragmentation column. Of course, you can also turn off colocate through session configuration.

EmmyMiao87 commented 3 years ago

5184 The feature can resolve this issue.