dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 714 forks source link

Randomly at the end of a program I get an error that some temporary directory cannot be removed #5322

Open guziy opened 2 years ago

guziy commented 2 years ago

What happened:

I get the following error randomly at the end of execution of a script using dask (it does not happen all the time):

distributed.scheduler - INFO - Receive client connection: Client-6ca31a72-1620-11ec-81a8-20000fa5fe80
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Run out-of-band function 'stop'
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
distributed.worker - INFO - Stopping worker at tcp....:36164
....
distributed.diskutils - ERROR - Failed to remove 'work/dask-worker-space/worker-acf10xla' (failed in <built-in function lstat>): [Errno 2] No such file or directory: 'work/dask-worker-space/worker-acf10xla'
distributed.wo

What you expected to happen: When a temporary directory is not found, I would expect dask not to try to remove it and not fail...

Anything else we need to know?:

I am running the script in a job, so when it exits with an error, the job also fails.

I use initialize function to start dask workers from the python script:

   if args.mpi:
        from dask_mpi import initialize
        initialize(nthreads=1, 
        #            dashboard_address=None
        )

And then create a client using context:

    with Client() as client:
           # do work

Environment:

guziy commented 2 years ago

above in the logs when the workers start I see that another worker was deleting this folder:

distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://...:36217', name: 16, memory: 0, processing: 0>
distributed.diskutils - INFO - Found stale lock file and directory 'work/dask-worker-space/worker-acf10xla', purging

Not sure why the lock file and the directory were classified as stale...

guziy commented 2 years ago

I got the error one more time... For some reason a lock file for a worker is declared stale, and then when trying to remove it there is an exception FileNotFound. I was running for the first time in this folder so there should not be any interference from previous runs....

distributed.diskutils - INFO - Found stale lock file and directory '/fs/site3/eccc/cmd/e/olh001/maestro/cmde_postproc_prog/hub/work/20210920000000/main/cmde_postproc/timeseries/detide/compute_surge/work/dask-worker-space/worker-siojmj7o', purging
/fs/site3/eccc/cmd/e/olh001/ssm_setup_hub/ssm/surgepy/master/surgepy_1.0.7_all/lib/python3.8/site-packages/distributed/worker.py:513: UserWarning: The local_dir keyword has moved to local_directory
  warnings.warn("The local_dir keyword has moved to local_directory")
Traceback (most recent call last):
  File "/fs/site3/eccc/cmd/e/olh001/maestro/cmde_postproc_prog/hub/work/20210920000000/main/cmde_postproc/timeseries/detide/compute_surge/bin/compute_surge.py", line 528, in <module>
    main()
  File "/fs/site3/eccc/cmd/e/olh001/maestro/cmde_postproc_prog/hub/work/20210920000000/main/cmde_postproc/timeseries/detide/compute_surge/bin/compute_surge.py", line 466, in main
    initialize(nthreads=1, 
  File "/fs/site3/eccc/cmd/e/olh001/ssm_setup_hub/ssm/surgepy/master/surgepy_1.0.7_all/lib/python3.8/site-packages/dask_mpi/core.py", line 93, in initialize
    asyncio.get_event_loop().run_until_complete(run_worker())
  File "/fs/site3/eccc/cmd/e/olh001/ssm_setup_hub/ssm/surgepy/master/surgepy_1.0.7_all/lib/python3.8/asyncio/base_events.py", line 612, in run_until_complete
    return future.result()
  File "/fs/site3/eccc/cmd/e/olh001/ssm_setup_hub/ssm/surgepy/master/surgepy_1.0.7_all/lib/python3.8/site-packages/dask_mpi/core.py", line 83, in run_worker
    async with WorkerType(
  File "/fs/site3/eccc/cmd/e/olh001/ssm_setup_hub/ssm/surgepy/master/surgepy_1.0.7_all/lib/python3.8/site-packages/distributed/worker.py", line 623, in __init__
    self.data = SpillBuffer(
  File "/fs/site3/eccc/cmd/e/olh001/ssm_setup_hub/ssm/surgepy/master/surgepy_1.0.7_all/lib/python3.8/site-packages/distributed/spill.py", line 30, in __init__
    File(spill_directory),
  File "/fs/site3/eccc/cmd/e/olh001/ssm_setup_hub/ssm/surgepy/master/surgepy_1.0.7_all/lib/python3.8/site-packages/zict/file.py", line 58, in __init__
    os.makedirs(self.directory, exist_ok=True)
  File "/fs/site3/eccc/cmd/e/olh001/ssm_setup_hub/ssm/surgepy/master/surgepy_1.0.7_all/lib/python3.8/os.py", line 221, in makedirs
    mkdir(name, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/fs/site3/eccc/cmd/e/olh001/maestro/cmde_postproc_prog/hub/work/20210920000000/main/cmde_postproc/timeseries/detide/compute_surge/work/dask-worker-space/worker-siojmj7o/storage'
distributed.diskutils - ERROR - Failed to remove '/fs/site3/eccc/cmd/e/olh001/maestro/cmde_postproc_prog/hub/work/20210920000000/main/cmde_postproc/timeseries/detide/compute_surge/work/dask-worker-space/worker-siojmj7o' (failed in <built-in function lstat>): [Errno 2] No such file or directory: '/fs/site3/eccc/cmd/e/olh001/maestro/cmde_postproc_prog/hub/work/20210920000000/main/cmde_postproc/timeseries/detide/compute_surge/work/dask-worker-space/worker-siojmj7o'
distributed.diskutils - INFO - Found stale lock file and directory '/fs/site3/eccc/cmd/e/olh001/maestro/cmde_postproc_prog/hub/work/20210920000000/main/cmde_postproc/timeseries/detide/compute_surge/work/dask-worker-space/worker-7ni51cxw', purging
guziy commented 2 years ago

Maybe workers did not have a chance to lock their respective directories and workspace just tried and succeeded to lock them, therefore, declared as stale....

jacobtomlinson commented 2 years ago

Yeah it sounds like workers don't seem to be able to clean up their temporary directories. If Dask finds temp dirs that do not belong to a worker they declare them as stale and try to clear them up.

Given that it looks like you are on an HPC system are you provided with some dedicated temp space? If so you can set the temporary-directory yourself as part of the worker config which may help solve this problem for you.

guziy commented 2 years ago

Thanks a lot for your reply Jacob:

I will try specifying temp dir, although we don't have a dedicated scratch... But from the logs, it looks like the following happens:

  1. worker creates his temp dir
  2. workspace removes it
  3. worker is trying to create a storage directory inside the temp dir and fails as it was purged by the WorkSpace.
guziy commented 2 years ago

I still get the purging error message when specifying local_directory, but it depends when it is purged before or after a worker created his storage directory if after, and there is no spill, it seems to be working...

Here is how I specify it:

        from dask_mpi import initialize
        initialize(nthreads=1, 
                local_directory=os.environ.get("TMPDIR", None),

Just curious workers don't keep locked their work directories until exit?

guziy commented 2 years ago

Now I get purging and fail to remove messages from distributed.diskutil so first it is purging a folder and then at the end complains that it cannot delete it on exit (when computing is completed), as it does not exist, but there is no exception so the thing kind of works...

distributed.diskutils - INFO - Found stale lock file and directory '/fs/site3/eccc/cmd/e/olh001/maestro/cmde_postproc_pseudo/hub/work/20210920000000/main/cmde_postproc/timeseries/detide/compute_surge/work/tmpdireccc-ppp38701/dask-worker-space/worker-x70nwvut', purging 

.....

distributed.diskutils - ERROR - Failed to remove '/fs/site3/eccc/cmd/e/olh001/maestro/cmde_postproc_pseudo/hub/work/20210920000000/main/cmde_postproc/timeseries/detide/compute_surge/work/tmpdireccc-ppp38701/dask-worker-space/worker-x70nwvut' (failed in <built-in function lstat>): [Errno 2] No such file or directory: '/fs/site3/eccc/cmd/e/olh001/maestro/cmde_postproc_pseudo/hub/work/20210920000000/main/cmde_postproc/timeseries/detide/compute_surge/work/tmpdireccc-ppp38701/dask-worker-space/worker-x70nwvut'
d
guziy commented 2 years ago

Is it possible to disable this purging feature?

guziy commented 2 years ago

I've modified the initialize line, seems to be working, but am not sure about implications:


        from dask_mpi import initialize
        dask.config.set({"distributed.worker.use-file-locking": False})
        initialize(nthreads=1, 
                local_directory=os.environ.get("TMPDIR", None),
        #            dashboard_address=None
        )
jrbourbeau commented 2 years ago

Just checking in, @guziy it looks like you've got things working. Is it safe to close this issue?

guziy commented 2 years ago

Not sure if this fix is ok, I have just disabled file locking, so it does not go into the problematic region... But if you think that no one will be able/interested to debug this, then yes it is safe to close...