dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

various errors when invoking `dask.distributed.LocalCluster` with custom host (and port) #5550

Open harrisliuwk opened 2 years ago

harrisliuwk commented 2 years ago

What happened: Initiating LocalCluster after upadating dask to newest version reports errors.

codes that used to work:

from dask import distributed as dis
clu = dis.LocalCluster(host='localhost:9000')
cli = dis.Client(dis)

What you expected to happen: runs without error

Minimal Complete Verifiable Example: case 1:

clu = dis.LocalCluster(host='localhost:9000')
clu = dis.LocalCluster(host='localhost', port=9000)

reports "OSError: [Errno 98] Address already in use"

case 2:

clu = dis.LocalCluster(host='localhost:9000', processes=False)
clu = dis.LocalCluster(host='localhost', port=9000, processes=False)

reports "ValueError: not enough values to unpack (expected 3, got 1)"

In case 2, I tried tracing back and found at least dis.comm.addressing.addresses_from_user_args parses my input correctly.

Anything else we need to know?: This is a bug introduced since version 2021.10.0. CLI tool dask-scheduler --host localhost --port 9000 works fine with latest version.

Environment:

log of case 1:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
/tmp/ipykernel_5274/1355496435.py in <module>
----> 1 clu = dis.LocalCluster(host='localhost', port=9000)

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/deploy/local.py in __init__(self, name, n_workers, threads_per_worker, processes, loop, start, host, ip, scheduler_port, silence_logs, dashboard_address, worker_dashboard_address, diagnostics_port, services, worker_services, service_kwargs, asynchronous, security, protocol, blocked_handlers, interface, worker_class, scheduler_kwargs, scheduler_sync_interval, **worker_kwargs)
    242             silence_logs=silence_logs,
    243             security=security,
--> 244             scheduler_sync_interval=scheduler_sync_interval,
    245         )
    246 

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/deploy/spec.py in __init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval)
    274             self._loop_runner.start()
    275             self.sync(self._start)
--> 276             self.sync(self._correct_state)
    277 
    278     async def _start(self):

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    256             return future
    257         else:
--> 258             return sync(self.loop, func, *args, **kwargs)
    259 
    260     def _log(self, log):

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    330     if error[0]:
    331         typ, exc, tb = error[0]
--> 332         raise exc.with_traceback(tb)
    333     else:
    334         return result[0]

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/utils.py in f()
    313             if callback_timeout is not None:
    314                 future = asyncio.wait_for(future, callback_timeout)
--> 315             result[0] = yield future
    316         except Exception:
    317             error[0] = sys.exc_info()

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/deploy/spec.py in _correct_state_internal(self)
    361                 for w in workers:
    362                     w._cluster = weakref.ref(self)
--> 363                     await w  # for tornado gen.coroutine support
    364             self.workers.update(dict(zip(to_open, workers)))
    365 

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/core.py in _()
    281                         )
    282                 else:
--> 283                     await self.start()
    284                     self.status = Status.running
    285             return self

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/nanny.py in start(self)
    310             try:
    311                 await self.listen(
--> 312                     start_address, **self.security.get_listen_args("worker")
    313                 )
    314             except OSError as e:

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/core.py in listen(self, port_or_addr, allow_offload, **kwargs)
    432             deserialize=self.deserialize,
    433             allow_offload=allow_offload,
--> 434             **kwargs,
    435         )
    436         self.listeners.append(listener)

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/comm/core.py in _()
    205     def __await__(self):
    206         async def _():
--> 207             await self.start()
    208             return self
    209 

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/comm/tcp.py in start(self)
    476                 # is large enough not to lose any.
    477                 sockets = netutil.bind_sockets(
--> 478                     self.port, address=self.ip, backlog=backlog
    479                 )
    480             except OSError as e:

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/tornado/netutil.py in bind_sockets(port, address, family, backlog, flags, reuse_port)
    159         sock.setblocking(False)
    160         try:
--> 161             sock.bind(sockaddr)
    162         except OSError as e:
    163             if (

OSError: [Errno 98] Address already in use

log of case 2:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/tmp/ipykernel_5140/1306321815.py in <module>
----> 1 clu = dis.LocalCluster(host='localhost:9000', processes=False)

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/deploy/local.py in __init__(self, name, n_workers, threads_per_worker, processes, loop, start, host, ip, scheduler_port, silence_logs, dashboard_address, worker_dashboard_address, diagnostics_port, services, worker_services, service_kwargs, asynchronous, security, protocol, blocked_handlers, interface, worker_class, scheduler_kwargs, scheduler_sync_interval, **worker_kwargs)
    242             silence_logs=silence_logs,
    243             security=security,
--> 244             scheduler_sync_interval=scheduler_sync_interval,
    245         )
    246 

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/deploy/spec.py in __init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name, shutdown_on_close, scheduler_sync_interval)
    273         if not self.asynchronous:
    274             self._loop_runner.start()
--> 275             self.sync(self._start)
    276             self.sync(self._correct_state)
    277 

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    256             return future
    257         else:
--> 258             return sync(self.loop, func, *args, **kwargs)
    259 
    260     def _log(self, log):

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    330     if error[0]:
    331         typ, exc, tb = error[0]
--> 332         raise exc.with_traceback(tb)
    333     else:
    334         return result[0]

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/utils.py in f()
    313             if callback_timeout is not None:
    314                 future = asyncio.wait_for(future, callback_timeout)
--> 315             result[0] = yield future
    316         except Exception:
    317             error[0] = sys.exc_info()

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/deploy/spec.py in _start(self)
    302                 cls = import_term(cls)
    303             self.scheduler = cls(**self.scheduler_spec.get("options", {}))
--> 304             self.scheduler = await self.scheduler
    305         self.scheduler_comm = rpc(
    306             getattr(self.scheduler, "external_address", None) or self.scheduler.address,

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/core.py in _()
    281                         )
    282                 else:
--> 283                     await self.start()
    284                     self.status = Status.running
    285             return self

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/scheduler.py in start(self)
   4058                 **self.security.get_listen_args("scheduler"),
   4059             )
-> 4060             self.ip = get_address_host(self.listen_address)
   4061             listen_ip = self.ip
   4062 

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/comm/addressing.py in get_address_host(addr)
    147     scheme, loc = parse_address(addr)
    148     backend = registry.get_backend(scheme)
--> 149     return backend.get_address_host(loc)
    150 
    151 

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/comm/inproc.py in get_address_host(self, loc)
    343 
    344     def get_address_host(self, loc):
--> 345         self.manager.validate_address(loc)
    346         return self.manager.ip
    347 

~/miniconda3/envs/DataFlow/lib/python3.7/site-packages/distributed/comm/inproc.py in validate_address(self, addr)
     66         Validate the address' IP and pid.
     67         """
---> 68         ip, pid, suffix = addr.split("/")
     69         if ip != self.ip or int(pid) != os.getpid():
     70             raise ValueError(

ValueError: not enough values to unpack (expected 3, got 1)
jcrist commented 2 years ago

Part of this may be a bug, but I'm pretty sure that this worked at all before was an accident. Going through your examples:

clu = dis.LocalCluster(host='localhost:9000')

The host kwarg should take a hostname, not a host + port. I think that this worked before was accidental, but we could certainly have a better error message here.

clu = dis.LocalCluster(host='localhost', port=9000)

Since port isn't a called out kwarg in LocalCluster, it's forwarded directly to all the workers (which then all try to start on port 9000). I'd be surprised if this ever worked - you probably meant to use scheduler_port=9000 here, which does work:

# This does work
clu = dis.LocalCluster(host="localhost", scheduler_port=9000)

Same applies for each of your cases with processes=False - the host kwarg should just be localhost and you meant to specify scheduler_port instead.

Some TODOs before closing this:

harrisliuwk commented 2 years ago

I can confirm that @jcrist 's solution does work, thanks!

Nonetheless, like jcrist said, this raises futher questions, and I would like to add some on top of that. Since host='{host}:{port}' and (host='{host}', port='{port}') has been working for so long that many users may have been used to regard host argument as scheduler host and port as scheduler port, so I suggest that to be the future settings as well? Also, it does not make sence to have port to be worker's port, like jcrist has tested out, because workers' ports should be managed by the scheduler for the common use cases. If users really need to set workers' ports, maybe we can provide something like worker_ports=[1234, 5678].