dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

`Client` shouldn't create `LocalCluster` #6792

Open jsignell opened 2 years ago

jsignell commented 2 years ago

This came out of the dev meeting today. The idea is that the separation between Client and Cluster is opaque and the hope is that making the separation clearer will help with understanding for users. The proposal here is that Client should not create a LocalCluster on the fly if no arguments are provided. Instead we should recommend people create the cluster explicitly.

current

from distributed import Client

client = Client()

proposed

from distributed import LocalCluster

client = LocalCluster().get_client()

The benefit of creating the cluster explicitly is that we can provide better help, and it makes it clearer to the user how they would go about using a different type of cluster.

Related to #6791

ian-r-rose commented 2 years ago

I love this idea.

mrocklin commented 2 years ago

I'm not sure that I love this idea. Client() is magical, yes, but early on people seem to like magic.

I'm totally fine with LocalCluster().get_client(), but I would hesitate to remove Client()

On Mon, Aug 1, 2022 at 10:48 AM Ian Rose @.***> wrote:

I love this idea.

— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/6792#issuecomment-1201378314, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTA6AKANWC4PRD5V7IDVW7WT5ANCNFSM54XHOC5A . You are receiving this because you are subscribed to this thread.Message ID: @.***>

jacobtomlinson commented 2 years ago

Some more thoughts...

I see a lot of confusion coming from **kwargs in Client being passed to LocalCluster. This is the magic that seems to harm understanding in the longer term.

I quite like how creating a bare client does this magically.

from dask.distributed import Client

client = Client()  # If DASK_SCHEDULER_ADDRESS is set then connect, otherwise create a LocalCluster

I just dislike how Client and LocalCluster are entwined in terms of kwargs.

My personal preference is to be explicit but also concise. For example I like the following:

from dask.distributed import Client, LocalCluster

client = Client(LocalCluster(foo="bar"))

In fact I much prefer this to LocalCluster(foo="bar").get_client().

Alternatively, we could just stop passing **kwargs straight through and instead have a single kwarg that takes a dict. This is still concise but not confusing.

from dask.distributed import Client

client = Client(..., spawn_cluster_options={"foo": "bar"})

If we keep the magic it would be nice for this to be configurable. For example in a RAPIDS environment I would like to be able to set the default cluster type to LocalCUDACluster instead. But I think the poor kwarg separation makes that more challenging.

gjoseph92 commented 1 year ago

Doesn't seem like we did anything here, but it's still an interesting discussion.

I have a feeling that users don't actually care very much about the Cluster object. What methods do you call on a Cluster? I can only think of:

Whereas the Client is the primary user-facing object. I'm not actually sure that the separation between Client and Cluster is something users need to know about more, because why do they need to know about Cluster? They need to know conceptually about clusters, and that they can run in many places, but the Cluster class doesn't feel very important to interact with once you've created it. Plus, you can always access it through client.cluster anyway.

So having to make a Cluster object, just to pass it into a Client and then not touch it again, feel boilerplate-y. Having Client() handle this for you is maybe part of the magic that @mrocklin likes.

Perhaps what we want here is actually a higher-level interface for creating both a Cluster and connecting a Client to it?

For example:

import distributed

client = distributed.launch()
client = distributed.launch(n_workers=4, memory_limit="2 GiB")

###

import dask_kubernetes

client = dask_kubernetes.operator.launch()
client = dask_kubernetes.operator.launch(custom_cluster_spec=make_cluster_spec(**config))

###

import coiled

client = coiled.launch()
client = coiled.launch(backend_options={...})

where launch is really just

def launch(**kwargs) -> distributed.Client:
    return ClusterClass(**kwargs).get_client()

A couple challenges:

  1. Naming. launch is probably not a good name? But I bet there's probably some name for this that's ok?

    Also, should it be like:

    import distributed
    client = distributed.launch()
    
    import dask_kubernetes
    client = dask_kubernetes.operator.launch()

    or:

    import distributed
    client = distributed.launch_local()
    
    import dask_kubernetes
    client = dask_kubernetes.operator.launch_k8s()

    (Should the name of the function always be the same, and you tell where it's launching by the import? Or should the name of the function reflect where the cluster is launching?)

  2. We'd need to make a slight change to the Client so that it can "own" the lifecycle of clusters besides LocalClusters. Right now, if you do Client(), client.close() will also call close on the LocalCluster that it created. But with Client(Cluster(...)), client.close() will not shut down the cluster.

    With this interface, users wouldn't have a reference to theCluster object, so shutting down the Client should also shut down the cluster. This is an easy change, and I think would generally make things easier to understand anyway.

jacobtomlinson commented 1 year ago

Many cluster managers also have adapt and get_logs which are useful too, and some have platform specific methods like KubeCluster.add_worker_group, but in general I agree that the API of a cluster is small.

I'm not super excited about combining the Client and Cluster objects though, they serve very different purposes. The Client is a way of talking to the scheduler and submitting work to the workers. The Cluster is a way of interacting with the resource manager and overseeing the scheduler and worker processes.

If methods like scale, adapt, get_logs, etc were moved to the Client it doesn't make much sense to me given those are not interactions with the scheduler.

I'm a big fan of adding more syntactic sugar around things to make the user experience more pleasant though.

gjoseph92 commented 1 year ago

I'm not super excited about combining the Client and Cluster objects though, they serve very different purposes

Agreed—did the examples above make it seem like they were combined? They're certainly not intended to be combined. The launch API would be leaning into the fact that Client already contains a Cluster (so by just returning a Client, you actually get access to both objects), but they're still separate objects.

If methods like scale, adapt, get_logs, etc were moved to the Client it doesn't make much sense to me

Also agreed, and I hope they wouldn't be. You'd do client.cluster.scale, client.cluster.adapt, etc. to access those.

jacobtomlinson commented 1 year ago

Yup yup sounds like we are on the same page. Thanks for confirming :).

jacobtomlinson commented 1 year ago

Some thoughts on the naming. I quite like launch(). It does what it says on the tin and is easy for folks to remember. Perhaps package namespacing can help make things more clear.

client = distributed.deploy.local.launch()
client = distributed.deploy.subprocess.launch()
client = distributed.deploy.ssh.launch()

# Although perhaps adding some nicer namespacing shims in the dask package makes more sense
client = dask.cluster.launch()  # To keep simple cases simple drop the `local` subpackage
client = dask.cluster.subprocess.launch()
client = dask.cluster.ssh.launch()

# Downstream projects could easily add a launch method to their existing subpackages

## dask-kubernetes
client = dask_kubernetes.operator.launch()
client = dask_kubernetes.helm.launch()
client = dask_kubernetes.classic.launch()

## dask-cloudprovider
client = dask_cloudprovider.aws.ec2.launch()
client = dask_cloudprovider.aws.ecs.launch()  # Although FargateCluster may need to be moved to a new submodule because it lives in here too
client = dask_cloudprovider.azure.azurevm.launch()
client = dask_cloudprovider.gcp.instances.launch()
client = dask_cloudprovider.hetzner.vserver.launch()
client = dask_cloudprovider.digitalocean.droplet.launch()

## dask-yarn
client = dask_yarn.launch()

## dask-jobqueue
client = dask_jobqueue.slurm.launch()
client = dask_jobqueue.pbs.launch()
client = dask_jobqueue.sge.launch()
client = dask_jobqueue.htcondor.launch()
client = dask_jobqueue.moab.launch()
client = dask_jobqueue.oar.launch()
client = dask_jobqueue.lsf.launch()

I'm not sure how easily dask-gateway could follow this because the Gateway object needs to be created first.

gjoseph92 commented 1 year ago

I think that looks pretty cool!

I'm not sure how easily dask-gateway could follow this because the Gateway object needs to be created first.

The Gateway object could have a launch method itself?

jacobtomlinson commented 1 year ago

Something like?

from dask_gateway import Gateway

gateway = Gateway("http://146.148.58.187")
client = gateway.launch()

The other option could be to check whether the Gateway kwargs and GatewayCluster kwargs have any overlap. I don't think they do so we could pull them off and create all three objects.

import dask_gateway

client = dask_gateway.launch(**gateway_kwargs, **cluster_kwargs)

Where launch is:

def launch(**kwargs) -> distributed.Client:
    gateway_keys = [...]
    cluster_keys = [...]
    [gateway_kwargs, cluster_kwargs] = map(lambda keys: {x: data[x] for x in keys}, [gateway_keys, cluster_keys])
    gateway = Gateway(**gateway_kwargs)
    return ClusterClass(**cluster_keys).get_client()