Eventual-Inc / Daft

Distributed data engine for Python/SQL designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.35k stars 166 forks source link

[FEAT] Dynamically parallel local parquet reader #3310

Open colin-ho opened 6 days ago

colin-ho commented 6 days ago

Implement a dynamically parallel local streaming parquet reader.

Background

The current streaming local parquet reader, while fast and streaming, has some problems:

This leads to unnecessarily high memory usage, and it potentially starves downstream tasks.

Solution

Instead of launching all tasks at once, we can incrementally increase the number of parallel deserialization tasks, based on certain factors:

If read time is much longer than deserialization, don't bother spawning more tasks. Conversely, if deserialization takes much longer than reads, then spawn more to get better pipelineing. However, if the wait time to send data is also long, don't spawn more tasks.

This is implemented by a dynamically updated semaphore. The read tasks and compute tasks update the semaphore handle which controls how many permits to increase / or not increase. In order to spawn a new compute task, a semaphore permit must be acquired.

Results

Most glaringly, the benefits of these are in memory usage of streaming queries, for example:

next(daft.read_parquet("data/tpch-dbgen/1_0/1/parquet/lineitem").iter_partitions()) # read lineitem tpch sf1

The new implementation hits a peak of 300mb, while the old goes over 1gb.

Screenshot 2024-11-18 at 11 35 36 PM Screenshot 2024-11-18 at 11 36 15 PM

Another example, where we stream the entire file, but the consumption is slow:

for _ in daft.read_parquet("/Users/colinho/Desktop/Daft/z/daft_tpch_100g_32part_64RG.parquet").iter_partitions():
    time.sleep(0.1)

The new implementation hits a peak of 1.2gb, while the old goes over 3gb.

Screenshot 2024-11-18 at 11 42 01 PM Screenshot 2024-11-18 at 11 42 44 PM

To maintain perfomance parity, I also wrote some benchmarks for parquet files with differing rows / cols / row groups, the results show that the new implementation is pretty much on par, with some slight differences.

Screenshot 2024-11-18 at 11 29 30 PM Screenshot 2024-11-18 at 11 29 38 PM

On reading a tpch sf-1 lineitem table though: the results are pretty much the same: (~0.2s)

codspeed-hq[bot] commented 6 days ago

CodSpeed Performance Report

Merging #3310 will degrade performances by 14.55%

Comparing colin/dynamic-parquet (460b060) with main (274f300)

Summary

⚡ 1 improvements ❌ 1 regressions ✅ 15 untouched benchmarks

:warning: Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main colin/dynamic-parquet Change
test_iter_rows_first_row[100 Small Files] 378.4 ms 230.9 ms +63.88%
test_show[100 Small Files] 23.9 ms 28 ms -14.55%
codecov[bot] commented 6 days ago

Codecov Report

Attention: Patch coverage is 90.36545% with 29 lines in your changes missing coverage. Please review.

Project coverage is 77.43%. Comparing base (84db665) to head (460b060). Report is 10 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-parquet/src/stream_reader.rs 88.12% 26 Missing :warning:
src/daft-parquet/src/semaphore.rs 97.29% 2 Missing :warning:
src/daft-parquet/src/read.rs 0.00% 1 Missing :warning:
Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3310/graphs/tree.svg?width=650&height=150&src=pr&token=J430QVFE89&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc)](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3310?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc) ```diff @@ Coverage Diff @@ ## main #3310 +/- ## ========================================== - Coverage 77.55% 77.43% -0.13% ========================================== Files 668 677 +9 Lines 82268 82803 +535 ========================================== + Hits 63807 64115 +308 - Misses 18461 18688 +227 ``` | [Files with missing lines](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3310?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc) | Coverage Δ | | |---|---|---| | [src/common/runtime/src/lib.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3310?src=pr&el=tree&filepath=src%2Fcommon%2Fruntime%2Fsrc%2Flib.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2NvbW1vbi9ydW50aW1lL3NyYy9saWIucnM=) | `91.21% <100.00%> (+0.43%)` | :arrow_up: | | [src/daft-parquet/src/lib.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3310?src=pr&el=tree&filepath=src%2Fdaft-parquet%2Fsrc%2Flib.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2RhZnQtcGFycXVldC9zcmMvbGliLnJz) | `50.00% <ø> (ø)` | | | [src/daft-parquet/src/read.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3310?src=pr&el=tree&filepath=src%2Fdaft-parquet%2Fsrc%2Fread.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2RhZnQtcGFycXVldC9zcmMvcmVhZC5ycw==) | `75.27% <0.00%> (-0.08%)` | :arrow_down: | | [src/daft-parquet/src/semaphore.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3310?src=pr&el=tree&filepath=src%2Fdaft-parquet%2Fsrc%2Fsemaphore.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2RhZnQtcGFycXVldC9zcmMvc2VtYXBob3JlLnJz) | `97.29% <97.29%> (ø)` | | | [src/daft-parquet/src/stream\_reader.rs](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3310?src=pr&el=tree&filepath=src%2Fdaft-parquet%2Fsrc%2Fstream_reader.rs&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc#diff-c3JjL2RhZnQtcGFycXVldC9zcmMvc3RyZWFtX3JlYWRlci5ycw==) | `89.96% <88.12%> (+1.68%)` | :arrow_up: | ... and [14 files with indirect coverage changes](https://app.codecov.io/gh/Eventual-Inc/Daft/pull/3310/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Eventual-Inc)

🚨 Try these New Features: