ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34k stars 5.78k forks source link

[xgboost_ray] Model Train Hangs/freeze when multiple actors used #26599

Closed Anando304 closed 1 year ago

Anando304 commented 2 years ago

Description

Background/Context

I was following the guide here for running a sample XGBoost model using xgboost_ray.

Problem

I have enough memory allocated(8 GB) and the program runs, but it seems to be training indefinitely for a long time.

Ray dashboard doesn't seem to show any errors in the logs.

Screen Shot 2022-07-15 at 2 44 50 PM

I ran the job using the following configuration for RayParams

However, if I change num_actors to 1 instead of 2, it completes successfully - and DOES NOT get stuck.

Not sure why it seems to fail when more than 1 actor is used even though the guide link above says to use 2 actors. 🤔

Steps to reproduce

Create a Ray cluster using no Helm approach: kubectl -n <namespace> create -f ./example_cluster.yaml

example_cluster.yaml file:

apiVersion: cluster.ray.io/v1
kind: RayCluster
metadata:
  name: example-cluster
spec:
  # The maximum number of workers nodes to launch in addition to the head node.
  maxWorkers: 3
  # The autoscaler will scale up the cluster faster with higher upscaling speed.
  # E.g., if the task requires adding more nodes then autoscaler will gradually
  # scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
  # This number should be > 0.
  upscalingSpeed: 1.0
  # If a node is idle for this many minutes, it will be removed.
  idleTimeoutMinutes: 5
  # Specify the pod type for the ray head node (as configured below).
  headPodType: head-node
  # Optionally, configure ports for the Ray head service.
  # The ports specified below are the defaults.
  headServicePorts:
    - name: client
      port: 10001
      targetPort: 10001
    - name: dashboard
      port: 8265
      targetPort: 8265
    - name: ray-serve
      port: 8000
      targetPort: 8000
  # Specify the allowed pod types for this ray cluster and the resources they provide.
  podTypes:
    - name: head-node
      podConfig:
        apiVersion: v1
        kind: Pod
        metadata:
          # The operator automatically prepends the cluster name to this field.
          generateName: ray-head-
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          restartPolicy: Never
          serviceAccount: default-editor
          # This volume allocates shared memory for Ray to use for its plasma
          # object store. If you do not provide this, Ray will fall back to
          # /tmp which cause slowdowns if is not a shared memory volume.
          volumes:
            - name: dshm
              emptyDir:
                medium: Memory
          containers:
            - name: ray-node
              imagePullPolicy: Always
              image: rayproject/ray:latest
              # Do not change this command - it keeps the pod alive until it is
              # explicitly killed.
              command: ["/bin/bash", "-c", "--"]
              args: ["trap : TERM INT; touch /tmp/raylogs; tail -f /tmp/raylogs; sleep infinity & wait;"]
              ports:
                - containerPort: 6379  # Redis port for Ray <= 1.10.0. GCS server port for Ray >= 1.11.0.
                - containerPort: 10001  # Used by Ray Client
                - containerPort: 8265  # Used by Ray Dashboard
                - containerPort: 8000 # Used by Ray Serve

              # This volume allocates shared memory for Ray to use for its plasma
              # object store. If you do not provide this, Ray will fall back to
              # /tmp which cause slowdowns if is not a shared memory volume.
              volumeMounts:
                - mountPath: /dev/shm
                  name: dshm
              resources:
                requests:
                  cpu: 4
                  memory: 8Gi
                  ephemeral-storage: 8Gi
                limits:
                  cpu: 4
                  # The maximum memory that this pod is allowed to use. The
                  # limit will be detected by ray and split to use 10% for
                  # redis, 30% for the shared memory object store, and the
                  # rest for application memory. If this limit is not set and
                  # the object store size is not set manually, ray will
                  # allocate a very large object store in each pod that may
                  # cause problems for other pods.
                  memory: 8Gi
    - name: worker-node
      # Minimum number of Ray workers of this Pod type.
      minWorkers: 2
      # Maximum number of Ray workers of this Pod type. Takes precedence over minWorkers.
      maxWorkers: 3
      # User-specified custom resources for use by Ray.
      # (Ray detects CPU and GPU from pod spec resource requests and limits, so no need to fill those here.)
      rayResources: {"example-resource-a": 1, "example-resource-b": 1}
      podConfig:
        apiVersion: v1
        kind: Pod
        metadata:
          # The operator automatically prepends the cluster name to this field.
          generateName: ray-worker-
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          serviceAccount: default-editor
          restartPolicy: Never
          volumes:
            - name: dshm
              emptyDir:
                medium: Memory
          containers:
            - name: ray-node
              imagePullPolicy: Always
              image: rayproject/ray:latest
              command: ["/bin/bash", "-c", "--"]
              args: ["trap : TERM INT; touch /tmp/raylogs; tail -f /tmp/raylogs; sleep infinity & wait;"]
              # This volume allocates shared memory for Ray to use for its plasma
              # object store. If you do not provide this, Ray will fall back to
              # /tmp which cause slowdowns if is not a shared memory volume.
              volumeMounts:
                - mountPath: /dev/shm
                  name: dshm
              resources:
                requests:
                  cpu: 4
                  memory: 8Gi
                  ephemeral-storage: 8Gi
                limits:
                  cpu: 4
                  # The maximum memory that this pod is allowed to use. The
                  # limit will be detected by ray and split to use 10% for
                  # redis, 30% for the shared memory object store, and the
                  # rest for application memory. If this limit is not set and
                  # the object store size is not set manually, ray will
                  # allocate a very large object store in each pod that may
                  # cause problems for other pods.
                  memory: 8Gi
  # Commands to start Ray on the head node. You don't need to change this.
  # Note dashboard-host is set to 0.0.0.0 so that Kubernetes can port forward.
  headStartRayCommands:
    - ray stop
    - ulimit -n 65536; ray start --disable-usage-stats --head --port=6379 --no-monitor --dashboard-host 0.0.0.0 &> /tmp/raylogs
  # Commands to start Ray on worker nodes. You don't need to change this.
  workerStartRayCommands:
    - ray stop
    - ulimit -n 65536; ray start --disable-usage-stats --address=$RAY_HEAD_IP:6379 &> /tmp/raylogs

Ray App Script:

import ray
from xgboost_ray import RayDMatrix, RayParams, train, predict
from sklearn.datasets import load_breast_cancer
import xgboost as xgb

# Initialize connection to remote Ray cluster on K8s and install dependencies
runtime_env = {"pip": ["pyarrow==8.0.0", "xgboost_ray", "gcsfs", "sklearn"]}

ray.init("ray://example-cluster-ray-head:10001", runtime_env=runtime_env)

# XG Boost Model Training
train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)

evals_result = {}
bst = train({"objective": "binary:logistic","eval_metric": ["logloss", "error"],},train_set,evals_result=evals_result,evals=[(train_set, "train")],verbose_eval=False,
            ray_params=RayParams(num_actors=2, cpus_per_actor=1) #num_actors=2
           )

bst.save_model("model.xgb")
print("Final training error: {:.4f}".format(
    evals_result["train"]["error"][-1]))

# Model Prediction
data, labels = load_breast_cancer(return_X_y=True)

dpred = RayDMatrix(data, labels)

bst = xgb.Booster(model_file="model.xgb")
pred_ray = predict(bst, dpred, ray_params=RayParams(num_actors=1))

print(pred_ray)

Link

https://docs.ray.io/en/latest/ray-more-libs/xgboost-ray.html#usage

richardliaw commented 2 years ago

Can you try putting the entire script into ray.remote?

@ray.remote
def run_remote():
      # XG Boost Model Training
      train_x, train_y = load_breast_cancer(return_X_y=True)
      train_set = RayDMatrix(train_x, train_y)

      evals_result = {}
      bst = train({"objective": "binary:logistic","eval_metric": ["logloss", "error"],},train_set,evals_result=evals_result,evals=[(train_set, "train")],verbose_eval=False,
                  ray_params=RayParams(num_actors=2, cpus_per_actor=1) #num_actors=2
                 )

      bst.save_model("model.xgb")
      print("Final training error: {:.4f}".format(
          evals_result["train"]["error"][-1]))

      # Model Prediction
      data, labels = load_breast_cancer(return_X_y=True)

      dpred = RayDMatrix(data, labels)

      bst = xgb.Booster(model_file="model.xgb")
      pred_ray = predict(bst, dpred, ray_params=RayParams(num_actors=1))

      return pred_ray

ray.get(run_remote.remote())
Anando304 commented 2 years ago

@richardliaw its the same result as before, basically training indefinitely/forever

Result:

(run_remote pid=461) 2022-07-15 11:41:33,105    INFO main.py:980 -- [RayXGBoost] Created 2 new actors (2 total actors). Waiting until actors are ready for training.
(run_remote pid=461) 2022-07-15 11:41:36,032    INFO main.py:1025 -- [RayXGBoost] Starting XGBoost training.
(_RemoteRayXGBoostActor pid=306, ip=7.255.241.229) [11:41:36] task [xgboost.ray]:140009536953040 got new rank 1
(_RemoteRayXGBoostActor pid=344, ip=7.255.240.222) [11:41:36] task [xgboost.ray]:139868628590352 got new rank 0
(run_remote pid=461) 2022-07-15 11:42:06,148    INFO main.py:1109 -- Training in progress (30 seconds since last restart).
(run_remote pid=461) 2022-07-15 11:42:36,247    INFO main.py:1109 -- Training in progress (60 seconds since last restart).
(run_remote pid=461) 2022-07-15 11:43:06,344    INFO main.py:1109 -- Training in progress (90 seconds since last restart).
richardliaw commented 2 years ago

that's really odd ok... @krfricke or @Yard1 any thoughts here?

richardliaw commented 2 years ago

Can you post logs @Anando304 (/tmp/raylogs)?

Anando304 commented 2 years ago

Here's the logs for /tmp/raylogs

2022-07-15 11:55:55,975 INFO services.py:1476 -- View the Ray dashboard at http://7.255.242.165:8265
2022-07-15 11:55:53,596 INFO usage_lib.py:297 -- Usage stats collection is disabled.
2022-07-15 11:55:53,596 INFO scripts.py:715 -- Local node IP: 7.255.242.165
2022-07-15 11:55:56,302 SUCC scripts.py:757 -- --------------------
2022-07-15 11:55:56,302 SUCC scripts.py:758 -- Ray runtime started.
2022-07-15 11:55:56,302 SUCC scripts.py:759 -- --------------------
2022-07-15 11:55:56,303 INFO scripts.py:761 -- Next steps
2022-07-15 11:55:56,303 INFO scripts.py:762 -- To connect to this Ray runtime from another node, run
2022-07-15 11:55:56,303 INFO scripts.py:767 --   ray start --address='7.255.242.165:6379'
2022-07-15 11:55:56,303 INFO scripts.py:770 -- Alternatively, use the following Python code:
2022-07-15 11:55:56,303 INFO scripts.py:772 -- import ray
2022-07-15 11:55:56,303 INFO scripts.py:785 -- ray.init(address='auto')
2022-07-15 11:55:56,303 INFO scripts.py:789 -- To connect to this Ray runtime from outside of the cluster, for example to
2022-07-15 11:55:56,303 INFO scripts.py:793 -- connect to a remote cluster from your laptop directly, use the following
2022-07-15 11:55:56,303 INFO scripts.py:796 -- Python code:
2022-07-15 11:55:56,303 INFO scripts.py:798 -- import ray
2022-07-15 11:55:56,303 INFO scripts.py:804 -- ray.init(address='ray://<head_node_ip_address>:10001')
2022-07-15 11:55:56,303 INFO scripts.py:810 -- If connection fails, check your firewall settings and network configuration.
2022-07-15 11:55:56,303 INFO scripts.py:816 -- To terminate the Ray runtime, run
2022-07-15 11:55:56,303 INFO scripts.py:817 --   ray stop
richardliaw commented 2 years ago

how about in /tmp/ray/session_latest/*?

Anando304 commented 2 years ago

cat: /tmp/ray/session_latest/logs: Is a directory {"7.255.242.165:/tmp/ray/session_2022-07-15_11-55-53_598472_140/sockets/plasma_store": {"metrics_agent_port": 52101, "metrics_export_port": 57843}}cat: /tmp/ray/session_latest/runtime_resources: Is a directory cat: /tmp/ray/session_latest/sockets: Is a directory

richardliaw commented 2 years ago

(check logs inside directory?)

Anando304 commented 2 years ago

There are quite a lot of different logs here, would you like to hop onto a quick call instead?

Anando304 commented 2 years ago

dashboard.log ray_client_server.out dashboard_agent.log ray_client_server_23000.err debug_state.txt ray_client_server_23000.out debug_state_gcs.txt raylet.err events raylet.out gcs_server.err runtime_env_setup-ray_client_server_23000.log gcs_server.out worker-6ccd38995718e210c30664fb9872c2b684374297e8450be25eea4baa-01000000-620.err log_monitor.log worker-6ccd38995718e210c30664fb9872c2b684374297e8450be25eea4baa-01000000-620.out old worker-81e22ef72e4962490e7f88ac48d73cb3a1f422ab04a5fe6aac89b49a-01000000-626.err python-core-driver-01000000ffffffffffffffffffffffffffffffffffffffffffffffff_509.log worker-81e22ef72e4962490e7f88ac48d73cb3a1f422ab04a5fe6aac89b49a-01000000-626.out python-core-worker-6ccd38995718e210c30664fb9872c2b684374297e8450be25eea4baa_620.log worker-9febc5919a9aa7a4a4bb79386707f9acb3b9b654e96a3774b4c5b690-01000000-572.err python-core-worker-81e22ef72e4962490e7f88ac48d73cb3a1f422ab04a5fe6aac89b49a_626.log worker-9febc5919a9aa7a4a4bb79386707f9acb3b9b654e96a3774b4c5b690-01000000-572.out python-core-worker-9febc5919a9aa7a4a4bb79386707f9acb3b9b654e96a3774b4c5b690_572.log worker-df522f2abb151e095ace1a949d91a069f35ea39b82f1968051156eb8-01000000-728.err python-core-worker-df522f2abb151e095ace1a949d91a069f35ea39b82f1968051156eb8_728.log worker-df522f2abb151e095ace1a949d91a069f35ea39b82f1968051156eb8-01000000-728.out python-core-worker-e0c03fca5fd86b65f5994ff0bc3708db856ad3aa39d7c960af68a4df_619.log worker-e0c03fca5fd86b65f5994ff0bc3708db856ad3aa39d7c960af68a4df-01000000-619.err ray_client_server.err worker-e0c03fca5fd86b65f5994ff0bc3708db856ad3aa39d7c960af68a4df-01000000-619.out

casassg commented 2 years ago

lets try to load the ones for xgboost ray actors

Anando304 commented 2 years ago

hmmm so i think the python-core-worker ones are relate to the actors when looking inside but the information doesn't seem to be particularly useful, mostly just how long its been running and things of that sort.

However, one of the log files gave this result

[2022-07-15 13:54:37,369 I 688 778] accessor.cc:599: Received notification for node id = 4f11fb61bd70eda1799d22789a270bec0ef5e11c81808eec1a4bc651, IsAlive = 1
[2022-07-15 13:54:37,369 I 688 778] accessor.cc:599: Received notification for node id = 948949140e766faf253352c416b99e4e16903b1f69322050608d2712, IsAlive = 1
[2022-07-15 13:54:37,369 I 688 778] accessor.cc:599: Received notification for node id = 09548b23ccc19bf204d6794e5ebbf608fc2668ea84fb8be422f8217d, IsAlive = 1
[2022-07-15 13:54:39,017 I 688 778] core_worker.cc:603: Exit signal received, this process will exit after all outstanding tasks have finished, exit_type=IDLE_EXIT
[2022-07-15 13:54:39,017 I 688 688] core_worker.cc:591: Disconnecting to the raylet.
[2022-07-15 13:54:39,017 I 688 688] raylet_client.cc:162: RayletClient::Disconnect, exit_type=IDLE_EXIT, has creation_task_exception_pb_bytes=0
[2022-07-15 13:54:39,017 I 688 688] core_worker.cc:539: Shutting down a core worker.
[2022-07-15 13:54:39,017 I 688 688] core_worker.cc:563: Disconnecting a GCS client.
[2022-07-15 13:54:39,017 I 688 688] core_worker.cc:567: Waiting for joining a core worker io thread. If it hangs here, there might be deadlock or a high load in the core worker io service.
[2022-07-15 13:54:39,017 I 688 778] core_worker.cc:679: Core worker main io service stopped.
[2022-07-15 13:54:39,018 I 688 688] core_worker.cc:576: Core worker ready to be deallocated.
[2022-07-15 13:54:39,018 I 688 688] core_worker_process.cc:314: Task execution loop terminated. Removing the global worker.
[2022-07-15 13:54:39,018 I 688 688] core_worker_process.cc:298: Removed worker ed4121f63622b51c6e692c81a48dd026ce8dcc0495bce205c5d90b09
[2022-07-15 13:54:39,018 I 688 688] core_worker.cc:530: Core worker is destructed
[2022-07-15 13:54:39,124 I 688 688] core_worker_process.cc:154: Destructing CoreWorkerProcessImpl. pid: 688
[2022-07-15 13:54:39,124 I 688 688] io_service_pool.cc:47: IOServicePool is stopped.

Maybe some sort of deadlock in a worker?

krfricke commented 2 years ago

We just came off a debugging session.

Results:

  1. The script does work on AWS EC2 instances (distributed workers)
  2. The script does not work on the k8s cluster when the Ray actors are distributed across nodes
  3. The script does work on the same k8s cluster when we force all Ray actors onto the same node

This thus seems to be a communication issue between kubernetes pods.

We'll try to repro this on our side to see if we can resolve this. One thing to try out would be to open up all ports for cross-node communication.

cc @DmitriGekhtman can you help me on how to set this up/deploy on our infrastructure to repro?

DmitriGekhtman commented 2 years ago

open up all ports for cross-node communication

By default everything is open within a K8s cluster. @Anando304 do you know of any network restrictions that might be relevant here?

@krfricke sure, could you grab a time on my calendar? We can go through it together.

Anando304 commented 2 years ago

Hey @DmitriGekhtman !

I don't believe there is any network restrictions.

I've ran Ray Train for a sample Tensorflow model found here on the same Kubernettes Ray Configuration that I'm currently using. That one seems to be running perfectly both with 1 actor AND more than 1 actor (distributed mode). So I think the pods are not blocked in any way, otherwise that should've failed too?

Maybe it is something else with xgboost_ray on K8's?

cc: @casassg @calvinleungyk

Anando304 commented 2 years ago

Hey all (@DmitriGekhtman @krfricke )!!

Just curious, have you had a chance to reproduce the error using K8's by any chance. Totally understand if you had your hands full with the latest release of Ray

DmitriGekhtman commented 2 years ago

Hey @Anando304! Your intuition about the Ray release is most astute :) :) I'll take a look into reproducing this issue today.

Besides unblocking you, we have selfish ulterior motives: In the context of the Ray release, we want Ray Tune on K8s to be a great experience :)

DmitriGekhtman commented 2 years ago

We just came off a debugging session.

Results:

  1. The script does work on AWS EC2 instances (distributed workers)
  2. The script does not work on the k8s cluster when the Ray actors are distributed across nodes
  3. The script does work on the same k8s cluster when we force all Ray actors onto the same node

This thus seems to be a communication issue between kubernetes pods.

We'll try to repro this on our side to see if we can resolve this. One thing to try out would be to open up all ports for cross-node communication.

cc @DmitriGekhtman can you help me on how to set this up/deploy on our infrastructure to repro?

@krfricke what size were the EC2 nodes you used? I see the K8s setup uses nodes with 4CPU 8Gi, 1 head, from 2 to 3 workers.

DmitriGekhtman commented 2 years ago

I am surprised that the actors in the example are getting scheduled across different Ray nodes, when the head node has 4 CPUs.

DmitriGekhtman commented 2 years ago

I applied the configuration and ran the scripts from the issue description and was not able to reproduce the issue. @Anando304, looks like you and I could use to do some pair debugging.

Anando304 commented 2 years ago

Sounds good, I'll DM you through slack and we can setup a short pair programming session to debug! Appreciate your time once again!!

Anando304 commented 2 years ago

So a few updates:

  1. Running xgboost_ray notebook on local minikube instance. xgboost_ray works fine when running a local minikube instance. As in, it doesn't hang indefinitely in the training step for more than one actor. I had to adjust the example_cluster.yaml file for the minikube instance to use 3 CPUs and 2Gi for each worker/head node. This ensures workers are not dying due to memory constraints. Through this experiment, it leads me to believe that the hanging problem from before may be something specific within my companies K8s environment, potentially some sort of port issue that @casassg and @krfricke alluded to before.

  2. Running the xgboost_ray notebook on a local cluster auto-deployed by Ray using ray.init() I wanted to test if xgboost_ray would work with multiple actors on a single ray cluster auto-deployed and managed by the Ray API. It seems to work quite well without any hanging issues. The performance gains are good with roughly 4x faster than using vanilla XGBoost with SkLearn. This is shown at the bottom of the below notebook.

Notebook

Anando304 commented 2 years ago

One thing I found interesting is that running that same notebook (posted in previous comment) on minikube using a multi-node Ray cluster, was slower than running using vanilla xgboost on its own. This is for 2 actors

I'm guessing there may be overhead between the actors communicating with each other for multi-node purposes, since it seemed to run quite fast when running on a single node ray cluster managed by ray itself via ray.init().

Any thoughts @DmitriGekhtman on why this may be? Was there any performance increase for you when you ran it on a multi-node cluster vs a single-node ray cluster? The dataset in the above notebook was scaled up so that it would be big enough to see the performance differences.

DmitriGekhtman commented 2 years ago

There is no benefit to parallelizing on a single machine using minikube. The idea is that, for large datasets, you can train quickly by parallelizing across several machines.

stale[bot] commented 1 year ago

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

You can always ask for help on our discussion forum or Ray's public slack channel.

stale[bot] commented 1 year ago

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!