dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Gracefully handle unevenly distributed disk space during P2P #8433

Open zmbc opened 9 months ago

zmbc commented 9 months ago

Apologies if this has already been requested, or is clearly impossible for some reason. My Dask knowledge isn't super deep.

I know that OSErrors, which can occur due to a disk being full, are handled pretty gracefully when spilling:

https://github.com/dask/distributed/blob/81774d41cb2a0b4258b36b29f2448b27cf62c363/distributed/spill.py#L134-L137

However, I am frequently running into OSErrors during the shuffle operation, here:

https://github.com/dask/distributed/blob/81774d41cb2a0b4258b36b29f2448b27cf62c363/distributed/shuffle/_disk.py#L179-L180

It does not appear that these are handled well -- they are treated as if they were an error in the task itself and surfaced up to me, when really I would like the task to be rerun elsewhere, since this is a problem local to one worker. Even killing the worker in question and allowing Dask to recompute the necessary data is more graceful.

This is a frequent annoyance for me running large dataframe operations (dataframes with a few hundred million rows and ~15 string columns) on a cluster that has unpredictable disk capacity constraints (which is a separate issue, but I would not expect to bubble up like this).

I can provide more details, such as a stack trace, if this is unexpected/should already work -- but I don't see any signs in the code of this being a bug, more like a missing feature.

hendrikmakait commented 9 months ago

@zmbc: Thanks for reporting your problem. Unfortunately, your deployment situation breaks a fundamental assumption the current P2P implementation holds: Workers have (roughly) equal disk space available.

Getting rid of this assumption is not straightforward if we want to ensure optimal performance since we don't know how large the shuffled data is a priori, but P2P has to make an a priori decision on where to send outputs. It should be possible to amend the allocation decisions later on, but that would add significant complexity to an already complex system. If this problem is common for other users as well, we could think about a way for the P2P system to take unevenly distributed disk space into account.

With that being said, let's dive into possible workarounds. There are a few different situations that we could tackle:

1. Your entire dataframe fits into memory

In this case, you could use the experimental in-memory P2P shuffle. You can use this by setting the distributed.p2p.disk config option to False, e.g.:

with dask.config.set({"distributed.p2p.disk": False}):
    df = dd.shuffle(df, "x")

This will never write data to disk. On the flip side, this also means that if your dataframe is too large and you run out of memory, this will blow up as well. We are looking into gradually writing data to disk, but there is currently no ETA for this. Please_ note that this option is still experimental, so use it at your own risk. Let us know if this helps you, then we'll take that into account for prioritization.

2. Most of my workers have a lot of disk, but there are only a few with too little

If you can figure out which workers you'd want to exclude ahead of time, you can use that to restrict P2P shuffling to a subset of your workers.

with dask.annotate(workers=["<address-1>", "<address-2>", ...]):
    df = dd.shuffle(df, "x")
zmbc commented 9 months ago

Thanks for the detailed response @hendrikmakait. It seems this is more complex than I realized. I may try the in-memory P2P shuffle.

I'd like to get your reaction to this part of my question:

Even killing the worker in question and allowing Dask to recompute the necessary data is more graceful.

Even if the P2P shuffle is fairly "broken" by unevenly distributed disk space, could there be an inefficient but functional stopgap to add to Dask that would at least prevent the overall computation from failing?

We are looking into gradually writing data to disk

This sounds like exactly the behavior I would want; and at that point, it seems like OSErrors could be handled similarly to the spilling case.

hendrikmakait commented 8 months ago

Even if the P2P shuffle is fairly "broken" by unevenly distributed disk space, could there be an inefficient but functional stopgap to add to Dask that would at least prevent the overall computation from failing?

Currently, I wouldn't know how that should look like given that it would need to be general and should avoid to retry "hopeless" shuffles where there's simply not enough disk space even though it is evenly distributed.

You could try implementing one on your own that caters to your use case. Here's a rough sketch:

...
def get_free_disk_space()
    ...
free_disk_space = client.run(get_free_disk_space)

def pick_workers_with_enough_disk(disk_space_mapping):
    ...

shuffle_workers = pick_workers_with_enough_disk(result)

with dask.annotate(workers=shuffle_workers)
    df = df.shuffle(...)
...