DeepRec-AI / DeepRec

DeepRec is a high-performance recommendation deep learning framework based on TensorFlow. It is hosted in incubation in LF AI & Data Foundation.
Apache License 2.0
1.05k stars 354 forks source link

【WorkQueue coredump】When use WorkQueue in 1ps/2worker, WorkQueue has 2 files, it will happen coredump. #436

Open huangmiumang opened 2 years ago

huangmiumang commented 2 years ago

When use WorkQueue in 1ps/2worker,WorkQueue has 2 files, it will happen coredump.Source code in below:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
from multiprocessing import Process, Manager
import os
import random
from six import string_types

from tensorflow.core.protobuf import config_pb2
from tensorflow.python.framework import errors_impl
from tensorflow.python.framework import ops
from tensorflow.python.framework import test_util
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import resources
from tensorflow.python.ops import variables
from tensorflow.python.ops import variable_scope as vs
from tensorflow.python.platform import test as test_lib
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.training import device_setter
from tensorflow.python.training import monitored_session
from tensorflow.python.training import queue_runner_impl
from tensorflow.python.training import saver
from tensorflow.python.training import server_lib
from tensorflow.python.training import training_util

from tensorflow.python.ops.work_queue import WorkQueue
from tensorflow.python.ops.work_queue import LocalWorkMgr
from tensorflow.python.training import saver
from tensorflow.python.training.saver import PartialRestoreSaverBuilder

class WorkQueueTest(test_lib.TestCase):
  def __init__(self, method_name='runTest'):
    super(WorkQueueTest, self).__init__(method_name)
    self._config = config_pb2.ConfigProto(
        log_device_placement=False,
        allow_soft_placement=True,
        gpu_options=config_pb2.GPUOptions(
            allow_growth=True,
            force_gpu_compatible=True))

  def _get_workers(
      self, num_workers, workers,
      works, num_epochs=1, shuffle=True,
      restore_works_dir=None,
      prefix=None):
    sessions = []
    graphs = []
    train_ops = []
    for worker_id in range(num_workers):
      graph = ops.Graph()
      is_chief = (worker_id == 0)
      with graph.as_default():
        worker_device = "/job:worker/task:%d/cpu:0" % (worker_id)
        with ops.device(device_setter.replica_device_setter(
            worker_device=worker_device,
            ps_device="/job:ps/task:0/cpu:0",
            ps_tasks=1)):

          work_queue = WorkQueue(works, num_epochs=1, shuffle=False)
          dataset = work_queue.input_dataset()
          dataset = dataset.apply(tf.data.experimental.parallel_interleave(
                      lambda file: tf.data.TextLineDataset(file), cycle_length=4, block_length=1,
                      sloppy=False, buffer_output_elements=1, prefetch_input_elements=1)
                    )
          iterator = dataset.make_initializable_iterator()
          ops.add_to_collection(ops.GraphKeys.TABLE_INITIALIZERS,
                                iterator.initializer)
          data = iterator.get_next()
        # Creates MonitoredSession
        hooks = []
        sess = monitored_session.MonitoredTrainingSession(
            workers[worker_id].target,
            is_chief=is_chief,
            hooks=hooks)

      sessions.append(sess)
      graphs.append(graph)
      train_ops.append(data)

    return sessions, graphs, train_ops

  def test_multi_worker_dataset(self):
    def _run(train_op, sess, result):
      while True:
        try:
          result.append(sess.run(train_op))
        except errors_impl.OutOfRangeError:
          break

    num_ps = 1
    num_workers = 2
    workers, _ = test_util.create_local_cluster(
        num_workers=num_workers, num_ps=num_ps)
    works = ['./0.txt', './0.txt'] # coredump
    #works = ['./0.txt', './0.txt, './0.txt''] # no coredump
    num_epochs = 1
    sess, _, train_ops = self._get_workers(
        num_workers, workers, works, num_epochs)
    manager = Manager()
    result = manager.list()
    threads = []
    threads.append(
        self.checkedThread(
            target=_run, args=(train_ops[0], sess[0], result)))
    threads.append(
        self.checkedThread(
            target=_run, args=(train_ops[1], sess[1], result)))

    # The two workers starts to execute the train op.
    for thread in threads:
      thread.start()
    for thread in threads:
      thread.join()

    with self.assertRaises(errors_impl.OutOfRangeError):
      sess[0].run(train_ops[0])

    with self.assertRaises(errors_impl.OutOfRangeError):
      sess[1].run(train_ops[1])

if __name__ == "__main__":
  with open('./0.txt', 'w') as f:
    f.write('0,0')
    f.write('1,1')
  logging.set_verbosity(logging.INFO)
  test_lib.main()

coredump image

welsonzhang commented 1 year ago

I hava a problem with WorkQueue. when i run WorkQueue in single process, it will take all filenames, then train. I expect it to take a filename, then train, then take a new filename again, then train again.

logs: before train, it return Out of range: All works in work queue work_queue are taken

Add epoch of 2 elements: ["./data/eval.csv" "./data/eval.csv"]
Add epoch of 2 elements: ["./data/train.csv" "./data/train.csv"]
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
Take work: "./data/train.csv"
Take work: "./data/eval.csv"
Take work: "./data/train.csv"
Take work: "./data/eval.csv"
INFO:tensorflow:Saving checkpoints for 0 into ./result/model_WIDE_AND_DEEP_1672921107/model.ckpt.
INFO:tensorflow:Create incremental timer, incremental_save:False, incremental_save_secs:None
2023-01-05 12:19:03.144568: W tensorflow/core/framework/op_kernel.cc:1651] OP_REQUIRES failed at work_queue_ops.cc:320 : Out of range: All works in work queue work_queue are taken.
INFO:tensorflow:loss = 0.6893319, steps = 1
2023-01-05 12:19:06.699829: I tensorflow/core/common_runtime/tensorpool_allocator.cc:146] TensorPoolAllocator enabled
INFO:tensorflow:global_step/sec: 19.8375
INFO:tensorflow:loss = 0.5995521, steps = 101 (5.042 sec)
JackMoriarty commented 1 year ago

tf.data.experimental.parallel_interleave is deprecated. Please use tf.data.Dataset.interleave instead.