alibaba / euler

A distributed graph deep learning framework.
Apache License 2.0
2.89k stars 559 forks source link

最新版的master编译的代码分布式仍然无法evaluate和save embedding #218

Open xuetf opened 4 years ago

xuetf commented 4 years ago

运行的是示例脚本run_dist_euler.sh 节点0日志: image 节点1日志: image

二者无限输出1 workers have finished。。。

请教下是什么原因导致的呢?SyncExitHook方法中_num_finished_workers不同worker之间是共享的吗?感觉是这个出的问题。谢谢指导!

cozilla commented 4 years ago

Issue is related with SyncExitHook. Save embedding will reload checkpoint, and self._num_finished_workers is polluted (not start from 0). https://github.com/alibaba/euler/blob/8132bc81e827e5c6661c60b7eb2b700d30ddf3c8/tf_euler/python/utils/hooks.py#L25

xuetf commented 4 years ago

Issue is related with SyncExitHook. Save embedding will reload checkpoint, and self._num_finished_workers is polluted (not start from 0). https://github.com/alibaba/euler/blob/8132bc81e827e5c6661c60b7eb2b700d30ddf3c8/tf_euler/python/utils/hooks.py#L25

谢谢,那请问下如何才能解决这个bug呢?

cozilla commented 4 years ago

自己实现一个多worker之间的同步退出机制就行了。最简单的,每个worker写一个done.{id}。

MeliaLin commented 4 years ago

Issue is related with SyncExitHook. Save embedding will reload checkpoint, and self._num_finished_workers is polluted (not start from 0). https://github.com/alibaba/euler/blob/8132bc81e827e5c6661c60b7eb2b700d30ddf3c8/tf_euler/python/utils/hooks.py#L25

谢谢,那请问下如何才能解决这个bug呢?

Issue is related with SyncExitHook. Save embedding will reload checkpoint, and self._num_finished_workers is polluted (not start from 0). https://github.com/alibaba/euler/blob/8132bc81e827e5c6661c60b7eb2b700d30ddf3c8/tf_euler/python/utils/hooks.py#L25

谢谢,那请问下如何才能解决这个bug呢?

请问您解决这个问题了吗

MeliaLin commented 4 years ago

实在抱歉在假期打扰您,方便分享一下如何实现多worker之间的同步退出吗?

xuetf commented 4 years ago

不好意思,刚刚才看到您的留言。因为假期,我也没有去管这个bug,可能等假期结束,我找个时间看看。如果我解决了,会在github中进行留言。如果你比我更早解决这个问题,也麻烦您告知我哈。

在 2020年1月31日,22:35,MeliaLin notifications@github.com 写道:

实在抱歉在假期打扰您,方便分享一下如何实现多worker之间的同步退出吗?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/alibaba/euler/issues/218?email_source=notifications&email_token=AC24J2IJCNWD4LA6NCRTZXTRAQZKBA5CNFSM4KC6TAH2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEKO2T7Q#issuecomment-580758014, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC24J2NYT7XYCW4TQBUODETRAQZKBANCNFSM4KC6TAHQ.

MeliaLin commented 4 years ago

OK.方便分享一下邮件吗,我的是melia1030@foxmail.com

WChCh commented 4 years ago

@xuetf 同遇到此问题,请问解决了吗?

WChCh commented 4 years ago

我把同步退出代码做了修改,仅供参考哈!大家在run_loop.py中调用SynExitHook的地方增加传入的参数即可:SyncExitHook(len(flags_obj.worker_hosts), flags_obj.task_index, is_chief)

修改如下:

class SyncExitHook(tf.train.SessionRunHook):
  def __init__(self, num_workers, task_index, is_chief):
    self._task_index = task_index
    self._is_chief = is_chief
    self._num_workers = num_workers
    self._counter_vars = []
    self._counter_add_ops = []
    for i in range(self._num_workers):
      counter_var = tf.Variable(0, name="num_finished_workers-{}".format(i), collections=[tf.GraphKeys.LOCAL_VARIABLES])
      self._counter_vars.append(counter_var)
      counter_var_ops = tf.assign(counter_var, 1, use_locking=True)
      self._counter_add_ops.append(counter_var_ops)

  def end(self, session):
    session.run(self._counter_add_ops[self._task_index])
    while True:
      num_finished_workers = 0
      for i in range(self._num_workers):
        state = session.run(self._counter_vars[i])
        if i == self._task_index and state == 0:
          session.run(self._counter_add_ops[i])
          state = session.run(self._counter_vars[i])

        num_finished_workers = num_finished_workers + state

      tf.logging.info("%d workers have finished ...", num_finished_workers)
      if num_finished_workers >= self._num_workers:
        break

      time.sleep(1)