coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

GPU annotation to limit number of jobs per worker with multiple threads #222

Open PranjalSahu opened 1 year ago

PranjalSahu commented 1 year ago

Describe the bug We want to limit one GPU job per worker. Currently, I am using just one thread in each worker so that only one job can run at a time. This avoids running multiple jobs on the same GPU. However, this is not optimal as we have specific jobs that can utilize multiple CPU cores. So I wanted to know how I can annotate one GPU per worker.

I did try using the GPU annotation but it didn't work for me. Also wanted to add that in our use-case the GPU performs some Pytorch computation.

Reference Links, optional Docker image Link: https://hub.docker.com/repository/docker/pranjalsahu/oaiimageanalysis

Issue Priority, optional

Environment, optional

Diagnostics, optional Please run coiled.diagnostics locally and paste the result below:

Coiled Diagnostics Output:

Additional context Add any other context about the problem here.

jrbourbeau commented 1 year ago

Thanks for the issue @PranjalSahu.

We want to limit one GPU job per worker.

Stepping back a bit, I'm curious why you want to do this in the first place. Could you provide a bit more context?

I would recommend using worker resources in this case https://distributed.dask.org/en/stable/resources.html. Two things need to happen in order for them to work. (1) workers need to be told how many resources (in this case gpus) they have and (2) you need to tell Dask how many resources each task needs.

For (1) I'm not sure if Coiled automatically handles setting gpu worker resources or not. cc @ntabris

For (2) there are two ways you can set resources. If you're using the client.submit / client.map API, then you can use the resources= keyword. There's an example of that here https://distributed.dask.org/en/stable/resources.html#resources-are-applied-separately-to-each-worker-process. If you're using Dask collections like arrays, DataFrame, delayed, etc. then you should use dask.annotate(...) like the example here https://distributed.dask.org/en/stable/resources.html#resources-with-collections

I did try using the GPU annotation but it didn't work for me

Not sure if this is the root cause, but there is a known issue with loosing annotation, when using dask.annotate(...), during task graph optimization. If that's what you're encountering, setting optimize_graph=False when calling compute(...) would ensure the GPU annotation isn't lost https://distributed.dask.org/en/stable/resources.html#resources-with-collections

ntabris commented 1 year ago

For (1) I'm not sure if Coiled automatically handles setting gpu worker resources or not.

We do not, nor (as far as I can tell) does dask-cuda. Should we?

ntabris commented 1 year ago

Maybe you already know this but if you want to set resources for your workers on Coiled, you'd do something like

coiled.Cluster(..., worker_options={"resources":{"GPU":2}})

(which would effectively be the same as --resources GPU=2 on each worker).

PranjalSahu commented 1 year ago

How would I define which tasks require that resource for ex. GPU. I submit jobs using compute method. I could not find an example where we can do that with delayed methods.

mrocklin commented 1 year ago

Hi @PranjalSahu sorry for the delay. There are a few ways to annotate tasks. The easiest is by using the with dask.annotate context manager. This is described a little bit here: https://distributed.dask.org/en/stable/resources.html#resources-with-collections

Here is a trivial example:

In [2]: import dask

In [3]: with dask.annotate(resources={"GPU": 1}):
   ...:     x = dask.delayed(lambda x: x + 1)(10)
   ...:

In [4]: x.__dask_graph__()  # you don't have to do this, just showing internals to be explicit
Out[4]:
HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x10fc7bc70>
 0. lambda-7a8636f8-4158-4e1a-b580-1cf40e519ad6

In [5]: x.__dask_graph__().layers  # you don't have to do this, just showing internals to be explicit
Out[5]: {'lambda-7a8636f8-4158-4e1a-b580-1cf40e519ad6': <dask.highlevelgraph.MaterializedLayer at 0x10fd5e4f0>}

In [6]: x.__dask_graph__().layers["lambda-7a8636f8-4158-4e1a-b580-1cf40e519ad6"]  # you don't have to do this, just showing internals to be explicit
   ...: .annotations
Out[6]: {'resources': {'GPU': 1}}

In [7]: x.compute()
Out[7]: 11
PranjalSahu commented 1 year ago

@mrocklin Thanks I will try the code snippet that you shared with our pipeline.

fjetter commented 1 year ago

Just checking in. Did the above solution work for you @PranjalSahu?

PranjalSahu commented 1 year ago

Hello, not exactly. But I have not tried it for some time. Last time I tried it didn't give me exactly what I wanted. Essentially what I am looking for is to use multiple CPU cores and simultaneously put restrictions that GPU should only get one job (GPU job) at a time due to memory restriction.

If I was simply allowing Dask to schedule jobs then it would get stuck sometimes and the worker would never recover. Even if the worker recovers automatically by detecting the deadlock then it would be great.

Is there any option where we can put some time limit that if the job does not finishes within some time limit then consider it failed and re-spawn the worker?

fjetter commented 1 year ago

But I have not tried it for some time. Last time I tried it didn't give me exactly what I wanted.

That's fine. I just wanted to check in if this is still something that is hurting you. It sounds like it is not urgent or no longer relevant for you.

ssentially what I am looking for is to use multiple CPU cores and simultaneously put restrictions that GPU should only get one job (GPU job) at a time due to memory restriction.

This is how worker restrictions should be working or at least very closely. Consider you are having a worker with four threads and a GPU resource of 1. This would allow simultaneous execution of one GPU task and three ordinary (CPU) tasks. If you want to prohibit other tasks running while the GPU task is running, this is indeed a bit more challenging. Technically this is possible with more complex annotations but comes with caveats.

If I was simply allowing Dask to schedule jobs then it would get stuck sometimes and the worker would never recover.

This sounds more concerning and should not happen. There was a time (12-18 month ago) where we were struggling with deadlocks but this should all be fixed now. We take deadlocking issues very serious and I'd be glad if you could reach out to us with more information if you ever encounter this again. We do not implement any "recovery" possibility for this and also no timeout.

PranjalSahu commented 1 year ago

Ok Thanks for this clarification. I will check and inform if I encounter the deadlock issue again. Also wanted to mention that it never happened if restricted the threads to 1 per machine i.e. only one task at a time. However, this approach was wasting the resources as many CPU jobs could have used the additional cores.