ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.28k stars 5.63k forks source link

tensorflow.python.framework.errors_impl.NotFoundError: ./multi_worker_model/variables/variables_temp/part-00000-of-00001.index; No such file or directory [Op:MergeV2Checkpoints] #27938

Open Alxe1 opened 2 years ago

Alxe1 commented 2 years ago

What happened + What you expected to happen

Saving tensorflow multi-worker model as 'tf' format get error:

(BaseWorkerMixin pid=14886) 2022-08-17 11:29:04.701227: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
(BaseWorkerMixin pid=14885) 2022-08-17 11:29:04.695715: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
(BaseWorkerMixin pid=14885) 2022-08-17 11:29:05.160647: W tensorflow/core/framework/op_kernel.cc:1767] OP_REQUIRES failed at save_restore_v2_ops.cc:241 : Not found: ./multi_worker_model/variables/variables_temp/part-00000-of-00001.index; No such file or directory
Traceback (most recent call last):
  File "train_test.py", line 102, in <module>
    train_tf_mnist()
  File "train_test.py", line 83, in train_tf_mnist
    config={"lr": 1e-3, "batch_size": 64, "epochs": epochs}
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/trainer.py", line 356, in run
    for intermediate_result in iterator:
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/trainer.py", line 753, in __next__
    self._finish_training
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/trainer.py", line 713, in _run_with_error_handling
    return func()
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/trainer.py", line 824, in _finish_training
    return self._backend_executor.finish_training()
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/utils.py", line 168, in <lambda>
    return lambda *args, **kwargs: ray.get(actor_method.remote(*args, **kwargs))
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/worker.py", line 1831, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(NotFoundError): ray::BackendExecutor.finish_training() (pid=14772, ip=10.0.5.145, repr=<ray.train.backend.BackendExecutor object at 0x7f9b81626190>)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/backend.py", line 498, in finish_training
    results = self.get_with_failure_handling(futures)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/backend.py", line 517, in get_with_failure_handling
    success = check_for_failure(remote_values)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/utils.py", line 50, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayTaskError(NotFoundError): ray::BaseWorkerMixin._BaseWorkerMixin__execute() (pid=14885, ip=10.0.5.145, repr=<ray.train.worker_group.BaseWorkerMixin object at 0x7fe73e797d90>)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/worker_group.py", line 26, in __execute
    return func(*args, **kwargs)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/backend.py", line 489, in end_training
    output = session.finish()
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/session.py", line 118, in finish
    func_output = self.training_thread.join()
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/utils.py", line 96, in join
    raise self.exc
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/utils.py", line 89, in run
    self.ret = self._target(*self._args, **self._kwargs)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/train/utils.py", line 138, in <lambda>
    return lambda: train_func(config)
  File "train_test.py", line 73, in train_func
    multi_worker_model.save("./multi_worker_model", save_format="tf")
  File "/mnt/softwares/hvd_env/lib64/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 2112, in save
    signatures, options, save_traces)
  File "/mnt/softwares/hvd_env/lib64/python3.7/site-packages/tensorflow/python/keras/saving/save.py", line 151, in save_model
    signatures, options, save_traces)
  File "/mnt/softwares/hvd_env/lib64/python3.7/site-packages/tensorflow/python/keras/saving/saved_model/save.py", line 90, in save
    model, filepath, signatures, options)
  File "/mnt/softwares/hvd_env/lib64/python3.7/site-packages/tensorflow/python/saved_model/save.py", line 1114, in save_and_return_nodes
    utils_impl.get_variables_path(export_dir), options=ckpt_options)
  File "/mnt/softwares/hvd_env/lib64/python3.7/site-packages/tensorflow/python/training/tracking/util.py", line 1219, in save
    file_prefix_tensor, object_graph_tensor, options)
  File "/mnt/softwares/hvd_env/lib64/python3.7/site-packages/tensorflow/python/training/tracking/util.py", line 1164, in _save_cached_when_graph_building
    save_op = saver.save(file_prefix, options=options)
  File "/mnt/softwares/hvd_env/lib64/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 300, in save
    return save_fn()
  File "/mnt/softwares/hvd_env/lib64/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 287, in save_fn
    sharded_prefixes, file_prefix, delete_old_dirs=True)
  File "/mnt/softwares/hvd_env/lib64/python3.7/site-packages/tensorflow/python/ops/gen_io_ops.py", line 504, in merge_v2_checkpoints
    delete_old_dirs=delete_old_dirs, name=name, ctx=_ctx)
  File "/mnt/softwares/hvd_env/lib64/python3.7/site-packages/tensorflow/python/ops/gen_io_ops.py", line 528, in merge_v2_checkpoints_eager_fallback
    attrs=_attrs, ctx=ctx, name=name)
  File "/mnt/softwares/hvd_env/lib64/python3.7/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
    inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.NotFoundError: ./multi_worker_model/variables/variables_temp/part-00000-of-00001.index; No such file or directory [Op:MergeV2Checkpoints]

Versions / Dependencies

python 3.7.10
ray                     1.13.0
tensorflow              2.5.0

Reproduction script

import argparse
import json
import os

import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import Callback

import ray.train as train
from ray.train import Trainer

class TrainReportCallback(Callback):
    def on_epoch_end(self, epoch, logs=None):
        train.report(**logs)

def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
    return train_dataset

def build_and_compile_cnn_model(config):
    learning_rate = config.get("lr", 0.001)
    model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation="relu"),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(10)
        ]
    )
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
        metrics=["accuracy"]
    )
    return model

def train_func(config):
    per_work_batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 3)
    steps_per_epoch = config.get("steps_per_epoch", 70)

    tf_config = json.loads(os.environ["TF_CONFIG"])
    num_works = len(tf_config["cluster"]["worker"])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_work_batch_size * num_works
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        multi_worker_model = build_and_compile_cnn_model(config)

    history = multi_worker_model.fit(
        multi_worker_dataset,
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        callbacks=[TrainReportCallback()]
    )
    multi_worker_model.save("./multi_worker_model", save_format="tf")  # tf format occured error
    result = history.history
    return result

def train_tf_mnist(num_workers=2, use_gpu=False, epochs=4):
    trainer = Trainer(backend="tensorflow", num_workers=num_workers, use_gpu=use_gpu)
    trainer.start()
    results = trainer.run(
        train_func=train_func,
        config={"lr": 1e-3, "batch_size": 64, "epochs": epochs}
    )
    trainer.shutdown()
    print(f"Results: {results[0]}")

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("--address", required=False, type=str, help="The address to use for ray")
    parser.add_argument("--num_workers", "-n", type=int, default=4, help="Sets number of workers for training")
    parser.add_argument("--use_gpu", action="store_true", default=False, help="Enable GPU training")
    parser.add_argument("--epochs", type=int, default=3, help="Number of epochs to train for")
    parser.add_argument("--smoke-test", action="store_true", default=False, help="Finish quickly for testing")

    args, _ = parser.parse_known_args()

    import ray
    if args.smoke_test:
        ray.init(num_cpus=2)
        train_tf_mnist()
    else:
        ray.init(address=args.address)
        train_tf_mnist(
            num_workers=args.num_workers,
            use_gpu=args.use_gpu,
            epochs=args.epochs
        )

Issue Severity

No response

xwjiang2010 commented 2 years ago

Hi @Alxe1 Thanks for filing the issue! I tried your script - this is indeed a bug. We have multiple workers on the same node in this case, and they are all accessing the same directory (the directory under which you run the script) by the time model.save() is called, thus causing contention. It doesn't matter whether it is h5 or tf. The way to fix this is to have separate directories for each worker. This is exactly what is being done for AIR trainer. Take a look here. I also want to mention that you need to use Session API to have your saved checkpoint synced to driver or cloud in a multi-node set up. Otherwise, the saved checkpoint will only show up in whichever node that workers are running on. It may or may not be the head node.

The following is how I have modified your original script to use the new AIR API. PTAL. This needs to run with our ray 2.0.0 wheels (rc wheels are fine).

import argparse
import json
import os

import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import Callback

import ray.train as train
from ray.train import Trainer

from ray.train.tensorflow import TensorflowTrainer
from ray.air import session
from ray.air.config import ScalingConfig
from ray.air.checkpoint import Checkpoint

class TrainReportCallback(Callback):
    def on_epoch_end(self, epoch, logs=None):
        train.report(**logs)

def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
    return train_dataset

def build_and_compile_cnn_model(config):
    learning_rate = config.get("lr", 0.001)
    model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation="relu"),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(10)
        ]
    )
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
        metrics=["accuracy"]
    )
    return model

def train_func(config):
    per_work_batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 1)
    steps_per_epoch = config.get("steps_per_epoch", 70)

    tf_config = json.loads(os.environ["TF_CONFIG"])
    num_works = len(tf_config["cluster"]["worker"])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_work_batch_size * num_works
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        multi_worker_model = build_and_compile_cnn_model(config)

    history = multi_worker_model.fit(
        multi_worker_dataset,
        epochs=1,
        steps_per_epoch=steps_per_epoch,
        callbacks=[TrainReportCallback()]
    )
    multi_worker_model.save("./multi_worker_model", save_format="tf")
    ckpt = Checkpoint.from_directory("./multi_worker_model")
    result = history.history
    session.report(result, checkpoint=ckpt)
    # feel free to delete "./multi_worker_model" at this point.
    return result

def train_tf_mnist(num_workers=2, use_gpu=False, epochs=1):
    # trainer = Trainer(backend="tensorflow", num_workers=num_workers, use_gpu=use_gpu)
    # trainer.start()
    # results = trainer.run(
    #     train_func=train_func,
    #     config={"lr": 1e-3, "batch_size": 64, "epochs": epochs}
    # )
    # trainer.shutdown()
    # print(f"Results: {results[0]}")
    trainer = TensorflowTrainer(train_loop_per_worker=train_func, train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": epochs},
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu))
    trainer.fit()

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("--address", required=False, type=str, help="The address to use for ray")
    parser.add_argument("--num_workers", "-n", type=int, default=4, help="Sets number of workers for training")
    parser.add_argument("--use_gpu", action="store_true", default=False, help="Enable GPU training")
    parser.add_argument("--epochs", type=int, default=3, help="Number of epochs to train for")
    parser.add_argument("--smoke-test", action="store_true", default=False, help="Finish quickly for testing")

    args, _ = parser.parse_known_args()

    import ray
    if args.smoke_test:
        ray.init(num_cpus=2)
        train_tf_mnist()
    else:
        ray.init(address=args.address)
        train_tf_mnist(
            num_workers=args.num_workers,
            use_gpu=args.use_gpu,
            epochs=args.epochs
        )

cc @amogkam for visibility

Alxe1 commented 2 years ago

Hi @Alxe1 Thanks for filing the issue! I tried your script - this is indeed a bug. We have multiple workers on the same node in this case, and they are all accessing the same directory (the directory under which you run the script) by the time model.save() is called, thus causing contention. It doesn't matter whether it is h5 or tf. The way to fix this is to have separate directories for each worker. This is exactly what is being done for AIR trainer. Take a look here. I also want to mention that you need to use Session API to have your saved checkpoint synced to driver or cloud in a multi-node set up. Otherwise, the saved checkpoint will only show up in whichever node that workers are running on. It may or may not be the head node.

The following is how I have modified your original script to use the new AIR API. PTAL. This needs to run with our ray 2.0.0 wheels (rc wheels are fine).

import argparse
import json
import os

import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import Callback

import ray.train as train
from ray.train import Trainer

from ray.train.tensorflow import TensorflowTrainer
from ray.air import session
from ray.air.config import ScalingConfig
from ray.air.checkpoint import Checkpoint

class TrainReportCallback(Callback):
    def on_epoch_end(self, epoch, logs=None):
        train.report(**logs)

def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
    return train_dataset

def build_and_compile_cnn_model(config):
    learning_rate = config.get("lr", 0.001)
    model = tf.keras.Sequential(
        [
            tf.keras.Input(shape=(28, 28)),
            tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(32, 3, activation="relu"),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(10)
        ]
    )
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
        metrics=["accuracy"]
    )
    return model

def train_func(config):
    per_work_batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 1)
    steps_per_epoch = config.get("steps_per_epoch", 70)

    tf_config = json.loads(os.environ["TF_CONFIG"])
    num_works = len(tf_config["cluster"]["worker"])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_work_batch_size * num_works
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        multi_worker_model = build_and_compile_cnn_model(config)

    history = multi_worker_model.fit(
        multi_worker_dataset,
        epochs=1,
        steps_per_epoch=steps_per_epoch,
        callbacks=[TrainReportCallback()]
    )
    multi_worker_model.save("./multi_worker_model", save_format="tf")
    ckpt = Checkpoint.from_directory("./multi_worker_model")
    result = history.history
    session.report(result, checkpoint=ckpt)
    # feel free to delete "./multi_worker_model" at this point.
    return result

def train_tf_mnist(num_workers=2, use_gpu=False, epochs=1):
    # trainer = Trainer(backend="tensorflow", num_workers=num_workers, use_gpu=use_gpu)
    # trainer.start()
    # results = trainer.run(
    #     train_func=train_func,
    #     config={"lr": 1e-3, "batch_size": 64, "epochs": epochs}
    # )
    # trainer.shutdown()
    # print(f"Results: {results[0]}")
    trainer = TensorflowTrainer(train_loop_per_worker=train_func, train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": epochs},
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu))
    trainer.fit()

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("--address", required=False, type=str, help="The address to use for ray")
    parser.add_argument("--num_workers", "-n", type=int, default=4, help="Sets number of workers for training")
    parser.add_argument("--use_gpu", action="store_true", default=False, help="Enable GPU training")
    parser.add_argument("--epochs", type=int, default=3, help="Number of epochs to train for")
    parser.add_argument("--smoke-test", action="store_true", default=False, help="Finish quickly for testing")

    args, _ = parser.parse_known_args()

    import ray
    if args.smoke_test:
        ray.init(num_cpus=2)
        train_tf_mnist()
    else:
        ray.init(address=args.address)
        train_tf_mnist(
            num_workers=args.num_workers,
            use_gpu=args.use_gpu,
            epochs=args.epochs
        )

cc @amogkam for visibility

Thank you, I will try it.