ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.23k stars 5.62k forks source link

Ray Tune + ray xgboost running out of disk space #34118

Open amohar2 opened 1 year ago

amohar2 commented 1 year ago

What happened + What you expected to happen

I have a script where RayTune is used to tune XGBoost_Ray on a very large distributed dataset and on a distributed 16-node cluster. Somewhere in the middle of the run I start seeing out of disk errors as follows:

(raylet) file_system_monitor.cc:105: /dev/shm/ray/spill is over 95% full, available space: 0; capacity: 270374129664. Object creation will fail if spilling is required.

Meanwhile when I check /dev/shm of some of the nodes, they are actually at 100% of their capacity: $ df -h /dev/shm/ Filesystem Size Used Avail Use% Mounted on tmpfs 252G 252G 0 100% /dev/shm

I don't see this issue when I run a single XGboost_ray train job on the same setup. So is it possible that RayTune does not properly cleanup the memory from one trial to the next, so that at some point the tmpfs folder runs out of space? If so, is there anyway to avoid this and somehow cleanup the tmpfs in between trials?

Some information on the setup: 16-node cluster each running 4 Ray jobs Each XGBoost training jobs will use 16 actors and 3 CPUs per actor Dataset has around 4B samples with approximately 800GB in-memory size. Data is a Ray dataset which is a collection of 16 equal partitions (each partition is a dummy random pandas dataframe of shape 250M x 28). Ray cluster is manually started as follows: Head node: ray start --head --port=6379 --num-cpus=4 --object-store-memory=270000000000 --disable-usage-stats --temp-dir=/dev/shm/ray --system-config='{"object_spilling_config":"{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/dev/shm/ray/spill\"}}"}' --include-dashboard True

Worker nodes: ray start --address='head_node_ip:6379' --num-cpus=4 --object-store-memory=270000000000 --disable-usage-stats --temp-dir=/dev/shm/ray

So basically, I set the config such that object_store can use all the space on tmpfs, and spill/ directory is on the same folder (due to some constraints on my system)

Versions / Dependencies

Ray 2.3.0 xgboost_ray 0.1.15 xgboost 1.6.0

Reproduction script

import ray
import pandas as pd
from sklearn.datasets import make_classification
from xgboost_ray import RayDMatrix, RayParams, RayParams, train
from ray import tune

def ray_tune(X, y, n_jobs, cpus_per_actor):

    ray_params = RayParams(
        num_actors=n_jobs,
        cpus_per_actor=cpus_per_actor,
        )

    train_set = RayDMatrix(X, y)
    def train_model(config):

        evals_result = {}
        bst = train(
            params=config,
            dtrain=train_set,
            evals_result=evals_result,
            evals=[(train_set, "train")],
            verbose_eval=False,
            ray_params=ray_params)

    config = {
        "eval_metric": ["logloss", "error"],
        "learning_rate": tune.uniform(0.0001, 1.0),
        "min_child_weight": tune.randint(0, 21),
        "max_depth": tune.randint(2, 11),
        "reg_alpha": tune.uniform(0, 4.0),
        "booster": tune.grid_search(["gbtree", "dart"]),
        "reg_lambda": tune.uniform(0, 100),
        "n_estimators": tune.randint(50, 100),
        }

    analysis = tune.run(
        train_model,
        config=config,
        metric="train-error",
        mode="min",
        num_samples=4,
        verbose=2,
        resources_per_trial=ray_params.get_tune_resources())
    print("Best hyperparameters", analysis.best_config)

@ray.remote
def create_pandas_df(n_samples, n_features, partition_id):
    X, y = make_classification(n_samples=n_samples, n_features=n_features, n_informative=28, n_redundant=0, n_classes=2, random_state=partition_id)
    df_X, df_y = pd.DataFrame(X, columns=[f"col_{i}" for i in range(n_features)]), pd.DataFrame(y, columns=['label'])
    df_all = pd.concat([df_X, df_y], axis=1)
    return df_all

if __name__ == "__main__":
    ray.init(_temp_dir="/dev/shm/ray",
             address='head_node_ip:6379')
    all_nodes_info = ray.nodes()
    object_refs = []
    n_samples, n_features = 250_000_000, 28
    for partition_id, node_info in enumerate(all_nodes_info):
        object_ref = create_pandas_df.options(
            scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
                node_id=node_info['NodeID'],
                soft=False,
            )
        ).remote(n_samples, n_features, partition_id)
        object_refs.append(object_ref)
    print(f"Submitted All the jobs to create partitions on each node")

    train_data = ray.data.from_pandas_refs(object_refs)
    n_jobs = len(all_nodes_info) # one xgboost actor per node (each actor works on one partition)
    cpus_per_actor = 3
    ray_tot_mem = ray_tune(train_data, 'label', n_jobs, cpus_per_actor)

Issue Severity

High: It blocks me from completing my task.

xwjiang2010 commented 1 year ago

Hi, I think there may be several aspects that contribute to this. For starter, could you may be try things like from_parquet, instead of doing splitting already by yourself with create_pandas_df on each node. I am not sure if this will result in the same "splitting" effect as needed by xgboost-ray actors. Splitting should be handled automatically for you in xgboost-ray and you should not need to do any manual work.

Second, we seem to be running 4 trials each with 16 actors. I am suspecting that train_data amount of data is copied for each trial instance. So we have 4 times the overall data as would be in the case of only 1 trial running. This can contribute to excessive disk spill as well.

xwjiang2010 commented 1 year ago

I am not super familiar with the internal impl of ray dataset. cc @amogkam to provide more insights here.

amohar2 commented 1 year ago

@xwjiang2010 Thanks for the response

For starter, could you may be try things like from_parquet, instead of doing splitting already by yourself with create_pandas_df on each node.

The dataset is larger than a single-node memory so the only way is to create the partitions on different nodes on the cluster. I can see a way of using read_parquet() from different sources, however, due to some other limitations on my setup, using pandas_refs is my only way of creating this large data from in-memory partitions.

Second, we seem to be running 4 trials each with 16 actors. I am suspecting that train_data amount of data is copied for each trial instance. So we have 4 times the overall data as would be in the case of only 1 trial running. This can contribute to excessive disk spill as well.

If this is the case, is there any way to avoid it? For example, by running some manual cleanup in between trials? I ran another experiment, by running xgboost trials in a for loop (to somewhat simulate what RayTune does) and my system ran out of memory on the 2nd iteration.