dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Workers should offer a ProcessPoolExecutor by default #8002

Open crusaderky opened 1 year ago

crusaderky commented 1 year ago

Worker by default runs all tasks on a ThreadPoolExecutor. Additionally, it allows any concurrent.futures.BaseExecutor, which can then be invoked through annotations:

w = await Worker(executors={"process": ProcessPoolExecutor()})
with dask.annotate(executor="process"):
    ... # define GIL-intensive tasks

This feature, as far as I can tell, is for advanced users only as you'll need to call your own Worker constructor (or define a Worker subclass).

There's a somewhat common pattern among all levels of users, particularly when graph definition is very heavy, to do it in a task:

def f():
    a = ... # dask collection definition. Takes a long time and hogs the GIL
    distributed.secede()
    return a.compute()

client.submit(f).result()

or

@dask.delayed
def f():
    a = ... # dask collection definition. Takes a long time and hogs the GIL
    distributed.secede()
    return a.compute()

f().compute()

This pattern today can be very problematic as the GIL-intensive work will hamstring everything else. It would be good to offer users a simple way to work around the GIL.

Proposed design

  1. Write a ProcessPoolExecutor subclass or variant which is compatible with secede() / rejoin() and start it automatically (with 0 processes) when the Worker starts, registered under executor="processes".
  2. Add a executor= parameter to Client.submit.

After that, a user with the above use case will simply need to add a single line to their workflow:

def f():
    a = ... # dask collection definition. Takes a long time and hogs the GIL
    distributed.secede()
    return a.compute()

client.submit(f, executor="process").result()

or

@dask.annotate(executor="process")
@dask.delayed
def f():
    a = ... # dask collection definition. Takes a long time and hogs the GIL
    distributed.secede()
    return a.compute()

f().compute()

Note that the latter form will break if you accidentally invert @dask.annotate and @dask.delayed. Not sure how to warn the user of that, as both syntaxes are meaningful in certain use cases.

fjetter commented 1 year ago

I'm -1 on implementing a new executor with secede/rejoin semantics. I believe the secede/rejoin should rather be removed from the thread pool as well (see https://github.com/dask/distributed/pull/6607 for more context)

The larger difficulties with processpools are around serialization and the fact that we must not start the worker itself as a daemon which would pose possibly large problems for the lifecycle. If I'm not mistaken, your example above will not work, at least not if there is a Nanny around.

If this is just about not blocking event loops during graph serialization/materialization, a much easier first step is https://github.com/dask/distributed/issues/7980

crusaderky commented 1 year ago

7980 is well and good, but it's for the scheduler side. This is for the worker side.

Also, the event loop blockage happens during graph definition and optimization, which can be substantially more CPU-intensive than serialization/materialization.

The larger difficulties with processpools are around serialization

Why? Of course we would need to use dask's own pickle -> cloudpickle instead of concurrent.futures's just pickle.

we must not start the worker itself as a daemon which would pose possibly large problems for the lifecycle. If I'm not mistaken, your example above will not work, at least not if there is a Nanny around.

I was not aware of this. Why are workers daemonized to begin with?

fjetter commented 1 year ago

Why? Of course we would need to use dask's own pickle -> cloudpickle instead of concurrent.futures's just pickle.

I tried this recently and ran immediately into non-trivial serialization errors but I didn't investigate further. We have the technology to make this robust but I believe right now it isn't. I suspect we'd need to do the task deserialization on the executor instead of on the worker main/offload thread.

I was not aware of this. Why are workers daemonized to begin with?

Well, when using a Nanny, the Nanny is basically the primary interface the deployment is dealing with. If anybody wanted to terminate the worker, it would target the nanny process and not the worker process. If Nanny process terminates, the Worker should properly be torn down as well and not block the shutdown. Launching this process as a daemon ensures this process will never block the shutdown of the Nanny. It is possible that the Nanny is implemented well enough for this not to be an issue (e.g. close the worker process quickly enough when handling various signals) but I'm not sure if this is the case.

My guess is that this is mostly caution but the entire process management in the nanny is a bit messy.

In case my comment above was not clear. I'm not against making it easier to launch a processpool. I just don't want to have secede/rejoin semantics because I feel this drives complexity a lot and they are actually not necessary.