ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.28k stars 5.82k forks source link

Blas GEMM launch failed when running ray tune 2.1 with horovod + gpu #31754

Closed ckyuto closed 1 year ago

ckyuto commented 1 year ago

What happened + What you expected to happen

Hi team,

Recently I tried to upgrade our ray tune version from 1.2 to 2.1. However I found that when I run horovod + gpu on ray cluster, it will cause the error like below,

Is there any change that cause this issue?

(TunerInternal pid=4780) 2023-01-18 20:42:24,186        ERROR trial_runner.py:993 -- Trial HorovodTrainer_750ba_00002: Error processing event.
(TunerInternal pid=4780) ray.exceptions.RayTaskError(InternalError): ray::_Inner.train() (pid=588, ip=100.96.197.49, repr=HorovodTrainer)
(TunerInternal pid=4780)   File "/home/jobuser/.shiv/cloudflow-dsl-example.pyz_5671c8d0e260c4ebf25144901e349542aa0e77068788380bed00dbf2496d6542/site-packages/ray/tune/trainable/trainable.py", line 355, in train
(TunerInternal pid=4780)     raise skipped from exception_cause(skipped)
(TunerInternal pid=4780)   File "/home/jobuser/.shiv/cloudflow-dsl-example.pyz_5671c8d0e260c4ebf25144901e349542aa0e77068788380bed00dbf2496d6542/site-packages/ray/train/_internal/utils.py", line 54, in check_for_failure
(TunerInternal pid=4780)     ray.get(object_ref)
(TunerInternal pid=4780) ray.exceptions.RayTaskError(InternalError): ray::RayTrainWorker._RayTrainWorker__execute() (pid=462, ip=100.96.205.88, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7f5520779f10>)
(TunerInternal pid=4780)   File "/home/jobuser/.shiv/cloudflow-dsl-example.pyz_5671c8d0e260c4ebf25144901e349542aa0e77068788380bed00dbf2496d6542/site-packages/ray/train/_internal/worker_group.py", line 31, in __execute
(TunerInternal pid=4780)     raise skipped from exception_cause(skipped)
(TunerInternal pid=4780)   File "/home/jobuser/.shiv/cloudflow-dsl-example.pyz_5671c8d0e260c4ebf25144901e349542aa0e77068788380bed00dbf2496d6542/site-packages/ray/train/_internal/utils.py", line 129, in discard_return_wrapper
(TunerInternal pid=4780)     train_func(*args, **kwargs)
(TunerInternal pid=4780)   File "/home/wyen/workspace/kingkong-starter-kit_trunk/cloudflow-dsl-example/src/linkedin/tensorflow/mnist/mnist_horovod_raytune.py", line 104, in train_func
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1100, in fit
(TunerInternal pid=4780)     tmp_logs = self.train_function(iterator)
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 828, in __call__
(TunerInternal pid=4780)     result = self._call(*args, **kwds)
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 888, in _call
(TunerInternal pid=4780)     return self._stateless_fn(*args, **kwds)
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 2943, in __call__
(TunerInternal pid=4780)     filtered_flat_args, captured_inputs=graph_function.captured_inputs)  # pylint: disable=protected-access
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 1919, in _call_flat
(TunerInternal pid=4780)     ctx, args, cancellation_manager=cancellation_manager))
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 560, in call
(TunerInternal pid=4780)     ctx=ctx)
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
(TunerInternal pid=4780)     inputs, attrs, num_outputs)
(TunerInternal pid=4780) tensorflow.python.framework.errors_impl.InternalError:  Blas GEMM launch failed : a.shape=(64, 784), b.shape=(784, 64), m=64, n=64, k=784
(TunerInternal pid=4780)         [[node sequential/dense/MatMul (defined at home/wyen/workspace/kingkong-starter-kit_trunk/cloudflow-dsl-example/src/linkedin/tensorflow/mnist/mnist_horovod_raytune.py:104) ]] [Op:__inference_train_function_515]
(TunerInternal pid=4780) 
(TunerInternal pid=4780) Function call stack:

Versions / Dependencies

Ray:2.1.0 CUDA: 11 Tensorflow: 2.4.0.39 Horovod: 0.23.0.7 Python: 3.7.10

Reproduction script

This is my code, and you can run with

python -m 'linkedin.tensorflow.mnist.mnist_horovod_raytune' --download_data --use_gpu

import argparse
import numpy as np
import os
import socket

import horovod.tensorflow.keras as hvd
import ray
import tensorflow as tf

from ray import air, tune
from ray.air.callbacks.keras import Callback as TrainCheckpointReportCallback
from ray.air.config import ScalingConfig
from ray.train.horovod import HorovodTrainer
from ray.tune.tune_config import TuneConfig

def build_and_compile_cnn_model(optimizer):
    model = tf.keras.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=optimizer,
        metrics=['accuracy'])
    return model

def train_func(config):
    os.environ["TF_GPU_CUPTI_USE_ACTIVITY_API"] = "false"
    hostname = socket.gethostname().replace(".", "_")

    per_worker_batch_size = config["batch_size"]

    # Horovod: initialize Horovod.
    hvd.init()

    global_batch_size = per_worker_batch_size * hvd.size()
    log_dir = os.path.join(config["working_dir"], hostname, f"worker_{hvd.rank()}")
    learning_rate = config.get("lr", 0.001) * hvd.size()
    steps_per_epoch = config["steps"] // config["epochs"] // hvd.size()
    ckpt_full_path = os.path.join(config["working_dir"], hostname, 'model.ckpt-{epoch:04d}')
    opt = tf.keras.optimizers.SGD(learning_rate=learning_rate)
    opt = hvd.DistributedOptimizer(opt)
    callbacks = [
        # Horovod: broadcast initial variable states from rank 0 to all other processes.
        # This is necessary to ensure consistent initialization of all workers when
        # training is started with random weights or restored from a checkpoint.
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),

        # Horovod: average metrics among workers at the end of every epoch.
        #
        # Note: This callback must be in the list before the ReduceLROnPlateau,
        # TensorBoard or other metrics-based callbacks.
        hvd.callbacks.MetricAverageCallback(),

        # Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
        # accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
        # the first three epochs. See https://arxiv.org/abs/1706.02677 for details.
        hvd.callbacks.LearningRateWarmupCallback(initial_lr=learning_rate, warmup_epochs=3, verbose=1),

        tf.keras.callbacks.TensorBoard(log_dir=log_dir, profile_batch=config["profile_batch"], histogram_freq=2),
        TrainCheckpointReportCallback(),
    ]

    # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
    if hvd.rank() == 0:
        callbacks.append(tf.keras.callbacks.ModelCheckpoint(ckpt_full_path, save_weights_only=True, verbose=1, save_freq=50))

    # Horovod: pin GPU to be used to process local rank (one GPU per process)
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

    model = build_and_compile_cnn_model(opt)

    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_datasets = (
        tf.data.Dataset.from_tensor_slices((x_train, y_train))
        .shuffle(60000)
        .repeat()
        .batch(global_batch_size)
    )

    # By default the sharding is done using files, however mnist dataset
    # only has 1 file so auto sharding fails for multiworker strategy.
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
    train_datasets_no_auto_shard = train_datasets.with_options(options)

    model.fit(
        x=train_datasets_no_auto_shard,
        epochs=config["epochs"],
        steps_per_epoch=steps_per_epoch,
        callbacks=callbacks,
        verbose=1
    )

def tune_tensorflow_mnist(config):
    num_workers = 1
    num_cpus_per_worker = 4
    num_gpus_per_worker = 1 if config["use_gpu"] else 0
    resources_per_worker = {"CPU": num_cpus_per_worker, "GPU": num_gpus_per_worker}
    hp = {"lr": tune.loguniform(1e-4, 1e-1)}

    trainable = HorovodTrainer(train_loop_per_worker=train_func,
                               train_loop_config=config,
                               scaling_config=ScalingConfig(trainer_resources={"CPU": 0}, num_workers=num_workers, use_gpu=config["use_gpu"], resources_per_worker=resources_per_worker))
    hp = {"train_loop_config": hp}

    tuner = tune.Tuner(
        trainable,
        run_config=air.RunConfig(
            failure_config=air.FailureConfig(max_failures=3)
        ),
        tune_config=TuneConfig(mode="max", metric="accuracy", num_samples=config["num_samples"], max_concurrent_trials=3),
        param_space=hp,
    )

    results = tuner.fit()
    print("Best hyperparameters found were: ", results.get_best_result().config)

def main(kwargs):
    # Needs to remove the usage of FLAGS in train_func otherwise there will be TypeError: ray.cloudpickle.dumps errors
    config = {
        "epochs": kwargs.get("epochs"),
        "data_dir": kwargs.get("data_dir"),
        "download_data": kwargs.get("download_data"),
        "working_dir": kwargs.get("working_dir"),
        "steps": kwargs.get("steps"),
        "batch_size": kwargs.get("batch_size"),
        "profile_batch": kwargs.get("profile_batch"),
        "num_samples": kwargs.get("num_samples"),
        "use_gpu": kwargs.get("use_gpu"),
    }

    if kwargs.get("is_local_run"):
        # when running locally, use ray.init()
        ray.init()
    else:
        ray.init("ray://127.0.0.1:10001")

    tune_tensorflow_mnist(config)
    ray.shutdown()

if __name__ == "__main__":
    # Training settings
    parser = argparse.ArgumentParser(
        description="Tensorflow MNIST Example",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    # To Run this locally, do $ python cloudflow-dsl-example/src/linkedin/tensorflow/mnist/mnist_horovod_raytune.py --epochs 1 --steps 200 --data_dir=~/
#  --working_dir=~/keras_raytune_test --download_data=True --use_gpu=False --profile_batch=0,0 --is_local_run=True
    parser.add_argument(
        "--batch_size",
        type=int,
        default=64,
        metavar="N",
        help="input batch size for training (default: 64)",
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=5,
        metavar="N",
        help="number of epochs to train (default: 10)",
    )

    parser.add_argument(
        "--download_data", action="store_true", default=False, help="download data"
    )

    parser.add_argument(
        "--use_gpu", action="store_true", default=False, help="enables CUDA training"
    )

    parser.add_argument(
        "--is_local_run", action="store_true", default=False, help="is local run"
    )

    parser.add_argument(
        "--profile_batch",
        default="0,0",
        help="profile batch",
    )

    parser.add_argument(
        "--data_dir",
        help="location of the training dataset in the local filesystem ("
        "will be downloaded if needed)",
    )

    args = parser.parse_args()

    kwargs = {
        "data_dir": args.data_dir,
        "batch_size": args.batch_size,
        "epochs": args.epochs,
        "download_data": args.download_data,
        "is_local_run": args.is_local_run,
        "profile_batch": args.profile_batch,
        "use_gpu": args.use_gpu,
        "num_samples": 3,
        "working_dir": "~/keras_raytune_test",
        "steps": 200,
    }

    main(kwargs=kwargs)

Issue Severity

High: It blocks me from completing my task.

justinvyu commented 1 year ago

I ran this script with the latest ray to see if that fixed things - seems like things are working for me.

The following setup (upgraded ray to nightly, as well as newer tf and horovod versions) worked:

ray==nightly
tensorflow==2.9.0
horovod==0.26.1

Python 3.7.13
cuda 11

Single node with 16 CPUs, 1 GPU, 64gb memory.

I ran the script with a only a few modifications:

python horovod_debug.py --is_local_run --download_data --epochs=10 --use_gpu

Now I'm trying to replicate your env setup more exactly to make sure that I am able to reproduce the original error.

@ckyuto Could you try to see if upgrading to the above versions work for you as well?

woshiyyya commented 1 year ago

Hi @ckyuto , As we discussed in slack channel, we reran your code with the same environment setting you mentioned but still cannot reproduce the error on my side.

python==2.7
tensorflow==2.4.0
horovod==0.23.0
ray==2.1
cuda version: 11.0
GPU version: V100

So I think this is not a bug of Ray. I searched through the error message "Blas GEMM launch failed", and find that most answers can be categorized into the following 3 reasons:

You can try out these solutions and hope they can help!

I'll close this issue for now.