apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.34k stars 1.2k forks source link

try to add local limit after the optimizer rule `join_selection` #10170

Open liukun4515 opened 7 months ago

liukun4515 commented 7 months ago

Is your feature request related to a problem or challenge?

In our cases, we have left_table(multi file group) and right_table(single file group), and we write the sql

select * from left_table join right_table on left_table.key = right_table.key

The final physical plan is not the optimal.

The init phy plan is below with auto join mode. The children of the join will be swapped by the rule join_selection

    GlobalLimitExec: skip=0, fetch=10
      ProjectionExec: ....
          HashJoinExec: mode=**auto**, join_type=Inner, on=....
            ProjectionExec: left_table
              ParquetExec: file_groups={37 group: ....}
            ProjectionExec: right_table
                ParquetExec: file_groups={1 groups: ....}

after the physical optimizer, the most import rule is join_selection The final plan is like:

....
    GlobalLimitExec: skip=0, fetch=10
      ProjectionExec: ....
          HashJoinExec: **mode=CollectLeft,** join_type=Inner, on=....
            ProjectionExec: right_table
              ParquetExec: file_groups={1 group: ....}
            ProjectionExec: left_table
                ParquetExec: file_groups={37 groups: ....}

The optimal plan should be added a LocalLimitExec behind GlobalLimitExec

Describe the solution you'd like

The optimal plan should be like

....
    GlobalLimitExec: skip=0, fetch=10
       LocalLimitExec:....
      ProjectionExec: ....
          HashJoinExec: **mode=CollectLeft,** join_type=Inner, on=....
            ProjectionExec: right_table
              ParquetExec: file_groups={1 group: ....}
            ProjectionExec: left_table
                ParquetExec: file_groups={37 groups: ....}

Describe alternatives you've considered

No response

Additional context

No response

liukun4515 commented 7 months ago

In the current logic/code, we create the local limit exec in the init physical phase https://github.com/apache/datafusion/blob/70db5eab8996af4816958f798f6ee887dffb69ed/datafusion/core/src/physical_planner.rs#L1134.

Whether to add local limit is determined by the partition number of the child node.

When we meet the above corner case, it will not add local limit exec to the physical plan.

liukun4515 commented 7 months ago

I am trying to find a good solution to resolve this issue

Lordworms commented 7 months ago

maybe related https://github.com/apache/datafusion/issues/9792