dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 720 forks source link

Better GPGPU integration #1226

Open crusaderky opened 7 years ago

crusaderky commented 7 years ago

Greetings,

I'm trying to figure out how dask can efficiently implement a hybrid GPGPU algorithm. Apologies if this is going to be a long post - as I think it's best if I share my analysis of the problem before I get to my proposed solution.

Memory management

GPGPU on dask falls in 4 big use cases:

  1. discrete graphics card with seamless unified memory (e.g. NVidia Pascal and later).
  2. integrated GPU with shared memory - e.g. Intel / AMD CPUs with a OpenCL-capable GPU, NVidia Tegra, etc.
  3. discrete graphics card without seamless-unified memory, but implementing algorithms where copying all memory from host to GPU and from GPU to host every single time you run a kernel only causes a minor performance loss. Again, the dask kernels always acquire in input and return plain numpy arrays.
  4. discrete graphics card without seamless-unified memory, where copying memory back and forth has a major cost, so it's necessary to find a way to explicitly keep the data in GPU space.

In use cases 1, 2, and 3, the input and output of all dask kernels is a plain numpy array, whose underlying RAM may be in GPU or host space.

In the discussion below, I'm going to deliberately ignore use case 4, as it would add a massive amount of complication both in terms of handling non-numpy data between kernels and in preventing out-of-memory errors in GPU space.

Limits of the current dask design

To my knowledge, the only support that dask offers for GPGPU is provided by http://distributed.readthedocs.io/en/latest/resources.html.

This has several limitations:

C C C C C C C C
^ ^ ^ ^ ^ ^ ^ ^
C C C C C C C C

C = dask kernel that consumes 100% of 1 CPU core

Now you want to convert only the second stage to GPGPU, as converting the first stage would have a poor cost/benefit ratio - e.g., 80% of the complexity of the algorithm is in stage 1, but 80% of the time is spent in stage 2.

G G G G G G G G
^ ^ ^ ^ ^ ^ ^ ^
C C C C C C C C

G = dask GPU kernel

Let's say you run the above on a 4x CPU, 1x GPU machine. In theory, one could run 4 stage-1 kernels on CPU, and then the other 4 stage-1 kernels in parallel with the first batch of stage-2 GPU kernels. However, there's no way to tell this to dask, as (to my understanding, please correct me if I'm wrong) if you run a task with resources={GPU: 1}, it will still lock 1 CPU. You could, of course, work around it by starting your worker with --resources CPU=4 --threads 999 and then explicitly setting resources={CPU: 1} on every CPU-based kernel - but when working with legacy code and/or higher-level libraries (e.g. xarray, or even just the dask DataArray framework) this can become incredibly hard.

This problem is also present in non-GPU problems where you have a slow I/O task - there's no way to tell dask that the CPU consumption of that task is going to be negligible.

Proposed design - first iteration

  1. Move non-CPU resource management to plain dask (e.g. not distributed) - see below.
  2. Write a @dask.resources(res1=quantity, res2=quantity...) decorator, which can then be applied to dask kernels. The implementation would be to simply add a dask_resources attribute to the decorated function, in order to retain seamless compatibility with dask dicts.
  3. Treat "CPU" as a special, hardcoded resource, which defaults to 1 if omitted and can be set to 0.
  4. When using dask.multithreading.get, the available resources will default to CPU=. When using dask.multiprocessing.get, the available resources will default to CPU=1. Add extra resources with dask.set_options(resources={"GPU": 1}).
# Don't lock a CPU core, as this kernel is going to be 99% GPU
@dask.resources(GPU=1, CPU=0)
def my_dask_kernel(my_input):
    my_output = numpy.empty_like(my_input)
    # Use numba.cuda, PyCUDA, PyOpenCL, or ctypes/cffi linking to a .cu library
    my_gpu_kernel(my_output, my_input)
    return my_output

Second iteration

This second batch of changes is specifically aimed at multi-GPU systems. Without it, multi-GPU systems that run single-GPU problems should probably start 1 independent worker per GPU - although it will cost needless pickling-unpickling and IPC, and it will make it impossible to use nvlink to transfer memory across GPUs.

  1. In the @dask.resources decorator, accept an optional parameter affinity=["GPU"] which indicates that the task should (if possible) run on the same GPU Resource ID as the task that produced its inputs.

  2. Offer a function, dask.resource_id(resource_name) which returns a thread-local resource ID. e.g.

    # In multi-GPU systems, it's preferred to run on the same GPU that produced my_input in order to avoid a costly memory transfer.
    @dask.resources(GPU=1, CPU=0, affinity=["GPU"])
    def my_dask_cuda_kernel(my_input):
    gpuid = dask.resource_id("GPU")     # e.g. 0 or 1
    my_output = numpy.empty_like(my_input)
    my_cuda_kernel(my_output, my_input, gpuid)
    return my_output

    The resource ID is

    • an integer counting from zero, for all resources that were requested as an integer =1 e.g. @dask.resources(GPU=1) -> dask.resource_id("GPU") may return 0, 1, 2, 3 on a quad-GPU host
    • a tuple, for all resources that were requested as an integer > 1. e.g. @dask.resources(GPU=2) -> dask.resource_id("GPU") may return (0, 3) on a quad-GPU host
    • won't work for float resources (e.g. RAM)
  3. When defining which resources are available, either from the command-line or with dask.set_options, allow grouping together resource IDs in affinity groups. The idea is that, if you have a 2 GPUs system, you really do want to have 4 single-GPU tasks running at the same time, in order to obtain optimal GPU usage and minimize the overhead.

e.g. dask.set_options(resources={"GPU": ((0, 1), (2, 3))} describes a 2-GPU system with 4 GPU tasks. When a task specifies @dask_resources(GPU=1, affinity=["GPU"]), and its input ran on GPU 0, then it means it's going to be optimal to run the task either on GPU 0 or GPU 1.

  1. When affinity groups are defined and @dask.resources requests more than 1 GPU, it should always mean that they must be within the same affinity group if there's a group large enough. e.g. in the configuration above, @dask.resources(GPU=2) means that the task will run on one whole graphics card, whereas @dask.resources(GPU=4) will run on both graphics cards.

Opinions? Did I miss major designs already existing or in the works? Can somebody come up with simpler ideas to tackle the problem?

Thanks

mrocklin commented 7 years ago

@crusaderky I haven't yet read through this entirely, but a couple of small comments:

  1. Are you talking about strictly dask.array? or dask generally? If you're talking about dask generally then presumably you can represent data however you like, not just with numpy arrays.
  2. The distributed scheduler is a bit of a misnomer. It works just fine on a single machine. In your case you probably want to do something like the following:
from dask.distributed import LocalCluster
cluster = LocalCluster(processes=False, n_workers=0)
cluster.start_worker(ncores=..., resources={'GPU': ...})

client = Client(cluster)

All of the resource management things will be there. Data moved between workers will move along tornado queues and not be serialized in any way.

mrocklin commented 7 years ago

A couple more comments:

  1. You might consider the secede function added in #1201 and described in https://github.com/dask/dask/pull/2501 to mark a task as not really needing a CPU
  2. It is unlikely that the first-generation scheduler will be significantly extended.
  3. Dask does support in-memory inter-worker communication channels with queues. I suspect that some of your data transfer concerns could be met that way.
  4. Alternatively you could bundle up your on-device-memory objects with Python objects that serialized themselves by just including a pointer-in-memory. You might also want to look at the previous issue https://github.com/dask/distributed/issues/400

Were any of the previous comments helpful?

crusaderky commented 7 years ago

Hi @mrocklin , sorry for the delayed answer.

I'm talking about both dask in general and dask.array specifically. The beauty of NVidia Pascal is that it opens the doors to holding device arrays with plain numpy - hence removing one of the biggest hurdles in integrating CUDA inside the dask.array paradigm.

I see how secede() can avoid locking a CPU for a pure-GPU task. However dask.array can't make any use of it as far as I understand? Same for dask queues.

I understand that dask already tries avoiding pickling/unpickling and sending data over the network as much as possible, and #400 is a good idea. However, in a scenario of a worker with 2 GPUs, how does dask prevent a needless RAM transfer between the two GPUs? AFAIK today dask thinks "if a task passes its outputs to another task and they both run within the same process, the memory transfer cost is zero". This is false for multi-GPU - you need a way to inform dask of a "hierarchy of transfer costs", if we want to call it like that:

same GPU < transfer between GPUs in the same process, or between GPU and host < transfer between processes < transfer over network.

It is unlikely that the first-generation scheduler will be significantly extended.

Ok, fair enough.

mrocklin commented 7 years ago

I believe that in the past people with multiple GPUs have used multiple processes per machine, with one GPU pinned to each process. This suffers from needless serialization costs when moving data between host processes but does match locality on the GPU.

jakirkham commented 4 years ago

@crusaderky I wonder if what you want to do is possible today with Dask + RAPIDS + UCX. Have you tried this? If so, does that solve the problem or are there things you are still missing?

crusaderky commented 4 years ago

@jakirkham I confess I'm not familiar with either tech stack, but from a summary reading I don't get it - how do they address the problems of (1) minimizing inter-GPU memory transfer costs and (2) optimizing occupancy in hybrid CPU+GPU algorithms?