dask / dask-expr

BSD 3-Clause "New" or "Revised" License
79 stars 18 forks source link

`from_delayed` generated one task group per `Delayed` object #1077

Closed hendrikmakait closed 1 week ago

hendrikmakait commented 3 weeks ago

Problem

When using from_delayed, we generate a single expression per Delayed object. This then generated a compound key of the Delayed object's key and a hard-coded 0: (self.obj.key, 0)

Due to this compound key, distributed picks the Delayeds key as the task group, which generates a single task group per Delayed object.

import dask.dataframe as dd
import pandas as pd
import pprint

from dask import delayed
from distributed import Client

def foo(x):
    return pd.DataFrame({"x": [x]})

delayeds = [delayed(foo)(i) for i in range(10)]
df = dd.from_delayed(delayeds)

with Client(set_as_default=True) as client:
    result = df.compute()
    pprint.pprint(client.run_on_scheduler(lambda dask_scheduler: dask_scheduler.task_groups))
{'foo-02586e6a-ec9e-42cb-9eb6-46222420147f': <foo-02586e6a-ec9e-42cb-9eb6-46222420147f: released: 1>,
 'foo-0df08452-681d-4958-a8a2-f015b96ef065': <foo-0df08452-681d-4958-a8a2-f015b96ef065: released: 1>,
 'foo-181ac224-3232-468e-93b0-4a61cfbd940e': <foo-181ac224-3232-468e-93b0-4a61cfbd940e: released: 1>,
 'foo-27c00c23-3c74-49cf-8b4c-689677adff35': <foo-27c00c23-3c74-49cf-8b4c-689677adff35: released: 1>,
 'foo-44c676d9-7645-44c9-b43b-027faa4df51e': <foo-44c676d9-7645-44c9-b43b-027faa4df51e: released: 1>,
 'foo-7f820419-d973-452a-be1b-70cffef7c28a': <foo-7f820419-d973-452a-be1b-70cffef7c28a: released: 1>,
 'foo-972e911b-3a7c-4fbf-9d07-af00a0d4dc67': <foo-972e911b-3a7c-4fbf-9d07-af00a0d4dc67: released: 1>,
 'foo-bdaa64d5-28d2-4e48-a910-69207f5d2868': <foo-bdaa64d5-28d2-4e48-a910-69207f5d2868: released: 1>,
 'foo-c74a28ac-a96d-4e89-9009-a72d0bde8f9e': <foo-c74a28ac-a96d-4e89-9009-a72d0bde8f9e: released: 1>,
 'foo-dfb34fc8-f091-4a0f-8ec0-689d5c07ba0e': <foo-dfb34fc8-f091-4a0f-8ec0-689d5c07ba0e: released: 1>,
 'fromdelayed-7fdf8ad58b0d245e0432e9f55c3db378': <fromdelayed-7fdf8ad58b0d245e0432e9f55c3db378: released: 10>,
 'repartitiontofewer-cca5739f4d942ca896245ccc9060e0d3': <repartitiontofewer-cca5739f4d942ca896245ccc9060e0d3: memory: 1>}

Impact

As described in https://github.com/dask/distributed/issues/8677, this can overwhelm the scheduler of a distributed cluster.

fjetter commented 3 weeks ago

With https://github.com/dask/distributed/issues/8678 I hope that the performance implications of this problem is resolved. UX is still awkward and we should look into it