dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Functional change for get_worker #7696

Open quasiben opened 1 year ago

quasiben commented 1 year ago

In https://github.com/dask/distributed/pull/7580/ we removed the ability for get_worker to return non-task worker functions to return the worker. This is a confusing statement -- essentially for the following is no longer supported

from dask.distributed import Client, get_worker

def main():
    c = Client()
    def foo():
        worker = get_worker()
        print(worker)

    c.run(foo)

if __name__ == "__main__":
    main()

client.run runs functions on each worker but there is no task so get_worker returns None. This is functionality RAPIDS folks have been using like the following:

https://github.com/rapidsai/raft/blob/05d899b36b76545d2439dbe47e4659d644ced227/python/raft-dask/raft_dask/common/comms.py#L410-L414

We are fixing this by relying on the dask_worker arg for client.run functions:

If your function takes an input argument named dask_worker then that variable will be populated with the worker itself.

We have a fix for this in Dask-CUDA and working on one for RAFT. I think we should include a statement like the following to warn legacy users of the change get_worker:

If you need to get the worker in a non-task function like client.run please use dask_worker in the function argument (xref: client.run)

quasiben commented 1 year ago

Another alternative is to add the functionality back in with a specific flag for doing so:

diff --git a/distributed/worker.py b/distributed/worker.py
index a4d4e053..1427bd8f 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -2684,7 +2684,7 @@ class Worker(BaseWorker, ServerNode):
         return self.transfer_outgoing_count_limit

-def get_worker() -> Worker:
+def get_worker(ignore_tasks=False) -> Worker:
     """Get the worker currently running this task

     Examples
@@ -2705,6 +2705,10 @@ def get_worker() -> Worker:
     try:
         return thread_state.execution_state["worker"]
     except AttributeError:
+        if ignore_tasks:
+            from tlz import first
+            return first(w for w in Worker._instances if w.status in
+                    WORKER_ANY_RUNNING)
         raise ValueError("No worker found") from None

I think either approach should work. @fjetter if you have time can you weigh in ?

fjetter commented 1 year ago

If you are using client.run and need the worker, the best way to do so is to include an argument dask_worker to the function and we're injecting this for you, e.g.

# replace this
def foo():
    get_worker().address

# with this
def foo(dask_worker):
    dask_worker.address
fjetter commented 1 year ago

Looks like this is the approach you are taking in https://github.com/rapidsai/raft/pull/1365 This would be my preferred way of doing it. I'm very open to polishing this injection behavior.

If this is not good enough for you, we can look into making Worker.run context aware. I believe the only reason was scope creep because execution_state is actually only set in a worker thread but that doesn't necessarily need to be this way. We could also move this to ContextVars, I believe. That'd be a bit of work and I figured the injection mechanism would be sufficient

pentschev commented 1 year ago

I'm ok with that change, but I think this has caused some confusion, and I'm confused myself. For example, I can't run get_worker() from an async function that runs via client.submit, but I can if the function is non-async, whereas client.run can be used to pass dask_worker for both async and non-async functions, this is how I had to workaround so far: https://github.com/rapidsai/dask-cuda/pull/1141/commits/884a59563c0daee3a13e8c8f1520b7928366c083 .

Is the above expected? Can we use get_worker() on an async function that is executed via client.submit somehow?

fjetter commented 1 year ago

For example, I can't run get_worker() from an async function that runs via client.submit,

The behavior of async functions is a bit odd nowadays. The intention is to run those in a dedicated thread as well which would fix this problem for you, see https://github.com/dask/distributed/pull/7339 Nothing is blocking this PR other than me finding time. If this is important for you, we can push this PR over the finishing line.

fjetter commented 1 year ago

I rebased https://github.com/dask/distributed/pull/7339 and if CI is green-ish we can merge this to fix your async submit problem.

fjetter commented 1 year ago

I looked at https://github.com/rapidsai/dask-cuda/commit/884a59563c0daee3a13e8c8f1520b7928366c083 and argue this "workaround" is how it is supposed to be. client.submit is intended to run user functions (on a thread) while client.run is intended to run a function/coroutine on the worker event loop, in the main thread, outside of our task scheduling. If you are actually inspecting the worker in your test, that's how you should do it

pentschev commented 1 year ago

Thanks for the details @fjetter , that makes sense to me. It seems to me that #7339 is also a good improvement.

pentschev commented 1 year ago

Just one small request for #7339 , it would be great if it could be left to merge after the upcoming release this week. We're getting into code-freeze and we've managed to fix the issues raised by https://github.com/dask/distributed/pull/7580 , so I'd prefer if we could play it safe to avoid any potential breakage in the very short term.

fjetter commented 1 year ago

it would be great if it could be left to merge after the upcoming release this week.

Not a problem. There are still some test failures anyhow and this is not top priority so this aligns well