dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Incorrect documentation about `workers` keyword in Data Locality article #5629

Open RRRajput opened 2 years ago

RRRajput commented 2 years ago

The documentation in the Specify Workers in Compute/Persist section in the Data Locality article seems to indicate that the workers keyword in Client.compute() accepts a dictionary.

What happened:

In practice, the workers keyword does not accept a dictionary.

What you expected to happen:

Expected that the workers keyword would accept the dictionary AND the tasks would execute on the workers specified by the dictionary.

Minimal Complete Verifiable Example:

import dask
dask.__version__
OUT: 
'2021.11.2'
from dask.distributed import Client

client = Client()
worker_addresses = [x for x in client.cluster.scheduler_info['workers']]
worker_addresses
OUT: 
['tcp://127.0.0.1:35297',
 'tcp://127.0.0.1:36365',
 'tcp://127.0.0.1:41759',
 'tcp://127.0.0.1:38571']
from time import sleep
def add(x, y):
    sleep(1)
    return x + y

def subtract(x, y):
    sleep(1)
    return x - y

def multiply(x, y):
    sleep(2)
    return x * y
from dask import delayed
x = 12
y = 8
add_result = delayed(add)(x, y)
subtract_result = delayed(subtract)(x, y)
multiply_result = delayed(multiply)(add_result, subtract_result)

multiply_result.visualize()
OUT: 

image

futures = client.compute(multiply_result, workers={multiply_result: worker_addresses[0], add_result: worker_addresses[1]})
futures
OUT: 

Future: multiply status:

pending,

type: NoneType,

key: multiply-4c43bba2-8553-4a09-a7af-63d78db6cf05

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.comm.utils - ERROR - can not serialize 'Delayed' object
Traceback (most recent call last):
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/utils.py", line 33, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/batched.py", line 93, in _background_send
    nbytes = yield self.comm.write(
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/tcp.py", line 250, in write
    frames = await to_frames(
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/utils.py", line 50, in to_frames
    return _to_frames()
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/utils.py", line 33, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
futures = client.compute(multiply_result, workers={'multiply_result': worker_addresses[0], 'add_result': worker_addresses[1]})
futures
OUT: 

Future: multiply status:

cancelled,

type: NoneType,

key: multiply-4c43bba2-8553-4a09-a7af-63d78db6cf05

distributed.core - ERROR - unhashable type: 'dict'
Traceback (most recent call last):
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/core.py", line 596, in handle_stream
    handler(**merge(extra, msg))
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4528, in update_graph_hlg
    return self.update_graph(
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4765, in update_graph
    w = self.coerce_address(w)
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 7353, in coerce_address
    if addr in parent._aliases:
TypeError: unhashable type: 'dict'
distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/core.py", line 530, in handle_comm
    result = await result
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 5359, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/core.py", line 596, in handle_stream
    handler(**merge(extra, msg))
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4528, in update_graph_hlg
    return self.update_graph(
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4765, in update_graph
    w = self.coerce_address(w)
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 7353, in coerce_address
    if addr in parent._aliases:
TypeError: unhashable type: 'dict'
tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7f49d0c5a790>, <Task finished name='Task-101' coro=<BaseTCPListener._handle_stream() done, defined at /home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/tcp.py:502> exception=TypeError("unhashable type: 'dict'")>)
Traceback (most recent call last):
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/tornado/tcpserver.py", line 331, in <lambda>
    gen.convert_yielded(future), lambda f: f.result()
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/comm/tcp.py", line 519, in _handle_stream
    await self.comm_handler(comm)
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/core.py", line 530, in handle_comm
    result = await result
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 5359, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/core.py", line 596, in handle_stream
    handler(**merge(extra, msg))
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4528, in update_graph_hlg
    return self.update_graph(
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 4765, in update_graph
    w = self.coerce_address(w)
  File "/home/rehan/notebook_testing/jupyter-env/lib/python3.8/site-packages/distributed/scheduler.py", line 7353, in coerce_address
    if addr in parent._aliases:
TypeError: unhashable type: 'dict'
futures = client.compute(multiply_result, workers=[worker_addresses[0]])
futures
OUT: 

Future: multiply status:

pending,

type: NoneType,

key: multiply-4c43bba2-8553-4a09-a7af-63d78db6cf05

Anything else we need to know?:

Apart from fixing the documentation, is there an alternative way of calling the compute method and specifying that the intermediate tasks be executed on a specific worker?

Environment:

fjetter commented 2 years ago

Thank you for raising this issue. The documentation of the Client.compute itself seems to be correct. It appears that the Data Locality documentation is outdated.