horovod / horovod

Distributed training framework for TensorFlow, Keras, PyTorch, and Apache MXNet.
http://horovod.ai
Other
14.25k stars 2.24k forks source link

Horovod doesn't seem to work with tf.estimator.train_and_evaluate API #182

Open smurching opened 6 years ago

smurching commented 6 years ago

Hi all,

I've been trying to run a modified version of the mnist estimator example (link to gist) using the tf.estimator.train_and_evaluate to intersperse training & evaluation. I'm hitting the following error when training resumes after the first run of evaluation:

INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Finished evaluation at 2018-02-28-00:51:56
INFO:tensorflow:Saving dict for global step 807: accuracy = 0.9359, global_step = 807, loss = 0.210769
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Restoring parameters from ./mnist_convnet_model/model.ckpt-807
Traceback (most recent call last):
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 277, in __init__
    fetch, allow_tensor=True, allow_operation=True))
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 3320, in as_graph_element
    return self._as_graph_element_locked(obj, allow_tensor, allow_operation)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/framework/ops.py", line 3407, in _as_graph_element_locked
    raise ValueError("Operation %s is not an element of this graph." % obj)
ValueError: Operation name: "group_deps"
op: "NoOp"
input: "^Assign"
input: "^Assign_1"
input: "^Assign_2"
input: "^Assign_3"
input: "^Assign_4"
input: "^Assign_5"
input: "^Assign_6"
input: "^Assign_7"
input: "^Assign_8"
input: "^Assign_9"
input: "^Assign_10"
input: "^Assign_11"
input: "^Assign_12"
input: "^Assign_13"
input: "^Assign_14"
input: "^Assign_15"
input: "^Assign_16"
 is not an element of this graph.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "examples/tensorflow_mnist_estimator.py", line 192, in <module>
    tf.app.run()
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/platform/app.py", line 124, in run
    _sys.exit(main(argv))
  File "examples/tensorflow_mnist_estimator.py", line 186, in main
    tf.estimator.train_and_evaluate(mnist_classifier, tf_train_spec, tf_eval_spec)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/estimator/training.py", line 432, in train_and_evaluate
    executor.run_local()
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/estimator/training.py", line 611, in run_local
    hooks=train_hooks)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/estimator/estimator.py", line 314, in train
    loss = self._train_model(input_fn, hooks, saving_listeners)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/estimator/estimator.py", line 812, in _train_model
    log_step_count_steps=self._config.log_step_count_steps) as mon_sess:
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py", line 380, in MonitoredTrainingSession
    stop_grace_period_secs=stop_grace_period_secs)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py", line 787, in __init__
    stop_grace_period_secs=stop_grace_period_secs)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py", line 511, in __init__
    self._sess = _RecoverableSession(self._coordinated_creator)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py", line 972, in __init__
    _WrappedSession.__init__(self, self._create_session())
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py", line 977, in _create_session
    return self._sess_creator.create_session()
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py", line 674, in create_session
    hook.after_create_session(self.tf_sess, self.coord)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/horovod/tensorflow/__init__.py", line 125, in after_create_session
    session.run(self.bcast_op)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 898, in run
    run_metadata_ptr)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1116, in _run
    self._graph, fetches, feed_dict_tensor, feed_handles=feed_handles)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 420, in __init__
    self._fetch_mapper = _FetchMapper.for_fetch(fetches)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 248, in for_fetch
    return _ElementFetchMapper(fetches, contraction_fn)
  File "/Users/sid/miniconda3/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 284, in __init__
    'Tensor. (%s)' % (fetch, str(e)))
ValueError: Fetch argument <tf.Operation 'group_deps' type=NoOp> cannot be interpreted as a Tensor. (Operation name: "group_deps"
op: "NoOp"
input: "^Assign"
input: "^Assign_1"
input: "^Assign_2"
input: "^Assign_3"
input: "^Assign_4"
input: "^Assign_5"
input: "^Assign_6"
input: "^Assign_7"
input: "^Assign_8"
input: "^Assign_9"
input: "^Assign_10"
input: "^Assign_11"
input: "^Assign_12"
input: "^Assign_13"
input: "^Assign_14"
input: "^Assign_15"
input: "^Assign_16"
 is not an element of this graph.)

I'm using the following environment:

smurching commented 6 years ago

I have a potential fix for this in #183 but I'm open to any suggestions! Thanks in advance :)

alsrgv commented 6 years ago

Hi @smurching, thanks for your PR. I tried it on 2 GPUs with your MNIST example, but it got stuck like this:

WARNING: One or more tensors were submitted to be reduced, gathered or broadcasted by subset of ranks and are waiting for remainder of ranks for more than 60 seconds. This may indicate that different ranks are trying to submit different tensors or that only subset of ranks is submitting tensors, which will cause deadlock. Stalled ops: HorovodBroadcast_dense_1_kernel_0 [ready ranks: 1], HorovodBroadcast_dense_bias_0 [ready ranks: 1], HorovodBroadcast_conv2d_1_bias_0 [ready ranks: 1], DistributedMomentumOptimizer_Allreduce/HorovodAllreduce_gradients_dense_BiasAdd_grad_tuple_control_dependency_1_0 [ready ranks: 0], DistributedMomentumOptimizer_Allreduce/HorovodAllreduce_gradients_dense_MatMul_grad_tuple_control_dependency_1_0 [ready ranks: 0], HorovodBroadcast_conv2d_bias_Momentum_0 [ready ranks: 1], HorovodBroadcast_dense_kernel_0 [ready ranks: 1], DistributedMomentumOptimizer_Allreduce/HorovodAllreduce_gradients_dense_2_MatMul_grad_tuple_control_dependency_1_0 [ready ranks: 0], DistributedMomentumOptimizer_Allreduce/HorovodAllreduce_gradients_conv2d_2_BiasAdd_grad_tuple_control_dependency_1_0 [ready ranks: 0], HorovodBroadcast_dense_1_bias_0 [ready ranks: 1], DistributedMomentumOptimizer_Allreduce/HorovodAllreduce_gradients_conv2d_2_Conv2D_grad_tuple_control_dependency_1_0 [ready ranks: 0], HorovodBroadcast_dense_1_bias_Momentum_0 [ready ranks: 1], DistributedMomentumOptimizer_Allreduce/HorovodAllreduce_gradients_conv2d_BiasAdd_grad_tuple_control_dependency_1_0 [ready ranks: 0], HorovodBroadcast_conv2d_1_bias_Momentum_0 [ready ranks: 1], DistributedMomentumOptimizer_Allreduce/HorovodAllreduce_gradients_conv2d_Conv2D_grad_tuple_control_dependency_1_0 [ready ranks: 0], DistributedMomentumOptimizer_Allreduce/HorovodAllreduce_gradients_dense_2_BiasAdd_grad_tuple_control_dependency_1_0 [ready ranks: 0], HorovodBroadcast_dense_1_kernel_Momentum_0 [ready ranks: 1], HorovodBroadcast_dense_bias_Momentum_0 [ready ranks: 1], HorovodBroadcast_global_step_0 [ready ranks: 1], HorovodBroadcast_conv2d_1_kernel_Momentum_0 [ready ranks: 1], HorovodBroadcast_dense_kernel_Momentum_0 [ready ranks: 1], HorovodBroadcast_conv2d_1_kernel_0 [ready ranks: 1], HorovodBroadcast_conv2d_bias_0 [ready ranks: 1], HorovodBroadcast_conv2d_kernel_0 [ready ranks: 1], HorovodBroadcast_conv2d_kernel_Momentum_0 [ready ranks: 1]

The issue seems to be caused by desync between workers. Since "time" is used as a criterion, what happened is one worker decided it's time for evaluation, while another worker decided to train one more batch. As a result, both workers got stuck.

Do you have any suggestions how this can be solved?

maorzalt commented 6 years ago

In my opinion, this is more of a bug than an enhancement.

Try calling estimator.train(...) twice and you'll get the same error message.

To reproduce, use the example code of: https://github.com/uber/horovod/blob/master/examples/tensorflow_mnist_estimator.py

and add a second identical call to mnist_classifier.train():

(https://github.com/uber/horovod/blob/73a5f72f8d1bbf4e57e4dc31be346b066a0cdc19/examples/tensorflow_mnist_estimator.py#L175)

mnist_classifier.train(
        input_fn=train_input_fn,
        steps=20000 // hvd.size(),
        hooks=[logging_hook, bcast_hook])

You will get the same issue. Running the same example without the hvd.DistributedOptimizer(...) and hvd.BroadcastGlobalVariablesHook(...) will work with no problems.

alsrgv commented 6 years ago

Good point. Indeed, having a loop that would repeatedly do training followed by evaluation makes perfect sense.

I have merged the PR. Thanks again, @smurching, for submitting it.

It does not fully solve the original issue in this thread though since the tf.estimator.train_and_evaluate relies on the time. One way to solve this would be to send Google a PR adding periodic evaluation based on a number of steps rather than a number of seconds.

maorzalt commented 6 years ago

@alsrgv @smurching

I am experiencing the same issue (deadlock). Should horovod block the whole training process if one of the workers is gone?

P.S. my current workaround for the issue is having N parallel workers that:

alsrgv commented 6 years ago

@maorzalt, you could do the following (similar to what we do with Keras):

  1. estimator.train(...)
  2. estimator.evaluate(...) for a subset of data (1/N if you can deterministically partition, or 3/N if you randomly sample - should give good enough results).
  3. hvd.allreduce() metrics from the evaluate (2).
  4. goto (1)

That's what Keras ImageNet example is doing.

maorzalt commented 6 years ago

@alsrgv Step 4 would be impossible due to calling estimator.train(...) more than once :(

alsrgv commented 6 years ago

@maorzalt, it should be now, new version 0.12.0 includes @smurching's fix for that :-)

smurching commented 6 years ago

Apologies for being MIA - thanks so much @alsrgv and @maorzalt for the super-quick responses & suggestions!

FWIW another thing I've tried (similar to what @maorzalt described) is running training on all N workers and an evaluate/sleep loop in a separate process on one of the workers, since I tend to run eval pretty infrequently & I want to fully utilize the workers for training. Perf-wise I'm not sure if one approach is better, and there may not be a big difference :)

maorzalt commented 6 years ago

@smurching - thank you for the feedback.
@alsrgv - thanks for the PR!

I will test it in the upcoming days and tell you if it worked on my setup as well. In the meanwhile, keep up the good work! The horovod approach to scaling the optimization process is amazing.

maorzalt commented 6 years ago

I tested the mnist example with 2 consecutive estimator.train(...) calls on a machine with 4 GPUs and it works like a charm. Also, the synchronization issues mentioned above were resolved.

Great work @alsrgv & @smurching !

maorzalt commented 6 years ago

@alsrgv I apologize to revive this... but I did experience the synchronization issue above when running it with my full model and pipeline with tf.estimator.train_and_evaluate(...). The training gets stuck after the first eval and sending this kind of errors:

... optimizer/DistributedMomentumOptimizer_Allreduce/HorovodAllreduce_optimizer_gradients_model_dense_1_BiasAdd_grad_tuple_control_dependency_1_0 [ready ranks: 2, 3, 1]

I am trying to reproduce this with a short example with no success so far. Any suggestions?

alsrgv commented 6 years ago

@maorzalt, as I mentioned in https://github.com/uber/horovod/issues/182#issuecomment-369823019, I don't think there's a way to fix tf.estimator.train_and_evaluate(...), but you can instead just do tf.estimator.train(...) followed up tf.estimator.evaluate(...) in a loop.

maorzalt commented 6 years ago

@alsrgv

Thank you. I changed it as you suggested and it worked. tf.estimator.train_and_evaluate(...) => estimator.train(...) + estimator.evaluate(...)

I start getting the larger Horovod picture. Workers are synched using the global step. This could potentially cause many additional issues.

Examples:

Are these examples correct by design? If so, how can we help to improve?

ps - the tensorflow patch solution should start here by changing _StopAtSecsHook to something new like _StopEveryNStepsHook

alsrgv commented 6 years ago

@maorzalt, yes, that's the nature of synchronous SGD. One caveat: if one of the worker crashes, other workers will fail rather than get stuck.

mdagost commented 6 years ago

@maorzalt Can you share the code that you ultimately got working? I've been trying something like

    for n in range(100):
        model_estimator.train(
                input_fn=lambda: get_inputs("train_*.tfrecords"),
                steps=100,
                hooks = [bcast_hook])

        eval_results = model_estimator.evaluate(
                    input_fn=lambda: get_inputs("validation.tfrecords"),
                    steps=None)

but it seems slow and oddly synchronized. And I haven't been able to make things work using N-1 GPU's for training and 1 for evaluation with something like:

for n in range(100):
    if hvd.rank() != 0:
        model_estimator.train(
                input_fn=lambda: get_inputs("train_*.tfrecords"),
                steps=100,
                hooks = [bcast_hook])
    else:
        eval_results = model_estimator.evaluate(
                    input_fn=lambda: get_inputs("validation.tfrecords"),
                    steps=None)
mdagost commented 6 years ago

And relatedly, that first code snippet results in 4 python dicts for the eval_results, one for each GPU. Is there a way to all-reduce into a single?

smurching commented 6 years ago

@mdagost I'm also curious about distributed evaluation (the original motivation behind this question) - as @alsrgv suggested you could try wrapping the tensors containing your evaluation metrics in an hvd.allreduce() before returning from your estimator's model_fn. I believe you'd still end up with separate Python dictionaries (one per training process / GPU) but they'd be identical / contain the average of your eval metrics across the processes. This seems most useful for cases where the average of the metric on n distinct batches equals the metric computed over the concatenation of the n batches (e.g. 0/1 loss) but could also be useful for e.g. RMSE where the above doesn't hold.

alsrgv commented 6 years ago

@smurching, @mdagost, the simple version is to use allreduce. However, for metrics that cannot be simply averaged across workers, I recommend using mpi4py to allgather intermediate evaluation results and compute the final metric value from the pieces.

shoubhik commented 5 years ago

Any final solutions to this issue?

shoubhik commented 5 years ago

@alsrgv

I have a query about #182 comment. As far as I know, calling tf.estimator.train(...) and tf.estimator.evaluate(...) in a loop will reset the dataset iterator for the training step everytime evaluate is called before the epoch is finished. This is problematic for larger datasets where the evaluate is called more frequently.

This seems to be the way Tensorflow has implemented. A detailed use case is mentioned in stack overflow

chychen commented 5 years ago

Hi all, if I go train_and_evaluate() then I have to setup theRunConfig.save_checkpoints_secs for all horovod workers in estimator or get the below error. would it be a bug?

ValueError: There should be a CheckpointSaverHook to use saving_listeners. Please set one of the RunConfig.save_checkpoints_steps or RunConfig.save_checkpoints_secs.
pranavladkat commented 5 years ago

I came across another deadlock scenario when checkpoint is already saved after max_steps iterations. In this case rank 0 exits train call before executing broadcast hook and other processes keeps waiting for matching call.

    estimator = tf.estimator.Estimator(...)

    # check if training is already done
    steps_done = 0
    if hvd.rank() == 0 and estimator.latest_checkpoint() is not None:
        # since only rank 0 writes checkpoint
        checkpoint_reader = tf.train.NewCheckpointReader(estimator.latest_checkpoint())
        steps_done = checkpoint_reader.get_tensor(tf.GraphKeys.GLOBAL_STEP)

    steps_done = tf.convert_to_tensor(steps_done, dtype=tf.int64)
    with tf.Session() as sess:
        # broadcast steps_done to all the processes
        steps_done = hvd.broadcast(steps_done, 0)
        steps_done = sess.run(steps_done)

    if max_steps <= steps_done:
        print("Skipping training since max_steps has already saved.")
    else:
        # do the training and eval
        estimator.train(...)
alsrgv commented 5 years ago

@chychen, I don't think train_and_evaluate will work, since it uses time to decide when to run evaluation, which may cause different workers to execute a different # of batches. We may be able to resolve it once #1058 is implemented.

@pranavladkat, that's a very interesting use case, thanks for the workaround! It's similar to what's happening in Keras to determine the epoch to start from: https://github.com/horovod/horovod/blob/master/examples/keras_imagenet_resnet50.py#L73

chychen commented 5 years ago

@alsrgv for now, I only use one worker to run evaluation, it seems ok.

leimao commented 5 years ago

Correct me if I am wrong. So the conclusion is train_and_evaluate will not work for Horovod. Uber Horovod team is also not seeking any integration to TensorFlow regarding this. We have to iteratively do estimator.train(), and estimator.evaluate().

alsrgv commented 5 years ago

@leimao, at this point that's correct.

nmatare commented 5 years ago

Following up on the previous two comments from @alsrgv @leimao and addressing the graph re-initialization challenges mentioned at https://github.com/horovod/horovod/issues/182#issuecomment-441830583, TensorFlow estimators do support evaluation on a per step basis.

Specifically, commit https://github.com/tensorflow/tensorflow/commit/3edb609926f2521c726737fc1efeae1572dc6581 addressed this shortcoming. The relevant discussion taking place here https://github.com/tensorflow/tensorflow/issues/17650

Thus, for training with train_and_evaluate, set the throttle_secs parameter in your passed EvalSpec specification to 0. This will enforce evaluation at the same time as checkpointing, which can be set to a desired number of steps. Horovod will then be synced across ranks as it performs validation.