dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 720 forks source link

errors persisting to local storage #1693

Closed rabernat closed 6 years ago

rabernat commented 6 years ago

I am trying to run dask on TACC wrangler. I am getting errors related to local disk storage.

Here is the SLURM jobscript I'm using to launch the workers https://github.com/pangeo-data/pangeo/blob/master/utilities/wrangler/launch-dask-worker.sh

Note that wrangler has a fast global storage system, which I use for dask's local dir. Specifically, in my jobscript (slightly updated from the one linked above), I say

LDIR=/gpfs/flash/users/$USER/dask-local-worker
SCHEDULER=$HOME/scheduler.json
dask-worker --memory-limit 0.15 --nthreads 4 --nprocs 6 \
            --local-directory $LDIR \
            --scheduler-file=$SCHEDULER \
            --interface ib0 

I create a cluster with approximately 10 such workers. The total size of the final cluster is

Workers: 59 Cores: 236 Memory: 1192.79 GB

As a test, say I want to persist a dataset bigger than the amount of memory in my cluster. This should be possible with local storage. I do

import dask.array as da
big_data = da.zeros((160000, 1600000), chunks=(4000, 4000)).persist()
big_data

I start seeing the following errors in the worker logs

distributed.worker - INFO - Failed to put key in memory
Traceback (most recent call last):
  File "/home/02823/rabernat/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/worker.py", line 1500, in transition_executing_done
    self.put_key_in_memory(key, value, transition=False)
  File "/home/02823/rabernat/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/worker.py", line 1668, in put_key_in_memory
    self.data[key] = value
  File "/home/02823/rabernat/miniconda3/envs/pangeo/lib/python3.6/site-packages/zict/buffer.py", line 80, in __setitem__
    self.fast[key] = value
  File "/home/02823/rabernat/miniconda3/envs/pangeo/lib/python3.6/site-packages/zict/lru.py", line 71, in __setitem__
    self.evict()
  File "/home/02823/rabernat/miniconda3/envs/pangeo/lib/python3.6/site-packages/zict/lru.py", line 90, in evict
    cb(k, v)
  File "/home/02823/rabernat/miniconda3/envs/pangeo/lib/python3.6/site-packages/zict/buffer.py", line 52, in fast_to_slow
    self.slow[key] = value
  File "/home/02823/rabernat/miniconda3/envs/pangeo/lib/python3.6/site-packages/zict/func.py", line 42, in __setitem__
    self.d[key] = self.dump(value)
  File "/home/02823/rabernat/miniconda3/envs/pangeo/lib/python3.6/site-packages/zict/file.py", line 80, in __setitem__
    with open(os.path.join(self.directory, _safe_key(key)), 'wb') as f:
FileNotFoundError: [Errno 2] No such file or directory: '/gpfs/flash/users/rabernat/dask-local-worker/worker-apt4oj1w/storage/%28%27wrapped-705ff5e63aa6243f2bf991654044fcc5%27%2C%2032%2C%20190%29'

There are indeed some worker directories in the local directory

$ ls /gpfs/flash/users/rabernat/dask-local-worker/
global.lock              worker-2scofr3k.dirlock  worker-gy1idt5x
purge.lock               worker-b4hanong          worker-gy1idt5x.dirlock
worker-_08n42hv          worker-b4hanong.dirlock  worker-zzr628du
worker-_08n42hv.dirlock  worker-gqf0qa7k          worker-zzr628du.dirlock
worker-2scofr3k          worker-gqf0qa7k.dirlock

But worker-apt4oj1w is missing?

Why would this happen? What is going on here?

This problem with local storage is infecting everything else I try to do on Wrangler.

rabernat commented 6 years ago

Update: I was able to work around this with the following

LDIR=/gpfs/flash/users/$USER/dask
UUID=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1)
...
    --local-directory $LDIR/$UUID

I think the problem is that dask does not expect any other worker processes to be using local-directory.

mrocklin commented 6 years ago

In general I recommend avoiding pointing the --local-directory to network storage. Many distributed small writes is a good way to crash a network file system. I've personally seen this happen at a few large facilities.

You might want to read http://dask.pydata.org/en/latest/setup/hpc.html#no-local-storage

mrocklin commented 6 years ago

In general though it sounds like the file locking system may have issues on network file systems. It would be nice to fix this if/when people have the time.

rabernat commented 6 years ago

Keep in mind that Wrangler is highly specialized for big data analysis and is not a typical HPC system. I don't know what exactly the /flash storage system is, but it is amazingly performant and resilient.

I asked the sysadmins what partition to use for temporary data in the following way:

My application (dask) occasionally needs to spill temporary data to disk. The potential size of this temporary data is ~100 GB per node. This location does not have to be globally readable. The ideal solution would be a fast local drive, such as an SSD. But I can't find any info about local storage in the wrangler docs. There are also several global filesystems (/work, /data, flash, etc.), but I don't know if they are meant for this sort of thing. What filesystem do you suggest I use?

They responded as follows:

Keep in mind also that the /gpfs/flash file system, while global, is effectively local to each node on Wrangler due to the DSSD architecture. So if the latency of your IO is an issue here /gpfs/flash will be your fastest option. Either /data or /gpfs/flash will work, it's just a matter of how performance-sensitive these operations are.

So I think this issue is really about multiple workers using the same directory. Once I worked around that, it worked great.

mrocklin commented 6 years ago

Have you ever encountered such a problem with multiple workers using the same directory on a local file system?

On Mon, Jan 15, 2018 at 11:34 PM, Ryan Abernathey notifications@github.com wrote:

Keep in mind that Wrangler https://portal.tacc.utexas.edu/user-guides/wrangler is highly specialized for big data analysis and is not a typical HPC system. I don't know what exactly the /flash storage system is, but it is amazingly performant and resilient.

I asked the sysadmins what partition to use for temporary data in the following way:

My application (dask) occasionally needs to spill temporary data to disk. The potential size of this temporary data is ~100 GB per node. This location does not have to be globally readable. The ideal solution would be a fast local drive, such as an SSD. But I can't find any info about local storage in the wrangler docs. There are also several global filesystems (/work, /data, flash, etc.), but I don't know if they are meant for this sort of thing. What filesystem do you suggest I use?

They responded as follows:

Keep in mind also that the /gpfs/flash file system, while global, is effectively local to each node on Wrangler due to the DSSD architecture. So if the latency of your IO is an issue here /gpfs/flash will be your fastest option. Either /data or /gpfs/flash will work, it's just a matter of how performance-sensitive these operations are.

So I think this issue is really about multiple workers using the same directory. Once I worked around that, it worked great.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/1693#issuecomment-357851383, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszELb9-QSOBVysWc2iKuS-ktvikHGks5tLCbQgaJpZM4RfDL7 .

mrocklin commented 6 years ago

By the way it looks like if you add the following line to your ~/.dask/config.yaml file it will avoid file locking altogether

use-file-locking: False
mrocklin commented 6 years ago

If you have an opportunity to try your situation with the fix in https://github.com/dask/distributed/pull/1714 I would appreciate it