dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 719 forks source link

[Feature] Add worker capabilities/labels #2616

Open cromefire opened 5 years ago

cromefire commented 5 years ago

It would be really nice if you could add something like capabilities/labels (however you want to call them) to dask

It would be something like:

dask-worker scheduler:8786 --capabilities "vaapi,nodejs"
dask-worker scheduler:8786

Where you could schedule a task like:

add_t = client.submit(add, 1, 4)
video_t = client.submit(process, "/path/to/file", capabilities=["vaapi"])

So that certain tasks only get scheduled on certain workers, which support that kind of task

Additionally another feature to consider would be enum-like capabilities:

dask-worker scheduler:8786 --capabilities "some_feature,arch=amd64"
dask-worker scheduler:8786 --capabilities "arch=armv8"

With a python equivalent of:

result_t = client.submit(process, some_data, capabilities={"some_feature": True, "arch": "amd64"})
other_result_t = client.submit(process, other_data, capabilities=["some_feature"])

The current equivalent of capabilities would be something like:

dask-worker scheduler:8786 --resources "vaapi=9999"
mrocklin commented 5 years ago

No disagreement from me. The only thing I see as a potential issue is that every new thing we add like this (resources, capabilities, plugins, extensions) do make the system more complex. Adding in things like this has to be done carefully and with a mind to future maintenance costs. With that in mind any contribution that added something like this would be great to see.

dhirschfeld commented 5 years ago

Similar to:

I like capabilities as it's general enough to include any type of capability/feature. Another option would be tags but I think capabilities is a bit more descriptive

cromefire commented 5 years ago

The only thing I see as a potential issue is that every new thing we add like this (resources, capabilities, plugins, extensions) do make the system more complex.

Maybe this can be partially prevented, because you can already filter by worker, so maybe there could just be some kind of preprocessing that automatically filters workers. Writing that filter code should really easy, I can do that If you wish, it shouldn't be more than 10-20 lines

dhirschfeld commented 5 years ago

There's work going on in https://github.com/dask/distributed/pull/2675 which may help enable this use-case.

I'm very interested in having this capability (pun intended! 😆 ) so you (@cromefire) might be interested to see if the new design will work to enable this use-case

dhirschfeld commented 5 years ago

@guillaumeeb - you might be interested in this issue as you seemed to like my idea of worker "pools"

If capabilities were added you might not need the concept of worker pools as you could filter the workers by capability directly rather than pool membership - e.g.

>>> pool_specs = {
...     'gpu': {
...         'worker': {
...             "cls": Worker, "options": {"ncores": 1}},
...             "capabilities": {"gpu": True}
...     },
...     'cpu': {
...         'worker': {"cls": Worker, "options": {"ncores": 1}},
...     },
... }
>>> worker_specs = {
...     'worker1': {
...         "pool": "gpu",
...         "capabilities": {  # extra capabilities
...             "memory": "8GB",
...         },
...     },
...     'worker2': {
...         "pool": "cpu",
...         "capabilities": {  # extra capabilities
...             "memory": "16GB",
...         },
...     },
... }
>>> cluster = SpecCluster(workers=worker_specs, pools=pool_specs)

e.g. the below two calls would run on the same set of workers:

>>> res = cluster.submit(func, *args, capabilities={"memory": "16GB"})
>>> res = cluster.submit(func, *args, pool="cpu")

...so the concept of a "pool" is just to specify a certain set of (minimum?) capabilities. As such I think it adds value (usability) to the underlying concept of capabilities.

In the workser_spec above I've allow for the possibility for a worker to belong to a specific pool but to also provide capabilities over and above those required by the pool.

cromefire commented 5 years ago
>>> worker_specs = {
...     'worker1': {
...         "pool": "gpu",
...         "capabilities": {  # extra capabilities
...             "memory": "8GB",
...         },
...     },
...     'worker2': {
...         "pool": "cpu",
...         "capabilities": {  # extra capabilities
...             "memory": "16GB",
...         },
...     },
... }

It seems like it's determined by the scheduler in that case, am I right? In my case the worker determines the capabilities by it self

cromefire commented 5 years ago

But the concept is good