ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.93k stars 5.77k forks source link

[Core] Ray Worker stuck in launching state - Azure AKS #45775

Open pinduzera opened 5 months ago

pinduzera commented 5 months ago

What happened + What you expected to happen

Not sure if it should be here or under the Kuberay project.

What should happen: When using the autoscaler on Azure Kubernetes service (aks), it should create new worker pods in the "pending state" (waiting for the nodes to be provided). I've documented in detail on this ray post.

What happened: Usually 1 or 2 worker pods doesn't spawn.

After all the nodes finishes running (cancelled/killed) ray status gives me the following output, which you will notice that it still pending 1 worker launching even after the job finished. The request seems to never have reached the Kubernetes cluster or ray doesn't know that it didn't reach it.

kubectl get pods doesn't show any worker pod in pending state, which is expected after the job finished

======== Autoscaler status: 2024-05-24 13:41:20.153528 ========
Node status
---------------------------------------------------------------
Active:
 1 headgroup
Idle:
 (no idle nodes)
Pending:
 spotworker, 1 launching
Recent failures:
 spotworker: NodeTerminated (ip: 10.224.1.35)
 spotworker: NodeTerminated (ip: 10.224.10.86)
 spotworker: NodeTerminated (ip: 10.224.1.163)
 spotworker: NodeTerminated (ip: 10.224.0.244)
 spotworker: NodeTerminated (ip: 10.224.1.9)
 spotworker: NodeTerminated (ip: 10.224.10.160)
 spotworker: NodeTerminated (ip: 10.224.11.125)

Resources
---------------------------------------------------------------
Usage:
 0B/18.44GiB memory
 0B/9.22GiB object_store_memory

Demands:
 (no resource demands)

Versions / Dependencies

Ray 2.21.0 python 3.10.14 (this is the example) Ray 2.23.0 Python 3.11.7 (also tested with these versions)

This is my yaml file, which is a variation of the one found on kuberay project , with some antiAffinity to guarantee workers never use the same node.

(You probably don't need the volume claim and mount, but I will leave it there since it is how I was using it)

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: raycluster-autoscaler
spec:
  # The version of Ray you are using. Make sure all Ray containers are running this version of Ray.
  # Use the Ray nightly or Ray version >= 2.21.0 and KubeRay 1.1.0 or later for autoscaler v2.
  rayVersion: '2.21.0'
  enableInTreeAutoscaling: true
  autoscalerOptions:
    upscalingMode: Default
    idleTimeoutSeconds: 60
    imagePullPolicy: IfNotPresent
    # Optionally specify the Autoscaler container's securityContext.
    securityContext: {}
    env: []
    envFrom: []
    # resources:
    #   limits:
    #     cpu: "500m"
    #     memory: "512Mi"
    #   requests:
    #     cpu: "500m"
    #     memory: "512Mi"
  # Ray head pod template
  headGroupSpec:
    rayStartParams:
      # Setting "num-cpus: 0" to avoid any Ray actors or tasks being scheduled on the Ray head Pod.
      num-cpus: "0"
    # Pod template
    template:
      spec:
        nodeSelector:
          raymachine: rayhead
        containers:
        # The Ray head container
        - name: ray-head
          image: rayacr.azurecr.io/ray:2.21.0-py310
          ports:
          - containerPort: 6379
            name: gcs
          - containerPort: 8265
            name: dashboard
          - containerPort: 10001
            name: client
          lifecycle:
            preStop:
              exec:
                command: ["/bin/sh","-c","ray stop"]
          # resources:
          #   limits:
          #     cpu: "4"
          #     memory: "16G"
          #   requests:
          #     cpu: "2"
          #     memory: "8G"
          env:
            - name: RAY_enable_autoscaler_v2 # Pass env var for the autoscaler v2.
              value: "1"
          volumeMounts:
            - mountPath: /home/ray/samples
              name: ray-example-configmap
            - mountPath: "/mnt/azure"
              name: azurepvc
              readOnly: false
        volumes:
          - name: ray-example-configmap
            configMap:
              name: ray-example
              defaultMode: 0777
              items:
                - key: detached_actor.py
                  path: detached_actor.py
                - key: terminate_detached_actor.py
                  path: terminate_detached_actor.py
          - name: azurepvc
            persistentVolumeClaim:
              claimName: azure-file-disk
        restartPolicy: Never # No restart to avoid reuse of pod for different ray nodes.
  workerGroupSpecs:
  # the Pod replicas in this group typed worker
  - replicas: 0
    minReplicas: 0
    maxReplicas: 40
    groupName: spotworker
    rayStartParams:
      num-cpus: "8"
    # Pod template
    template:
      metadata:
        labels:
          podtype: worker
      spec:
        nodeSelector:
          raymachine: spotworker
        containers:
        - name: ray-worker
          image: rayacr.azurecr.io/ray:2.21.0-py310
          # resources:
          #   limits:
          #     cpu: "8"
          #     memory: "32G"
          #   requests:
          #     cpu: "4"
          #     memory: "16G"
          volumeMounts:
            - mountPath: /home/ray/samples
              name: ray-example-configmap
            - mountPath: "/mnt/azure"
              name: azurepvc
              readOnly: false
        restartPolicy: Never # Never restart a pod to avoid pod reuse
        tolerations:
        - key: "kubernetes.azure.com/scalesetpriority"
          operator: "Equal"
          value: "spot"
          effect: "NoSchedule"
        volumes:
          - name: azurepvc
            persistentVolumeClaim:
              claimName: azure-file-disk
        #Anti affinity, to guarantee only 1 worker per machine
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                - key: podtype
                  operator: In
                  values:
                  - worker
              topologyKey: "kubernetes.io/hostname"      
---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: azure-file-disk
provisioner: file.csi.azure.com # replace with "kubernetes.io/azure-file" if aks version is less than 1.21
allowVolumeExpansion: true
mountOptions:
 - dir_mode=0777
 - file_mode=0777
 - uid=0
 - gid=0
 - mfsymlinks
 - cache=strict
 - actimeo=30
parameters:
  skuName: Standard_LRS
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: azure-file-disk
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: azure-file-disk
  resources:
    requests:
      storage: 5Gi
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-example
data:
  detached_actor.py: |
    import ray
    import sys

    @ray.remote(num_cpus=1)
    class Actor:
      pass

    ray.init(namespace="default_namespace")
    Actor.options(name=sys.argv[1], lifetime="detached").remote()

  terminate_detached_actor.py: |
    import ray
    import sys

    ray.init(namespace="default_namespace")
    detached_actor = ray.get_actor(sys.argv[1])
    ray.kill(detached_actor)
---
kind: Service
apiVersion: v1
metadata:
  name: raycluster-autoscaler-head-svc
spec:
  ports:
    - name: client
      protocol: TCP
      appProtocol: tcp
      port: 10001
      targetPort: 10001
    - name: dashboard
      protocol: TCP
      appProtocol: tcp
      port: 8265
      targetPort: 8265
    - name: gcs
      protocol: TCP
      appProtocol: tcp
      port: 6379
      targetPort: 6379
    - name: metrics
      protocol: TCP
      appProtocol: tcp
      port: 8080
      targetPort: 8080
  selector:
    app.kubernetes.io/created-by: kuberay-operator
    app.kubernetes.io/name: kuberay
    ray.io/cluster: raycluster-autoscaler
    ray.io/identifier: raycluster-autoscaler-head
    ray.io/node-type: head
  type: LoadBalancer
  loadBalancerSourceRanges:
  - 192.168.0.0/32          # restrict to company IP

Reproduction script

The goal was to spawn some actors (or placement groups) and "hot start" all the nodes before doing any computation. But if I start the actors too quickly what will happen is that one or two workers will never be spawn (and can't be seen under kubectl get pods). So, I've added a workaround that is to wait 3 seconds before creating the next cluster, which seems to make the ray autoscaler not to fail.

Each Node have 8 Cpus, so a asking for 8 cpus fills a single worker (and a single node)

import ray
from time import sleep

@ray.remote(num_cpus=8)
class MyActor:
    def __init__(self, value):
        self.value = value
    def is_ready(self):
        if (self.value % 2 > 0): sleep(5)
        return "Actor {} is ready".format(self.value)

ray.init()

## number of machines to spawn
spawn = 8

## workaround: Make the create actors slowly, 3 seconds between requests make the cluster/autoscaler happy:
## actor_handles = [MyActor.remote(k) for k in range(spawn) if sleep(3) is None]
actor_handles = [MyActor.remote(k) for k in range(spawn)]
not_initiated_id = [handle.is_ready.remote() for handle in actor_handles]

ready_ids = 0
results = []
timeout = 0
while not_initiated_id:
    ## Timeout doesn't differentiate from running/scheduling
    initiated_id, not_initiated_id = ray.wait(not_initiated_id, timeout = 60)
    timeout += 60
    if len(initiated_id) > 0:
        ready_ids += len(initiated_id)
        results.append(ray.get(initiated_id[0]))
    print("initiated: {} | Not initiated: {}".format(ready_ids, len(not_initiated_id)))

    if (timeout >= 600):
        for task in not_initiated_id:
            ray.cancel(task)
        break

for result in results:
    print(result)

for handle in actor_handles:
    ray.kill(handle)

Issue Severity

Medium: It is a significant difficulty but I can work around it.

pinduzera commented 4 months ago

As an Additional. Seems like if you try to launch new pods while the previous are in Terminating state, the newer ones will hang as well and never spawn.

jjyao commented 4 months ago

@pinduzera,

Could you try to remove

env:
            - name: RAY_enable_autoscaler_v2 # Pass env var for the autoscaler v2.
              value: "1"

This enables the experimental autoscaler v2 which might have some bugs. Removing this will use the default autoscaler v1.

pinduzera commented 4 months ago

@pinduzera,

Could you try to remove

env:
            - name: RAY_enable_autoscaler_v2 # Pass env var for the autoscaler v2.
              value: "1"

This enables the experimental autoscaler v2 which might have some bugs. Removing this will use the default autoscaler v1.

That worked, thanks. I had copied the example from the repo and missed that it was an using an alpha autoscaler. Thought the v2 was standard since it was the example.

Thanks :)