dask / dask-expr

BSD 3-Clause "New" or "Revised" License
79 stars 18 forks source link

Fix delayed in fusing with multipled dependencies #1038

Closed phofl closed 2 months ago

phofl commented 2 months ago

xref https://github.com/dask/dask/issues/11067

jtilly commented 2 months ago

Thank you for the PR!

I just wanted to flag that this PR doesn't (yet) change the behavior for the example that I provided in https://github.com/dask/dask/issues/11067:

(dask) ➜  ~/dask (main) ✗ pip install git+https://github.com/phofl/dask-expr@11067
...
(dask) ➜  ~/dask (main) ✗ python example.py                                             
Loading chunk 9
Loading chunk 8
Loading chunk 7
Loading chunk 6
Loading chunk 5
Loading chunk 4
Loading chunk 3
Loading chunk 2
Loading chunk 1
Loading chunk 0
Storing chunk 9
Storing chunk 8
Storing chunk 7
Storing chunk 6
Storing chunk 5
Storing chunk 4
Storing chunk 3
Storing chunk 2
Storing chunk 1
Storing chunk 0
(dask) ➜  ~/dask (main) ✗ DASK_DATAFRAME__QUERY_PLANNING=False python example.py                     
Loading chunk 9
Storing chunk 9
Loading chunk 8
Storing chunk 8
Loading chunk 7
Storing chunk 7
Loading chunk 6
Storing chunk 6
Loading chunk 5
Storing chunk 5
Loading chunk 4
Storing chunk 4
Loading chunk 3
Storing chunk 3
Loading chunk 2
Storing chunk 2
Loading chunk 1
Storing chunk 1
Loading chunk 0
Storing chunk 0
phofl commented 2 months ago

Yes we still have some overhead. It works mostly as expected if your delayed function runs longer (e.g. increase the size of your DataFrame), but this will still need a follow up on our end since the behaviour isn't as intended yet.

On a side note: You might want to use from_map if your delayed function is indeed only loading data from disk, that is better suited for your task anyway and doesn't suffer from this problem.