dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 719 forks source link

`Client.compute(df)` operates on larger task graphs than `df.compute()` #8833

Open hendrikmakait opened 2 months ago

hendrikmakait commented 2 months ago

Describe the issue:

Minimal Complete Verifiable Example:

from dask.distributed import Client
import dask

if __name__ == "__main__":
    client = Client()

    task_counts = []

    def capture_task_counts(event):
        _, msg = event
        if not isinstance(msg, dict):
            return
        if msg.get("action", None) != "update_graph":
            return
        task_counts.append(msg["count"])
    client.subscribe_topic("all", capture_task_counts)

    df = dask.datasets.timeseries(
        start="2000-01-01",
        end="2001-01-01",
        dtypes={"x": float, "y": float},
        freq="10 s",
    )
    total = df.x.sum()

    total.compute()
    client.compute(total, sync=True)
    assert task_counts[0] == task_counts[1], task_counts

raises AssertionError: [367, 1466].

From what it looks like, client.compute(...) does not perform (all) task fusion.

Anything else we need to know?:

Environment:

hendrikmakait commented 2 months ago

I suspect this is the culprit: https://github.com/dask/dask-expr/blob/c0b2b9a6ae15039fa3574a9a7960e899adc46b3f/dask_expr/_collection.py#L474-L478

hendrikmakait commented 2 months ago

Injecting .optimize(fuse=True) (client.compute(total.optimize(fuse=True), sync=True)) reduces the difference to 1: AssertionError: [367, 368]