ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.04k stars 5.59k forks source link

[client][runtime_env] Inconsistent runs on ray client #30518

Open darwinharianto opened 1 year ago

darwinharianto commented 1 year ago

What happened + What you expected to happen

I have created a cluster with docker, and wanted to call script from client. Following the docs, I called it using python tensorflow_mnist_example.py --address=ray://172.21.36.145:10001 --num-workers=1 --use-gpu The first one throws error logs

2022-11-21 10:36:18.714017: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Traceback (most recent call last):
  File "tensorflow_mnist_example.py", line 122, in <module>
    ray.init(address=args.address, runtime_env=runtime_env)
  File "/Users/darwinharianto/anaconda3/envs/py-3.7.13/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/Users/darwinharianto/anaconda3/envs/py-3.7.13/lib/python3.7/site-packages/ray/_private/worker.py", line 1255, in init
    ctx = builder.connect()
  File "/Users/darwinharianto/anaconda3/envs/py-3.7.13/lib/python3.7/site-packages/ray/client_builder.py", line 187, in connect
    metadata=self._metadata,
  File "/Users/darwinharianto/anaconda3/envs/py-3.7.13/lib/python3.7/site-packages/ray/util/client_connect.py", line 66, in connect
    ray_init_kwargs=ray_init_kwargs,
  File "/Users/darwinharianto/anaconda3/envs/py-3.7.13/lib/python3.7/site-packages/ray/util/client/__init__.py", line 252, in connect
    conn = self.get_context().connect(*args, **kw_args)
  File "/Users/darwinharianto/anaconda3/envs/py-3.7.13/lib/python3.7/site-packages/ray/util/client/__init__.py", line 102, in connect
    self.client_worker._server_init(job_config, ray_init_kwargs)
  File "/Users/darwinharianto/anaconda3/envs/py-3.7.13/lib/python3.7/site-packages/ray/util/client/worker.py", line 850, in _server_init
    f"Initialization failure from server:\n{response.msg}"
ConnectionAbortedError: Initialization failure from server:
Traceback (most recent call last):
  File "/home/darwinharianto/anaconda3/envs/ray_cluster/lib/python3.9/site-packages/ray/util/client/server/proxier.py", line 678, in Datapath
    if not self.proxy_manager.start_specific_server(
  File "/home/darwinharianto/anaconda3/envs/ray_cluster/lib/python3.9/site-packages/ray/util/client/server/proxier.py", line 304, in start_specific_server
    serialized_runtime_env_context = self._create_runtime_env(
  File "/home/darwinharianto/anaconda3/envs/ray_cluster/lib/python3.9/site-packages/ray/util/client/server/proxier.py", line 281, in _create_runtime_env
    raise TimeoutError(
TimeoutError: GetOrCreateRuntimeEnv request failed after 5 attempts.

Then I tried to see what causing the error by checking the logs on docker

cat /tmp/ray/session_latest/logs/ray_client_server.err 
2022-11-20 16:17:24,364 INFO server.py:884 -- Starting Ray Client server on 0.0.0.0:10001
2022-11-20 16:18:15,007 INFO proxier.py:670 -- New data connection from client defabd9092b0495e88f301e6703b5c3e: 
2022-11-20 16:18:16,043 INFO proxier.py:341 -- SpecificServer started on port: 23000 with PID: 750 for client: defabd9092b0495e88f301e6703b5c3e
2022-11-20 16:18:19,526 INFO proxier.py:743 -- defabd9092b0495e88f301e6703b5c3e last started stream at 1668989895.0059516. Current stream started at 1668989895.0059516.
2022-11-20 16:18:54,485 INFO proxier.py:391 -- Specific server defabd9092b0495e88f301e6703b5c3e is no longer running, freeing its port 23000
2022-11-20 16:19:52,832 INFO proxier.py:670 -- New data connection from client b7bfedc62cf6487e95f4d2efd135aa76: 
2022-11-20 16:19:53,860 INFO proxier.py:341 -- SpecificServer started on port: 23001 with PID: 978 for client: b7bfedc62cf6487e95f4d2efd135aa76
2022-11-20 16:19:57,305 INFO proxier.py:743 -- b7bfedc62cf6487e95f4d2efd135aa76 last started stream at 1668989992.8317022. Current stream started at 1668989992.8317022.
2022-11-20 16:20:06,652 INFO proxier.py:670 -- New data connection from client e61806d9194840bbb8cb706b45e35d5f: 
2022-11-20 16:20:07,676 INFO proxier.py:341 -- SpecificServer started on port: 23002 with PID: 1126 for client: e61806d9194840bbb8cb706b45e35d5f
2022-11-20 16:20:11,121 INFO proxier.py:743 -- e61806d9194840bbb8cb706b45e35d5f last started stream at 1668990006.651533. Current stream started at 1668990006.651533.
2022-11-20 16:20:54,602 INFO proxier.py:391 -- Specific server b7bfedc62cf6487e95f4d2efd135aa76 is no longer running, freeing its port 23001
2022-11-20 16:20:54,602 INFO proxier.py:391 -- Specific server e61806d9194840bbb8cb706b45e35d5f is no longer running, freeing its port 23002
2022-11-20 16:31:17,900 INFO proxier.py:670 -- New data connection from client 4e3534e4fec049f8ba34731f169f6757: 
2022-11-20 16:31:18,924 INFO proxier.py:341 -- SpecificServer started on port: 23003 with PID: 1309 for client: 4e3534e4fec049f8ba34731f169f6757
2022-11-20 16:31:22,371 INFO proxier.py:743 -- 4e3534e4fec049f8ba34731f169f6757 last started stream at 1668990677.8991323. Current stream started at 1668990677.8991323.
2022-11-20 16:31:28,821 INFO proxier.py:670 -- New data connection from client 5c599b20aab24d9aad6d3dd181496f9e: 
2022-11-20 16:31:29,842 INFO proxier.py:341 -- SpecificServer started on port: 23004 with PID: 1444 for client: 5c599b20aab24d9aad6d3dd181496f9e
2022-11-20 16:31:33,287 INFO proxier.py:743 -- 5c599b20aab24d9aad6d3dd181496f9e last started stream at 1668990688.8201573. Current stream started at 1668990688.8201573.
2022-11-20 16:31:55,082 INFO proxier.py:391 -- Specific server 4e3534e4fec049f8ba34731f169f6757 is no longer running, freeing its port 23003
2022-11-20 16:32:25,113 INFO proxier.py:391 -- Specific server 5c599b20aab24d9aad6d3dd181496f9e is no longer running, freeing its port 23004
2022-11-20 16:41:24,318 INFO proxier.py:670 -- New data connection from client 323a2bef0604477393597feb4fabaeae: 
2022-11-20 16:41:24,356 INFO proxier.py:230 -- Increasing runtime env reference for ray_client_server_23005.Serialized runtime env is {"env_vars": {"NCCL_SOCKET_IFNAME": "en,eth,wl,bond"}}.
2022-11-20 16:41:25,379 INFO proxier.py:341 -- SpecificServer started on port: 23005 with PID: 1598 for client: 323a2bef0604477393597feb4fabaeae
2022-11-20 16:42:07,965 INFO proxier.py:743 -- 323a2bef0604477393597feb4fabaeae last started stream at 1668991284.3170214. Current stream started at 1668991284.3170214.
2022-11-20 16:42:36,081 INFO proxier.py:670 -- New data connection from client fa24f8f93b0f4ad6bf8824bfcebd695f: 
2022-11-20 16:42:36,088 INFO proxier.py:230 -- Increasing runtime env reference for ray_client_server_23006.Serialized runtime env is {"env_vars": {"NCCL_SOCKET_IFNAME": "en,eth,wl,bond"}}.
2022-11-20 16:42:37,109 INFO proxier.py:341 -- SpecificServer started on port: 23006 with PID: 1784 for client: fa24f8f93b0f4ad6bf8824bfcebd695f
2022-11-20 16:42:45,358 INFO proxier.py:743 -- fa24f8f93b0f4ad6bf8824bfcebd695f last started stream at 1668991356.0806959. Current stream started at 1668991356.0806959.
2022-11-20 16:42:55,622 INFO proxier.py:391 -- Specific server 323a2bef0604477393597feb4fabaeae is no longer running, freeing its port 23005
2022-11-20 16:43:25,641 INFO proxier.py:391 -- Specific server fa24f8f93b0f4ad6bf8824bfcebd695f is no longer running, freeing its port 23006

Not sure what is happening, I tried to run the script once again python tensorflow_mnist_example.py --address=ray://172.21.36.145:10001 --num-workers=1 --use-gpu

but now it run without any problem

Why is this happening?

Versions / Dependencies

ray 2.1.0

Reproduction script

# This example showcases how to use Tensorflow with Ray Train.
# Original code:
# https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
import argparse
import json
import os

import numpy as np
import tensorflow as tf
from ray.air.callbacks.keras import Callback as TrainReportCallback

from ray.train.tensorflow import TensorflowTrainer
from ray.air.config import ScalingConfig

def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = (
        tf.data.Dataset.from_tensor_slices((x_train, y_train))
        .shuffle(60000)
        .repeat()
        .batch(batch_size)
    )
    return train_dataset

def build_and_compile_cnn_model(config):
    learning_rate = config.get("lr", 0.001)
    model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation="relu"),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(10),
        ]
    )
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
        metrics=["accuracy"],
    )
    return model

def train_func(config):
    per_worker_batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 3)
    steps_per_epoch = config.get("steps_per_epoch", 70)

    tf_config = json.loads(os.environ["TF_CONFIG"])
    num_workers = len(tf_config["cluster"]["worker"])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_worker_batch_size * num_workers
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_and_compile_cnn_model(config)

    history = multi_worker_model.fit(
        multi_worker_dataset,
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        callbacks=[TrainReportCallback()],
    )
    results = history.history
    return results

def train_tensorflow_mnist(num_workers=2, use_gpu=False, epochs=4):
    trainer = TensorflowTrainer(
        train_func,
        train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": epochs},
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
    )
    results = trainer.fit()
    print(f"Results: {results.metrics}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--address", required=False, type=str, help="the address to use for Ray"
    )
    parser.add_argument(
        "--num-workers",
        "-n",
        type=int,
        default=2,
        help="Sets number of workers for training.",
    )
    parser.add_argument(
        "--use-gpu", action="store_true", default=False, help="Enables GPU training"
    )
    parser.add_argument(
        "--epochs", type=int, default=3, help="Number of epochs to train for."
    )
    parser.add_argument(
        "--smoke-test",
        action="store_true",
        default=False,
        help="Finish quickly for testing.",
    )

    args, _ = parser.parse_known_args()

    import ray
    # Add this at the top of your Ray application.
    runtime_env = {"env_vars": {"NCCL_SOCKET_IFNAME": "en,eth,wl,bond"}}
    if args.smoke_test:
        ray.init(num_cpus=4, runtime_env=runtime_env)
        train_tensorflow_mnist()
    else:
        ray.init(address=args.address, runtime_env=runtime_env)
        train_tensorflow_mnist(
            num_workers=args.num_workers, use_gpu=args.use_gpu, epochs=args.epochs
        )

Issue Severity

Low: It annoys or frustrates me.

wuisawesome commented 1 year ago

@architkulkarni @ckw017 can y'all take a look?