apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.48k stars 1.01k forks source link

manual repartitioning overridden by physical optimizer #3629

Open kmitchener opened 1 year ago

kmitchener commented 1 year ago

Describe the bug

Using dataframe.repartition() function doesn't work as expected.

To Reproduce Using the tpch bin from benchmarks, convert the .tbl (csv) files to Parquet format using the "partitions" option:

cargo run --release --bin tpch -- convert --input ./tpch-data --output ./tpch-data-parquet --format parquet --partitions 4

That should have produced 4 parquet files per table, but instead created 20 (this laptop has 20 cores).

Expected behavior

Expected it to produce the specified number of partitions (4 in this case).

Additional context I added some debug and the physical plan produced is:

CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=RoundRobinBatch(20)
    RepartitionExec: partitioning=RoundRobinBatch(4)
      RepartitionExec: partitioning=RoundRobinBatch(20)
        CsvExec: files=[home/kmitchener/dev/arrow-datafusion/benchmarks/tpch-data/region.tbl], has_header=false, limit=None, projection=[r_regionkey, r_name, r_comment]

The tpch bin repartitions the file using this bit of code:

        // optionally, repartition the file
        if opt.partitions > 1 {
            csv = csv.repartition(Partitioning::RoundRobinBatch(opt.partitions))?
        }
Dandandan commented 1 year ago

I am not sure if it's a bug in the planner.

After repartition a write is used - between those the planner is free to add changes as the write doesn't require any input partition - in fact, repartitioning (to 20) might improve the write performance (although now there are some unnecessary repartitions).

There are a couple of options I think to solve this:

kmitchener commented 1 year ago

In this particular case of the tpch binary, if you leave the default partitions setting at 1, the physical plan is just the CsvExec node and it results in 1 file. (it's coming from 1 CSV file, so output partitions is just 1, resulting in a single file being written by write_parquet())

Based on this usage of Dataframe.repartition() and Andy's suggestion to use repartition() in this same way in the Slack channel, I think that round-robin splitting any input into N partitions is the desired use of repartition(). It just doesn't actually work because the physical optimizer adds RepartitionExec nodes after it to undo the work.

  RepartitionExec: partitioning=RoundRobinBatch(20)
    RepartitionExec: partitioning=RoundRobinBatch(4)
      RepartitionExec: partitioning=RoundRobinBatch(20)

I was thinking to fix it by adding some logic to the Repartitioning bit of the physical optimizer so that it won't add 2 repartitions in a row.

Dandandan commented 1 year ago

Yeah so the current optimization pass will add the RepartitionExec whenever the target_partitions is bigger than the incoming partitions - that definitely is something to improve upon. Plus your observation that no RepartitionExec is added when you don't add a repartition.

I would argue a more sensical final result should look like this (regardless of adding a repartition but based on your cores or configured target_partitions ):

  RepartitionExec: partitioning=RoundRobinBatch(20)
    CsvExec ...

Because no part of the query requires to have exactly 4 partitions available, so the optimizer is free to increase / descrease it where it sees fit.

kmitchener commented 1 year ago

So the use case I'm thinking of is something like this:

In this case, we want to leave target_partitions to default so it uses all cores for processing the transformations, but I want to control the number of parquet files being written. If I used a df.repartition() right before df.write_parquet(), it would work fine in this case, if the optimizer wasn't messing with it. (I actually can't think of any other use case for repartition() -- if it's not for this, what is it for?)

Dandandan commented 1 year ago

I need some more time to write the ideas up - but the gist is that I indeed believe at some point repartition as a method on dataframes should be deprecated in favor of adding ways to control the partitioning/file number in the writers instead.

kmitchener commented 1 year ago

I'm not in any rush and your idea sounds right to me. Also, if we can have more options for creating partitioned directory structures, that would be even better. Not sure how that API should look though.