rapidsai / dask-cuda

Utilities for Dask and CUDA interactions
https://docs.rapids.ai/api/dask-cuda/stable/
Apache License 2.0
285 stars 91 forks source link

[ENH] Smarter portable defaults for CPU<>GPU dask interop #540

Open lmeyerov opened 3 years ago

lmeyerov commented 3 years ago

This got complicated enough that wanted to move from Slack --

Configuring and using dask cpu with dask gpu has been confusing, and is a reality both wrt hw (GPU hw is generally balanced with CPU core counts) and sw (a lot of code isn't GPU, despite our efforts ;-) ). It'd be good to get clarity in docs + defaults here:

Automating nthreads for cpu vs. gpu

We want a simple and portable config for our typical GPU case of launching on a single node with 1-8 GPUs and 4-128 cores, and write size-agnostic dask + dask-cuda code over it. That means never hard-coding the above in the config nor app and still getting reasonable resource utilization. I suspect this is true of most user. It'd also be good to predictably override this. I can't speak to multi-node clusters nor heterogenous ones.

"Just working" means:

Automating resource tagging

Separately, here's experimental discussion of using logical resources & annotations, though PyTorch experiences may not carry over to Dask-CPU <> Dask-CUDF. One idea is autotag GPU/CPU workers with # Physical vs # Logical units, and letting app devs use those.

Ex:

8 x A100 $ dask-cuda-worker 
# => {'GPU_NODES': 1, 'GPUS_PHYSICAL': 8, 'GPUS_LOGICAL': 8000}
#       'CPU_NODES': 1, 'CPUS_PHYSICAL': 128, 'CPUS_LOGICAL': 128000}

From there, code can use annotations based on hard-coded physical or more scale-free / agnostic logical styles

quasiben commented 3 years ago

@lmeyerov thanks for moving the discussion here. If you have time, I think devs would also appreciate some motivating use cases or your thoughts on where mixing GPU/CPU workers is applicable. Pseudo would also be good

lmeyerov commented 3 years ago

Some examples we see:

  1. Data ingestion: We get some annoying formats like excel files and more niche ones that we do dask jobs to turn into dataframes, and then send to dask-cudf for actual use

  2. Data cleaning: We do some cleaning/enrichment tasks that end up being cpu sometimes, and can be tricky to turn into dgdf b/c the whole point is to infer the metadata, and dgdf assumes it is known

  3. RAPIDS won't do sparse data for awhile, so we need to do dask cpu for those steps

  4. We basically do interactive data dashboards, so viewers will load data of different size and every interaction kicks off pipelines of tasks on them (filter, histogram, ML, ...). We're now switching to default-dask just to keep the web server responsive. Tasks can technically be sized for pandas vs cudf vs dask-cudf, but that can be a lot of work, so we often just run as dask-cudf with 1 partition, and maybe the kernel is cudf. (We mentally put bsql in the dask bucket of writing any-scale).

Maybe also helpful are some code patterns:

quasiben commented 3 years ago

I don't have recommendations yet (first try attempting this). Naively I setup a cluster in the following manner:

Scheduler

dask-scheduler

CPU Workers

dask-worker tcp://...:8786 --nprocs 1 --resources CPU=1.0 --name cpu-worker

GPU Workers

dask-cuda-worker tcp://...:8786 --resources GPU=1 --name gpu-worker

Client

import dask
import cudf
from dask.distributed import Client, wait

client = Client('localhost:8786')

with dask.annotate(resources={'CPU': 1}):
    ddf = dask.datasets.timeseries()
    ddf = ddf.persist()
    wait(ddf)

assert isinstance(ddf, dask.dataframe.core.DataFrame)
with dask.annotate(resources={'GPU': 1}):
    cdf = ddf.map_partitions(cudf.DataFrame)
    cdf = cdf.persist()

assert isinstance(cdf, dask_cudf.core.DataFrame)

The above worked but I can see this being brittle in that missing an annotation can to lead to problems. I agree that automatic resource tagging for dask-cuda-workers could make sense especially in these scenarios. For dask-workers, I don't think we can ask to auto tag workers.

Are you asking for LocalCUDACluster to optionally spin up additional CPU workers with tags ? I could see this being very convenient but also highly prone mistakes. Probably need to think about this more (cc @pentschev )

I'm also curious what you mean by:

RAPIDS won't do sparse data for awhile, so we need to do dask cpu for those steps

Do you mean sparse data frames or sparse arrays ? If the latter, CuPy has grown some what recently to meet these needs

lmeyerov commented 3 years ago

Thanks. I was starting to get to that conclusion as the current happy path based on the slack conv. Doing some reworking here to be closer to that, I'm guessing it'll take a few weeks to see how this phase ends up. Been a lot of surprises up to here (ex: diff exn's mixing diff clients to same worker), and a sane dask Resource model that's concurrency friendly.

Re:sparse, sparse Series. Typically dense when indexed by another col. Ex: df['stick'] might be sparse, while df[ df['type'] == 'car' ]['stick'] is dense. Related but a bit more there, esp. when doing ML, we're starting to see many-column datasets. But that's all for another ticket..

Will share feedback as it gets clearer.

lmeyerov commented 3 years ago

Interim update: blazingsql context creation is getting wedged when combining dask-worker --resources CPU=1 and dask-cuda-worker --resources GPU=1 (despite the happy case of single node / single gpu / sequential test). Will help bsql folks track down and once that's unblocked, return here..

chrisroat commented 3 years ago

I am also curious if this could interact with cluster scaling (and things like dask-gateway). Could each type of worker need to be tracked by some key and scaled independently?

(In my case, I have a workflow with "big disk", "gpu" and "cpu" workers.)

github-actions[bot] commented 3 years ago

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

quasiben commented 3 years ago

Hi folks, you might be interested in @madsbk recent work https://github.com/dask/distributed/pull/4869 allowing workers to have multiple executors

github-actions[bot] commented 2 years ago

This issue has been labeled inactive-90d due to no recent activity in the past 90 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed.

github-actions[bot] commented 2 years ago

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.