dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Locks Not Released on MemoryError #7371

Open jamesstidard opened 1 year ago

jamesstidard commented 1 year ago

Describe the issue: Hi, I'm currently using dask as a backend for a web service and trying to improve the stability. I'm using dask.distributed.Lock in order to coordinate access to certain files by the workers.

I've noticed when a worker handles a task that results in a MemoryError and the worker being restarted, the dask.distributed.Locks that the worker has acquired are never released. This results in other workers slowly hitting the same dask.distributed.Lock and not being able to acquire it, though no worker is currently using it.

Because the long running nature of some of these tasks, this can lead to workers that will end up freezing on trying to acquire a lock, unable to progress, or do other work. A timeout is another option, though with how long these tasks might take, a appropriate timeout will lead to essentially the same impact as just hanging forever (especially as that timeout will need to be waited each time a worker reencounters the same lock).

Having the ability, built in or not, to be able to release these locks when the worker is killed, would be really useful to handle the stability the service.

Additionally, other context managers are never able to clean up, like you might expect in native python code. So if a MemoryError occurs while not just the dask.distributed.Lock is acquired, but when a open context manager is open, the __exit__ for these is never able to call, like it would if a unhandled exception occurred in non-dask-distributed code. This is why my example uses multiple opens.

Thanks.

Minimal Complete Verifiable Example: This is the bit of code I used to reproduce that, tweaking the GIGABYTES_TARGET variable to induce a memory error.

$ dask scheduler
$ dask worker 0.0.0.0:8786 --memory-limit '1GB'
$ python main.py
# main.py
import time

import numpy as np

from dask.delayed import delayed
from dask.distributed import Client, Lock, get_client

GIGABYTES_TARGET = 1

@delayed(pure=False)
def memory_hog():
    client = get_client()

    with open("log.txt", "w+") as fp:
        fp.write("entered function\n")

    with Lock(name="my-lock", client=client):
        with open("log.txt", "a") as fp:
            fp.write("entered lock\n")

        dtype = np.dtype("float16")
        n_required = int((GIGABYTES_TARGET * 1e9) / dtype.itemsize)
        _ = np.ones((n_required,), dtype=dtype)

        with open("log.txt", "a") as fp:
            fp.write("entered sleep\n")

        time.sleep(2)

    with open("log.txt", "a") as fp:
        fp.write("exit function\n")

def main():
    client = Client("0.0.0.0:8786")
    client.compute(memory_hog(), sync=True)
    print("done")

if __name__ == "__main__":
    main()

Environment:

fjetter commented 1 year ago

Thanks for reporting this. Lock is currently indeed not resilient to any kind of failures. This is a known issue.

However, there is also Semaphore which can be used as a drop-in replacement if you set max_leases=1. The semaphore implementation is failure resilient and will release the lock again if a worker dies.

i.e. you can basically set Lock = Semaphore in your code and it will continue to work as expected. the semaphore max_leases is defaulting to 1, i.e. it defaults to being simply a lock :tada:

The only reason why this implementation was never ported is because there are some subtle differences in internal APIs and we haven't gotten around cleaning this up, yet

jamesstidard commented 1 year ago

@fjetter awesome, thank you very much for the quick response and workaround.

I'll leave it up to you if you want to close this ticket or keep it open to track this issue.

Thanks again.

fjetter commented 1 year ago

I'll keep it open. I still want this to be done for the Lock eventually