dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 715 forks source link

Random access array take using P2P #8774

Open fjetter opened 1 month ago

fjetter commented 1 month ago

Slicing in dask array effectively generates a task per contiguous subslice per chunk.

For the worst case of random indexing this generates a slice/task for every row along this dimension. Dask is currently raising a PerformanceWarning once we detect this situation, see https://github.com/dask/dask/blob/b4b33caed8fc9cf77c9332442ab11cf00f90bb42/dask/array/slicing.py#L630-L641

Worst case example

import dask.array as da
import numpy as np

x = da.random.random((10, 20), chunks=(10, 10))
idx = np.random.randint(0, x.shape[1], x.shape[1])

y = x[:, idx]

This random access pattern is another shuffle pattern and we should be able to offer an efficient solution to this using our P2P infrastructure

see also https://github.com/pydata/xarray/issues/9220

fjetter commented 1 month ago

It might be necessary / helpful to first deal with https://github.com/dask/dask/issues/11234

fjetter commented 1 month ago

As a first step for this I would like to understand how much of the P2P rechunk logic can be reused