ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.04k stars 5.78k forks source link

num_cpus not handled correctly when function has a Queue argument #14863

Open birgerbr opened 3 years ago

birgerbr commented 3 years ago

What is the problem?

Too many tasks are put on the same node, the CPU requirements are not handled correctly. In the test below, a node is set to have 7 CPUs and each task has num_cpus=3. There should be no more than two tasks and one queue-actor on a node, but 4 tasks are put in one node together with the queue-actor. Screenshot from 2021-03-23 09-43-07

Reproduction (REQUIRED)

import ray
import ray.util.queue

import time

ray.init(address="auto")

@ray.remote(num_cpus=3.0, memory=1024 ** 3)
def f(input_queue):
    while not input_queue.empty():
        try:
            input_queue.get()
        except ray.util.queue.Empty:
            break
        time.sleep(30)

# Work-around to trigger scale-up.
# See https://github.com/ray-project/ray/issues/14862
ray.get(ray.remote(num_cpus=2.0)(lambda: time.sleep(1)).remote())

input_queue = ray.util.queue.Queue()
for i in range(10):
    input_queue.put(i)

ray.get([f.remote(input_queue) for i in range(6)])

ray config:

# An unique identifier for the head node and workers of this cluster.
cluster_name: default

# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 0

# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 50

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 10.0

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 1

# Kubernetes resources that need to be configured for the autoscaler to be
# able to manage the Ray cluster. If any of the provided resources don't
# exist, the autoscaler will attempt to create them. If this fails, you may
# not have the required permissions and will have to request them to be
# created by your cluster administrator.
provider:
    type: kubernetes

    # Exposing external IP addresses for ray pods isn't currently supported.
    use_internal_ips: true

    # Namespace to use for all resources created.
    namespace: ray

    # ServiceAccount created by the autoscaler for the head node pod that it
    # runs in. If this field isn't provided, the head pod config below must
    # contain a user-created service account with the proper permissions.
    autoscaler_service_account:
        apiVersion: v1
        kind: ServiceAccount
        metadata:
            name: autoscaler

    # Role created by the autoscaler for the head node pod that it runs in.
    # If this field isn't provided, the role referenced in
    # autoscaler_role_binding must exist and have at least these permissions.
    autoscaler_role:
        kind: Role
        apiVersion: rbac.authorization.k8s.io/v1
        metadata:
            name: autoscaler
        rules:
        - apiGroups: [""]
          resources: ["pods", "pods/status", "pods/exec"]
          verbs: ["get", "watch", "list", "create", "delete", "patch"]

    # RoleBinding created by the autoscaler for the head node pod that it runs
    # in. If this field isn't provided, the head pod config below must contain
    # a user-created service account with the proper permissions.
    autoscaler_role_binding:
        apiVersion: rbac.authorization.k8s.io/v1
        kind: RoleBinding
        metadata:
            name: autoscaler
        subjects:
        - kind: ServiceAccount
          name: autoscaler
        roleRef:
            kind: Role
            name: autoscaler
            apiGroup: rbac.authorization.k8s.io

    services:
      # Service that maps to the head node of the Ray cluster.
      - apiVersion: v1
        kind: Service
        metadata:
            # NOTE: If you're running multiple Ray clusters with services
            # on one Kubernetes cluster, they must have unique service
            # names.
            name: ray-head
        spec:
            # This selector must match the head node pod's selector below.
            selector:
                component: ray-head
            ports:
                - protocol: TCP
                  port: 8000
                  targetPort: 8000
                  name: 8k
                - protocol: TCP
                  port: 6379
                  name: redis-primary
                  targetPort: 6379
                - protocol: TCP
                  port: 6380
                  targetPort: 6380
                  name: redis-shard-0
                - protocol: TCP
                  port: 6381
                  targetPort: 6381
                  name: redis-shard-1
                - protocol: TCP
                  port: 12345
                  targetPort: 12345
                  name: object-manager
                - protocol: TCP
                  port: 12346
                  targetPort: 12346
                  name: node-manager

      # Service that maps to the worker nodes of the Ray cluster.
      - apiVersion: v1
        kind: Service
        metadata:
            # NOTE: If you're running multiple Ray clusters with services
            # on one Kubernetes cluster, they must have unique service
            # names.
            name: ray-workers
        spec:
            # This selector must match the worker node pods' selector below.
            selector:
                component: ray-worker
            ports:
                - protocol: TCP
                  port: 8000
                  targetPort: 8000

available_node_types:
    head_node:
        node_config:
            metadata:
                # Automatically generates a name for the pod with this prefix.
                generateName: ray-head-

                # Must match the head node service selector above if a head node
                # service is required.
                labels:
                    component: ray-head
                tolerations:
                - key: ray-head
                  operator: Equal
                  value: "true"
                  effect: NoSchedule
        resources: {}
        min_workers: 0
        max_workers: 0
    node_pool_cpu:
        node_config:
            apiVersion: v1
            kind: Pod
            metadata:
                # Automatically generates a name for the pod with this prefix.
                generateName: ray-worker-cpu-

                # Must match the worker node service selector above if a worker node
                # service is required.
                labels:
                    component: ray-worker
            spec:
                tolerations:
                - key: cloud.google.com/gke-preemptible
                  operator: Equal
                  value: "true"
                  effect: NoSchedule
                - key: imerso-ray-worker
                  operator: Equal
                  value: "true"
                  effect: NoSchedule

                serviceAccountName: ray-prod

                # Worker nodes will be managed automatically by the head node, so
                # do not change the restart policy.
                restartPolicy: Never

                # This volume allocates shared memory for Ray to use for its plasma
                # object store. If you do not provide this, Ray will fall back to
                # /tmp which cause slowdowns if is not a shared memory volume.
                volumes:
                - name: dshm
                  emptyDir:
                      medium: Memory
                - name: filestore-ray
                  persistentVolumeClaim:
                    claimName: fileserver-ray-claim
                    readOnly: false

                containers:
                - name: ray-node
                  imagePullPolicy: Always
                  # You are free (and encouraged) to use your own container image,
                  # but it should have the following installed:
                  #   - rsync (used for `ray rsync` commands and file mounts)
                  image: rayproject/ray:latest
                  # Do not change this command - it keeps the pod alive until it is
                  # explicitly killed.
                  command: ["/bin/bash", "-c", "--"]
                  args: ["trap : TERM INT; sudo /usr/sbin/ldconfig.real; sleep infinity & wait;"]
                  ports:
                      - containerPort: 12345 # Ray internal communication.
                      - containerPort: 12346 # Ray internal communication.

                  # This volume allocates shared memory for Ray to use for its plasma
                  # object store. If you do not provide this, Ray will fall back to
                  # /tmp which cause slowdowns if is not a shared memory volume.
                  volumeMounts:
                      - mountPath: /dev/shm
                        name: dshm
                      - mountPath: /filestore
                        name: filestore-ray
                  resources:
                      requests:
                          cpu: 7
                          memory: 25Gi
                  env:
                      # This is used in the head_start_ray_commands below so that
                      # Ray can spawn the correct number of processes. Omitting this
                      # may lead to degraded performance.
                      - name: MY_CPU_REQUEST
                        valueFrom:
                            resourceFieldRef:
                                resource: requests.cpu
        resources: {"CPU": 7, "memory": 26843545600} # Memory-unit ~= 52 MB
        min_workers: 0
        max_workers: 50
    node_pool_gpu:
        node_config:
            apiVersion: v1
            kind: Pod
            metadata:
                # Automatically generates a name for the pod with this prefix.
                generateName: ray-worker-gpu-

                # Must match the worker node service selector above if a worker node
                # service is required.
                labels:
                    component: ray-worker
            spec:
                tolerations:
                - key: cloud.google.com/gke-preemptible
                  operator: Equal
                  value: "true"
                  effect: NoSchedule
                - key: imerso-ray-worker
                  operator: Equal
                  value: "true"
                  effect: NoSchedule

                serviceAccountName: ray-prod

                # Worker nodes will be managed automatically by the head node, so
                # do not change the restart policy.
                restartPolicy: Never

                # This volume allocates shared memory for Ray to use for its plasma
                # object store. If you do not provide this, Ray will fall back to
                # /tmp which cause slowdowns if is not a shared memory volume.
                volumes:
                - name: dshm
                  emptyDir:
                      medium: Memory
                - name: filestore-ray
                  persistentVolumeClaim:
                    claimName: fileserver-ray-claim
                    readOnly: false

                containers:
                - name: ray-node
                  imagePullPolicy: Always
                  # You are free (and encouraged) to use your own container image,
                  # but it should have the following installed:
                  #   - rsync (used for `ray rsync` commands and file mounts)
                  image: rayproject/ray:latest
                  # Do not change this command - it keeps the pod alive until it is
                  # explicitly killed.
                  command: ["/bin/bash", "-c", "--"]
                  args: ["trap : TERM INT; sudo /usr/sbin/ldconfig.real; sleep infinity & wait;"]
                  ports:
                      - containerPort: 12345 # Ray internal communication.
                      - containerPort: 12346 # Ray internal communication.

                  # This volume allocates shared memory for Ray to use for its plasma
                  # object store. If you do not provide this, Ray will fall back to
                  # /tmp which cause slowdowns if is not a shared memory volume.
                  volumeMounts:
                      - mountPath: /dev/shm
                        name: dshm
                      - mountPath: /filestore
                        name: filestore-ray
                  resources:
                      requests:
                          cpu: 7
                          memory: 25Gi
                      limits:
                          nvidia.com/gpu: 1
                  env:
                      # This is used in the head_start_ray_commands below so that
                      # Ray can spawn the correct number of processes. Omitting this
                      # may lead to degraded performance.
                      - name: MY_CPU_REQUEST
                        valueFrom:
                            resourceFieldRef:
                                resource: requests.cpu
        resources: {"CPU": 7, "memory": 26843545600, "GPU": 1, "accelerator_type:T4": 1} # Memory-unit ~= 52 MB
        min_workers: 0
        max_workers: 20

head_node_type: head_node     
worker_default_node_type: node_pool_cpu

# Kubernetes pod config for the head node pod.
head_node:
    apiVersion: v1
    kind: Pod
    spec:
        # Change this if you altered the autoscaler_service_account above
        # or want to provide your own.
        serviceAccountName: autoscaler

        # Restarting the head node automatically is not currently supported.
        # If the head node goes down, `ray up` must be run again.
        restartPolicy: Never

        # This volume allocates shared memory for Ray to use for its plasma
        # object store. If you do not provide this, Ray will fall back to
        # /tmp which cause slowdowns if is not a shared memory volume.
        volumes:
        - name: dshm
          emptyDir:
              medium: Memory
        - name: filestore-ray
          persistentVolumeClaim:
            claimName: fileserver-ray-claim
            readOnly: false

        containers:
        - name: ray-node
          imagePullPolicy: Always
          # You are free (and encouraged) to use your own container image,
          # but it should have the following installed:
          #   - rsync (used for `ray rsync` commands and file mounts)
          #   - screen (used for `ray attach`)
          #   - kubectl (used by the autoscaler to manage worker pods)
          image: rayproject/ray:latest
          # Do not change this command - it keeps the pod alive until it is
          # explicitly killed.
          command: ["/bin/bash", "-c", "--"]
          args: ["trap : TERM INT; sleep infinity & wait;"]
          ports:
              - containerPort: 6379 # Redis port.
              - containerPort: 6380 # Redis port.
              - containerPort: 6381 # Redis port.
              - containerPort: 12345 # Ray internal communication.
              - containerPort: 12346 # Ray internal communication.

          # This volume allocates shared memory for Ray to use for its plasma
          # object store. If you do not provide this, Ray will fall back to
          # /tmp which cause slowdowns if is not a shared memory volume.
          volumeMounts:
              - mountPath: /dev/shm
                name: dshm
              - mountPath: /filestore
                name: filestore-ray
          resources:
              requests:
                  cpu: 500m
                  memory: 5Gi
              limits:
                  memory: 5Gi
          env:
              # This is used in the head_start_ray_commands below so that
              # Ray can spawn the correct number of processes. Omitting this
              # may lead to degraded performance.
              - name: MY_CPU_REQUEST
                valueFrom:
                    resourceFieldRef:
                        resource: requests.cpu

# Kubernetes pod config for worker node pods.
worker_nodes: {}

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
#    "~/path1/on/remote/machine": "/path1/on/local/machine",
#    "~/path2/on/remote/machine": "/path2/on/local/machine",
}
# Note that the container images in this example have a non-root user.
# To avoid permissions issues, we recommend mounting into a subdirectory of home (~).

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down.
# This is not supported on kubernetes.
# rsync_exclude: []

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
# This is not supported on kubernetes.
# rsync_filter: []

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []

# List of shell commands to run to set up nodes.
setup_commands: []

# Custom commands that will be run on the head node after common setup.
head_setup_commands: []

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
# Note webui-host is set to 0.0.0.0 so that kubernetes can port forward.
head_start_ray_commands:
    - ray stop
    - ulimit -n 65536; ray start --head --num-cpus=0 --object-store-memory 1073741824 --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host 0.0.0.0

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - ulimit -n 65536; ray start --address=ray-head.ray.svc.cluster.local:6379 --object-store-memory 104857600 --object-manager-port=8076
richardliaw commented 3 years ago

This may be fixed on latest master? or latest release?

richardliaw commented 3 years ago

@architkulkarni can you take a look at this?