Eventual-Inc / Daft

Distributed data engine for Python/SQL designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.39k stars 170 forks source link

[FEAT] Pre Shuffle Merge Strategy #3191

Closed colin-ho closed 2 weeks ago

colin-ho commented 3 weeks ago

Enables an experimental pre-shuffle merge strategy for shuffles.

Background

Our traditional shuffle strategy is a fully materializing map-reduce. Each of the M map partition tasks ends in a fanout, producing N output partitions, and then N reduce partition tasks are ran to merge M outputs each.

Algorithm

The pre-shuffle merge strategy works by merging P map partition tasks, before doing a fanout. The N reduce partition tasks will now only have to merge M/P outputs. The benefit here is that we can reduce the total number of intermediate objects by a factor of P, i.e. from M * N to M / P * N.

P can be decided dynamically or statically. I chose to go with the dynamic route, together with a memory threshold of 1gb to cap the maximum merging size. Essentially, the pre-shuffle merge algorithm in this PR attempts to greedily merge map outputs as they are ready, as long as the combined size of the merged partitions is less than 1gb.

Tests

1000 x 1000 shuffle of 100mb partitions,

2000 x 2000 shuffle of 100mb partitions

Todos

codspeed-hq[bot] commented 3 weeks ago

CodSpeed Performance Report

Merging #3191 will degrade performances by 33.51%

Comparing colin/preshufflemerge (ab364ac) with main (5115c32)

Summary

❌ 2 regressions ✅ 15 untouched benchmarks

:warning: Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main colin/preshufflemerge Change
test_iter_rows_first_row[100 Small Files] 260.7 ms 392 ms -33.51%
test_show[100 Small Files] 42.4 ms 50.7 ms -16.45%
codecov[bot] commented 3 weeks ago

Codecov Report

Attention: Patch coverage is 74.03846% with 54 lines in your changes missing coverage. Please review.

Project coverage is 77.69%. Comparing base (5115c32) to head (ab364ac). Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-physical-plan/src/ops/shuffle_exchange.rs 46.77% 33 Missing :warning:
src/common/daft-config/src/python.rs 52.63% 9 Missing :warning:
...n/src/optimization/rules/reorder_partition_keys.rs 45.45% 6 Missing :warning:
src/daft-scheduler/src/scheduler.rs 83.78% 6 Missing :warning:
Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191/graphs/tree.svg?width=650&height=150&src=pr&token=J430QVFE89&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc)](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc) ```diff @@ Coverage Diff @@ ## main #3191 +/- ## ========================================== + Coverage 77.65% 77.69% +0.04% ========================================== Files 665 666 +1 Lines 80960 81129 +169 ========================================== + Hits 62867 63031 +164 - Misses 18093 18098 +5 ``` | [Files with missing lines](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc) | Coverage Δ | | |---|---|---| | [daft/context.py](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191?src=pr&el=tree&filepath=daft%2Fcontext.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-ZGFmdC9jb250ZXh0LnB5) | `90.85% <ø> (ø)` | | | [daft/execution/shuffles/pre\_shuffle\_merge.py](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191?src=pr&el=tree&filepath=daft%2Fexecution%2Fshuffles%2Fpre_shuffle_merge.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-ZGFmdC9leGVjdXRpb24vc2h1ZmZsZXMvcHJlX3NodWZmbGVfbWVyZ2UucHk=) | `100.00% <100.00%> (ø)` | | | [src/common/daft-config/src/lib.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191?src=pr&el=tree&filepath=src%2Fcommon%2Fdaft-config%2Fsrc%2Flib.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2NvbW1vbi9kYWZ0LWNvbmZpZy9zcmMvbGliLnJz) | `80.00% <100.00%> (+0.63%)` | :arrow_up: | | [...al-plan/src/optimization/rules/drop\_repartition.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191?src=pr&el=tree&filepath=src%2Fdaft-physical-plan%2Fsrc%2Foptimization%2Frules%2Fdrop_repartition.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2RhZnQtcGh5c2ljYWwtcGxhbi9zcmMvb3B0aW1pemF0aW9uL3J1bGVzL2Ryb3BfcmVwYXJ0aXRpb24ucnM=) | `100.00% <100.00%> (ø)` | | | [...ft-physical-plan/src/physical\_planner/translate.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191?src=pr&el=tree&filepath=src%2Fdaft-physical-plan%2Fsrc%2Fphysical_planner%2Ftranslate.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2RhZnQtcGh5c2ljYWwtcGxhbi9zcmMvcGh5c2ljYWxfcGxhbm5lci90cmFuc2xhdGUucnM=) | `95.33% <100.00%> (+0.06%)` | :arrow_up: | | [...n/src/optimization/rules/reorder\_partition\_keys.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191?src=pr&el=tree&filepath=src%2Fdaft-physical-plan%2Fsrc%2Foptimization%2Frules%2Freorder_partition_keys.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2RhZnQtcGh5c2ljYWwtcGxhbi9zcmMvb3B0aW1pemF0aW9uL3J1bGVzL3Jlb3JkZXJfcGFydGl0aW9uX2tleXMucnM=) | `77.36% <45.45%> (-2.31%)` | :arrow_down: | | [src/daft-scheduler/src/scheduler.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191?src=pr&el=tree&filepath=src%2Fdaft-scheduler%2Fsrc%2Fscheduler.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2RhZnQtc2NoZWR1bGVyL3NyYy9zY2hlZHVsZXIucnM=) | `92.67% <83.78%> (-0.51%)` | :arrow_down: | | [src/common/daft-config/src/python.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191?src=pr&el=tree&filepath=src%2Fcommon%2Fdaft-config%2Fsrc%2Fpython.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2NvbW1vbi9kYWZ0LWNvbmZpZy9zcmMvcHl0aG9uLnJz) | `75.84% <52.63%> (-2.35%)` | :arrow_down: | | [src/daft-physical-plan/src/ops/shuffle\_exchange.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191?src=pr&el=tree&filepath=src%2Fdaft-physical-plan%2Fsrc%2Fops%2Fshuffle_exchange.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2RhZnQtcGh5c2ljYWwtcGxhbi9zcmMvb3BzL3NodWZmbGVfZXhjaGFuZ2UucnM=) | `50.00% <46.77%> (-2.18%)` | :arrow_down: | ... and [3 files with indirect coverage changes](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3191/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc)