StarRocks / starrocks

The world's fastest open query engine for sub-second analytics both on and off the data lakehouse. With the flexibility to support nearly any scenario, StarRocks provides best-in-class performance for multi-dimensional analytics, real-time analytics, and ad-hoc queries. A Linux Foundation project.
https://starrocks.io
Apache License 2.0
9k stars 1.81k forks source link

Transform count(distinct pk) into count(pk) #50974

Closed satanson closed 1 month ago

satanson commented 2 months ago

Enhancement

In the example as follows, count(distinct user_id) can converted into count(user_id) to speedup, but the optimizer fails.

-- prepare data
create table t0 (user_id int, value int) primary key(user_id) properties("replication_num"="1");
insert into t0 values(1,0),(2,1),(3,0),(4,1);
-- Q1
explain costs select case when(value=1) then 'A' else 'B' end as flag, count(distinct user_id) from t0 group by 1;

we can rewrite Q1 as follows when column used by count distinct is key column of primary key table.

-- Q2
select case when(value=1) then 'A' else 'B' end as flag, count(user_id) from t0 group by 1;

It seems that GroupByCountDistinctRewriteRule should be updated to support this conversion.

danielhumanmod commented 2 months ago

Hi @satanson , I can take a look and support this enhancement

satanson commented 2 months ago

@danielhumanmod Ok

danielhumanmod commented 1 month ago

Hi @satanson , I try to reproduce the optimization fail in SelectStmtTest.java, but didn't find something related to "distinct" in execution plan. I am not sure whether it is because I didn't reproduce or examine execution result in the right way. Do you mind providing more context?

Here is the code where I try to reproduce the optimization fail:

// SelectStmtTest.java

// 1. Table creation with primary key:
String createTableWithPrimaryKey = "CREATE TABLE db1.t_with_pk (" +
    "user_id INT," +
    "value INT) " +
    "PRIMARY KEY (user_id) " +
    "PROPERTIES('replication_num' = '1');";

starRocksAssert = new StarRocksAssert();
starRocksAssert.withDatabase("db1").useDatabase("db1");
starRocksAssert.withTable(createTblStmtStr)
    .withTable(createBaseAllStmtStr)
    .withTable(createDateTblStmtStr)
    .withTable(createPartitionTableStr)
    .withTable(createTable1)
    .withTable(createTableWithPrimaryKey);

// 2. Test case for distinct count on primary key:
@Test
public void testDistinctCountOnPrimaryKey() throws Exception {
    String insertData = "INSERT INTO t0 VALUES (1,0),(2,1),(3,0),(4,1);";
    starRocksAssert.query(insertData);

    String sql = "SELECT CASE WHEN(value = 1) THEN 'A' ELSE 'B' END as flag, COUNT(DISTINCT user_id) " +
        "FROM db1.t_with_pk " +
        "GROUP BY 1;";

    String plan = starRocksAssert.query(sql).explainQuery();
    System.out.println(plan);
}

And here is the output of execution plan:

PLAN FRAGMENT 0
 OUTPUT EXPRS:3: case | 4: count
  PARTITION: HASH_PARTITIONED: 3: case

  RESULT SINK

  5:AGGREGATE (merge finalize)
  |  output: count(4: count)
  |  group by: 3: case
  |  
  4:EXCHANGE

PLAN FRAGMENT 1
 OUTPUT EXPRS:
  PARTITION: RANDOM

  STREAM DATA SINK
    EXCHANGE ID: 04
    HASH_PARTITIONED: 3: case

  3:AGGREGATE (update serialize)
  |  STREAMING
  |  output: count(1: user_id)
  |  group by: 3: case
  |  
  2:AGGREGATE (update finalize)
  |  group by: 3: case, 1: user_id
  |  
  1:Project
  |  <slot 1> : 1: user_id
  |  <slot 3> : if(2: value = 1, 'A', 'B')
  |  
  0:OlapScanNode
     TABLE: t_with_pk
     PREAGGREGATION: ON
     partitions=0/1
     rollup: t_with_pk
     tabletRatio=0/0
     tabletList=
     cardinality=1
     avgRowSize=3.0
satanson commented 1 month ago

@danielhumanmod count(distinct column) is computed in a plan of 3-stage aggregations.

for an example

-- Q1
select shape, count(distinct color)
from chess
group by shape;
-- Q2
with cte0 as (
select distinct shape, color
from chess
)
select shape, count(color) from cte0

Q1 is optimized into Q2 in fact and Q2 can be illustrated by the picture as follows. image

You can try to percept 3-stage aggregations, if it is hard to understand, you can set new_planner_agg_stage = 2 to force convert final plan into 2-stage aggregations, multi_count_distinct(column) would be seen in the plan, which is physical implementation corresponding to count(distinct column).

danielhumanmod commented 1 month ago

Thanks for such a detailed explanation @satanson, really appreciate it! I have created the initial PR for this enhancement, and looking forward to any feedback or suggestions from you :)