dask / dask-expr

BSD 3-Clause "New" or "Revised" License
86 stars 26 forks source link

We can't properly deal with delayed inputs #777

Open phofl opened 10 months ago

phofl commented 10 months ago

Every argument could theoretically be a delayed object, similar to how every argument could be a dask-expr collection, we can't deal with this yet since we never check for them

I created a very naive implementation of a delayed expression _Delayed to capture some things, but we should think critically about it before moving ahead with this issue.

I suggest that we unpack delayed object similar to what we do with collections, in the constructor of Expr

delucchi-cmu commented 6 months ago

This introduced some big performance regressions in our framework that makes heavy use of @dask.delayed. We do some silly things like create 300k delayed objects so we have extreme control over joining dataframes from disparate sources.

We're getting around it for now by using the legacy dataframe directly.

phofl commented 6 months ago

Do you have a reproducer? I suspect that this is actually something different that we can address more easily

delucchi-cmu commented 6 months ago

Simple code snippet.py:

import numpy as np
import dask.dataframe as dd
import pandas as pd

if __name__ == "__main__":
    a = dd.from_dict({"a": np.arange(300_000)}, npartitions=30_000)
    parts = a.to_delayed()
    dd.from_delayed(
        parts[0], meta=pd.DataFrame.from_dict({"a": pd.Series(dtype=np.int64)})
    ).compute()

Starting from a fresh virtual environment:

>> pip install numpy pandas 'dask<=2024.2.0'
>> time python snippet.py

real    0m1.094s
user    0m1.435s
sys 0m1.383s

>> pip install 'dask>=2024.3.1'
>> time python snippet.py

real    0m1.127s
user    0m1.507s
sys 0m1.360s

>> pip install dask-expr
>> time python snippet.py

real    0m48.497s
user    0m48.845s
sys 0m1.381s
phofl commented 6 months ago

Thx, that's helpful

put up a fix here: https://github.com/dask/dask-expr/pull/1048

delucchi-cmu commented 6 months ago

I suspect there is more than one operation that is being slowed by the addition of dask-expr. Using a similar reproducer as before:

    a = dd.from_dict({"a": np.arange(300_000)}, npartitions=300_000)
    parts = a.to_delayed(optimize_graph=False)
3x slowdown, unchanged by #1048 ``` >> pip install numpy pandas 'dask<=2024.2.0' >> time python snippet.py real 0m5.882s user 0m7.840s sys 0m1.521s >> pip install 'dask>=2024.3.1' >> time python snippet.py real 0m5.936s user 0m7.882s sys 0m1.554s >> pip install dask-expr Successfully installed dask-expr-1.0.14 >> time python snippet.py real 0m20.619s user 0m22.504s sys 0m1.365s >> pip install >> time python snippet.py real 0m20.285s user 0m22.208s sys 0m1.568s ```
phofl commented 6 months ago

Yeah this is a know limitation unfortunately. Going to delayed objects roundtrips through the legacy implementation, which materialises the graph. That causes the slowdown here. Improvements are certainly very welcome