dask / dask-kubernetes

Native Kubernetes integration for Dask
https://kubernetes.dask.org
BSD 3-Clause "New" or "Revised" License
311 stars 148 forks source link

Running dask from within k8s cluster #498

Closed jadeidev closed 2 years ago

jadeidev commented 2 years ago

What happened: I am trying to run my dask application from within the k8s cluster. I have created a service account, role (based on Role-Based Access Control) and role binging. My container has a python script that runs the script below (run.py) with the corresponding worker-spec.yml. The application runs just fine when I trigger it from my machine on this cluster. However, when I create a pod within K8s that runs this script I am getting the following error:

Creating scheduler pod on cluster. This may take some time.
2022-05-17 21:32:17,305 - distributed.deploy.spec - WARNING - Cluster closed without starting up
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/deploy/spec.py", line 294, in _start
self.scheduler = await self.scheduler
File "/usr/local/lib/python3.10/dist-packages/distributed/deploy/spec.py", line 59, in _
await self.start()
File "/usr/local/lib/python3.10/dist-packages/dask_kubernetes/classic/kubecluster.py", line 198, in start
logs = await self.logs()
File "/usr/local/lib/python3.10/dist-packages/dask_kubernetes/classic/kubecluster.py", line 118, in logs
raise e
File "/usr/local/lib/python3.10/dist-packages/dask_kubernetes/classic/kubecluster.py", line 109, in logs
log = await self.core_api.read_namespaced_pod_log(
File "/usr/local/lib/python3.10/dist-packages/kubernetes_asyncio/client/api_client.py", line 192, in __call_api
raise e
File "/usr/local/lib/python3.10/dist-packages/kubernetes_asyncio/client/api_client.py", line 185, in __call_api
response_data = await self.request(
File "/usr/local/lib/python3.10/dist-packages/kubernetes_asyncio/client/rest.py", line 193, in GET
return (await self.request("GET", url,
File "/usr/local/lib/python3.10/dist-packages/kubernetes_asyncio/client/rest.py", line 187, in request
raise ApiException(http_resp=r)
kubernetes_asyncio.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response headers: <CIMultiDictProxy('Audit-Id': '7cea313d-78bb-4d52-aada-2a56b256ef0a', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Tue, 17 May 2022 21:32:17 GMT', 'Content-Length': '183')>
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"container dask-worker is not valid for pod dask-root-5c1cad7b-18sbnn","reason":"BadRequest","code":400}
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/app/run.py", line 6, in <module>
cluster = KubeCluster(pod_template="worker-spec.yml", namespace="myns")
File "/usr/local/lib/python3.10/dist-packages/dask_kubernetes/classic/kubecluster.py", line 496, in __init__
super().__init__(**self.kwargs)
File "/usr/local/lib/python3.10/dist-packages/distributed/deploy/spec.py", line 260, in __init__
self.sync(self._start)
File "/usr/local/lib/python3.10/dist-packages/distributed/utils.py", line 318, in sync
return sync(
File "/usr/local/lib/python3.10/dist-packages/distributed/utils.py", line 385, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.10/dist-packages/distributed/utils.py", line 358, in f
result = yield future
File "/usr/local/lib/python3.10/dist-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.10/dist-packages/dask_kubernetes/classic/kubecluster.py", line 627, in _start
await super()._start()
File "/usr/local/lib/python3.10/dist-packages/distributed/deploy/spec.py", line 304, in _start
raise RuntimeError(f"Cluster failed to start: {e}") from e
RuntimeError: Cluster failed to start: (400)
Reason: Bad Request
HTTP response headers: <CIMultiDictProxy('Audit-Id': '7cea313d-78bb-4d52-aada-2a56b256ef0a', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Tue, 17 May 2022 21:32:17 GMT', 'Content-Length': '183')>
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"container dask-worker is not valid for pod dask-root-5c1cad7b-18sbnn","reason":"BadRequest","code":400}
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fb63f596da0>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7fb63f578e80>, 619154.475991593)]']
connector: <aiohttp.connector.TCPConnector object at 0x7fb63f596c20>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fb63f597100>

What you expected to happen:

Minimal Complete Verifiable Example: Step 1: create a container that has python, dask and the files run.py and worker-spec.yml (see below) and run CMD ["python","run.py"] step 2: create the cron job that runs a pod with a container in step 1

apiVersion: batch/v1
kind: CronJob
metadata:
  name: test
  namespace: myns
spec:
  schedule: "*/2 * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          serviceAccountName: dask-sa
          containers:
            - image: <my container with the run.py and worker-spec.yml files on it>
              imagePullPolicy: IfNotPresent

run.py

from dask_kubernetes import KubeCluster
from dask.distributed import Client
import dask.array as da

if __name__ == "__main__":
    cluster = KubeCluster(pod_template="worker-spec.yml", namespace="myns" )
    cluster.adapt(minimum=0, maximum=10)
    client = Client(cluster)
    array = da.ones((10000, 100000, 1000))
    print(array.mean().compute())  # Should print 1.0
    client.shutdown()

woker-spec.py

kind: Pod
spec:
  restartPolicy: Never
  containers:
    - image: ghcr.io/dask/dask:latest
      imagePullPolicy: Always
      args:
        [
          dask-worker,
          --nthreads,
          "6",
          --no-dashboard,
          --memory-limit,
          16GB,
          --death-timeout,
          "60",
        ]
      name: dask
      resources:
        limits:
          cpu: "6"
          memory: 16G
        requests:
          cpu: "6"
          memory: 16G

Anything else we need to know?:

Environment:

Cluster Dump State:
jadeidev commented 2 years ago

Ok, after reading some other issues I have been able to find some problems and make it work. #466 was definitely helpful. It seems that worker-spec.yml file is causing issues. need to change name to dask-worker

kind: Pod
spec:
  restartPolicy: Never
  containers:
    - image: ghcr.io/dask/dask:latest
      imagePullPolicy: Always
      args:
        [
          dask-worker,
          --nthreads,
          "6",
          --no-dashboard,
          --memory-limit,
          16GB,
          --death-timeout,
          "60",
        ]
      name: dask-worker
      resources:
        limits:
          cpu: "6"
          memory: 16G
        requests:
          cpu: "6"
          memory: 16G
jacobtomlinson commented 2 years ago

Sorry you're having trouble here. You are right that the container is expected to have a certain name.

I checked our docs to see what the example shows and that was wrong so I've raised #500 to fix that.

Given that you've found a solution I'm going to close this out. But please feel free to follow up if you have any more issues.