prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
16.07k stars 5.38k forks source link

Suboptimal plan for INSERT queries with UNION ALL #21252

Open arhimondr opened 1 year ago

arhimondr commented 1 year ago

Query:

SELECT orderkey * 2 orderkey
FROM ( 
  SELECT orderkey
  FROM orders
  UNION ALL
  SELECT orderkey
  FROM orders
)
UNION ALL
SELECT orderkey
FROM orders

Plan:

Fragment 1 [ROUND_ROBIN]
    Output layout: [rows_39, fragments_40, commitcontext_41]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - RemoteSource[2,3] => [rows_39:bigint, fragments_40:varbinary, commitcontext_41:varbinary]
Fragment 2 [SOURCE]
    Output layout: [rows_45, fragments_46, commitcontext_47]
    Output partitioning: ROUND_ROBIN []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - TableWriter[PlanNodeId 557] => [rows_45:bigint, fragments_46:varbinary, commitcontext_47:varbinary]
            orderkey := multiply (1:77)
            Statistics collected: 0
        - LocalExchange[PlanNodeId 622][ROUND_ROBIN] () => [multiply:bigint]
            - ScanProject[PlanNodeId 0,344][table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=orders, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.orders{}]'}, grouped = false, projectLocality = LOCAL] => [multiply:bigint]
                    multiply := (orderkey) * (BIGINT'2') (1:107)
                    LAYOUT: tpch.orders{}
                    orderkey := orderkey:bigint:0:REGULAR (1:122)
Fragment 3 [SOURCE]
    Output layout: [rows_48, fragments_49, commitcontext_50]
    Output partitioning: ROUND_ROBIN []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - TableWriter[PlanNodeId 558] => [rows_48:bigint, fragments_49:varbinary, commitcontext_50:varbinary]
            orderkey := multiply_38 (1:77)
            Statistics collected: 0
        - LocalExchange[PlanNodeId 623][ROUND_ROBIN] () => [multiply_38:bigint]
            - ScanProject[PlanNodeId 3,345][table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=orders, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.orders{}]'}, grouped = false, projectLocality = LOCAL] => [multiply_38:bigint]
                    multiply_38 := (orderkey_2) * (BIGINT'2') (1:107)
                    LAYOUT: tpch.orders{}
                    orderkey_2 := orderkey:bigint:0:REGULAR (1:166)
Fragment 4 [SOURCE]
    Output layout: [rows_42, fragments_43, commitcontext_44]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - TableWriter[PlanNodeId 551] => [rows_42:bigint, fragments_43:varbinary, commitcontext_44:varbinary]
            orderkey := orderkey_20 (1:77)
            Statistics collected: 0
        - LocalExchange[PlanNodeId 624][ROUND_ROBIN] () => [orderkey_20:bigint]
            - TableScan[PlanNodeId 11][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=orders, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.orders{}]'}, grouped = false] => [orderkey_20:bigint]
                    LAYOUT: tpch.orders{}
                    orderkey_20 := orderkey:bigint:0:REGULAR (1:206)

Your Environment

Expected Behavior

All table writes are directly connected to the TableFinish stage

Current Behavior

There's an unnecessary round robin stage in between

Possible Solution

Run PushProjectionThroughUnion before SetFlatteningOptimizer

Steps to Reproduce

  1. EXPLAIN (TYPE DISTRIBUTED) ... on a query from the description
jaystarshot commented 8 months ago

This unnecessary exchange also happens with cte materialization sometimes