apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.16k stars 1.16k forks source link

Make repartitioning in `PhysicalPlan` output less confusing #9370

Open alamb opened 7 months ago

alamb commented 7 months ago

Is your feature request related to a problem or challenge?

The use of multiple RepartitionExec and CoalesceBatchesExec I think makes the explain plans in DataFusion hard to read. This causes users of DataFusion, especially new users, to ask about / wonder if they really need this and what it is doing (see discord thread, for example)

For example, consider this plan that is repartitioning the input to a HashJoin but that repartitioning requires three separate nodes

ProjectionExec: expr=[name@1 as schoolname, name@3 as teachername]
  CoalesceBatchesExec: target_batch_size=8192
    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, class_id@0)]
      CoalesceBatchesExec: target_batch_size=8192
        RepartitionExec: partitioning=Hash([id@0], 8), input_partitions=8
          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
            VirtualExecutionPlan
      CoalesceBatchesExec: target_batch_size=8192
        RepartitionExec: partitioning=Hash([class_id@0], 8), input_partitions=8
          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
            ProjectionExec: expr=[class_id@1 as class_id, name@2 as name]
              VirtualExecutionPlan

Describe the solution you'd like

Ideally I think the plan would look like this:

ProjectionExec: expr=[name@1 as schoolname, name@3 as teachername]
  CoalesceBatchesExec: target_batch_size=8192
    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, class_id@0)]
      RepartitionExec: partitioning=Hash([id@0], 8), input_partitions=8 <-- repartition
        VirtualExecutionPlan
      RepartitionExec: partitioning=Hash([class_id@0], 8), input_partitions=8
        ProjectionExec: expr=[class_id@1 as class_id, name@2 as name]
          VirtualExecutionPlan

Describe alternatives you've considered

I think we could do this in at least two steps:

  1. Combine the CoalesceBatchesExec into the RepartitionExec (and anything else that needs it)
  2. Using a single RepartitionExec -- I think this may require changes to the RepartitionExec to ensure that the inputs can be hashed in parallel

I think care needs to be taken to ensure that

Additional context

This came from a discussion in discord: https://discord.com/channels/885562378132000778/1206315256977035394/1212085214168490015

edmondop commented 7 months ago

Hello @alamb, I'd love to work on this

alamb commented 7 months ago

Hello @alamb, I'd love to work on this

Thanks @edmondop -- that would be great. I think this could get quite tricky if we are not careful so I would suggest taking it in phases.

Perhaps you can first try to remove CoalesceBatchesExec by refactoring its code into a struct like

struct BatchCoalscer {
  batches: Vec<RecordBatch> 
  target_batch_size: usize,
}

impl BatchCoalscer { 
  /// Buffers the specified record batch. If a more than `target_batch_size` rows are buffered, 
  /// clears the buffer and emits a RecordBatch with target_batch_size rows
 fn push(&mut self, batch: RecordBatch) -> Option<RecordBatch>{ .. }

 /// Completes this coalscer and emits any buffered rows
 fn finish(mut self) -> Option<RecordBatch> { ... }
}

And then using that struct directly in RepartitionExec and any other places that require CoalsceExec

ozankabak commented 7 months ago

We should definitely be careful when doing this, doing this gung-ho style has the potential to break many things. @mustafasrepo please watch any work on this carefully and discuss possible implications of proposals and guide any implementation efforts.

As a first step, can you please share your thoughts on @alamb's suggestion above? What are the possible implications of rolling batch-coalescing into a within-operator computation? One thing I can think of is that it turns any operator to a possibly delay introducing operator (where it wasn't otherwise), which could make DataFusion plans very hard to reason about in certain contexts. So maybe it is not a good idea after all -- but then maybe it is. We should be very careful in any case.

alamb commented 7 months ago

One thing I can think of is that it turns any operator to a possibly delay introducing operator (where it wasn't otherwise),

This is a good point. So if we introduced coalscing within the operator it may result in buffering that is not obvious from the plan. For streaming use cases this could be a substantial problem so perhaps we can not remove CoalesceBatchesExec

I think it would still be ok to extract BatchCoalscer as suggested by refactoring (not yet removing) the code in CoalesceBatchesStream, but maybe that wouldn't really improve things: https://github.com/apache/arrow-datafusion/blob/e62240969135e2236d100c8c0c01546a87950a80/datafusion/physical-plan/src/coalesce_batches.rs#L176-L280

mustafasrepo commented 7 months ago

I think, combining consecutive


RepartitionExec(hash) 
--RepartitionExec(round robin) 

into


RepartitionExec(hash) (where inputs hashed in parallel)

produces much more readable plans. And I presume it would be better in terms of execution speed. Hence we should have this support. However, combining CoalesceBatchesExec into RepartitionExec may introduce complications as suggested by @ozankabak . I think, one of the causes of this problem is that CoalesceBatches rule is to naive in its current state.

By refactoring CoalesceBatches rule, we can produce better plans as is. As an example, CoalesceBatchesExec in the build side of the HashJoinExec is unnecesary (given that HashJoinExec already buffers up all data at its input) in the following plan.

ProjectionExec: expr=[name@1 as schoolname, name@3 as teachername]
  CoalesceBatchesExec: target_batch_size=8192
    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, class_id@0)]
      CoalesceBatchesExec: target_batch_size=8192
        RepartitionExec: partitioning=Hash([id@0], 8), input_partitions=8
          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
            VirtualExecutionPlan
      CoalesceBatchesExec: target_batch_size=8192
        RepartitionExec: partitioning=Hash([class_id@0], 8), input_partitions=8
          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
            ProjectionExec: expr=[class_id@1 as class_id, name@2 as name]
              VirtualExecutionPlan

However, this kind of analysis may require new APIs. We can also benefit from estimated selectivity in the Statistics for the CoalesceBatchesExec insertion decision.

Hence, I propose as a first step we should first do 2nd step in the @alamb s suggestion. In the meantime, I will explore how can we refactor CoalesceBatches rule for better plans. I presume these 2 steps together would be sufficient for most of the use cases.

edmondop commented 7 months ago

@mustafasrepo just to confirm, you think the first step would be to have a single RepartitionExec in the section of the query plan where we have two, is that right?

        RepartitionExec: partitioning=Hash([class_id@0], 8), input_partitions=8
          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
ozankabak commented 7 months ago

Yes, we can have three modes of partitioning: Hash, RoundRobin and a new RoundRobinHash which does both jointly. The RR + Hash cascade does hashing in parallel, so you would need to be careful with the implementation to make sure you don't lose parallelism, but other than this it should be relatively straightforward.

mustafasrepo commented 7 months ago

@mustafasrepo just to confirm, you think the first step would be to have a single RepartitionExec in the section of the query plan where we have two, is that right?

        RepartitionExec: partitioning=Hash([class_id@0], 8), input_partitions=8
          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1

Exactly.

alamb commented 7 months ago

@edmondop asked me in slack, but I am posting here to make sure we are all on the same page. I believe the suggestion is:

  1. Change the execute method so it also supports the third method described by @ozankabak in https://github.com/apache/arrow-datafusion/issues/9370#issuecomment-1981827316
  2. Change the planner so that instead of making two RepartitionExec it makes a single one (that uses the third-method) as explained by https://github.com/apache/arrow-datafusion/issues/9370#issuecomment-1982593783
alamb commented 7 months ago

@mustafasrepo also had some good thoughts from discord which I am copying here https://discord.com/channels/885562378132000778/1216380737410826270/1216684364805570560

"

I think, best approach would be Refactoring RepartitionExec such that it can parallelize hashing, when its input partition number is 1.

The point of

RepartitionExec(hash n_in=8, n_out=8)
--RepartitionExec(round robin n_in=1, n_out=8)

is to parallelizes hashing. Otherwise it is functionally equivalent to

RepartitionExec(hash n_in=1, n_out=8)

For this reason, if we can parallelize hashing algorithm when input partition number is 1. That would enable us to use plan above without worry. I think, we primarily need to change implementation of RepartitionExec. Once RepartitionExec has this capacity, we need to update EnforceDistribution rule to generate plan Repartition(Hash n_in=1, n_out=8) instead of Repartition(hash n_in=8, n_out=8), Repartition(round robin n_in=1, n_out=8) stack. However, this second stage is quite trivial. I am familiar with that part of the code.

"

edmondop commented 7 months ago

Kept doing some investigation but hit a roadblock. Is the change required within RepartitionExec or is in the AsExecutionPlan implementation for PhysicalPlanNode? I could find the three implementation of the Partitioning enum, but I couldn't find the number of input partitions

edmondop commented 6 months ago

I looked a little bit deeper, this is what I understood so far.

RepartitionExec pull_from_input is async, but different partitions are processed sequentially, with a for loop https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/repartition/mod.rs#L788-L803 and internally it uses all sequential code, both to create the indices and to taking from the batch
https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/repartition/mod.rs#L274-L788 according to the indexes.

@mustafasrepo :

mustafasrepo commented 6 months ago

As you say RepartitionExec::pull_from_input is async. However, it opens a work for each input partition. Hence, when the plan contains RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=1. It will open a single job to consume from input. Then will calculate hash values for each row, then will row to appropriate output partition according to its hash value. Since hash calculation may be intensive doing this in single partition may be suboptimal. Hence instead of RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=1 datafusion plans contain

RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=8
--RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1

where RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 consumes data from input (with a single job also, However, in this case there is no computation and works pretty fast.) Then re-routes them to output partitions. Then RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=8 opens 8 asyn pull_from_input jobs for hash calculation. What we like to have is accomplishing this parallelism in RepartitionExec: partitioning=Hash([exprs, ...], 8), input_partitions=1 (where input partition is 1.)

I think one possible way to support this feature will be, adding a method to the RepartitionExec which

edmondop commented 5 months ago

@mustafasrepo when writing split_input_data I haven't found a way to apply round robin without consuming the stream, is this expected?

mustafasrepo commented 5 months ago

@mustafasrepo when writing split_input_data I haven't found a way to apply round robin without consuming the stream, is this expected?

totally expected (at least to me)

edmondop commented 5 months ago

Still work in progress, @mustafasrepo you recommended the split to return a Vec<SendableStream>, however once I have a RecordBatch I got lost a bit. Should that part be implemented using a RepartitionStream too?

mustafasrepo commented 5 months ago

Still work in progress, @mustafasrepo you recommended the split to return a Vec<SendableStream>, however once I have a RecordBatch I got lost a bit. Should that part be implemented using a RepartitionStream too?

I think so, PerPartitionStream might work also. I think we need to consume input: Arc<dyn ExecutiionPlan>, which generates RecordBatch. Then these RecordBatches should be fed to the output channels, (as in pull_from_input method). Then once channels are fed with RecordBatches we can construct SendableRecordBatchStreams from them using either RepartitionStream or PerPartitionStream. I think the best way to proceed is to write a method (this method can assume input partition is always 1)

async fn pull_from_input_helper(
        input: Arc<dyn ExecutionPlan>,
        partition: usize,
        mut output_channels: HashMap<
            usize,
            (DistributionSender<MaybeBatch>, SharedMemoryReservation),
        >,
        metrics: RepartitionMetrics,
        context: Arc<TaskContext>,
    ) -> Result<()> 

similar to fn pull_from_input where partitioning is always roundrobin. And writing another method

fn generate_streams(necessary_data, receivers, context, etc) -> Vec<SendableRecordBatchStream>

I will try to experiment with these in my spare time (If you have a branch you already work on we can collaborate on that branch if that is OK for you).

edmondop commented 5 months ago

Still work in progress, @mustafasrepo you recommended the split to return a Vec<SendableStream>, however once I have a RecordBatch I got lost a bit. Should that part be implemented using a RepartitionStream too?

I think so, PerPartitionStream might work also. I think we need to consume input: Arc<dyn ExecutiionPlan>, which generates RecordBatch. Then these RecordBatches should be fed to the output channels, (as in pull_from_input method). Then once channels are fed with RecordBatches we can construct SendableRecordBatchStreams from them using either RepartitionStream or PerPartitionStream. I think the best way to proceed is to write a method (this method can assume input partition is always 1)

async fn pull_from_input_helper(
        input: Arc<dyn ExecutionPlan>,
        partition: usize,
        mut output_channels: HashMap<
            usize,
            (DistributionSender<MaybeBatch>, SharedMemoryReservation),
        >,
        metrics: RepartitionMetrics,
        context: Arc<TaskContext>,
    ) -> Result<()> 

similar to fn pull_from_input where partitioning is always roundrobin. And writing another method

fn generate_streams(necessary_data, receivers, context, etc) -> Vec<SendableRecordBatchStream>

I will try to experiment with these in my spare time (If you have a branch you already work on we can collaborate on that branch if that is OK for you).

There is a draft PR linked to the PR, but I have missed the part of the output_channels, I will do some more work and ping you again

edmondop commented 4 months ago

I spent a bit of extra time on this and I have some thoughts worth sharing.

pull_from_input is the task that pulls the record and send them to the ouput channels. However, it uses a mutable batch partitioner that computes the hash and publish results, see https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/repartition/mod.rs#L766. Therefore, we cannot at the moment call partition_iter in parallel on the same input. We have two strategies:

  1. Refactor the BatchPartitioner so to we can invoke it in parallel (i.e. maybe using locking? haven't explored this much)
  2. Repartition the input that we feed in pull_from_input.

Our current setup does (2) but it does by creating multiple nodes in the plan, which is confusing. It is possible to create an internal RepartitionExec when the partitioning is Hash partitioning and the input partition is one, if we want to simplify the plan. I am not sure how much refactor is possible though, would appreciate help / thoughts

mustafasrepo commented 4 months ago

I think, both of the suggestion would work. However, second approach where we bury Internal RepartitionExec might be a bit confusing given existing code already complex enough. I think, first approach would be better and neater.