NVIDIA-Merlin / HugeCTR

HugeCTR is a high efficiency GPU framework designed for Click-Through-Rate (CTR) estimating training
Apache License 2.0
937 stars 200 forks source link

[Question] Having a hard time running demo with tensorflow2 mirrorredstrategy #385

Closed Nov11 closed 1 year ago

Nov11 commented 1 year ago

I'm new to sparse operation kit and want to use it in my project. I tried to write a little demo after reading 'sparse_operation_kit_demo' under notebook folder. Basically it calls embedding layer inside mirrorred strategy scope. Unfortunately it doesn't work and I can't figure out what's missing. Please help me fix this.

Code :

import os
import sparse_operation_kit as sok
import tensorflow as tf

os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"

class SOKDemo(tf.keras.models.Model):
    def __init__(self):
        super(SOKDemo, self).__init__()

        self.embedding_layer = sok.DistributedEmbedding(combiner='sum',
                                                        max_vocabulary_size_per_gpu=20,
                                                        embedding_vec_size=4,
                                                        slot_num=1,
                                                        max_nnz=3)

    def call(self, inputs, training=True):
        embedding_vector = self.embedding_layer(inputs, training=training)
        embedding_vector = tf.reshape(embedding_vector, shape=[-1, 4])
        return embedding_vector

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    result = sok.Init(global_batch_size=10)

    plugin_demo = SOKDemo()

p = tf.sparse.SparseTensor(indices=tf.constant([[0, 0], [1, 0], [1, 1]], dtype=tf.int64),
                           values=tf.constant([1, 1, 1], dtype=tf.int64),
                           dense_shape=tf.constant([2, 3], dtype=tf.int64))

def work(_p):
    return plugin_demo(_p)

strategy.run(work, args=(p,))

Output from merline-tensorflow container:

You are using the plugin with MirroredStrategy.
2023-04-06 15:01:34.793294: I sparse_operation_kit/kit_cc/kit_cc_infra/src/resources/manager.cc:107] Mapping from local_replica_id to device_id:
2023-04-06 15:01:34.793294: I sparse_operation_kit/kit_cc/kit_cc_infra/src/resources/manager.cc:109] 0 -> 0
2023-04-06 15:01:34.793294: I sparse_operation_kit/kit_cc/kit_cc_infra/src/resources/manager.cc:109] 1 -> 1
2023-04-06 15:01:34.793294: I sparse_operation_kit/kit_cc/kit_cc_infra/src/resources/manager.cc:84] Global seed is 3111454230
2023-04-06 15:01:34.793294: I sparse_operation_kit/kit_cc/kit_cc_infra/src/resources/manager.cc:85] Local GPU Count: 2
2023-04-06 15:01:34.793294: I sparse_operation_kit/kit_cc/kit_cc_infra/src/resources/manager.cc:86] Global GPU Count: 2
2023-04-06 15:01:34.793294: I sparse_operation_kit/kit_cc/kit_cc_infra/src/resources/manager.cc:127] 2023Global Replica Id: 0; Local Replica Id: 0-
04-06 15:01:34.793294: I sparse_operation_kit/kit_cc/kit_cc_infra/src/resources/manager.cc:127] Global Replica Id: 1; Local Replica Id: 1
2023-04-06 15:01:35.793295: I sparse_operation_kit/kit_cc/kit_cc_infra/src/resources/manager.cc:198] All peer to peer access enabled.
2023-04-06 15:01:35.793295: I sparse_operation_kit/kit_cc/kit_cc_infra/src/parameters/raw_manager.cc:132] Created embedding variable whose name is EmbeddingVariable
2023-04-06 15:01:35.793295: I sparse_operation_kit/kit_cc/kit_cc_infra/src/parameters/raw_manager.cc:132] Created embedding variable whose name is EmbeddingVariable/replica_1/
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
2023-04-06 15:01:35.793295: I sparse_operation_kit/kit_cc/kit_cc_infra/src/parameters/raw_param.cc:121] Variable: EmbeddingVariable on global_replica_id: 0 start initialization
2023-04-06 15:01:35.793295: I sparse_operation_kit/kit_cc/kit_cc_infra/src/parameters/raw_param.cc:138] Variable: EmbeddingVariable on global_replica_id: 0 initialization done.
INFO:tensorflow:Error reported to Coordinator: Exception encountered when calling layer "distributed_embedding" "                 f"(type DistributedEmbedding).

{{function_node __wrapped__PluginSparseFprop_device_/job:localhost/replica:0/task:0/device:GPU:0}} /hugectr/sparse_operation_kit/kit_cc/kit_cc_infra/include/resources/cpu_resource.h:91 BlockingCallOnce time out.
     [[{{node PluginSparseFprop}}]] [Op:PluginSparseFprop]

Call arguments received by layer "distributed_embedding" "                 f"(type DistributedEmbedding):
  • inputs=<tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f322c2d8ac0>
  • training=True
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/tensorflow/python/training/coordinator.py", line 293, in stop_on_exception
    yield
  File "/usr/local/lib/python3.8/dist-packages/tensorflow/python/distribute/mirrored_run.py", line 386, in run
    self.main_result = self.main_fn(*self.main_args, **self.main_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/tensorflow/python/autograph/impl/api.py", line 595, in wrapper
    return func(*args, **kwargs)
  File "/tmp/ipykernel_2099/958539416.py", line 37, in work
    return plugin_demo(_p)
  File "/usr/local/lib/python3.8/dist-packages/keras/utils/traceback_utils.py", line 70, in error_handler
    raise e.with_traceback(filtered_tb) from None
  File "/tmp/ipykernel_2099/958539416.py", line 19, in call
    embedding_vector = self.embedding_layer(inputs, training=training)
  File "/usr/local/lib/python3.8/dist-packages/merlin_sok-1.1.4-py3.8-linux-x86_64.egg/sparse_operation_kit/embeddings/distributed_embedding.py", line 171, in call
    emb_vector = embedding_ops.embedding_lookup_sparse(
  File "/usr/local/lib/python3.8/dist-packages/merlin_sok-1.1.4-py3.8-linux-x86_64.egg/sparse_operation_kit/embeddings/embedding_ops.py", line 144, in embedding_lookup_sparse
    vector, _ = kit_lib.plugin_sparse_fprop(
  File "<string>", line 1952, in plugin_sparse_fprop
  File "<string>", line 2038, in plugin_sparse_fprop_eager_fallback
tensorflow.python.framework.errors_impl.AbortedError: Exception encountered when calling layer "distributed_embedding" "                 f"(type DistributedEmbedding).

{{function_node __wrapped__PluginSparseFprop_device_/job:localhost/replica:0/task:0/device:GPU:0}} /hugectr/sparse_operation_kit/kit_cc/kit_cc_infra/include/resources/cpu_resource.h:91 BlockingCallOnce time out.
     [[{{node PluginSparseFprop}}]] [Op:PluginSparseFprop]

Call arguments received by layer "distributed_embedding" "                 f"(type DistributedEmbedding):
  • inputs=<tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f322c2d8ac0>
  • training=True
---------------------------------------------------------------------------
AbortedError                              Traceback (most recent call last)
Cell In[1], line 40
     36 def work(_p):
     37     return plugin_demo(_p)
---> 40 strategy.run(work, args=(p,))

File /usr/local/lib/python3.8/dist-packages/tensorflow/python/distribute/distribute_lib.py:1315, in StrategyBase.run(***failed resolving arguments***)
   1310 with self.scope():
   1311   # tf.distribute supports Eager functions, so AutoGraph should not be
   1312   # applied when the caller is also in Eager mode.
   1313   fn = autograph.tf_convert(
   1314       fn, autograph_ctx.control_status_ctx(), convert_by_default=False)
-> 1315   return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)

File /usr/local/lib/python3.8/dist-packages/tensorflow/python/distribute/distribute_lib.py:2891, in StrategyExtendedV1.call_for_each_replica(self, fn, args, kwargs)
   2889   kwargs = {}
   2890 with self._container_strategy().scope():
-> 2891   return self._call_for_each_replica(fn, args, kwargs)

File /usr/local/lib/python3.8/dist-packages/tensorflow/python/distribute/mirrored_strategy.py:676, in MirroredExtended._call_for_each_replica(self, fn, args, kwargs)
    675 def _call_for_each_replica(self, fn, args, kwargs):
--> 676   return mirrored_run.call_for_each_replica(
    677       self._container_strategy(), fn, args, kwargs)

File /usr/local/lib/python3.8/dist-packages/tensorflow/python/distribute/mirrored_run.py:101, in call_for_each_replica(strategy, fn, args, kwargs)
     93 else:
     94   # When a tf.function is wrapped to trigger _call_for_each_replica (see
     95   # the other branch above), AutoGraph stops conversion at
     96   # _call_for_each_replica itself (TF library functions are allowlisted).
     97   # This makes sure that the Python function that originally passed to
     98   # the tf.function is still converted.
     99   fn = autograph.tf_convert(fn, autograph_ctx.control_status_ctx())
--> 101 return _call_for_each_replica(strategy, fn, args, kwargs)

File /usr/local/lib/python3.8/dist-packages/tensorflow/python/distribute/mirrored_run.py:283, in _call_for_each_replica(distribution, fn, args, kwargs)
    281   for t in threads:
    282     t.should_run.set()
--> 283   coord.join(threads)
    285 return distribute_utils.regroup(tuple(t.main_result for t in threads))

File /usr/local/lib/python3.8/dist-packages/tensorflow/python/training/coordinator.py:385, in Coordinator.join(self, threads, stop_grace_period_secs, ignore_live_threads)
    383 self._registered_threads = set()
    384 if self._exc_info_to_raise:
--> 385   six.reraise(*self._exc_info_to_raise)
    386 elif stragglers:
    387   if ignore_live_threads:

File /usr/lib/python3/dist-packages/six.py:703, in reraise(tp, value, tb)
    701     if value.__traceback__ is not tb:
    702         raise value.with_traceback(tb)
--> 703     raise value
    704 finally:
    705     value = None

File /usr/local/lib/python3.8/dist-packages/tensorflow/python/training/coordinator.py:293, in Coordinator.stop_on_exception(self)
    263 """Context manager to request stop when an Exception is raised.
    264 
    265 Code that uses a coordinator must catch exceptions and pass
   (...)
    290   nothing.
    291 """
    292 try:
--> 293   yield
    294 except:  # pylint: disable=bare-except
    295   self.request_stop(ex=sys.exc_info())

File /usr/local/lib/python3.8/dist-packages/tensorflow/python/distribute/mirrored_run.py:386, in _MirroredReplicaThread.run(self)
    373   # TODO(josh11b): Use current logical device instead of 0 here.
    374   with self.coord.stop_on_exception(), \
    375       _enter_graph(self._init_graph, self._init_in_eager), \
    376       _enter_graph(self.graph, self.in_eager,
   (...)
    384           self._var_scope, reuse=self.replica_id > 0), \
    385       variable_scope.variable_creator_scope(self.variable_creator_fn):
--> 386     self.main_result = self.main_fn(*self.main_args, **self.main_kwargs)
    387     self.done = True
    388 finally:

File /usr/local/lib/python3.8/dist-packages/tensorflow/python/autograph/impl/api.py:595, in call_with_unspecified_conversion_status.<locals>.wrapper(*args, **kwargs)
    593 def wrapper(*args, **kwargs):
    594   with ag_ctx.ControlStatusCtx(status=ag_ctx.Status.UNSPECIFIED):
--> 595     return func(*args, **kwargs)

Cell In[1], line 37, in work(_p)
     36 def work(_p):
---> 37     return plugin_demo(_p)

File /usr/local/lib/python3.8/dist-packages/keras/utils/traceback_utils.py:70, in filter_traceback.<locals>.error_handler(*args, **kwargs)
     67     filtered_tb = _process_traceback_frames(e.__traceback__)
     68     # To get the full stack trace, call:
     69     # `tf.debugging.disable_traceback_filtering()`
---> 70     raise e.with_traceback(filtered_tb) from None
     71 finally:
     72     del filtered_tb

Cell In[1], line 19, in SOKDemo.call(self, inputs, training)
     18 def call(self, inputs, training=True):
---> 19     embedding_vector = self.embedding_layer(inputs, training=training)
     20     embedding_vector = tf.reshape(embedding_vector, shape=[-1, 4])
     21     return embedding_vector

File /usr/local/lib/python3.8/dist-packages/merlin_sok-1.1.4-py3.8-linux-x86_64.egg/sparse_operation_kit/embeddings/distributed_embedding.py:171, in DistributedEmbedding.call(self, inputs, training)
    150 def call(self, inputs, training=True):
    151     """
    152     The forward logic of this wrapper class.
    153 
   (...)
    169             *[batchsize, slot_num, embedding_vec_size]*
    170     """
--> 171     emb_vector = embedding_ops.embedding_lookup_sparse(
    172         embedding_variable=self.var, sp_ids=inputs, slot_num=self.slot_num, training=training
    173     )
    174     return emb_vector

File /usr/local/lib/python3.8/dist-packages/merlin_sok-1.1.4-py3.8-linux-x86_64.egg/sparse_operation_kit/embeddings/embedding_ops.py:144, in embedding_lookup_sparse(embedding_variable, sp_ids, slot_num, training)
    140 resource_variable_ops.variable_accessed(embedding_variable)
    142 global_replica_id = get_global_replica_id(_get_comm_tool(), embedding_variable)
--> 144 vector, _ = kit_lib.plugin_sparse_fprop(
    145     embedding_variable._handle,
    146     embedding_layer.handle,
    147     values,
    148     row_indices,
    149     global_replica_id,
    150     slot_num=slot_num,
    151     training=training,
    152     unique_op_name=embedding_variable.name,
    153     dtype=embedding_layer.compute_dtype,
    154 )
    155 return vector

File <string>:1952, in plugin_sparse_fprop(emb_var_handle, emb_handle, values, indices, global_replica_id, slot_num, training, dtype, unique_op_name, name)

File <string>:2038, in plugin_sparse_fprop_eager_fallback(emb_var_handle, emb_handle, values, indices, global_replica_id, slot_num, training, dtype, unique_op_name, name, ctx)

AbortedError: Exception encountered when calling layer "distributed_embedding" "                 f"(type DistributedEmbedding).

{{function_node __wrapped__PluginSparseFprop_device_/job:localhost/replica:0/task:0/device:GPU:0}} /hugectr/sparse_operation_kit/kit_cc/kit_cc_infra/include/resources/cpu_resource.h:91 BlockingCallOnce time out.
     [[{{node PluginSparseFprop}}]] [Op:PluginSparseFprop]

Call arguments received by layer "distributed_embedding" "                 f"(type DistributedEmbedding):
  • inputs=<tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f322c2d8ac0>
  • training=True

I think it says something not ready after a timeout. Just cannot find out what is missing.

kanghui0204 commented 1 year ago

Hi @Nov11 , thank you for use SOK . in your example , I think you need use graph model to run tf mirrored strategy, just like this:

@tf.function
def work(_p):
    return plugin_demo(_p)

I already try you example in my local , and it shows don't use eager model can solve your problem , you can have a try on your own machine.

why the problem happen? Because SOK is model parallel embedding , so we need every card launch op together , so between every card thread(mirrored strategy is single process and multi threads) ,we have a sync use std::condition_variable . But when you use eager model, tensorflow don't try to launch all the threads currently, only launch threads in serial , so this will be create deadlock, and if dead lock time is long , std::condition_variable.wait_for will quit ,and SOK will raise a error(BlockingCallOnce time out.). And tf graph model can launch all the threads currently ,so the problem can be solved.

recommend for you Now we plan a new SOK implement , and already have some feature to use , new SOK called sok.experiment, you can search how to use it in this SOK experiment we recommend user use new SOK , and in May 2023, we will move SOK experiment to SOK official ,and abandon the old SOK implement.

Nov11 commented 1 year ago

Yes, graph mode works! Great explanation for the mechanism. I'll move on to sok.experiment. Thank you!