apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.61k stars 1.05k forks source link

Default datafusion.optimizer.prefer_existing_sort to true #8572

Closed tustvold closed 7 months ago

tustvold commented 7 months ago

Is your feature request related to a problem or challenge?

Whilst working on #8540 I was surprised to see removing unbounded causing the DataFusion optimizer to not remove the SortExec from the below plan:

    "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
    "  SortExec: expr=[a@0 ASC NULLS LAST]",
    "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
    "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",

Doing some spelunking this appears to be a regression introduced by https://github.com/apache/arrow-datafusion/pull/7671#discussion_r1341636365

Describe the solution you'd like

I can't see an obvious reason to not enable this by default, as it seems like the more reasonable default, and also consistent with how I historically remember DataFusion behaving

Describe alternatives you've considered

No response

Additional context

FYI @alamb @ozankabak

alamb commented 7 months ago

I believe it was a deliberate change. from the comment you reference https://github.com/apache/arrow-datafusion/pull/7671#discussion_r1341636365

For some detailed context: The flag basically lets the user choose whether they want SortPreservingMerges, or Repartition/Coalesce+Sort cascades. We ran some benchmarks and there is no clearly dominating strategy, each alternative comes out ahead in certain cases. In non-streaming cases, the first alternative typically came out ahead, so we let the default flag value to be false.

In IOx it is better for our usecase to use pre-existing sort orders, but I can see how for other uses cases it may not be.

If there is a consensus that changing the default setting would be less surprising, it would be fine with me to change it.

mustafasrepo commented 7 months ago

At the time of the original PR, we did a benchmark for comparison with following results.

PLAN V1

"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
"  SortPreservingRepartitionExec: partitioning=Hash([b@0], 16), input_partitions=16",
"    RepartitionExec: partitioning=Hash([a@0], 16), input_partitions=1",
"      MemoryExec: partitions=1, partition_sizes=[<depends_on_batch_size>], output_ordering: [PhysicalSortExpr { expr: Column { name: \\"a\\", index: 0 }, options: SortOptions { descending: false, nulls_first: false } }]",

PLAN V2

"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
"  SortExec: expr=[a@0 ASC NULLS LAST]",
"    RepartitionExec: partitioning=Hash([b@0], 16), input_partitions=16",
"      RepartitionExec: partitioning=Hash([a@0], 16), input_partitions=1",
"        MemoryExec: partitions=1, partition_sizes=[(<depends_on_batch_size>)], output_ordering: [PhysicalSortExpr { expr: Column { name: \\"a\\", index: 0 }, options: SortOptions { descending: false, nulls_first: false } }]",
n_elem n_trial batch_size elapsed_mean(v1) elapsed_median(v1) elapsed_mean(v2) elapsed_median(v2)
100000 10 25 581.791054ms 581.777708ms 1.112292862s 1.117181625s
100000 10 50 373.18927ms 373.231708ms 385.533966ms 385.685291ms
100000 10 100 240.91572ms 240.570833ms 239.308708ms 239.303709ms
100000 10 1000 115.471541ms 113.817709ms 100.713324ms 100.647333ms

After these results, we decided to use existing default behavior (where V2 is preferred over V1) to not hurt performance for others. Frankly, changing default would be better for our use cases. As @alamb suggests if there is concencus we can change the default. I wonder @ozankabak's opinion regarding this change.

ozankabak commented 7 months ago

I believe it was a deliberate change. from the comment you reference #7671 (comment)

Right.

If there is a consensus that changing the default setting would be less surprising, it would be fine with me to change it.

Focusing on our use cases, we would be fine with changing the default (it fits much better to our use cases). However, it may result in a somewhat noticeable OOTB performance hit for some groups of users that use (or will use) DF for batch compute jobs. It may also hurt OOTB performance in certain batch benchmarks.

I think it'd be a good idea to try to discuss this with a wider audience to better understand the implications.

tustvold commented 7 months ago

Perhaps I am missing something, but in the example plan in the issue, setting this option to true causes the optimiser to remove the SortExec, with no modification to the rest of the plan. I struggle to see how this would lead to a performance regression, and by extension why this would not be the default behaviour? Perhaps the setting is overly restrictive on the optimizer?

mustafasrepo commented 7 months ago

Actually, it sets preserve_order, flag to true for the second repartition in the plan (What I call SortPreservingRepartitionExec in Plan V1). In this mode, during repartitioning streaming_merge helper is used. Hence this mode preserves input ordering during repartitioning. This decreases speed as compared to the direct repartitioning (Similar to the CoalescePartitionsExec and SortPreservingMergeExec)

tustvold commented 7 months ago

But it only has one input partition, why does it need a streaming merge to do order preserving repartition?

mustafasrepo commented 7 months ago

By second I mean from bottom to top. In your original example

    "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
    "  SortExec: expr=[a@0 ASC NULLS LAST]",
    "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
    "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",

RepartitionExec immediately below the SortExec has input partition 8. Other plan would be

    "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
    "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true",
    "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "      CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",

In this case streaming_merge will be used in the internal of second RepartitionExec

tustvold commented 7 months ago

Aah makes sense. 👍

Tbh in that case I'd hope DF would strip all the repartitioning and merges out, they're all unnecessary and will just make it slower, but I suspect that's a different issue.

Thanks for the discussion, closing this for now