StarRocks / starrocks

The world's fastest open query engine for sub-second analytics both on and off the data lakehouse. With the flexibility to support nearly any scenario, StarRocks provides best-in-class performance for multi-dimensional analytics, real-time analytics, and ad-hoc queries. A Linux Foundation project.
https://starrocks.io
Apache License 2.0
9.01k stars 1.81k forks source link

The Agg reduces an exchange in one stage when the Gather Property is satisfied #203

Closed Youngwb closed 3 years ago

Youngwb commented 3 years ago

Reduce one data shuffle

before:

mysql> explain  select    sum(p1)  from    (     select        t0.p1     from        (         select            count(n1.P_PARTKEY) as p1         from            part n1       ) t0        join (         select            count(n2.P_PARTKEY) as p2
 from            part n2       ) t1 on p1=p2   ) t2;
+------------------------------------------------------------------------------+
| Explain String                                                               |
+------------------------------------------------------------------------------+
| WORK ON CBO OPTIMIZER                                                        |
| PLAN FRAGMENT 0                                                              |
|  OUTPUT EXPRS:23: sum(11: count(1: P_PARTKEY))                               |
|   PARTITION: UNPARTITIONED                                                   |
|                                                                              |
|   RESULT SINK                                                                |
|                                                                              |
|   12:AGGREGATE (update finalize)                                             |
|   |  output: sum(11: count(1: P_PARTKEY))                                    |
|   |  group by:                                                               |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   11:EXCHANGE                                                                |
|      use vectorized: true                                                    |
|                                                                              |
| PLAN FRAGMENT 1                                                              |
|  OUTPUT EXPRS:                                                               |
|   PARTITION: UNPARTITIONED                                                   |
|                                                                              |
|   STREAM DATA SINK                                                           |
|     EXCHANGE ID: 11                                                          |
|     UNPARTITIONED                                                            |
|                                                                              |
|   10:Project                                                                 |
|   |  <slot 11> : 11: count(1: P_PARTKEY)                                     |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   9:HASH JOIN                                                                |
|   |  join op: INNER JOIN (BROADCAST)                                         |
|   |  hash predicates:                                                        |
|   |  colocate: false, reason:                                                |
|   |  equal join conjunct: 11: count(1: P_PARTKEY) = 22: count(12: P_PARTKEY) |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   |----8:EXCHANGE                                                            |
|   |       use vectorized: true                                               |
|   |                                                                          |
|   3:AGGREGATE (merge finalize)                                               |
|   |  output: count(11: count(1: P_PARTKEY))                                  |
|   |  group by:                                                               |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   2:EXCHANGE                                                                 |
|      use vectorized: true                                                    |
|                                                                              |
| PLAN FRAGMENT 2                                                              |
|  OUTPUT EXPRS:                                                               |
|   PARTITION: UNPARTITIONED                                                   |
|                                                                              |
|   STREAM DATA SINK                                                           |
|     EXCHANGE ID: 08                                                          |
|     UNPARTITIONED                                                            |
|                                                                              |
|   7:AGGREGATE (merge finalize)                                               |
|   |  output: count(22: count(12: P_PARTKEY))                                 |
|   |  group by:                                                               |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   6:EXCHANGE                                                                 |
|      use vectorized: true                                                    |
|                                                                              |
| PLAN FRAGMENT 3                                                              |
|  OUTPUT EXPRS:                                                               |
|   PARTITION: RANDOM                                                          |
|                                                                              |
|   STREAM DATA SINK                                                           |
|     EXCHANGE ID: 06                                                          |
|     UNPARTITIONED                                                            |
|                                                                              |
|   5:AGGREGATE (update serialize)                                             |
|   |  output: count(12: P_PARTKEY)                                            |
|   |  group by:                                                               |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   4:OlapScanNode                                                             |
|      TABLE: part                                                             |
|      PREAGGREGATION: ON                                                      |
|      partitions=1/1                                                          |
|      rollup: part                                                            |
|      tabletRatio=10/10                                                       |
|      tabletList=34648,34650,34652,34654,34656,34658,34660,34662,34664,34666  |
|      cardinality=2000000                                                     |
|      avgRowSize=4.0                                                          |
|      numNodes=0                                                              |
|      use vectorized: true                                                    |
|                                                                              |
| PLAN FRAGMENT 4                                                              |
|  OUTPUT EXPRS:                                                               |
|   PARTITION: RANDOM                                                          |
|                                                                              |
|   STREAM DATA SINK                                                           |
|     EXCHANGE ID: 02                                                          |
|     UNPARTITIONED                                                            |
|                                                                              |
|   1:AGGREGATE (update serialize)                                             |
|   |  output: count(1: P_PARTKEY)                                             |
|   |  group by:                                                               |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   0:OlapScanNode                                                             |
|      TABLE: part                                                             |
|      PREAGGREGATION: ON                                                      |
|      partitions=1/1                                                          |
|      rollup: part                                                            |
|      tabletRatio=10/10                                                       |
|      tabletList=34648,34650,34652,34654,34656,34658,34660,34662,34664,34666  |
|      cardinality=2000000                                                     |
|      avgRowSize=4.0                                                          |
|      numNodes=0                                                              |
|      use vectorized: true                                                    |
+------------------------------------------------------------------------------+
Youngwb commented 3 years ago

after:

mysql> explain  select    sum(p1)  from    (     select        t0.p1     from        (         select            count(n1.P_PARTKEY) as p1         from            part n1       ) t0        join (         select            count(n2.P_PARTKEY) as p2
 from            part n2       ) t1 on p1=p2   ) t2;
+------------------------------------------------------------------------------+
| Explain String                                                               |
+------------------------------------------------------------------------------+
| WORK ON CBO OPTIMIZER                                                        |
| PLAN FRAGMENT 0                                                              |
|  OUTPUT EXPRS:23: sum(11: count(1: P_PARTKEY))                               |
|   PARTITION: UNPARTITIONED                                                   |
|                                                                              |
|   RESULT SINK                                                                |
|                                                                              |
|   11:AGGREGATE (update finalize)                                             |
|   |  output: sum(11: count(1: P_PARTKEY))                                    |
|   |  group by:                                                               |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   10:Project                                                                 |
|   |  <slot 11> : 11: count(1: P_PARTKEY)                                     |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   9:HASH JOIN                                                                |
|   |  join op: INNER JOIN (BROADCAST)                                         |
|   |  hash predicates:                                                        |
|   |  colocate: false, reason:                                                |
|   |  equal join conjunct: 11: count(1: P_PARTKEY) = 22: count(12: P_PARTKEY) |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   |----8:EXCHANGE                                                            |
|   |       use vectorized: true                                               |
|   |                                                                          |
|   3:AGGREGATE (merge finalize)                                               |
|   |  output: count(11: count(1: P_PARTKEY))                                  |
|   |  group by:                                                               |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   2:EXCHANGE                                                                 |
|      use vectorized: true                                                    |
|                                                                              |
| PLAN FRAGMENT 1                                                              |
|  OUTPUT EXPRS:                                                               |
|   PARTITION: UNPARTITIONED                                                   |
|                                                                              |
|   STREAM DATA SINK                                                           |
|     EXCHANGE ID: 08                                                          |
|     UNPARTITIONED                                                            |
|                                                                              |
|   7:AGGREGATE (merge finalize)                                               |
|   |  output: count(22: count(12: P_PARTKEY))                                 |
|   |  group by:                                                               |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   6:EXCHANGE                                                                 |
|      use vectorized: true                                                    |
|                                                                              |
| PLAN FRAGMENT 2                                                              |
|  OUTPUT EXPRS:                                                               |
|   PARTITION: RANDOM                                                          |
|                                                                              |
|   STREAM DATA SINK                                                           |
|     EXCHANGE ID: 06                                                          |
|     UNPARTITIONED                                                            |
|                                                                              |
|   5:AGGREGATE (update serialize)                                             |
|   |  output: count(12: P_PARTKEY)                                            |
|   |  group by:                                                               |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   4:OlapScanNode                                                             |
|      TABLE: part                                                             |
|      PREAGGREGATION: ON                                                      |
|      partitions=1/1                                                          |
|      rollup: part                                                            |
|      tabletRatio=10/10                                                       |
|      tabletList=34648,34650,34652,34654,34656,34658,34660,34662,34664,34666  |
|      cardinality=2000000                                                     |
|      avgRowSize=4.0                                                          |
|      numNodes=0                                                              |
|      use vectorized: true                                                    |
|                                                                              |
| PLAN FRAGMENT 3                                                              |
|  OUTPUT EXPRS:                                                               |
|   PARTITION: RANDOM                                                          |
|                                                                              |
|   STREAM DATA SINK                                                           |
|     EXCHANGE ID: 02                                                          |
|     UNPARTITIONED                                                            |
|                                                                              |
|   1:AGGREGATE (update serialize)                                             |
|   |  output: count(1: P_PARTKEY)                                             |
|   |  group by:                                                               |
|   |  use vectorized: true                                                    |
|   |                                                                          |
|   0:OlapScanNode                                                             |
|      TABLE: part                                                             |
|      PREAGGREGATION: ON                                                      |
|      partitions=1/1                                                          |
|      rollup: part                                                            |
|      tabletRatio=10/10                                                       |
|      tabletList=34648,34650,34652,34654,34656,34658,34660,34662,34664,34666  |
|      cardinality=2000000                                                     |
|      avgRowSize=4.0                                                          |
|      numNodes=0                                                              |
|      use vectorized: true                                                    |
+------------------------------------------------------------------------------+

The topmost AGG has no shuffle