dask / helm-chart

Helm charts for Dask
https://helm.dask.org/
91 stars 91 forks source link

client = cluster.get_client() : Timed out #431

Closed NicholasCote closed 7 months ago

NicholasCote commented 9 months ago

Describe the issue: We installed the latest, 2024.1.0, DaskHub chart on a fresh kubernetes cluster with no network policies or proxies in place with the following command

helm upgrade --wait --install --render-subchart-notes dhub dask/daskhub --create-namespace --namespace=dhub

It installs without issue. The services proxy-public and traefik-daskgw-dask-gateway both get external IP addresses assigned.

We are able to login to the JupyterHub via the IP address assigned to proxy-public. We then open a Notebook with the default Python 3 (ipykernel) icon. In the first cell we launch a Dask Cluster with the following

from dask_gateway import GatewayCluster

cluster = GatewayCluster()  # connect to Gateway

cluster.adapt(minimum=2, maximum=10)  # scale cluster

cluster

image

The Dashboard link works. We can see the Scheduler and Worker pods come up on the k8s cluster.

The issue is when we use .get_client(), or if we import Client from distributed, it times out. We can't actually connect to the cluster that's been provisioned to take advantage of those resources. We run the following after creating the cluster :

client = cluster.get_client()

client

Full Exception message:

---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/utils.py:1935, in wait_for(fut, timeout)
   1934 async with asyncio.timeout(timeout):
-> 1935     return await fut

File /srv/conda/envs/notebook/lib/python3.11/site-packages/dask_gateway/comm.py:45, in GatewayConnector.connect(self, address, deserialize, **connection_args)
     44 try:
---> 45     plain_stream = await self.client.connect(
     46         ip, port, max_buffer_size=MAX_BUFFER_SIZE
     47     )
     48     stream = await plain_stream.start_tls(
     49         False, ssl_options=ctx, server_hostname=sni
     50     )

File /srv/conda/envs/notebook/lib/python3.11/site-packages/tornado/tcpclient.py:279, in TCPClient.connect(self, host, port, af, ssl_options, max_buffer_size, source_ip, source_port, timeout)
    270 connector = _Connector(
    271     addrinfo,
    272     functools.partial(
   (...)
    277     ),
    278 )
--> 279 af, addr, stream = await connector.start(connect_timeout=timeout)
    280 # TODO: For better performance we could cache the (af, addr)
    281 # information here and re-use it on subsequent connections to
    282 # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)

CancelledError: 

The above exception was the direct cause of the following exception:

TimeoutError                              Traceback (most recent call last)
File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/comm/core.py:342, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    341 try:
--> 342     comm = await wait_for(
    343         connector.connect(loc, deserialize=deserialize, **connection_args),
    344         timeout=min(intermediate_cap, time_left()),
    345     )
    346     break

File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/utils.py:1934, in wait_for(fut, timeout)
   1933 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1934     async with asyncio.timeout(timeout):
   1935         return await fut

File /srv/conda/envs/notebook/lib/python3.11/asyncio/timeouts.py:115, in Timeout.__aexit__(self, exc_type, exc_val, exc_tb)
    112     if self._task.uncancel() <= self._cancelling and exc_type is exceptions.CancelledError:
    113         # Since there are no new cancel requests, we're
    114         # handling this.
--> 115         raise TimeoutError from exc_val
    116 elif self._state is _State.ENTERED:

TimeoutError: 

The above exception was the direct cause of the following exception:

OSError                                   Traceback (most recent call last)
Cell In[5], line 1
----> 1 client = cluster.get_client()

File /srv/conda/envs/notebook/lib/python3.11/site-packages/dask_gateway/client.py:1080, in GatewayCluster.get_client(self, set_as_default)
   1073 def get_client(self, set_as_default=True):
   1074     """Get a ``Client`` for this cluster.
   1075 
   1076     Returns
   1077     -------
   1078     client : dask.distributed.Client
   1079     """
-> 1080     client = Client(
   1081         self,
   1082         security=self.security,
   1083         set_as_default=set_as_default,
   1084         asynchronous=self.asynchronous,
   1085         loop=self.loop,
   1086     )
   1087     if not self.asynchronous:
   1088         self._clients.add(client)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/client.py:1014, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
   1011 preload_argv = dask.config.get("distributed.client.preload-argv")
   1012 self.preloads = preloading.process_preloads(self, preload, preload_argv)
-> 1014 self.start(timeout=timeout)
   1015 Client._instances.add(self)
   1017 from distributed.recreate_tasks import ReplayTaskClient

File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/client.py:1216, in Client.start(self, **kwargs)
   1214     self._started = asyncio.ensure_future(self._start(**kwargs))
   1215 else:
-> 1216     sync(self.loop, self._start, **kwargs)

File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/utils.py:434, in sync(loop, func, callback_timeout, *args, **kwargs)
    431         wait(10)
    433 if error is not None:
--> 434     raise error
    435 else:
    436     return result

File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/utils.py:408, in sync.<locals>.f()
    406         awaitable = wait_for(awaitable, timeout)
    407     future = asyncio.ensure_future(awaitable)
--> 408     result = yield future
    409 except Exception as exception:
    410     error = exception

File /srv/conda/envs/notebook/lib/python3.11/site-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/client.py:1296, in Client._start(self, timeout, **kwargs)
   1293 self.scheduler_comm = None
   1295 try:
-> 1296     await self._ensure_connected(timeout=timeout)
   1297 except (OSError, ImportError):
   1298     await self._close()

File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/client.py:1358, in Client._ensure_connected(self, timeout)
   1355 self._connecting_to_scheduler = True
   1357 try:
-> 1358     comm = await connect(
   1359         self.scheduler.address, timeout=timeout, **self.connection_args
   1360     )
   1361     comm.name = "Client->Scheduler"
   1362     if timeout is not None:

File /srv/conda/envs/notebook/lib/python3.11/site-packages/distributed/comm/core.py:368, in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
    366         await asyncio.sleep(backoff)
    367 else:
--> 368     raise OSError(
    369         f"Timed out trying to connect to {addr} after {timeout} s"
    370     ) from active_exception
    372 local_info = {
    373     **comm.handshake_info(),
    374     **(handshake_overrides or {}),
    375 }
    376 await comm.write(local_info)

OSError: Timed out trying to connect to gateway://traefik-dhub-dask-gateway.dhub:80/dhub.9f3cb45b017c448897a6537906349113 after 30 s

The kubernetes pod logs for traefik don't show anything coming from the client to connect to the cluster. The JupyterHub user pod logs just show the same python error. We see messages around successfully creating the cluster, but that's it.

How do we fix/troubleshoot this timeout issue on a fresh DaskHub installation?

Anything else we need to know?:

We had a working environment at one point and this issue appeared and we haven't been able to figure out why. We have scaled back to new everything, fresh install and deployment, taking out whatever variables we can and this issue reproduces based on the issue description above.

Environment:

Kubernetes was deployed via Kubespray on to Alma Linux

NicholasCote commented 7 months ago

This issue is better covered here https://github.com/dask/helm-chart/issues/445