dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Inconsistent use of workers parameter between client submit and scatter. #1944

Open darindf opened 6 years ago

darindf commented 6 years ago

It appears that workers parameter between the client submit and scatter methods are not consistent with each other. The submit worker can take ip address, hostnames, (and other other formats, like tcp:// prefix?). The scatter method requires a worker port to be added. The documentation states this needs to be a pair but I found that tcp://127.0.0.1:3454 syntax works as well.

Ideally the same format for defining workers should be the same across the client api.

Here is an example of the failure

import dask
from distributed import Client

def doSomething(x):
    print(x)

if __name__ == '__main__':
    client = Client()
    workers = ['127.0.0.1']
    dictionary = {1: 2}

    client.gather(client.submit(doSomething, dictionary,
                                pure=False, workers=workers))
    future = client.scatter(dictionary, broadcast=True, workers=workers)
    client.gather(client.submit(doSomething, dictionary,
                                pure=False, workers=workers))

The client scatter line fails with the error

  File "d:\mpi\dask-scatter.py", line 18, in <module>
    future = client.scatter(dictionary, broadcast=True, workers=workers)
  File "c:\users\ddeforest\documents\distributed\distributed\client.py", line 1717, in scatter
    asynchronous=asynchronous, hash=hash)
  File "c:\users\ddeforest\documents\distributed\distributed\client.py", line 615, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "c:\users\ddeforest\documents\distributed\distributed\utils.py", line 253, in sync
    six.reraise(*error[0])
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\six.py", line 693, in reraise
    raise value
  File "c:\users\ddeforest\documents\distributed\distributed\utils.py", line 238, in f
    result[0] = yield make_coro()
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\tornado\gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "c:\users\ddeforest\documents\distributed\distributed\client.py", line 1518, in _scatter
    d = yield self._scatter(keymap(tokey, data), workers, broadcast)
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\tornado\gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "c:\users\ddeforest\documents\distributed\distributed\client.py", line 1585, in _scatter
    broadcast=broadcast)
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\tornado\gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "c:\users\ddeforest\documents\distributed\distributed\core.py", line 475, in send_recv_from_rpc
    result = yield send_recv(comm=comm, op=key, **kwargs)
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\tornado\gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "c:\users\ddeforest\documents\distributed\distributed\core.py", line 375, in send_recv
    six.reraise(*clean_exception(**response))
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "c:\users\ddeforest\documents\distributed\distributed\core.py", line 297, in handle_comm
    result = yield result
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\tornado\gen.py", line 1099, in run
    value = future.result()
  File "c:\ProgramData\Anaconda3\envs\build\lib\site-packages\tornado\gen.py", line 315, in wrapper
    yielded = next(result)
  File "c:\users\ddeforest\documents\distributed\distributed\scheduler.py", line 2227, in scatter
    workers = [self.coerce_address(w) for w in workers]
  File "c:\users\ddeforest\documents\distributed\distributed\scheduler.py", line 2227, in <listcomp>
    workers = [self.coerce_address(w) for w in workers]
  File "c:\users\ddeforest\documents\distributed\distributed\scheduler.py", line 3987, in coerce_address
    addr = resolve_address(addr)
  File "c:\users\ddeforest\documents\distributed\distributed\comm\addressing.py", line 170, in resolve_address
    return unparse_address(scheme, backend.resolve_address(loc))
  File "c:\users\ddeforest\documents\distributed\distributed\comm\tcp.py", line 493, in resolve_address
    host, port = parse_host_port(loc)
  File "c:\users\ddeforest\documents\distributed\distributed\comm\addressing.py", line 87, in parse_host_port
    port = _default()
  File "c:\users\ddeforest\documents\distributed\distributed\comm\addressing.py", line 68, in _default
    raise ValueError("missing port number in address %r" % (address,))
ValueError: missing port number in address '127.0.0.1'

I have tested this with using the dask-worker --name mel and changing workers = ['mel'] and this works which contradicts the scatter documentaiton

workers: list of tuples (optional)
Optionally constrain locations of data. Specify workers as hostname/port pairs, e.g. ('127.0.0.1', 8787).
mrocklin commented 6 years ago

First, let me apologize for the long delay here. This is an excellent issue. Thank you for taking the time to write it up well and diagnose the problem.

Yes, I believe that you're right. I think that ideally we would centralize the logic to handle the different cases for workers= in the scheduler and use this consistently in the cases that it is used. Currently the best implementation is probably shared in Scheduler.update_graph and is manipulates the restrictions parameter, and the Scheduler.valid_workers as it interprets those restrictions against the set of current workers.

Refactoring this so that it was usable both in its present form for submitting tasks, and also for scatter, would be quite welcome. I'll also try to address this at some point, but may not get around to it personally for some time.

darindf commented 6 years ago

Perhaps I could look at it, so would the logic for the submit, handling workers, be the best version to port?

mrocklin commented 6 years ago

The logic for this isn't actually in Client.submit or Client.scatter, instead its in scheduler.py in Scheduler.update_graph and Scheduler.valid_workers. This is because, in cases like Client.submit where new workers might arrive between the client asking for something and the scheduler deciding it should happen. Scheduler.update_graph handles worker restrictions here when Client.submit is called:

https://github.com/dask/distributed/blob/fefbaa38ad7ca9e07c1a19c83945209e5e626ac5/distributed/scheduler.py#L1409-L1427

Then when it comes time to assess what workers fit the constraints it calls Scheduler.valid_workers

https://github.com/dask/distributed/blob/fefbaa38ad7ca9e07c1a19c83945209e5e626ac5/distributed/scheduler.py#L3919-L3962

Scatter on the other hand is much simpler, as you've discovered:

https://github.com/dask/distributed/blob/fefbaa38ad7ca9e07c1a19c83945209e5e626ac5/distributed/scheduler.py#L2248-L2250

So ideally we find a way to capture a lot of the logic in the first two sections of code, but without tying it to the scheduler state, so that scatter can just take the workers= keyword it was given (which is more-or-less equivalent to the restrictions= keyword passed to update_graph) and get the results immediately, without touching the Scheduler state like ts.host_restrictions, etc..

The challenge here is refactoring this code to get the desired functionality without damaging other use cases.