related-sciences / ukb-gwas-pipeline-nealelab

Pipeline for reproduction of NealeLab 2018 UKB GWAS
4 stars 3 forks source link

Explore Dask Cloud Provider for deployment #19

Closed eric-czech closed 4 years ago

eric-czech commented 4 years ago

I already tried dask-kubernetes which is fine but kubernetes isn't really essential for it. Support for GCE is on it's way in cloud-provider: https://github.com/dask/dask-cloudprovider/pull/131. Using DataProc may also be another option given Dask's Yarn integration. I have no idea how the latter would work, but it might lead to much less latency in the time it takes to spin and scale a cluster up/down during an interactive analysis.

eric-czech commented 4 years ago

The dask-cloudprovider GCE support is ready for testing now: https://github.com/dask/dask-cloudprovider/pull/131#issuecomment-707806335

eric-czech commented 4 years ago

I was able to get this working after https://github.com/dask/dask-cloudprovider/pull/131 and https://github.com/dask/dask-cloudprovider/pull/164.

A session might look something like this within GCP:

from dask_cloudprovider.gcp.instances import GCPCluster
from dask.distributed import Client
import dask.array as da

# These properties can also be set as env vars
# e.g. export DASK_CLOUDPROVIDER__GCP__PUBLIC_INGRESS=false
!cat ~/.config/dask/cloudprovider.yaml | head -n 4
cloudprovider:
  gcp:
    projectid: "XXXXX"
    public_ingress: false

%%time
cluster = GCPCluster(
    name='dask-gcp-test-1', 
    zone='us-east1-c', 
    machine_type='n1-standard-8', 
    projectid='XXXXX',
    docker_image="daskdev/dask:latest",
    n_workers=0,
    ngpus=0
)
Launching cluster with the following configuration: 
  Source Image: projects/ubuntu-os-cloud/global/images/ubuntu-minimal-1804-bionic-v20201014 
  Docker Image: daskdev/dask:latest 
  Machine Type: n1-standard-8 
  Filesytsem Size: 50 
  N-GPU Type:  
  Zone: us-east1-c 
Creating scheduler instance
dask-42c86823-scheduler
    Internal IP: 10.142.0.14
    External IP: 35.231.172.201
Waiting for scheduler to run
Scheduler is running
CPU times: user 512 ms, sys: 87.2 ms, total: 599 ms
Wall time: 1min 43s
/home/eczech/miniconda3/envs/dask-cp/lib/python3.8/contextlib.py:120: UserWarning: Creating your cluster is taking a surprisingly long time. This is likely due to pending resources. Hang tight! 

%%time
cluster.scale_up(3)
CPU times: user 124 µs, sys: 42 µs, total: 166 µs
Wall time: 132 µs
Creating worker instance
Creating worker instance
Creating worker instance

# Create client (will have 0 workers until workers created above finish bootstrapping
Client(cluster)

# Open port to Web UI
# gcloud beta compute ssh --zone "us-east1-c" dask-42c86823-scheduler --ssh-flag="-L 8799:localhost:8787"
# Access locally (outside of GCP) at localhost:8799

x = da.random.random((1000, 1000), chunks=100)
y = x.T @ x
r = y.sum().sum()
r.compute()
249743712.2621659

%%time
cluster.close()
Closing Instance: dask-42c86823-worker-fff04b2f
Closing Instance: dask-42c86823-worker-8d9c7049
Closing Instance: dask-42c86823-worker-e241e40f
Closing Instance: dask-42c86823-scheduler
CPU times: user 38.1 ms, sys: 2.71 ms, total: 40.8 ms
Wall time: 2.31 s
eric-czech commented 4 years ago

Adaptive deployment example:

cluster = GCPCluster(
    name='dask-gcp-test-1', 
    zone='us-east1-c', 
    machine_type='n1-standard-8', 
    projectid='XXXXX',
    docker_image="daskdev/dask:latest",
    n_workers=0,
    ngpus=0
)

# See https://docs.dask.org/en/latest/setup/adaptive.html
# Make sure to set `interval` and/or `wait_count` to avoid nearly immediate termination
# of workers once they finish tasks for a graph (default is 3 seconds) 
# See https://github.com/dask/distributed/blob/f081ad0f51239bfeb27391fd72c62fbfa1bd7b0b/distributed/deploy/adaptive.py#L27
#     for details on those parameters
cluster.adapt(minimum=0, maximum=5, interval="300s") # Wait 5 minutes * 3 `wait_count` (by default) = 15 mins before killing a worker

client = Client(cluster)

# This will take ~5 minutes to run and cause 5 workers to be created
x = da.random.random((10000, 10000), chunks=100)
y = x.T @ x
r = y.sum().sum()
r.compute()