dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Failure in a task submitted by another task can leave an "erred" task permanently on the cluster #8789

Open bcalvert-graft opened 2 months ago

bcalvert-graft commented 2 months ago

The issue

Note, as mentioned in #8790, I think this issue and that one are very similar, with the main difference that I can see being that #8790 is easier to reproduce/simpler to reason through given it only involves a local Client. I'm fairly certain that the discussion in #8790 will subsume this one, but not 100% certain so I decided to create that additional issue rather than edit this one. If I should have just pared this issue down into the simpler example, please share that feedback and I'll change moving forward.

Hello Dask Maintainers,

Thank you in advance for taking the time to read this bug report!

At my company, we are heavy users of Dask's ability to handle dynamic task submission from workers (i.e. the stuff discussed here). Now, some of the tasks we dynamically submit will transiently fail (for a concrete example, a model training sub-task that was spawned dynamically might fail because it can't connect to an MLFlow server to store training metrics). In a very minimalistic sense, you could think of it like this

import random
from dask.distributed import Client, worker_client

def sometimes_fails():
    random_num = random.random()
    if random_num < 0.5:
        raise ValueError("Transiently failed!")

def submits_sometimes_failing_subtask(key_for_subtask: str):
    with worker_client() as client:
        fut = client.submit(sometimes_fails, key=key_for_subtask)
        return fut.result()

client = Client()
# Try this line a few times until you see a failure
outer_future = client.submit(submits_sometimes_failing_subtask, "some_fixed_key")

If the sometimes_fails Future submitted inside submits_sometimes_failing_subtask fails, then you'll end up with a task with key "some_fixed_key" "stuck" on the cluster, where "stuck" in the sense means that you won't have a direct Future handle to it and it won't fall out-of-scope trivially.

Reproducing the bug

For a deterministic MCVE (since the above example uses random.random), we need a slight tweak/extension of the above code:

from dask.distributed import Client, worker_client

def always_fails():
    raise ValueError("Always fails")

def submits_always_failing_subtask(key_for_subtask: str):
    with worker_client() as client:
        fut = client.submit(always_fails, key=key_for_subtask)
        return fut.result()

client = Client()
outer_future = client.submit(submits_always_failing_subtask, "some_fixed_key")
# We have the outer_future object in memory still, so it's on the cluster, along with "some_fixed_key"
assert set(client.cluster.scheduler.tasks) == {"some_fixed_key", outer_future.key}
outer_future.cancel()
# After we cancel it, naturally it goes aways, but the key for the dynamically submitted task is "stuck", so it doesn't
assert set(client.cluster.scheduler.tasks) == {"some_fixed_key"}

If you dig in, you'll see that the "some_fixed_key" task is "stuck" because it's wanted by the worker Client associated with the worker that submitted it,

>>> {client_state.client_key for client_state in client.cluster.scheduler.tasks["some_fixed_key"].who_wants}
{"Client-worker-<rest-of-id>"}

Impact

At bare minimum, IIUC, this is a slight memory leak on both the scheduler and the worker that submitted the task, since both of those will have to track the metadata for the poisoned task. Related, I haven't done a deep dive to see if any additional state created during the execution of the sub-task sticks around, but if so, that'd be additional memory leakage.

Beyond that, for our usage of Dask, we often leverage fixed keys for certain tasks to save expensive compute (e.g. for model inference tasks with large models, we use fixed keys to save compute in case multiple queries come at similar times with the same inputs). The issue then is that if one of these fixed-key tasks is submitted dynamically from another task and fails, then the erred-state Future stuck on the cluster with that key blocks additional attempts to rerun the task.Put in the context of the MCVE above, assume we use the submits_sometimes_failing_subtask task so it doesn't always fail; the moment the internal sub-task sometimes_fails fails, then any additional submissions of submits_sometimes_failing_subtask beyond that point will always fail (naturally, since there's a Future with "some_fixed_key" already on the cluster).

Resolving the issue

I'm not sure if the bug I'm describing is expected behavior. Assuming it's not, I've been looking into fixes for it. One somewhat straightforward option I have is to tweak the logic of the task that dynamically submits the sub-task by wrapping it in a try/except:

def submits_always_failing_subtask(key_for_subtask: str):
    with worker_client() as client:
        fut = client.submit(always_fails, key=key_for_subtask)
        try:
            return fut.result()
        except:
            fut.cancel()
            raise

This seems to work (applying it to my MCVE above, I don't see "some_fixed_key" stuck on the cluster), but I was wondering if there was some better way to do this.

Environment

bcalvert-graft commented 2 months ago

I mentioned this above in the issue description, but while expanding some tests on our side, I think I found a simpler variation with what might be the same root cause. I opened a new issue focused on the simpler variation in #8790.