dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Specify resources for dask builtin functions #2127

Open kkraus14 opened 6 years ago

kkraus14 commented 6 years ago

I'm trying to specify resources for builtin dask functions such a dd.read_csv, with an end goal of running certain functions on "CPU workers" and other functions on "GPU workers". Here's a minimal example of trying to force dd.read_csv to run only on my "CPU worker":

cluster = LocalCluster(processes=False)
cpu_worker = cluster.workers[0]
cpu_worker.name = 'cpu'
cpu_worker.set_resources(CPU=80)
client = Client(cluster)
pdf = pd.DataFrame({"a": [1,2,3], "b": [4,5,6]})
test_df = dd.from_pandas(pdf, npartitions=2)
test_df.compute(resources = {tuple(test_df.__dask_keys__()): {'CPU': 1}})

This returns the following:

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 96, in loads
    msg = loads_msgpack(small_header, small_payload)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 173, in loads_msgpack
    return msgpack.loads(payload, encoding='utf8')
  File "msgpack/_unpacker.pyx", line 200, in msgpack._unpacker.unpackb
TypeError: unhashable type: 'list'
distributed.scheduler - ERROR - unhashable type: 'list'
Traceback (most recent call last):
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/scheduler.py", line 1929, in handle_client
    msgs = yield comm.read()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/tcp.py", line 203, in read
    msg = yield from_frames(frames, deserialize=self.deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 315, in wrapper
    yielded = next(result)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 75, in from_frames
    res = _from_frames()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 96, in loads
    msg = loads_msgpack(small_header, small_payload)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 173, in loads_msgpack
    return msgpack.loads(payload, encoding='utf8')
  File "msgpack/_unpacker.pyx", line 200, in msgpack._unpacker.unpackb
TypeError: unhashable type: 'list'
distributed.utils - ERROR - unhashable type: 'list'
Traceback (most recent call last):
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/utils.py", line 622, in log_errors
    yield
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/client.py", line 921, in _handle_report
    six.reraise(*clean_exception(**msg))
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/tcp.py", line 203, in read
    msg = yield from_frames(frames, deserialize=self.deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 315, in wrapper
    yielded = next(result)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 75, in from_frames
    res = _from_frames()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 96, in loads
    msg = loads_msgpack(small_header, small_payload)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 173, in loads_msgpack
    return msgpack.loads(payload, encoding='utf8')
  File "msgpack/_unpacker.pyx", line 200, in msgpack._unpacker.unpackb
TypeError: unhashable type: 'list'

It would be great if you could specify resources as you create tasks as opposed to when computing them, similar to how you can with client.submit I.E.

test_df = dd.from_pandas(pdf, npartitions=2, resources={'CPU': 1})
mrocklin commented 6 years ago

Hrm, so this works fine for me both on master and latest release

In [1]: from dask.distributed import Client, LocalCluster
   ...: import pandas as pd
   ...: import dask.dataframe as dd
   ...: cluster = LocalCluster(processes=False)
   ...: cpu_worker = cluster.workers[0]
   ...: cpu_worker.name = 'cpu'
   ...: cpu_worker.set_resources(CPU=80)
   ...: client = Client(cluster)
   ...: pdf = pd.DataFrame({"a": [1,2,3], "b": [4,5,6]})
   ...: test_df = dd.from_pandas(pdf, npartitions=2)
   ...: test_df.compute(resources = {tuple(test_df.__dask_keys__()): {'CPU': 1}})
   ...: 
Out[1]: 
   a  b
0  1  4
1  2  5
2  3  6

I might also suggest the following test which sets up resources and names when creating the workers and verifies that tasks are allocated appropriately by checking the structured log.

from dask.distributed import Client, LocalCluster
import pandas as pd
import dask.dataframe as dd

cluster = LocalCluster(n_workers=0, processes=False)
client = Client(cluster)
alice = cluster.start_worker(resources={'CPU': 80}, name='alice')
bob = cluster.start_worker(name='bob')

pdf = pd.DataFrame({"a": [1,2,3], "b": [4,5,6]})
ddf = dd.from_pandas(pdf, npartitions=2)
ddf.compute(resources = {tuple(ddf.__dask_keys__()): {'CPU': 1}})

assert alice.log
assert not bob.log
mrocklin commented 6 years ago

The exception is odd. If you were using something other than LocalCluster I would guess that you had a version mismatch between your workers or between you workers and client, but given that everything is local I don't see how this could be. How did you install Dask? I don't suppose you can provide a conda environment.yml or something similar that reproduces the problem? (my guess would be that this is challenging, but thought I'd ask anyway)

kkraus14 commented 6 years ago

I was on Dask 0.17.2 and just confirmed the exception issue is resolved when I upgraded to Dask 0.18.1. Thanks!

I'm planning on chaining together a number of functions, is there any way to specify the resources when calling the functions as opposed to when calling .compute?

mrocklin commented 6 years ago

I agree that that would be valuable but currently no, resources are specific to the distributed scheduler, while collections like dask.delayed and dask.dataframe are scheduler agnostic. This is something that could be improved though. I don't know how at the moment, but there is likely a better way around this.

On Wed, Jul 18, 2018 at 3:26 PM, Keith Kraus notifications@github.com wrote:

I was on Dask 0.17.2 and just confirmed the exception issue is resolved when I upgraded to Dask 0.18.1. Thanks!

I'm planning on chaining together a number of functions, is there any way to specify the resources when calling the functions as opposed to when calling .compute?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2127#issuecomment-406046328, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszIOzmEzbsOnuP4NcL34IiTrulFxIks5uH4vbgaJpZM4VVH3R .

kkraus14 commented 6 years ago

So for dd.read_csv if I call __dask_keys__() it only returns the from-delayed tasks while it looks like there's also pandas_read_text and read-block tasks which end up getting scheduled on the GPU workers. Is there a different function or a snippet which given an object returns every key that we need to define the resources for? I.E.

test = dd.read_csv("/path/to/some/file")
resources = {tuple(test.getallkeys()): {'CPU': 1}}
test.compute()
mrocklin commented 6 years ago

Hrm, short term list(test.dask) would probably serve your needs. This would include all keys that are used to create this dataset

On Wed, Jul 18, 2018 at 4:07 PM, Keith Kraus notifications@github.com wrote:

So for dd.read_csv if I call __dask_keys__() it only returns the from-delayed tasks while it looks like there's also pandas_read_text and read-block tasks which end up getting scheduled on the GPU workers. Is there a different function or a snippet which given an object returns every key that we need to define the resources for? I.E.

test = dd.read_csv("/path/to/some/file") resources = {tuple(test.getallkeys()): {'CPU': 1}} test.compute()

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2127#issuecomment-406057657, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszFA91sIOquOWIBwdGJQffTk_57ETks5uH5VmgaJpZM4VVH3R .

kkraus14 commented 6 years ago

Hmm, I'd expect the following to work but it's still scheduling tasks on the GPU workers including the from-delayed tasks as well:

test = dd.read_csv("/path/to/some/file")
resources = {tuple(test.dask): {'CPU': 1}}
test.compute()
mrocklin commented 6 years ago

Hrm, can you try passing compute(optimize_graph=False) ?

On Wed, Jul 18, 2018 at 4:18 PM, Keith Kraus notifications@github.com wrote:

Hmm, I'd expect the following to work but it's still scheduling tasks on the GPU workers including the from-delayed tasks as well:

test = dd.read_csv("/path/to/some/file") resources = {tuple(test.dask): {'CPU': 1}} test.compute()

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2127#issuecomment-406060943, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszAFCCwkpG1u4PnNiPQVbJij8XpqKks5uH5gtgaJpZM4VVH3R .

kkraus14 commented 6 years ago

Still the same behavior. (Note my above example forgot to specify resources in the compute call but I am in fact setting it while testing)

mrocklin commented 6 years ago

I'll take a look sometime today.

mrocklin commented 6 years ago

OK, it looks like this is failing to support tuple-based keys in the .get path. Should be an easy fix.

Short term you could do this as a workaround:

result = client.compute(df).result()

My apologies for the dust here. Most users of resources historically have been doing more custom computations (delayed, futures) and have been using the client API. The code paths around using them with the standard collections (array, dataframe) have not been as well travelled. I'll push a fix for this in a bit.

mrocklin commented 6 years ago

If you use optimize_graph=False then https://github.com/dask/distributed/pull/2131 should solve your immediate issue. There is still a bit of work to clear up this situation generally though and make it more usable.

kkraus14 commented 6 years ago

@mrocklin Unfortunately I have some pretty hard time constraints for what I'm working on where creating 8 dask workers with a single GPU visible is working well enough for my needs currently, but I'll hopefully have time to revisit this late next week to continue troubleshooting with you towards a solution. Apologies for the delay!

mrocklin commented 6 years ago

It's just fine. This has been a useful exercise to flush out some bugs, but technical bugs and usability bugs, with using resources with collections.

Good luck!