dask / dask-cloudprovider

Cloud provider cluster managers for Dask. Supports AWS, Google Cloud Azure and more...
https://cloudprovider.dask.org
BSD 3-Clause "New" or "Revised" License
130 stars 104 forks source link

Support multiple workers per VM #173

Open eric-czech opened 3 years ago

eric-czech commented 3 years ago

It may be more efficient to run many workers on larger VMs as opposed to larger numbers of worker processes on small VMs, presumably to avoid inter-VM communication. This was a suggestion that resulted from this thread on optimizing some dask array workflows: https://github.com/pystatgen/sgkit/issues/390.

@quasiben mentioned that threads per worker can be controlled with worker_options={"nthreads": 2 }, but there appears to be no way to run more than one worker on a single VM.

quasiben commented 3 years ago

I think for this to work we need to change:

https://github.com/dask/dask-cloudprovider/blob/d5dfd99e0d29b1c6b03cb291633484fdd367ed4d/dask_cloudprovider/gcp/instances.py#L318-L330

To handle a spec which looks like the following:

workers_options = {
    'worker-1': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 1}},
    'worker-2': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 2}},
}
jacobtomlinson commented 3 years ago

When running on a local machine LocalCluster optimises this for you right? On my laptop with 12 cores it runs three workers with four threads each.

Perhaps we can reuse this logic?

quasiben commented 3 years ago

You mean, have dask-cloudprovider inspect the system and assign worker/threads accordingly ?

jacobtomlinson commented 3 years ago

Yeah. We do this for LocalCluster here.

jacobtomlinson commented 3 years ago

dask/distributed#4377 allows us to detect and create worker/threads automatically.

dask-worker --nprocs=auto

Once a release is out we can update things here to use this new option.

eric-valente commented 3 years ago

any progress here?

jacobtomlinson commented 3 years ago

@eric-valente with the latest release of dask and distributed you should be able to set worker_options={"nprocs": "auto"}. Although I haven't managed to come back here and test this yet.

If you could give it a go and report back it would be much appreciated!

eric-valente commented 3 years ago

Hi @jacobtomlinson Tried it with packages and no luck. The workers will turn on in EC2 and won't connect to the scheduler and then proceed to die. When I remove the nprocs it works fine but again only has 1 worker per VM with threads = # of cores

worker_options={'nprocs':'auto'}

dask 2021.4.1 pyhd8ed1ab_0 conda-forge dask-cloudprovider 2021.3.1 pyhd8ed1ab_0 conda-forge dask-core 2021.4.1 pyhd8ed1ab_0 conda-forge distributed 2021.4.1 py38h578d9bd_0 conda-forge

jacobtomlinson commented 3 years ago

Strange! Are you able to get any logs from the EC2 instances? They tend to get dumped in var/log/cloud-init-output.log.

eric-valente commented 3 years ago

@jacobtomlinson

This error then worker dies: image

Here are my settings: image

Note, this works if i simply remove nprocs from the worker_options. Thanks for your help!

jacobtomlinson commented 3 years ago

Thanks @eric-valente. You also need to set the worker_class kwarg to "dask.distributed.Nanny".

The traceback you got above looks like a bug, I'll raise that back in distributed.

@quasiben what do you think about making dask.distributed.Nanny the default instead of dask.distributed.Worker?

eric-valente commented 3 years ago

Thanks again for your help here @jacobtomlinson

Tried setting the worker class to Nanny but still same issue:

image

image

eric-valente commented 3 years ago

https://github.com/dask/distributed/issues/4640 seems maybe related

eric-valente commented 3 years ago

Seems dask.distributed.Nanny does not accept nprocs either https://distributed.dask.org/en/latest/_modules/distributed/nanny.html#Nanny

Fails with above init error: --spec '{"cls": "dask.distributed.Nanny", "opts": {"nprocs": 4, "name": "dask-4ffc56fd-worker-dd6509e3"}}'

Works: --spec '{"cls": "dask.distributed.Nanny", "opts": {"ncores": 4, "name": "dask-4ffc56fd-worker-dd6509e3"}}'

eric-valente commented 3 years ago

Seems like the cloud-init to add a worker uses this style of starting a worker: python -m distributed.cli.dask_spec tcp://x.x.x.x:8786 --spec {"cls": "distributed.nanny.Nanny", "opts": {"nprocs": 4, "name": "dask-4ffc56fd-worker-dd6509e3"}}

vs. this style: dask-worker --nprocs=auto

quasiben commented 3 years ago

+1 to making dask.distributed.Nanny the default.

eric-valente commented 3 years ago

@jacobtomlinson Yeah it seems like EC2Cluster uses python -m distributed.cli.dask_spec and passing in worker_class

image

As suggested above, I think you might need to accept multiiple workers defined in worker_options and skip nprocs?

workers_options = {
    'worker-1': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 1}},
    'worker-2': {"cls": "dask.distributed.Nanny", "opts": {"nthreads": 2}},
}

I think I could use worker_module but it is not a valid parameter for EC2Cluster: image This would allow me to use dask-worker with --nprocs=x

jacobtomlinson commented 3 years ago

I think I could use worker_module but it is not a valid parameter for EC2Cluster

It should be, have you tried it?

shireenrao commented 2 years ago

Hello Is there an update on this issue? I am trying to do something similar. Currently I am creating my cluster manually where I start the scheduler on one EC2 and then on another EC2 I create the workers like this:

dask-worker  tcp://XX.X.X.XXX:8786 --nprocs n --nthreads 1

Where n is the number of CPU's on that EC2.

It would be great if this can be done by passing nprocs and nthreads as worker_options to EC2Cluster. If I can help in anyway please let me know.

Thank you

jacobtomlinson commented 2 years ago

@shireenrao have you tried it?

I would expect the following to work.

cluster = EC2Cluster(..., worker_class="distributed.nanny.Nanny", worker_options={"nprocs": "n"})
shireenrao commented 2 years ago

@jacobtomlinson - I tried that and it fails. This is the stack trace I see

python -m distributed.cli.dask_spec tcp://XXX.X.X.XXX:8786 --spec '{"cls": "distributed.nanny.Nanny", "opts": {"nprocs":"4", "name": "dask-44bad6fe-worker-ecaffb24"}}'
no environment.yml
distributed.nanny - INFO -         Start Nanny at: 'tcp://XXX.17.0.X:41233'
distributed.nanny - ERROR - Failed to initialize Worker
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 844, in _run
    worker = Worker(**worker_kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/worker.py", line 733, in __init__
    super().__init__(
TypeError: __init__() got an unexpected keyword argument 'nprocs'
distributed.nanny - ERROR - Failed while trying to start worker process: __init__() got an unexpected keyword argument 'nprocs'
distributed.nanny - INFO - Closing Nanny at 'tcp://XXX.17.0.X:41233'
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/opt/conda/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/opt/conda/lib/python3.8/site-packages/distributed/cli/dask_spec.py", line 43, in <module>
    main()
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/conda/lib/python3.8/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/cli/dask_spec.py", line 39, in main
    asyncio.get_event_loop().run_until_complete(run())
  File "/opt/conda/lib/python3.8/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/opt/conda/lib/python3.8/site-packages/distributed/cli/dask_spec.py", line 33, in run
    servers = await run_spec(_spec, *args)
  File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 659, in run_spec
    await asyncio.gather(*workers.values())
  File "/opt/conda/lib/python3.8/asyncio/tasks.py", line 690, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 692, in start
    msg = await self._wait_until_connected(uid)
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 811, in _wait_until_connected
    raise msg["exception"]
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 844, in _run
    worker = Worker(**worker_kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/worker.py", line 733, in __init__
    super().__init__(
TypeError: __init__() got an unexpected keyword argument 'nprocs'
jacobtomlinson commented 2 years ago

Sorry my bad looks like you should be setting nthreads and ncores.

https://github.com/dask/distributed/blob/3afc6703fb480b9fe5c983c84d6ea115810dd1ff/distributed/nanny.py#L88-L89

shireenrao commented 2 years ago

@jacobtomlinson - I am trying this on a ec2 instance with 36 vCPU's. Setting ncores and nthreads only starts 1 Nanny process with 36 threads. Here are the logs from the worker

python -m distributed.cli.dask_spec tcp://XX.XXX.XX.XX:8786 --spec '{"cls": "distributed.nanny.Nanny", "opts": {"ncores":36, "nthreads":1, "name": "dask-44bad6fe-worker-ecaffb24"}}'
/opt/conda/lib/python3.8/site-packages/distributed/nanny.py:172: UserWarning: the ncores= parameter has moved to nthreads=
  warnings.warn("the ncores= parameter has moved to nthreads=")
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.17.0.2:45419'
distributed.worker - INFO -       Start worker at:     tcp://172.17.0.2:42719
distributed.worker - INFO -          Listening to:     tcp://172.17.0.2:42719
distributed.worker - INFO -          dashboard at:           172.17.0.2:45887
distributed.worker - INFO - Waiting to connect to:    tcp://XX.XXX.XX.XX:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         36
distributed.worker - INFO -                Memory:                  68.59 GiB
distributed.worker - INFO -       Local Directory: /dask-worker-space/worker-2ywv7v5h
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:    tcp://XX.XXX.XX.XX:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

From a jupyter notebook session this is what one sees for the cluster

<Client: 'tcp://172.17.0.2:8786' processes=1 threads=36, memory=68.59 GiB>

Where as when I start the worker manually with

dask-worker tcp://XX.XXX.XX.XX:8786 --nprocs=36

Jupyter Notebooks shows the cluster to be

<Client: 'tcp://172.17.0.2:8786' processes=36 threads=36, memory=68.59 GiB>

All the CPU's are not being utilized.

jacobtomlinson commented 2 years ago

Right I'm with you sorry. It seems that dask_spec doesn't support that today. Implementing that would have to be done upstream in distributed.

ansar-sa commented 2 years ago

Could we please request / create issue in distributed upstream to integrate this work.

For my use case it makes more sense to have a few very large machines than lots of small VMs.

jacobtomlinson commented 2 years ago

The spec in distributes now accepts a list of specs. This does assume that nodes are regular as we would need to specify number of workers and cores/memory for each worker. But we should be able to implement this here now without needing any upstream changes.

cspagemarine commented 2 years ago

Has anybody been able to achieve this?

emkademy commented 1 year ago

Are there any news on this issue?

kumarprabhu1988 commented 1 year ago

@jacobtomlinson I tried this on EC2Cluster like this:

ec2_cluster = EC2Cluster(
    env_vars=env_vars,
    extra_bootstrap=EC2_BOOTSTRAP_COMMANDS,
    filesystem_size=cluster_config["volume_size"],
    instance_tags=cluster_config["ec2_instance_tags"],
    n_workers=cluster_config["n_workers"],
    #worker_class="distributed.nanny.Nanny", 
    worker_options={i: {"cls": Nanny, "options": {"nthreads": 2}} for i in range(4)},
    scheduler_instance_type=cluster_config["scheduler_instance_type"],
    auto_shutdown=False,
    shutdown_on_close=False,
    security=False,  # https://github.com/dask/dask-cloudprovider/issues/249,
    volume_tags=cluster_config["ec2_instance_tags"],
    worker_instance_type=cluster_config["worker_instance_type"],
)

but this errors out with a json encode error in VMCluster

~/miniconda3/envs/pricing/lib/python3.7/site-packages/dask_cloudprovider/generic/vmcluster.py in __init__(self, scheduler, worker_module, worker_class, worker_options, *args, **kwargs)
    138                             "opts": {
    139                                 **worker_options,
--> 140                                 "name": self.name,
    141                             },
    142                         }

~/miniconda3/envs/pricing/lib/python3.7/json/__init__.py in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    229         cls is None and indent is None and separators is None and
    230         default is None and not sort_keys and not kw):
--> 231         return _default_encoder.encode(obj)
    232     if cls is None:
    233         cls = JSONEncoder

Any ideas? Will this require some changes in cloudprovider?

jacobtomlinson commented 1 year ago

@kumarprabhu1988 I think the cls need to be a string, and it probably needs to be "distributed.nanny.Nanny".

kumarprabhu1988 commented 1 year ago

@jacobtomlinson So I tried this, and no ec2 worker machine creation fails and there are no workers created, but it is a silent failure - no errors.

jacobtomlinson commented 1 year ago

Could you set debug=True and grab the /var/log/cloud-init-output.log logs from the instances so we can see what went wrong?

kumarprabhu1988 commented 1 year ago

Sure, I can do that. In the meanwhile I looked at the code and tried a to pass in worker configuration with some code changes and it worked. Here's the change I made. Let me know what you think.