cloudnativecube / octopus

14 stars 2 forks source link

Trino sql PushIntoTableScan #56

Closed FishermanZzhang closed 3 years ago

FishermanZzhang commented 3 years ago

以join 为例

PushJoinIntoTableScan JdbcMetadata.applyJoin BaseJdbcClient implementJoin和isSupportedJoinCondition

FishermanZzhang commented 3 years ago

#6874

FishermanZzhang commented 3 years ago

Aggregation pushdown

trino> explain select id, count(*) from mysql.test.stu_0  group by id;
                                                                               Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [id_0, _pfgnrtd_1]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     Output[id, _col1]
     │   Layout: [id_0:integer, _pfgnrtd_1:bigint]
     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
     │   id := id_0
     │   _col1 := _pfgnrtd_1
     └─ RemoteSource[1]
            Layout: [id_0:integer, _pfgnrtd_1:bigint]

 Fragment 1 [SOURCE]
     Output layout: [id_0, _pfgnrtd_1]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     TableScan[mysql:Query[SELECT `id`, count(*) AS `_pfgnrtd_0` FROM `test`.`stu_0` GROUP BY `id`] columns=[id:integer:INT, _pfgnrtd_0:bigint:bigint], grouped = false]
         Layout: [id_0:integer, _pfgnrtd_1:bigint]
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
         _pfgnrtd_1 := _pfgnrtd_0:bigint:bigint
         id_0 := id:integer:INT

参考: https://trino.io/docs/current/optimizer/pushdown.html#aggregation-pushdown

FishermanZzhang commented 3 years ago
    /*
     * Join pushdown is disabled by default as this is the safer option.
     * Pushing down a join which substantially increases the row count vs
     * sizes of left and right table separately, may incur huge cost both
     * in terms of performance and money due to an increased network traffic.
     */
FishermanZzhang commented 3 years ago

MySqlClient 在Join pushdown 时,为什么碰到 charvarchar 类型就不下推了?

    @Override
    protected boolean isSupportedJoinCondition(JdbcJoinCondition joinCondition)
    {
        if (joinCondition.getOperator() == JoinCondition.Operator.IS_DISTINCT_FROM) {
            // Not supported in MySQL
            return false;
        }

        // Remote database can be case insensitive.
        return Stream.of(joinCondition.getLeftColumn(), joinCondition.getRightColumn())
                .map(JdbcColumnHandle::getColumnType)
                .noneMatch(type -> type instanceof CharType || type instanceof VarcharType);
    }
FishermanZzhang commented 3 years ago

cklickhouse join 下推

trino> explain (type distributed)select a.name from clickhouse.default.student  as a join clickhouse.default.xh_test as b on a.name=b.name  where a.id >10;
                                                                                                                                    Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [name]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     Output[name]
     │   Layout: [name:varbinary]
     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
     └─ RemoteSource[1]
            Layout: [name:varbinary]

 Fragment 1 [SOURCE]
     Output layout: [name]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     TableScan[clickhouse:Query[SELECT l."name" AS "name_0", r."name" AS "name_1" FROM (SELECT "name" FROM "default"."student" WHERE "id" > ?) l INNER JOIN (SELECT "name" FROM "default"."xh_test") r ON l."
         Layout: [name:varbinary]
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
         name := name_0:varbinary:String

(1 row)

vs

join 没有下推

trino:default> explain (type distributed)select a.name from clickhouse.default.student  as a join clickhouse.default.xh_test as b on a.name=b.name  where a.id >10;
                                                                                                    Query Plan                       
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]                                                                                                                 
     Output layout: [name]                                                                                                           
     Output partitioning: SINGLE []                                                                                                  
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                   
     Output[name]                                                                                                                    
     │   Layout: [name:varbinary]                                                                                                    
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                     
     └─ RemoteSource[1]                                                                                                              
            Layout: [name:varbinary]                                                                                                 

 Fragment 1 [HASH]                                                                                                                   
     Output layout: [name]                                                                                                           
     Output partitioning: SINGLE []                                                                                                  
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                   
     InnerJoin[("name" = "name_0")][$hashvalue, $hashvalue_2]                                                                        
     │   Layout: [name:varbinary]                                                                                                    
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                     
     │   Distribution: PARTITIONED                                                                                                   
     │   dynamicFilterAssignments = {name_0 -> #df_298}                                                                              
     ├─ RemoteSource[2]                                                                                                              
     │      Layout: [name:varbinary, $hashvalue:bigint]                                                                              
     └─ LocalExchange[HASH][$hashvalue_2] ("name_0")                                                                                 
        │   Layout: [name_0:varbinary, $hashvalue_2:bigint]                                                                          
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}                                                                 
        └─ RemoteSource[3]                                                                                                           
               Layout: [name_0:varbinary, $hashvalue_3:bigint]                                                                       

 Fragment 2 [SOURCE]                                                                                                                 
     Output layout: [name, $hashvalue_1]                                                                                             
     Output partitioning: HASH [name][$hashvalue_1]                                                                                  
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                   
     ScanFilterProject[table = clickhouse:default.student default.default.student constraint on [id] columns=[name:varbinary:String], grouped = false, filterPredicate = true, dynamicFilter = {"name" = #df_298}
         Layout: [name:varbinary, $hashvalue_1:bigint]                                                                               
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
         $hashvalue_1 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("name"), 0))                                        
         name := name:varbinary:String                                                                                               

 Fragment 3 [SOURCE]                                                                                                                 
     Output layout: [name_0, $hashvalue_4]                                                                                           
     Output partitioning: HASH [name_0][$hashvalue_4]                                                                                
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                   
     ScanProject[table = clickhouse:default.xh_test default.default.xh_test columns=[name:varbinary:String], grouped = false]        
         Layout: [name_0:varbinary, $hashvalue_4:bigint]                                                                             
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}                    
         $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("name_0"), 0))                                      
         name_0 := name:varbinary:String
FishermanZzhang commented 3 years ago

pushdown 参数配置

TODO: 如何配置相关参数?

在catalog 中配置相应的参数

FishermanZzhang commented 3 years ago

order by 不能pushdown

trino> explain (type distributed)select name from clickhouse.default.student  order by name desc;
                                                     Query Plan
--------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [name]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     Output[name]
     │   Layout: [name:varbinary]
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
     └─ RemoteMerge[1]
            Layout: [name:varbinary]

 Fragment 1 [ROUND_ROBIN]
     Output layout: [name]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     LocalMerge[name DESC_NULLS_LAST]
     │   Layout: [name:varbinary]
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
     └─ PartialSort[name DESC_NULLS_LAST]
        │   Layout: [name:varbinary]
        └─ RemoteSource[2]
               Layout: [name:varbinary]

 Fragment 2 [SOURCE]
     Output layout: [name]
     Output partitioning: ROUND_ROBIN []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     TableScan[clickhouse:default.student default.default.student columns=[name:varbinary:String], grouped = false]
         Layout: [name:varbinary]
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
         name := name:varbinary:String

(1 row)
FishermanZzhang commented 3 years ago

Mysql PushIntoTableScan

其中 JdbcClient 接口为:Aggregate, Limit, TopN, join

FishermanZzhang commented 3 years ago

1617703805(1)

FishermanZzhang commented 3 years ago

trino-sql