Open d-v-b opened 1 year ago
Thanks for the feedback @d-v-b!
I got an import error due to pyarrow missing from my python environment when I ran cl.compute. The error should probably happen earlier, e.g. as soon as I select algorithm=p2p in da.rechunk.
Yeah, I'm able to reproduce this behavior. I agree it'd be nice if we could raise at graph construction time.
Any ideas what could be going wrong here?
Hrm, I'm not sure. cc @hendrikmakait @fjetter for visibility
Thanks for trying this and reporting back.
The pyarrow dependency for rechunking is accidental. This is actually not required for rechunking which is why we're not raising early.
I can also reproduce the other error. This one looks like there is some kind of fusion happening even though dask.config.set({'optimization.fuse.active': False})
is set
Original graph
Optimized graph
This is a bit unfortunate but possible to avoid. The problem here is that the optimization is actually triggered on compute and not on graph construction time, i.e. you need to include the client.compute() in the context manager of dask.config.set({'optimization.fuse.active': False})
. We'll need to update our docs for this (and it should only be temporary)
I opened https://github.com/dask/distributed/issues/7602 for the pyarrow import which we can close quickly. Getting rid of the fusion is a bit more tricky
ok, I for things working on my end by putting the compute call under the no-fusion context manager. I'm happy to close this unless there's value in keeping it open for tracking purposes?
I have new broken code for you :)
beyond the RuntimeError
, it looks like there's an f-string that isn't getting prepended with the f properly:
raise RuntimeError("rechunk_transfer failed during shuffle {id}") from e
beyond the RuntimeError
Thanks @d-v-b. That should be fixed by https://github.com/dask/distributed/pull/7600 (included in the patch release pushed out today https://github.com/dask/community/issues/309)
@d-v-b do you still encounter this error after upgrading? I can no longer reproduce this on the latest version.
If you are still running into this, can you please try to slim down your code example. For instance
The smaller the example, the easier it is for us to help.
@fjetter after upgrading to the latest version of distributed
I still get the KeyError: 'shuffle'
when running my second example. I switched out the LSF cluster for LocalCluster, but somehow the call to da.store
seems crucial to generating the error -- I can't elicit an error by simply calling .compute()
or .mean()
on all the multiscale arrays directly.
edit: I can remove zarr and just use a fake target for da.store, and I get the error
Thanks @d-v-b this is helpful. I can reproduce with this but I can't offer a straight forward solution and we'll need to investigate a bit further.
I've been able to reduce the example further:
import dask.array as da
import dask
from distributed import Client, LocalCluster
class VoidStore():
"""
A class that implements setitem as a no-op
"""
def __setitem__(*args):
pass
cluster = LocalCluster()
client = Client(cluster)
source = da.random.randint(0, 255, (4, ), chunks=(2, 2))
with dask.config.set({'optimization.fuse.active': False}):
rechunked = source.rechunk((3, 1), algorithm='p2p')
stored = da.store(rechunked, VoidStore(), lock=None)
The call to da.store
causes the error (likely through unwanted optimization). I'll keep investigating.
@hendrikmakait reading the store
code I'm fairly certain we're loosing annotations here https://github.com/dask/dask/blob/c9a9edef0c7e996ef72cb474bf030778189fd737/dask/array/core.py#L1202-L1211 because we're switching from HLG to low level graph without carrying over any annotations
Dear all,
I'm getting a similar KeyError:
2024-01-25 09:41:18,926 - distributed.core - ERROR - Exception while handling op shuffle_get
Traceback (most recent call last):
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/core.py", line 967, in _handle_comm
result = handler(**msg)
^^^^^^^^^^^^^^
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/shuffle/_scheduler_plugin.py", line 128, in get
state = self.active_shuffles[id]
~~~~~~~~~~~~~~~~~~~~^^^^
KeyError: 'd6f8b2d9ed84fca0a899360d7ddf2b2b'
2024-01-25 09:41:18,937 - distributed.core - ERROR - Exception while handling op shuffle_get
Traceback (most recent call last):
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/core.py", line 967, in _handle_comm
result = handler(**msg)
^^^^^^^^^^^^^^
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/shuffle/_scheduler_plugin.py", line 128, in get
state = self.active_shuffles[id]
~~~~~~~~~~~~~~~~~~~~^^^^
KeyError: 'd6f8b2d9ed84fca0a899360d7ddf2b2b'
2024-01-25 09:45:08,520 - distributed.core - ERROR - Exception while handling op shuffle_get
Traceback (most recent call last):
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/core.py", line 967, in _handle_comm
result = handler(**msg)
^^^^^^^^^^^^^^
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/shuffle/_scheduler_plugin.py", line 128, in get
state = self.active_shuffles[id]
~~~~~~~~~~~~~~~~~~~~^^^^
KeyError: 'd6f8b2d9ed84fca0a899360d7ddf2b2b'
... which looks like this in the dashboard once it occurs:
https://github.com/dask/distributed/assets/56583917/68cdf886-910d-447f-9c5f-698ae8e6a172
Unfortunately, I've not been able to create a minimal example that is reproducible on a LocalCluster
. I'm working with satellite imagery which is stored on a fileserver connected to compute nodes of my university's HPC cluster. The imagery is mirrored from this data product and I'm using this code to start a dask_jobqueue.SLURMCluster
.
I've been able to (kind of (*)) reproduce this using the following code but still only on our HPC Cluster and not locally. Instead of loading the satellite imagery from our fileserver, it streams a similar data product from a cloud resource using planetary-computer and odc-stac:
(*) kind of, because the error is slightly different and also includes something about shuffle_restrict_task
:
The error also happens at a different part in the workflow (after the shuffle barrier) as you can see in this dashboard recording:
https://github.com/dask/distributed/assets/56583917/2131df59-442c-43bd-836f-1d2234b1c673
Here is also the log output of a worker, which includes an additional warning that is related to P2P:
I hope all of this makes sense and someone is able to connect the dots here.
@maawoo: Thanks for reporting your problem. What version of dask
and distributed
are you running? The output from client.get_versions()
should suffice here.
Just looking at the provided output, it seems like your workers are running out of memory. I'll have to look deeper into your workload characteristics to make good recommendations on how to avoid that scenario, though.
Here the output for client.get_versions()
for the scheduler (workers use the same versions):
{'python': '3.11.7.final.0', 'python-bits': 64, 'OS': 'Linux', 'OS-release': '4.18.0-425.13.1.el8_7.x86_64', 'machine': 'x86_64', 'processor': 'x86_64', 'byteorder': 'little', 'LC_ALL': 'None', 'LANG': 'en_US.UTF-8'}, 'packages': {'python': '3.11.7.final.0', 'dask': '2024.1.0', 'distributed': '2024.1.0', 'msgpack': '1.0.7', 'cloudpickle': '3.0.0', 'tornado': '6.3.3', 'toolz': '0.12.0', 'numpy': '1.26.3', 'pandas': '2.2.0', 'lz4': None}
It looks like you're running on rather small workers. The dashboard looks like every worker has only about 6GB of memory which is a little small and doesn't give us a lot of room to move. I totally agree that this KeyError
is a bad UX but I also recommend running on slightly larger machines
Thanks for your quick replies! You're right... I gave my workers more memory to work with (10 GiB each) and the process finished without issues. To me the worker memory usage in the first dashboard recording looks like nothing compared to when using the default tasks
rechunk method (thanks for p2p, btw!). That's why I was not considering it as the root cause of all of these errors and warnings I got.
Even though it works better with the increase of memory per worker, I occasionally run into some form of this problem. Now the error message looks like this:
2024-01-26 12:29:58,292 - distributed.core - ERROR - Exception while handling op shuffle_restrict_task
Traceback (most recent call last):
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/core.py", line 967, in _handle_comm
result = handler(**msg)
^^^^^^^^^^^^^^
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/shuffle/_scheduler_plugin.py", line 102, in restrict_task
shuffle = self.active_shuffles[id]
~~~~~~~~~~~~~~~~~~~~^^^^
KeyError: 'd6f8b2d9ed84fca0a899360d7ddf2b2b'
2024-01-26 12:30:02,425 - distributed.core - ERROR - Exception while handling op shuffle_restrict_task
Traceback (most recent call last):
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/core.py", line 967, in _handle_comm
result = handler(**msg)
^^^^^^^^^^^^^^
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/shuffle/_scheduler_plugin.py", line 102, in restrict_task
shuffle = self.active_shuffles[id]
~~~~~~~~~~~~~~~~~~~~^^^^
KeyError: 'd6f8b2d9ed84fca0a899360d7ddf2b2b'
2024-01-26 12:30:02,431 - distributed.core - ERROR - Exception while handling op shuffle_restrict_task
Traceback (most recent call last):
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/core.py", line 967, in _handle_comm
result = handler(**msg)
^^^^^^^^^^^^^^
File "/home/du23yow/micromamba/envs/dev_sdc_env/lib/python3.11/site-packages/distributed/shuffle/_scheduler_plugin.py", line 102, in restrict_task
shuffle = self.active_shuffles[id]
~~~~~~~~~~~~~~~~~~~~^^^^
KeyError: 'd6f8b2d9ed84fca0a899360d7ddf2b2b'
As opposed to my initial report, this time I don't see any warnings regarding worker memory such as distributed.worker.memory - WARNING - Worker is at xx% memory usage.
or distributed.nanny.memory - WARNING - Worker [...] exceeded xx% memory budget. Restarting...
. The following information was logged by my workers at the time the above error occurred:
signal 11 is a segmentation fault. This isn't great. Can you please share what kind of data schema you are using? My best guess is that this happens somewhere in pyarrow so we'd also need the pyarrow version used
@hendrikmakait we should figure out a way how we can reraise exceptions like this with a clear error that indicates a worker died.
(edit: I'm using python 3.10.8 and both dask and distributed are version 2022.3.1) I have several issues running this code:
Downsampling + rechunking
```python import dask.array as da import numpy as np import dask import time from distributed import Client from dask_jobqueue import LSFCluster shape = (2000,) * 3 source_chunks = (1024,) * 3 dest_chunks = (64,) * 3 data = da.random.randint(0,255, shape, dtype='uint8', chunks=source_chunks) levels = 8 multi = [data] for level in range(levels): multi.append(da.coarsen(np.mean, multi[-1], {0: 2, 1: 2, 2: 2}, trim_excess=True)) rechunked_tasks = [] for m in multi: # only rechunk if the chunks are too small if any(c1 < c2 for c1,c2 in zip(m.chunksize, dest_chunks)): rechunked_tasks.append(m.rechunk(dest_chunks, algorithm='tasks')) else: rechunked_tasks.append(m) mean_tasks = [m.mean() for m in rechunked_tasks] with dask.config.set({'optimization.fuse.active': False}): rechunked_p2p = [] for m in multi: # only rechunk if the chunks are too small if any(c1 < c2 for c1,c2 in zip(m.chunksize, dest_chunks)): rechunked_p2p.append(m.rechunk(dest_chunks, algorithm='p2p')) else: rechunked_p2p.append(m) mean_p2p = [m.mean() for m in rechunked_p2p] if __name__ == '__main__': num_cores = 8 cluster = LSFCluster( cores=num_cores, processes=1, memory=f"{15 * num_cores}GB", ncpus=num_cores, mem=15 * num_cores, walltime="72:00", ) cluster.scale(10) cl = Client(cluster) print(f"Begin distributed operations. Dask dashboard url: {cl.dashboard_link}") start = time.time() cl.compute(mean_p2p, sync=True) print(f"Completed p2p rechunking -> mean after {time.time() - start} s") start = time.time() cl.compute(mean_tasks, sync=True) print(f"Completed tasks rechunking -> mean after {time.time() - start} s") ```first, I got an import error due to
pyarrow
missing from my python environment when I rancl.compute
. The error should probably happen earlier, e.g. as soon as I selectalgorithm=p2p
inda.rechunk
.After installing
pyarrow
, I get a new error:Traceback
```bash distributed.core - ERROR - Exception while handling op shuffle_get_or_create Traceback (most recent call last): File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create return self.get(id, worker) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get state = self.states[id] KeyError: '290a0eef76b7ab6e389634324025588a' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm result = handler(**msg) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create state = self._create_array_rechunk_state(id, spec) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state for ts in self.scheduler.tasks[name].dependents: KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a' distributed.core - ERROR - Exception while handling op shuffle_get_or_create Traceback (most recent call last): File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create return self.get(id, worker) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get state = self.states[id] KeyError: '290a0eef76b7ab6e389634324025588a' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm result = handler(**msg) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create state = self._create_array_rechunk_state(id, spec) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state for ts in self.scheduler.tasks[name].dependents: KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a' distributed.core - ERROR - Exception while handling op shuffle_get_or_create Traceback (most recent call last): File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create return self.get(id, worker) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get state = self.states[id] KeyError: '290a0eef76b7ab6e389634324025588a' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm result = handler(**msg) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create state = self._create_array_rechunk_state(id, spec) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state for ts in self.scheduler.tasks[name].dependents: KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a' distributed.core - ERROR - Exception while handling op shuffle_get_or_create Traceback (most recent call last): File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create return self.get(id, worker) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get state = self.states[id] KeyError: '290a0eef76b7ab6e389634324025588a' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm result = handler(**msg) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create state = self._create_array_rechunk_state(id, spec) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state for ts in self.scheduler.tasks[name].dependents: KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a' Traceback (most recent call last): File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_rechunk.py", line 41, in rechunk_transfer return _get_worker_extension().add_partition( File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 628, in add_partition shuffle = self.get_or_create_shuffle(shuffle_id, type=type, **kwargs) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 879, in get_or_create_shuffle return sync( File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 405, in sync distributed.core - ERROR - Exception while handling op shuffle_get_or_create Traceback (most recent call last): File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create return self.get(id, worker) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get state = self.states[id] KeyError: '290a0eef76b7ab6e389634324025588a' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm result = handler(**msg) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create state = self._create_array_rechunk_state(id, spec) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state for ts in self.scheduler.tasks[name].dependents: KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a' raise exc.with_traceback(tb) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 378, in f result = yield future File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/tornado/gen.py", line 769, in run value = future.result() File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 709, in _get_or_create_shuffle shuffle = await self._refresh_shuffle( File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 768, in _refresh_shuffle result = await self.worker.scheduler.shuffle_get_or_create( File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 1227, in send_recv_from_rpc return await send_recv(comm=comm, op=key, **kwargs) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 1011, in send_recv raise exc.with_traceback(tb) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm result = handler(**msg) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create state = self._create_array_rechunk_state(id, spec) File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state for ts in self.scheduler.tasks[name].dependents: KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a' The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/groups/cellmap/home/bennettd/dev/cosem-flows/test_rechunking.py", line 54, inAny ideas what could be going wrong here?