dask / dask-kubernetes

Native Kubernetes integration for Dask
https://kubernetes.dask.org
BSD 3-Clause "New" or "Revised" License
312 stars 148 forks source link

ConnectionClosedError during Dask Cluster Creation with k8s #878

Open terrykong opened 7 months ago

terrykong commented 7 months ago

Describe the issue:

When creating a cluster with dask_kubernetes where kr8s is pinned to 0.9.0, I see a ConnectionClosedError. The cluster still gets created when I check with kubectl, so I presume this is happening after the manifest is submitted to the API server

Minimal Complete Verifiable Example:

Setting up my environment

python3 -m venv venv
source venv/bin/activate
pip install dask_kubernetes==2024.4.0

This will error:

python -c 'from dask_kubernetes.operator.kubecluster import KubeCluster as K; K()'
Task exception was never retrieved
future: <Task finished name='Task-23' coro=<PortForward._sync_sockets() done, defined at /tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP
socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
    +------------------------------------
Task exception was never retrieved
future: <Task finished name='Task-21' coro=<PortForward._sync_sockets() done, defined at /tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP
socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
╭─────────────── Creating KubeCluster 'dask-terryk-e81a4dc3-a' ────────────────╮
│                                                                              │
│   DaskCluster                                                      Running   │
│   Scheduler Pod                                                    Running   │
│   Scheduler Service                                                Created   │
│   Default Worker Group                                             Created   │
│                                                                              │
│ ⠴ Waiting for scheduler service                                              │
╰──────────────────────────────────────────────────────────────────────────────╯
Task exception was never retrieved
future: <Task finished name='Task-56' coro=<PortForward._sync_sockets() done, defined at /tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP
socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
    +------------------------------------
Exception ignored in atexit callback: <function reap_clusters at 0x7f4406bb7250>
Traceback (most recent call last):
  File "/tmp/venv/lib/python3.10/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 1033, in reap_clusters
    asyncio.run(_reap_clusters())
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/tmp/venv/lib/python3.10/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 1031, in _reap_clusters
    cluster.close(timeout=10)
  File "/tmp/venv/lib/python3.10/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 700, in close
    return self.sync(self._close, timeout=timeout)
  File "/tmp/venv/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/tmp/venv/lib/python3.10/site-packages/distributed/utils.py", line 434, in sync
    raise error
  File "/tmp/venv/lib/python3.10/site-packages/distributed/utils.py", line 408, in f
    result = yield future
  File "/tmp/venv/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/tmp/venv/lib/python3.10/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 706, in _close
    cluster = await DaskCluster.get(self.name, namespace=self.namespace)
  File "/tmp/venv/lib/python3.10/site-packages/kr8s/_objects.py", line 177, in get
    resources = await api._get(
  File "/tmp/venv/lib/python3.10/site-packages/kr8s/_api.py", line 332, in _get
    async with self._get_kind(
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/contextlib.py", line 199, in __aenter__
    return await anext(self.gen)
  File "/tmp/venv/lib/python3.10/site-packages/kr8s/_api.py", line 261, in _get_kind
    async with self.call_api(
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/contextlib.py", line 199, in __aenter__
    return await anext(self.gen)
 File "/tmp/venv/lib/python3.10/site-packages/kr8s/_api.py", line 132, in call_api
    response = await self._session.request(**kwargs)
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_client.py", line 1574, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_client.py", line 1661, in send
    response = await self._send_handling_auth(
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_client.py", line 1689, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_client.py", line 1726, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_client.py", line 1763, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/tmp/venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 373, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
    raise exc from None
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request
    response = await connection.handle_async_request(
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 99, in handle_async_request
    raise exc
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 76, in handle_async_request
    stream = await self._connect(request)
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 122, in _connect
    stream = await self._network_backend.connect_tcp(**kwargs)
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_backends/auto.py", line 30, in connect_tcp
    return await self._backend.connect_tcp(
  File "/tmp/venv/lib/python3.10/site-packages/httpcore/_backends/anyio.py", line 116, in connect_tcp
    stream: anyio.abc.ByteStream = await anyio.connect_tcp(
  File "/tmp/venv/lib/python3.10/site-packages/anyio/_core/_sockets.py", line 195, in connect_tcp
    gai_res = await getaddrinfo(
  File "/tmp/venv/lib/python3.10/site-packages/anyio/_core/_sockets.py", line 573, in getaddrinfo
    gai_res = await get_async_backend().getaddrinfo(
  File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2370, in getaddrinfo
    return await get_running_loop().getaddrinfo(
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/asyncio/base_events.py", line 863, in getaddrinfo
    return await self.run_in_executor(
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/asyncio/base_events.py", line 821, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/home/terryk/.pyenv/versions/3.10.12/lib/python3.10/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
Task exception was never retrieved
future: <Task finished name='Task-44' coro=<PortForward._sync_sockets() done, defined at /tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP
socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
    +------------------------------------
Task exception was never retrieved
future: <Task finished name='Task-46' coro=<PortForward._sync_sockets() done, defined at /tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP
socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/tmp/venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/tmp/venv/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
    +------------------------------------

But if I make the following modifications to my environment:

pip install -U kr8s==0.14.1
ls -1 venv/lib/python*/site-packages/dask_kubernetes/operator/_objects.py | xargs sed -i 's/self._refresh()/self.refresh()/g'

Then everything is fine:

python -c 'from dask_kubernetes.operator.kubecluster import KubeCluster as K; K()'
╭─────────────── Creating KubeCluster 'dask-terryk-c760868d-e' ────────────────╮
│                                                                              │
│   DaskCluster                                                      Running   │
│   Scheduler Pod                                                    Running   │
│   Scheduler Service                                                Created   │
│   Default Worker Group                                             Created   │
│                                                                              │
│ ⠼ Getting dashboard URL                                                      │
╰──────────────────────────────────────────────────────────────────────────────╯

Anything else we need to know?:

Environment:

jacobtomlinson commented 7 months ago

Yeah this is a known noisy warning that was fixed in kr8s. This will be closed by #853.