dask / dask-kubernetes

Native Kubernetes integration for Dask
https://kubernetes.dask.org
BSD 3-Clause "New" or "Revised" License
311 stars 143 forks source link

Improper scaling of dask workers #677

Open naveenrb98 opened 1 year ago

naveenrb98 commented 1 year ago

So when using dask cluster on kubernetes with adaptive scaling there are two issues I noticed.

One is repeated scale up or scale down request happening but no scaling happens as result of it even though cluster scaling permission is given to the kubernetes cluster. image This image shows the issue. Scale down request was fired every 5 seconds but no scale down happened. Same goes for scale up.

Second issue i came across was regarding kopf.

@kopf.timer("daskautoscaler.kubernetes.dask.org", interval=5.0)
async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs):
    async with kubernetes.client.api_client.ApiClient() as api_client:
        coreapi = kubernetes.client.CoreV1Api(api_client)

        pod_ready = False
        try:
            scheduler_pod = await coreapi.read_namespaced_pod(
                f"{spec['cluster']}-scheduler", namespace

This is the function that gets called for adaptive scaling which does a replica scale.

While this has happened and cluster is trying to scale to maximum possible workers another function mentioned below pitches in and gives a scale down command because of which cluster never scales the way we want.

@kopf.on.field("daskworkergroup.kubernetes.dask.org", field="spec.worker.replicas")
async def daskworkergroup_replica_update(
        name, namespace, meta, spec, new, body, logger, initial=False, **kwargs
):
    if not initial:
        return
    cluster_name = spec["cluster"]

    # Replica updates can come in quick succession and the changes must be applied atomically to ensure
    # the number of workers ends in the correct state
    async with worker_group_scale_locks[f"{namespace}/{name}"]:
        async with kubernetes.client.api_client.ApiClient() as api_client:
            customobjectsapi = kubernetes.client.CustomObjectsApi(api_client)
            corev1api = kubernetes.client.CoreV1Api(api_client)
jacobtomlinson commented 1 year ago

My guess is that the daskautoscaler_adapt timer is asking the scheduler what it wants to do and it is trying to scale the worker group. However daskworkergroup_replica_update is locked for a long time for some reason.

@jmif would you mind having a look at this as I expect it is related to the recent changes in #649