apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.49k stars 190 forks source link

Support broadcast exchange #342

Open Dandandan opened 1 year ago

Dandandan commented 1 year ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do. Broadcasting partitions helps for when joins on the build side are small. In that case we can transform partitioned joins to broadcast joins.

Describe the solution you'd like We should support broadcasts in the physical plan.

Broadcasting means copying the entire dataset to each worker.

This could be used in broadcast joins, i.e. by broadcasting smaller dataframes to every worker, which can provide big speedups as the other (big) side doesn't have to be shuffled.

Describe alternatives you've considered

Additional context

Probably we can reuse some heuristics from Spark for conditions when to perform broadcasting for joins.

mingmwang commented 1 year ago

One quick question regarding this, after those dataset are copied to each executor, should they kept in-memory or spilled to disk, if keep them in memory for a while, memory usage might be a concern.

Dandandan commented 1 year ago

@mingmwang I think for broadcasting exchange the same thing applies as normal exchanges, they are spilled to disk by default and might be maintained in memory if memory budget allows. I believe Ballista doesn't support the latter yet(?). A limit may be chosen, like 100MB, so it will fit in memory most often, but even when written to disk for joins it might give impressive speedups as the other side of the join could be way larger.

mingmwang commented 1 year ago

@Dandandan Sounds nice. There are couple of things need to do to support the broadcasting exchange.

  1. Rpc protocols to efficiently do broadcasting, something similar to Spark's TorrentBroadcast
  2. Executor memory management as you just mentioned.
  3. Broadcast lifecycle management, after SQL finish, those broadcasts should be cleaned.
  4. Planner need more accurate stats to choose broadcast join or partitioned hash join or even SortMergeJoin, build side selection etc. Need to enhance the stats collection.
  5. Broadcast exchange reuse within SQL?
  6. Broadcast reuse between SQLs ?
mingmwang commented 1 year ago

Today, For partitioned hash join, DataFusion already support CollectLeft model, I think it is similar to the Broadcast HashJoin. I do not get a chance to test it on Ballista yet, but I think it should work in the distribution model. The downside is the Left side might cause lots of duplicate re-computations.


  fn required_input_distribution(&self) -> Vec<Distribution> {
        match self.mode {
            PartitionMode::CollectLeft => vec![
                Distribution::SinglePartition,
                Distribution::UnspecifiedDistribution,
            ],
            PartitionMode::Partitioned => {
                let (left_expr, right_expr) = self
                    .on
                    .iter()
                    .map(|(l, r)| {
                        (
                            Arc::new(l.clone()) as Arc<dyn PhysicalExpr>,
                            Arc::new(r.clone()) as Arc<dyn PhysicalExpr>,
                        )
                    })
                    .unzip();
                vec![
                    Distribution::HashPartitioned(left_expr),
                    Distribution::HashPartitioned(right_expr),
                ]
            }
        }
    }
Dandandan commented 1 year ago

That's a good observation @mingmwang ! The difference with CollectLeft is that that mode collects the left side to one partition, whereas with broadcast we would broadcast the output of the left side to each worker.

Indeed, I think the trade off is that doing a bit more on the left side (i.e. building the hash table in each worker) we save the work on the right side (shuffle).

Dandandan commented 1 year ago

I added some details for implementing a broadcast join optimization rule here: https://github.com/apache/arrow-ballista/issues/348 .