dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.55k stars 712 forks source link

Partial P2P rechunks cause explosion in the task group count #8656

Closed hendrikmakait closed 4 weeks ago

hendrikmakait commented 1 month ago

When rechunking data with P2P in a pattern where each output depends only on a few inputs, P2P causes the task group count to explode because it splits the rechunk into several independent partial rechunks, each having its own task groups. This can cause significant strain on the scheduler.

Reproducer:

@gen_cluster(client=True, nthreads=[], config={"optimization.fuse.active": False})
async def test_partial_rechunk_taskgroups(c, s):
    arr = da.random.random((10, 10, 10), chunks=((2, 2, 2, 2, 2,), ) * 3)
    arr = arr.rechunk(((1, 2, 2, 2, 2, 1,), ) * 3, method="p2p")

    _ = c.compute(arr)
    await async_poll_for(
        lambda: any(
            isinstance(task, str) and task.startswith("shuffle-barrier")
            for task in s.tasks
        ),
        timeout=5,
    )
    assert len(s.task_groups) < 7

fails with AssertionError: assert 190 < 7. 6 would be the count if all independent partials shared their rechunks.