dask / dask-jobqueue

Deploy Dask on job schedulers like PBS, SLURM, and SGE
https://jobqueue.dask.org
BSD 3-Clause "New" or "Revised" License
235 stars 142 forks source link

`JobQueueCluster` with local worker(s) #596

Open fnattino opened 2 years ago

fnattino commented 2 years ago

Thanks a lot for this fantastic library - it is really awesome!

I'd love to hear your opinion on the following use case. I have access to a SLURM cluster where I am not allowed (or I am at least discouraged) to run tasks such as a Jupyter server or a Dask scheduler on the login node, and the minimal partition size that I can request via a batch job is 32 cores.

I can start the Jupyter server and instantiate a SLURMCluster in a batch job, then scale up the cluster by adding resources via SLURM. However, the initial batch job where Jupyter and the Dask scheduler are running still occupies 32 cores - which is a bit wasteful. Right now I could "manually" create an additional local worker and connect it to the scheduler to fill the remaining allocated resources. But maybe it could be useful to have the option to add a local worker when instantiating a JobQueueCluster?

jacobtomlinson commented 2 years ago

This is an interesting question.

I'm not sure that having the FooCluster object also spawn a local worker is the right approach here. It feels messy and a bad separation of concerns.

Howerver I agree manually creating a local worker right now isn't the best user experience. I had a quick go and it was something like this:

import asyncio

# Using LocalCluster to sake of demo but could be any FooCluster
from dask.distributed import LocalCluster, Nanny, Client
from distributed.comm import get_address_host_port

async def main():
    async with LocalCluster() as cluster:
        async with Nanny(*get_address_host_port(cluster.scheduler_address), silence_logs=True) as nanny:
            async with Client(cluster) as client:
                print(cluster)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

It would be much nicer if there was some non-asyncio way of starting a worker by simply passing it the cluster object, the same way we can create a client from a cluster today.

Something like this:

from dask.distributed import LocalCluster, Nanny, Client

cluster = LocalCluster()
local_worker = Nanny(cluster)
client = Client(cluster)
guillaumeeb commented 2 years ago

Hi @fnattino, thanks for raising this. This is a problem we already identified but with no good solutions yet. There is some discussion about it in some issues, (here for example) and a solution (yet to be implemented) is proposed in https://github.com/dask/dask-jobqueue/issues/419.

Basically, the idea I had is to be able to create a LocalCluster and reuse its Scheduler object to build a JobQueueCluster on top of it.

But I also like the proposal of @jacobtomlinson!

@fnattino if you'd like to give a try implementing one of these solutions, this would be very much appreciated :).

fnattino commented 2 years ago

@jacobtomlinson @guillaumeeb thank you both for your quick replies and for pointing out other relevant issues! I will try to experiment a bit with your suggestions and, should I get to something useful, I will keep you posted. Thanks again!