dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 719 forks source link

`get_worker` can return the wrong worker with an async local cluster #4959

Open gjoseph92 opened 3 years ago

gjoseph92 commented 3 years ago

Split out from https://github.com/dask/distributed/pull/4937#issuecomment-866234111.

What happened:

When using a local cluster in async mode, get_worker always returns the same Worker instance, no matter which worker it's being called within.

What you expected to happen:

get_worker

Minimal Complete Verifiable Example:

@gen_cluster(client=True)
async def test_get_worker_async_local(c, s, *workers):
    assert len(workers) > 1
    def whoami():
        return get_worker().address

    results = await c.run(whoami)
    assert len(set(results.values())) == len(workers), results
E       AssertionError: {'tcp://127.0.0.1:59774': 'tcp://127.0.0.1:59776', 'tcp://127.0.0.1:59776': 'tcp://127.0.0.1:59776'}
E       assert 1 == 2
E         +1
E         -2

Anything else we need to know?:

This probably almost never affects users directly. But since most tests use an async local cluster with @gen_cluster, I'm concerned what edge-case behavior we might testing incorrectly.

Also, note that the same issue affects get_client. This feels a tiny bit less bad (at least it's always the right client, unlikeget_worker), but still can have some strange effects. In multiple places, worker code updates the default Client instance assuming it's in a separate process. With multiple workers trampling the default client, I wonder if this affects tests around advanced secede/client-within-task workloads.

I feel like the proper solution here would be to set a contextvar for the current worker that's updated as we context-switch in and out of that worker. Identifying the points where those switches have to happen seems tricky though.

I also think it would be reasonable for get_worker to error if len(Worker._instances) > 1.

Environment:

jrbourbeau commented 3 years ago

Thanks for the detailed description and example test @gjoseph92!

As mentioned offline, when get_worker is called from inside a task thread_state.execution_state["worker"] should point to the corresponding worker which is running the task

https://github.com/dask/distributed/blob/9f4165a4278bc67f628089b72001ae5d97f5146b/distributed/worker.py#L3561-L3567

In the case that get_worker is called outside of a task, what should the "correct" worker be?

gjoseph92 commented 3 years ago

Just adding a note here since I didn't see it written down: setting thread_state.execution_state["worker"] while deserializing (and serializing) on a worker would probably alleviate most of the problems we see with this issue. It typically seems to come up with stateful things that interact with worker machinery like Actors, ShuffleService, etc. that define a custom __setstate__ which tries to store the current get_worker() in an instance variable when unpickled.