apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.15k stars 1.16k forks source link

Avoid recompute CTEs (common table expressions) / share input plans #8777

Open alamb opened 9 months ago

alamb commented 9 months ago

Is your feature request related to a problem or challenge?

The core usecase is:

with x as (
    <expensive full join of two large tables producing small-ish result>
)

select * from x where ...
union all
select * from x where ...
union all
select * from x where ...

DataFusion will effectively run the subquery x three times (it will basically copy the LogicalPlan for x wherever it is used.

                                       ┌─────────────────────┐                                     
                                       │      UNION ALL      │                                     
                                       │                     │                                     
                                       └─────────────────────┘                                     
                                             ▲    ▲   ▲                                            
                                             │    │   │                                            
               ┌─────────────────────────────┘    │   └─────────────────────────────┐              
               │                                  │                                 │              
               │                                  │                                 │              
      ┌────────────────┐                 ┌────────────────┐                ┌────────────────┐      
      │    Filter 1    │                 │    Filter 2    │                │    Filter 3    │      
      └────────────────┘                 └────────────────┘                └────────────────┘      
               ▲                                  ▲                                 ▲              
               │                                  │                                 │              
               │                                  │                                 │              
 ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐      ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐     ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
      ┌────────────────┐                 ┌────────────────┐                ┌────────────────┐      
 │    │ Expensive Join │     │      │    │ Expensive Join │     │     │    │ Expensive Join │     │
      └────────────────┘                 └────────────────┘                └────────────────┘      
 │             ▲             │      │             ▲             │     │             ▲             │
        ┌──────┴──────┐                    ┌──────┴──────┐                   ┌──────┴──────┐       
 │      │             │      │      │      │             │      │     │      │             │      │
    .───────.     .───────.            .───────.     .───────.           .───────.     .───────.   
 │ ╱         ╲   ╱         ╲ │      │ ╱         ╲   ╱         ╲ │     │ ╱         ╲   ╱         ╲ │
  (  Input 1  ) (  Input 2  )        (  Input 1  ) (  Input 2  )       (  Input 1  ) (  Input 2  ) 
 │ `.       ,'   `.       ,' │      │ `.       ,'   `.       ,' │     │ `.       ,'   `.       ,' │
     `─────'       `─────'              `─────'       `─────'             `─────'       `─────'    
 │"x"                        │      │"x"                        │     │"x"                        │
  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─        ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─       ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ 

This design has certain benefits:

  1. It is straightforward to implement (probably why DataFusion is like this)
  2. If the different UNION ALL arms have different predicates, they could potentially be pushed down in one branch but not the others.

Describe the solution you'd like

However, in many cases it would likely be better to do to the expensive join only once and reuse the results like this:

                     ┌─────────────────────┐                     
                     │      UNION ALL      │                     
                     │                     │                     
                     └─────────────────────┘                     
                           ▲    ▲   ▲                            
                           │    │   │                            
         ┌─────────────────┘    │   └───────────────────┐        
         │                      │                       │        
         │                      │                       │        
         │                      │                       │        
┌────────────────┐     ┌────────────────┐      ┌────────────────┐
│    Filter 1    │     │    Filter 2    │      │    Filter 3    │
└────────────────┘     └────────────────┘      └────────────────┘
         ▲                      ▲                       ▲        
         │                      │                       │        
         │                      │                       │        
         └────────────────────┐ │ ┌─────────────────────┘        
                              │ │ │                              
                              │ │ │                              
                              │ │ │                              
                  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐                  
                       ┌────────────────┐                        
                  │    │ Expensive Join │     │                  
                       └────────────────┘                        
                  │             ▲             │                  
                         ┌──────┴──────┐                         
                  │      │             │      │                  
                     .───────.     .───────.                     
                  │ ╱         ╲   ╱         ╲ │                  
                   (  Input 1  ) (  Input 2  )                   
                  │ `.       ,'   `.       ,' │                  
                      `─────'       `─────'                      
                  │"x"                        │                  
                   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                   

Describe alternatives you've considered

I think there are several considerations for this design, the biggest is that it is a 'diamond' plan where the same stream can be consumed at different rates potentially needing to buffer the entire intermediate result or else the plan will deadlock

For example

                 ┌─────────────────────┐                     
                 │      Hash Join      │                     
                 │                     │                     
                 └─────────────────────┘                     
                       ▲          ▲                          
                       │          │                          
         ┌─────────────┘          └────────────┐             
         │                                     │             
         │                                     │             
         │                                     │             
    Build Side                            Probe Side         
 (read completely                (not read at all until Build
before probe side)                 Side is completely read)  
         ▲                                     ▲             
         │                                     │             
         │                                     │             
         └────────────────────┬────────────────┘             
                              │                              
                              │                              
                              │                              
                  ┌ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ┐              
                       ┌────────────────┐                    
                  │    │ Expensive Join │     │              
                       └────────────────┘                    
                  │             ▲             │              
                         ┌──────┴──────┐                     
                  │      │             │      │              
                     .───────.     .───────.                 
                  │ ╱         ╲   ╱         ╲ │              
                   (  Input 1  ) (  Input 2  )               
                  │ `.       ,'   `.       ,' │              
                      `─────'       `─────'                  
                  │"x"                        │              
                   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─               

Additional context

This came from a discord thread from @sergiimk

alamb commented 9 months ago

This also came up as potentially part of a solution for https://github.com/apache/arrow-datafusion/issues/8582

Dandandan commented 9 months ago

This is an interesting topic / improvement!

Two suggestions from my side

crepererum commented 9 months ago

where the same stream can be consumed at different rates potentially needing to buffer the entire intermediate result or else the plan will deadlock

I think this is very similar to our repartitioning code and the trade-offs and problems we see there. The reason is that a repartition is basically also as single input with mulitiple consumers. Just think of it like the same data but with a column "bool: belongs to this output" added.