yahoo / TensorFlowOnSpark

TensorFlowOnSpark brings TensorFlow programs to Apache Spark clusters.
Apache License 2.0
3.87k stars 940 forks source link

ParameterServerStrategy stuck in "Feeding partition" #425

Closed markromedia closed 5 years ago

markromedia commented 5 years ago

Environment:

Describe the bug: Using the examples provided I have been attempting to port one of our existing spark-ml jobs to Tfos. These jobs all run in a dedicated Yarn cluster(cpu-only).

As part of my POC, I am attempting to create a 2-worker, 1-ps, 1-master Tensor-cluster which trains a simple Keras model(converted to tf estimator), using a distributed strategy of ParameterServerStrategy.

When I start this up, the cluster "establishes" itself, but I see the master gets stuck trying to feed the queue. I have run the same example in non-distributed mode, and it worked fine(multiple instances doing the same thing).

Thanks in advance for any help.

Here is the relevant snippet of code running

` model = Sequential() model.add(Dense(64, input_dim=num_features, activation='sigmoid')) model.add(Dropout(0.2)) model.add(Dense(64, activation='sigmoid')) model.add(Dropout(0.2)) model.add(Dense(1, activation='sigmoid')) model.compile(loss='binary_crossentropy', optimizer= tf.train.AdamOptimizer(), metrics=['accuracy']) model.summary()

    distribution_strategy = tf.contrib.distribute.ParameterServerStrategy()
    config = tf.estimator.RunConfig(
        train_distribute=distribution_strategy, eval_distribute=distribution_strategy)
    estimator = tf.keras.estimator.model_to_estimator(model, model_dir=model_dir, config=config)

    def generate_rdd_data(tf_feed):
        while not tf_feed.should_stop():
            batch = tf_feed.next_batch(1)
            if len(batch) > 0:
                record = batch[0]
                features = numpy.array(record[0]).astype(numpy.float32)
                label = numpy.array([record[1]]).astype(numpy.float32)

                yield (features, label)
            else:
                return

    def train_input_fn():
        ds = tf.data.Dataset.from_generator(generator,
                                            (tf.float32, tf.float32),
                                            (tf.TensorShape([num_features]), tf.TensorShape([1])))
        ds = ds.batch(args.batch_size)
        return ds

    # add a hook to terminate the RDD data feed when the session ends
    hooks = [StopFeedHook(tf_feed)]

    # train model
    estimator.train(input_fn=train_input_fn, max_steps=steps_per_epoch, `hooks=hooks)`

Logs: Master Logs: 2019-05-23 18:49:54,434 INFO (MainThread-53285) 1: ======== master:0 ======== 2019-05-23 18:49:54,434 INFO (MainThread-53285) 1: Cluster spec: {'worker': ['10.90.28.232:34493', '10.90.28.252:38762'], 'ps': ['10.90.28.222:45450'], 'master': ['10.90.28.230:42065']} 2019-05-23 18:49:54,435 INFO (MainThread-53285) 1: Using CPU 19/05/23 18:49:54 INFO TorrentBroadcast: Started reading broadcast variable 110 19/05/23 18:49:54 INFO MemoryStore: Block broadcast_110_piece0 stored as bytes in memory (estimated size 658.9 KB, free 1970.6 MB) 19/05/23 18:49:54 INFO TorrentBroadcast: Reading broadcast variable 110 took 28 ms 19/05/23 18:49:54 INFO MemoryStore: Block broadcast_110 stored as values in memory (estimated size 1434.9 KB, free 1969.2 MB) 2019-05-23 18:49:54.479917: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2300070000 Hz 2019-05-23 18:49:54.481187: I tensorflow/compiler/xla/service/service.cc:150] XLA service 0x44295a0 executing computations on platform Host. Devices: 2019-05-23 18:49:54.481225: I tensorflow/compiler/xla/service/service.cc:158] StreamExecutor device (0): <undefined>, <undefined> 2019-05-23 18:49:54.483937: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job master -> {0 -> localhost:42065} 2019-05-23 18:49:54.483963: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job ps -> {0 -> 10.90.28.222:45450} 2019-05-23 18:49:54.483990: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job worker -> {0 -> 10.90.28.232:34493, 1 -> 10.90.28.252:38762} 2019-05-23 18:49:54.486650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:391] Started server with target: grpc://localhost:42065 19/05/23 18:49:54 INFO BlockManager: Found block rdd_412_3 locally 19/05/23 18:49:54 INFO CodeGenerator: Code generated in 21.755654 ms 19/05/23 18:49:54 INFO Executor: Finished task 3.0 in stage 73.0 (TID 777). 2805 bytes result sent to driver 19/05/23 18:49:54 INFO CoarseGrainedExecutorBackend: Got assigned task 780 19/05/23 18:49:54 INFO Executor: Running task 6.0 in stage 73.0 (TID 780) 19/05/23 18:49:54 INFO BlockManager: Found block rdd_412_6 locally 19/05/23 18:49:55 INFO Executor: Finished task 6.0 in stage 73.0 (TID 780). 2762 bytes result sent to driver 19/05/23 18:49:55 INFO CoarseGrainedExecutorBackend: Got assigned task 782 19/05/23 18:49:55 INFO Executor: Running task 10.0 in stage 73.0 (TID 782) 19/05/23 18:49:55 INFO BlockManager: Found block rdd_412_10 locally 19/05/23 18:49:55 INFO Executor: Finished task 10.0 in stage 73.0 (TID 782). 2762 bytes result sent to driver 19/05/23 18:50:01 INFO CoarseGrainedExecutorBackend: Got assigned task 784 19/05/23 18:50:01 INFO Executor: Running task 1.0 in stage 73.0 (TID 784) 19/05/23 18:50:01 INFO BlockManager: Found block rdd_412_1 remotely 19/05/23 18:50:01 INFO Executor: Finished task 1.0 in stage 73.0 (TID 784). 2762 bytes result sent to driver 19/05/23 18:50:02 INFO CoarseGrainedExecutorBackend: Got assigned task 787 19/05/23 18:50:02 INFO Executor: Running task 0.0 in stage 74.0 (TID 787) 19/05/23 18:50:02 INFO MapOutputTrackerWorker: Updating epoch to 37 and clearing cache 19/05/23 18:50:02 INFO TorrentBroadcast: Started reading broadcast variable 111 19/05/23 18:50:02 INFO MemoryStore: Block broadcast_111_piece0 stored as bytes in memory (estimated size 10.5 KB, free 1969.2 MB) 19/05/23 18:50:02 INFO TorrentBroadcast: Reading broadcast variable 111 took 5 ms 19/05/23 18:50:02 INFO MemoryStore: Block broadcast_111 stored as values in memory (estimated size 23.4 KB, free 1969.2 MB) 19/05/23 18:50:02 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 36, fetching them 19/05/23 18:50:02 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@srv-01-11-b09.iad1.trmr.io:33383) 19/05/23 18:50:02 INFO MapOutputTrackerWorker: Got the output locations 19/05/23 18:50:02 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty blocks out of 12 blocks 19/05/23 18:50:02 INFO ShuffleBlockFetcherIterator: Started 2 remote fetches in 1 ms 2019-05-23 18:50:02,187 INFO (MainThread-52742) Connected to TFSparkNode.mgr on 10.90.28.230, executor=1, state='running' 2019-05-23 18:50:02,194 INFO (MainThread-52742) mgr.state='running' 2019-05-23 18:50:02,194 INFO (MainThread-52742) Feeding partition <itertools.chain object at 0x7f97ea46c2d0> into input queue <multiprocessing.queues.JoinableQueue object at 0x7f97dae9d2d0> 19/05/23 19:00:03 ERROR Executor: Exception in task 0.0 in stage 74.0 (TID 787) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/hadoop/yarn/local/usercache/pipeline/appcache/application_1557769783296_1143/container_e21_1557769783296_1143_01_000005/pyspark.zip/pyspark/worker.py", line 253, in main process() File "/hadoop/yarn/local/usercache/pipeline/appcache/application_1557769783296_1143/container_e21_1557769783296_1143_01_000005/pyspark.zip/pyspark/worker.py", line 248, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 350, in func File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/rdd.py", line 799, in func File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 420, in _train raise Exception("Timeout while feeding partition") Exception: Timeout while feeding partition

Worker logs: 019-05-23 18:49:53,977 INFO (MainThread-118610) Starting TensorFlow worker:0 as worker on cluster node 2 on background process 19/05/23 18:49:53 INFO PythonRunner: Times: total = 7808, boot = -36392, init = 43165, finish = 1035 19/05/23 18:49:53 INFO Executor: Finished task 2.0 in stage 72.0 (TID 773). 1418 bytes result sent to driver 2019-05-23 18:49:53,985 INFO (MainThread-121583) 2: ======== worker:0 ======== 2019-05-23 18:49:53,986 INFO (MainThread-121583) 2: Cluster spec: {'worker': ['10.90.28.232:34493', '10.90.28.252:38762'], 'ps': ['10.90.28.222:45450'], 'master': ['10.90.28.230:42065']} 2019-05-23 18:49:53,986 INFO (MainThread-121583) 2: Using CPU 19/05/23 18:49:53 INFO CoarseGrainedExecutorBackend: Got assigned task 775 19/05/23 18:49:53 INFO Executor: Running task 0.0 in stage 73.0 (TID 775) 19/05/23 18:49:53 INFO TorrentBroadcast: Started reading broadcast variable 110 19/05/23 18:49:54 INFO MemoryStore: Block broadcast_110_piece0 stored as bytes in memory (estimated size 658.9 KB, free 1970.6 MB) 19/05/23 18:49:54 INFO TorrentBroadcast: Reading broadcast variable 110 took 50 ms 19/05/23 18:49:54 INFO MemoryStore: Block broadcast_110 stored as values in memory (estimated size 1434.9 KB, free 1969.2 MB) 19/05/23 18:49:54 INFO BlockManager: Found block rdd_412_0 locally 2019-05-23 18:49:54.084395: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2599985000 Hz 2019-05-23 18:49:54.085463: I tensorflow/compiler/xla/service/service.cc:150] XLA service 0x4554d90 executing computations on platform Host. Devices: 2019-05-23 18:49:54.085503: I tensorflow/compiler/xla/service/service.cc:158] StreamExecutor device (0): , 19/05/23 18:49:54 INFO CodeGenerator: Code generated in 24.400731 ms 2019-05-23 18:49:54.095435: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job master -> {0 -> 10.90.28.230:42065} 2019-05-23 18:49:54.095469: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job ps -> {0 -> 10.90.28.222:45450} 2019-05-23 18:49:54.095481: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job worker -> {0 -> localhost:34493, 1 -> 10.90.28.252:38762} 2019-05-23 18:49:54.097460: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:391] Started server with target: grpc://localhost:34493 2019-05-23 18:49:54,175 WARNING (MainThread-121583) From /usr/lib/python2.7/site-packages/tensorflow/python/ops/resource_variable_ops.py:435: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version. Instructions for updating: Colocations handled automatically by placer. 2019-05-23 18:49:54,238 WARNING (MainThread-121583) From /usr/lib/python2.7/site-packages/tensorflow/python/keras/layers/core.py:143: calling dropout (from tensorflow.python.ops.nn_ops) with keep_prob is deprecated and will be removed in a future version. Instructions for updating: Please use rate instead of keep_prob. Rate should be set to rate = 1 - keep_prob. 19/05/23 18:49:54 INFO Executor: Finished task 0.0 in stage 73.0 (TID 775). 2805 bytes result sent to driver 19/05/23 18:49:54 INFO CoarseGrainedExecutorBackend: Got assigned task 776 19/05/23 18:49:54 INFO Executor: Running task 4.0 in stage 73.0 (TID 776) 19/05/23 18:49:54 INFO BlockManager: Found block rdd_412_4 locally 19/05/23 18:49:54 INFO Executor: Finished task 4.0 in stage 73.0 (TID 776). 2762 bytes result sent to driver 19/05/23 18:49:54 INFO CoarseGrainedExecutorBackend: Got assigned task 778 19/05/23 18:49:54 INFO Executor: Running task 8.0 in stage 73.0 (TID 778) 19/05/23 18:49:54 INFO BlockManager: Found block rdd_412_8 locally 19/05/23 18:49:54 INFO Executor: Finished task 8.0 in stage 73.0 (TID 778). 2805 bytes result sent to driver


Total params: 15,425 Trainable params: 15,425 Non-trainable params: 0


num_features: 174 num_records: 240000 batch_size: 1953 epochs: 3 steps_per_epoch: 128

WARNING: The TensorFlow contrib module will not be included in TensorFlow 2.0. For more information, please see:

2019-05-23 18:49:57,229 INFO (MainThread-121583) ParameterServerStrategy with compute_devices = ('/replica:0/task:0/device:CPU:0',), variable_device = '/device:CPU:0' 2019-05-23 18:49:57,229 INFO (MainThread-121583) TF_CONFIG environment variable: {u'environment': u'cloud', u'cluster': {u'ps': [u'10.90.28.222:45450'], u'worker': [u'10.90.28.232:34493', u'10.90.28.252:38762'], u'master': [u'10.90.28.230:42065']}, u'task': {u'index': 0, u'type': u'worker'}} 2019-05-23 18:49:57,229 INFO (MainThread-121583) Initializing RunConfig with distribution strategies. 2019-05-23 18:49:57,230 INFO (MainThread-121583) Not using Distribute Coordinator. 2019-05-23 18:49:57,230 INFO (MainThread-121583) Using the Keras model provided. 2019-05-23 18:49:57,754 WARNING (MainThread-121583) From /usr/lib/python2.7/site-packages/tensorflow/python/ops/math_ops.py:3066: to_int32 (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version. Instructions for updating: Use tf.cast instead. 2019-05-23 18:49:58,405 INFO (MainThread-121583) Using config: {'_save_checkpoints_secs': 600, '_session_config': device_filters: "/job:ps" device_filters: "/job:worker/task:0" allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_task_type': u'worker', '_train_distribute': <tensorflow.contrib.distribute.python.parameter_server_strategy.ParameterServerStrategy object at 0x7ff44bbdf1d0>, '_is_chief': False, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7ff44bbdf050>, '_model_dir': '/tmp/model-20190523184945', '_protocol': None, '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_service': None, '_num_ps_replicas': 1, '_tf_random_seed': None, '_save_summary_steps': 100, '_device_fn': None, '_experimental_distribute': None, '_num_worker_replicas': 3, '_task_id': 0, '_log_step_count_steps': 100, '_evaluation_master': '', '_eval_distribute': <tensorflow.contrib.distribute.python.parameter_server_strategy.ParameterServerStrategy object at 0x7ff44bbdf1d0>, '_global_id_in_cluster': 1, '_master': u'grpc://10.90.28.232:34493', '_distribute_coordinator_mode': None} 2019-05-23 18:49:58,413 WARNING (MainThread-121583) From /usr/lib/python2.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:429: py_func (from tensorflow.python.ops.script_ops) is deprecated and will be removed in a future version. Instructions for updating: tf.py_func is deprecated in TF V2. Instead, use tf.py_function, which takes a python function which manipulates tf eager tensors instead of numpy arrays. It's easy to convert a tf eager tensor to an ndarray (just call tensor.numpy()) but having access to eager tensors means tf.py_functions can use accelerators such as GPUs as well as being differentiable using a gradient tape.

2019-05-23 18:49:58,545 INFO (Thread-1-121583) Calling model_fn. 2019-05-23 18:49:59,510 INFO (Thread-1-121583) Done calling model_fn. 2019-05-23 18:49:59,554 INFO (MainThread-121583) Warm-starting with WarmStartSettings: WarmStartSettings(ckpt_to_initialize_from='/tmp/model-20190523184945/keras/keras_model.ckpt', vars_to_warm_start='.*', var_name_to_vocab_info={}, var_name_to_prev_var_name={}) 2019-05-23 18:49:59,554 INFO (MainThread-121583) Warm-starting from: ('/tmp/model-20190523184945/keras/keras_model.ckpt',) 2019-05-23 18:49:59,555 INFO (MainThread-121583) Warm-starting variable: dense_2/kernel; prev_var_name: Unchanged 2019-05-23 18:49:59,555 INFO (MainThread-121583) Warm-starting variable: dense/bias; prev_var_name: Unchanged 2019-05-23 18:49:59,555 INFO (MainThread-121583) Warm-starting variable: dense_2/bias; prev_var_name: Unchanged 2019-05-23 18:49:59,555 INFO (MainThread-121583) Warm-starting variable: dense_1/kernel; prev_var_name: Unchanged 2019-05-23 18:49:59,555 INFO (MainThread-121583) Warm-starting variable: dense_1/bias; prev_var_name: Unchanged 2019-05-23 18:49:59,555 INFO (MainThread-121583) Warm-starting variable: dense/kernel; prev_var_name: Unchanged 2019-05-23 18:49:59,586 INFO (MainThread-121583) Create CheckpointSaverHook. 2019-05-23 18:49:59,856 INFO (MainThread-121583) Graph was finalized. 2019-05-23 18:49:59.897664: I tensorflow/core/distributed_runtime/master_session.cc:1192] Start master session c7382e2c43bf9a42 with config: device_filters: "/job:ps" device_filters: "/job:worker/task:0" allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } isolate_session_state: true 2019-05-23 18:50:00,001 INFO (MainThread-121583) Waiting for model to be ready. Ready_for_local_init_op: Variables not initialized: global_step, dense/kernel, dense/bias, dense_1/kernel, dense_1/bias, dense_2/kernel, dense_2/bias, training/TFOptimizer/beta1_power, training/TFOptimizer/beta2_power, dense/kernel/Adam, dense/kernel/Adam_1, dense/bias/Adam, dense/bias/Adam_1, dense_1/kernel/Adam, dense_1/kernel/Adam_1, dense_1/bias/Adam, dense_1/bias/Adam_1, dense_2/kernel/Adam, dense_2/kernel/Adam_1, dense_2/bias/Adam, dense_2/bias/Adam_1, ready: None 19/05/23 18:50:01 INFO CoarseGrainedExecutorBackend: Got assigned task 785 19/05/23 18:50:01 INFO Executor: Running task 5.0 in stage 73.0 (TID 785) 19/05/23 18:50:01 INFO BlockManager: Found block rdd_412_5 remotely 19/05/23 18:50:01 INFO Executor: Finished task 5.0 in stage 73.0 (TID 785). 2762 bytes result sent to driver 19/05/23 18:50:02 INFO CoarseGrainedExecutorBackend: Got assigned task 789 19/05/23 18:50:02 INFO Executor: Running task 2.0 in stage 74.0 (TID 789) 19/05/23 18:50:02 INFO MapOutputTrackerWorker: Updating epoch to 37 and clearing cache 19/05/23 18:50:02 INFO TorrentBroadcast: Started reading broadcast variable 111 19/05/23 18:50:02 INFO MemoryStore: Block broadcast_111_piece0 stored as bytes in memory (estimated size 10.5 KB, free 1969.2 MB) 19/05/23 18:50:02 INFO TorrentBroadcast: Reading broadcast variable 111 took 5 ms 19/05/23 18:50:02 INFO MemoryStore: Block broadcast_111 stored as values in memory (estimated size 23.4 KB, free 1969.2 MB) 19/05/23 18:50:02 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 36, fetching them 19/05/23 18:50:02 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@srv-01-11-b09.iad1.trmr.io:33383) 19/05/23 18:50:02 INFO MapOutputTrackerWorker: Got the output locations 19/05/23 18:50:02 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty blocks out of 12 blocks 19/05/23 18:50:02 INFO ShuffleBlockFetcherIterator: Started 2 remote fetches in 1 ms 2019-05-23 18:50:02,149 INFO (MainThread-118615) Connected to TFSparkNode.mgr on 10.90.28.232, executor=2, state='running' 2019-05-23 18:50:02,160 INFO (MainThread-118615) mgr.state='running' 2019-05-23 18:50:02,160 INFO (MainThread-118615) Feeding partition <itertools.chain object at 0x7ff49de182d0> into input queue <multiprocessing.queues.JoinableQueue object at 0x7ff48e5f22d0> 2019-05-23 18:50:30.041061: I tensorflow/core/distributed_runtime/master_session.cc:1192] Start master session 90084f8fef3f0cd9 with config: device_filters: "/job:ps" device_filters: "/job:worker/task:0" allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } isolate_session_state: true 2019-05-23 18:50:30,098 INFO (MainThread-121583) Waiting for model to be ready. Ready_for_local_init_op: Variables not initialized: global_step, dense/kernel, dense/bias, dense_1/kernel, dense_1/bias, dense_2/kernel, dense_2/bias, training/TFOptimizer/beta1_power, training/TFOptimizer/beta2_power, dense/kernel/Adam, dense/kernel/Adam_1, dense/bias/Adam, dense/bias/Adam_1, dense_1/kernel/Adam, dense_1/kernel/Adam_1, dense_1/bias/Adam, dense_1/bias/Adam_1, dense_2/kernel/Adam, dense_2/kernel/Adam_1, dense_2/bias/Adam, dense_2/bias/Adam_1, ready: None 2019-05-23 18:51:00.121986: I tensorflow/core/distributed_runtime/master_session.cc:1192] Start master session 55f754381ba9f6c4 with config: device_filters: "/job:ps" device_filters: "/job:worker/task:0" allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } isolate_session_state: true 2019-05-23 18:51:00,176 INFO (MainThread-121583) Waiting for model to be ready. Ready_for_local_init_op: Variables not initialized: global_step, dense/kernel, dense/bias, dense_1/kernel, dense_1/bias, dense_2/kernel, dense_2/bias, training/TFOptimizer/beta1_power, training/TFOptimizer/beta2_power, dense/kernel/Adam, dense/kernel/Adam_1, dense/bias/Adam, dense/bias/Adam_1, dense_1/kernel/Adam, dense_1/kernel/Adam_1, dense_1/bias/Adam, dense_1/bias/Adam_1, dense_2/kernel/Adam, dense_2/kernel/Adam_1, dense_2/bias/Adam, dense_2/bias/Adam_1, ready: None 2019-05-23 18:51:30.194164: I tensorflow/core/distributed_runtime/master_session.cc:1192] Start master session 920fab950aa69a50 with config: device_filters: "/job:ps" device_filters: "/job:worker/task:0" allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } isolate_session_state: true 2019-05-23 18:51:30,244 INFO (MainThread-121583) Waiting for model to be ready. Ready_for_local_init_op: Variables not initialized: global_step, dense/kernel, dense/bias, dense_1/kernel, dense_1/bias, dense_2/kernel, dense_2/bias, training/TFOptimizer/beta1_power, training/TFOptimizer/beta2_power, dense/kernel/Adam, dense/kernel/Adam_1, dense/bias/Adam, dense/bias/Adam_1, dense_1/kernel/Adam, dense_1/kernel/Adam_1, dense_1/bias/Adam, dense_1/bias/Adam_1, dense_2/kernel/Adam, dense_2/kernel/Adam_1, dense_2/bias/Adam, dense_2/bias/Adam_1, ready: None

PS Logs: `019-05-23 18:49:55,059 INFO (MainThread-39243) 0: ======== ps:0 ======== 2019-05-23 18:49:55,060 INFO (MainThread-39243) 0: Cluster spec: {'worker': ['10.90.28.232:34493', '10.90.28.252:38762'], 'ps': ['10.90.28.222:45450'], 'master': ['10.90.28.230:42065']} 2019-05-23 18:49:55,060 INFO (MainThread-39243) 0: Using CPU 2019-05-23 18:49:55.163831: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2200225000 Hz 2019-05-23 18:49:55.167500: I tensorflow/compiler/xla/service/service.cc:150] XLA service 0x7529890 executing computations on platform Host. Devices: 2019-05-23 18:49:55.167542: I tensorflow/compiler/xla/service/service.cc:158] StreamExecutor device (0): , 2019-05-23 18:49:55.185042: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job master -> {0 -> 10.90.28.230:42065} 2019-05-23 18:49:55.185069: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job ps -> {0 -> localhost:45450} 2019-05-23 18:49:55.185084: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job worker -> {0 -> 10.90.28.232:34493, 1 -> 10.90.28.252:38762} 2019-05-23 18:49:55.193529: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:391] Started server with target: grpc://localhost:45450

Spark Submit Command Line: --master yarn --queue default --conf spark.sql.autoBrodcastJoinThreshold=-1 --conf spark.yarn.executor.memoryOverhead=1g --conf spark.storage.memoryFraction=0.2 --conf spark.executor.memory =4g --conf spark.driver.memory=2g --conf spark.executor.instances=4 --conf spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=false --conf spark.yarn.maxAppAttempts=1 --conf spark.task.cpus=4

hdarabi commented 5 years ago

Thanks for posting. I am facing the same issue.

leewyang commented 5 years ago

@markromedia The model checkpoint file looks suspicious: /tmp/model-20190523184945/keras/keras_model.ckpt. This needs to be a path on a distributed filesystem, e.g. HDFS. Also, you will need to setup the LD_LIBRARY_PATH similar to this example to include the paths to the libhdfs.so and libjvm.so.

markromedia commented 5 years ago

@leewyang thanks for the quick response.

I have fixed the model directory to point to hdfs and also ensured the relevant dependencies are set up. I can now see the checkpoint data being written to hdfs.

Screen Shot 2019-05-23 at 8 51 44 PM

However, I am still seeing the feeding partitions issue.

I did notice that only on worker:0 was I seeing this error

2019-05-24 02:23:19,063 INFO (Thread-1-58711) Calling model_fn. 2019-05-24 02:23:19,966 INFO (Thread-1-58711) Done calling model_fn. 2019-05-24 02:23:20,010 INFO (MainThread-58711) Warm-starting with WarmStartSettings: WarmStartSettings(ckpt_to_initialize_from='hdfs://srv-3b-01-a05.iad1.trmr.io:8020/user/pipeline/tf/model-20190524022311/keras/keras_model.ckpt', vars_to_warm_start='.*', var_name_to_vocab_info={}, var_name_to_prev_var_name={}) 2019-05-24 02:23:20,010 INFO (MainThread-58711) Warm-starting from: ('hdfs://srv-3b-01-a05.iad1.trmr.io:8020/user/pipeline/tf/model-20190524022311/keras/keras_model.ckpt',) 2019-05-24 02:23:20,011 INFO (MainThread-58711) Warm-starting variable: dense_2/kernel; prev_var_name: Unchanged 2019-05-24 02:23:20,011 INFO (MainThread-58711) Warm-starting variable: dense/bias; prev_var_name: Unchanged 2019-05-24 02:23:20,011 INFO (MainThread-58711) Warm-starting variable: dense_2/bias; prev_var_name: Unchanged 2019-05-24 02:23:20,011 INFO (MainThread-58711) Warm-starting variable: dense_1/kernel; prev_var_name: Unchanged 2019-05-24 02:23:20,011 INFO (MainThread-58711) Warm-starting variable: dense_1/bias; prev_var_name: Unchanged 2019-05-24 02:23:20,011 INFO (MainThread-58711) Warm-starting variable: dense/kernel; prev_var_name: Unchanged 2019-05-24 02:23:20,176 INFO (MainThread-58711) Create CheckpointSaverHook. 2019-05-24 02:23:20,680 INFO (MainThread-58711) Graph was finalized. 2019-05-24 02:23:20.695314: I tensorflow/core/distributed_runtime/master_session.cc:1192] Start master session 0b1b43bf04aefbd1 with config: device_filters: "/job:ps" device_filters: "/job:worker/task:0" allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } isolate_session_state: true 2019-05-24 02:23:20,736 INFO (MainThread-58711) Waiting for model to be ready. Ready_for_local_init_op: Variables not initialized: global_step, dense/kernel, dense/bias, dense_1/kernel, dense_1/bias, dense_2/kernel, dense_2/bias, training/TFOptimizer/beta1_power, training/TFOptimizer/beta2_power, dense/kernel/Adam, dense/kernel/Adam_1, dense/bias/Adam, dense/bias/Adam_1, dense_1/kernel/Adam, dense_1/kernel/Adam_1, dense_1/bias/Adam, dense_1/bias/Adam_1, dense_2/kernel/Adam, dense_2/kernel/Adam_1, dense_2/bias/Adam, dense_2/bias/Adam_1, ready: None

On all the other workers, the last line of output is 2019-05-24 02:23:20,680 INFO (MainThread-58711) Graph was finalized.

This is after I added a sleep between cluster.run and cluster.train

leewyang commented 5 years ago

Missed this earlier:

    # train model
    estimator.train(input_fn=train_input_fn, max_steps=steps_per_epoch, `hooks=hooks)`

For distributed clusters, there is a special train_and_evalute method that has special handling for the cluster. Not sure if this will resolve your issue, but definitely needs to be used.

markromedia commented 5 years ago

@leewyang thanks for the pointer. I had it originally using train_and_evaluate, but switched it as part of a test. I have reverted it back. Now the workers are getting this error. The root cause seems obvious, but not sure what the solution is.

File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 329, in wrapper_fn_background wrapper_fn(args, context) File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 323, in wrapper_fn fn(args, context) File "/home/pipeline/src/tensor_on_spark.py", line 223, in tensor_main File "/usr/lib/python2.7/site-packages/tensorflow_estimator/python/estimator/training.py", line 471, in train_and_evaluate return executor.run() File "/usr/lib/python2.7/site-packages/tensorflow_estimator/python/estimator/training.py", line 638, in run getattr(self, task_to_run)() File "/usr/lib/python2.7/site-packages/tensorflow_estimator/python/estimator/training.py", line 648, in run_worker return self._start_distributed_training() File "/usr/lib/python2.7/site-packages/tensorflow_estimator/python/estimator/training.py", line 769, in _start_distributed_training self._start_std_server(config) File "/usr/lib/python2.7/site-packages/tensorflow_estimator/python/estimator/training.py", line 757, in _start_std_server protocol=config.protocol) File "/usr/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 148, in __init__ self._server = c_api.TF_NewServer(self._server_def.SerializeToString()) UnknownError: Could not start gRPC server

The server debug { "created":"@1558715126.277406418", "description":"No address added out of total 1 resolved", "file":"external/grpc/src/core/ext/transport/chttp2/server/chttp2_server.cc", "file_line":349, "referenced_errors":[ { "created":"@1558715126.277403818", "description":"Failed to add any wildcard listeners", "file":"external/grpc/src/core/lib/iomgr/tcp_server_posix.cc", "file_line":324, "referenced_errors":[ { "created":"@1558715126.277386284", "description":"Unable to configure socket", "fd":308, "file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc", "file_line":217, "referenced_errors":[ { "created":"@1558715126.277378899", "description":"OS Error", "errno":98, "file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc", "file_line":190, "os_error":"Address already in use", "syscall":"bind" } ] }, { "created":"@1558715126.277403095", "description":"Unable to configure socket", "fd":308, "file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc", "file_line":217, "referenced_errors":[ { "created":"@1558715126.277399992", "description":"OS Error", "errno":98, "file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc", "file_line":190, "os_error":"Address already in use", "syscall":"bind" } ] } ] } ] }

leewyang commented 5 years ago

That seems strange. Are there any errors before that (check all executors)? If not, can you find the equivalent of this line in your latest logs?

2019-05-23 18:49:57,229 INFO (MainThread-121583) TF_CONFIG environment variable: {u'environment': u'cloud', u'cluster': {u'ps': [u'10.90.28.222:45450'], u'worker': [u'10.90.28.232:34493', u'10.90.28.252:38762'], u'master': [u'10.90.28.230:42065']}, u'task': {u'index': 0, u'type': u'worker'}}

Also, does this happen consistently?

markromedia commented 5 years ago

Hey @leewyang, this is happening consistently after the switch to train_and_evaluate. It only happens on worker nodes. PS and master are fine.

Last line PS server 2019-05-24 18:28:07.105077: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:391] Started server with target: grpc://localhost:41595

Last Line master(prior to timeout feeding partitions) 2019-05-24 18:28:07.033971: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:391] Started server with target: grpc://localhost:43409

A worker log: 2019-05-24 18:28:05,258 INFO (MainThread-14329) connected to server at ('10.90.28.229', 41772) 2019-05-24 18:28:05,261 INFO (MainThread-14329) TFSparkNode.reserve: {'port': 38045, 'authkey': '\xd9\x92~\xb3\xbbgG\xc6\x99\xd9\xea\xe9)|#w', 'executor_id': 4, 'addr': '/tmp/pymp-64Fxnt/listener-nQSz2Q', 'tb_port': 0, 'task_index': 2, 'host': '10.90.28.218', 'tb_pid': 0, 'job_name': 'worker'} 2019-05-24 18:28:07,267 INFO (MainThread-14329) node: {'executor_id': 0, 'addr': ('10.90.28.223', 42840), 'task_index': 0, 'port': 41595, 'authkey': 'wk\xda\xd8\mI%\xbf\x04\xfd\t\xd2\xceE\t', 'host': '10.90.28.223', 'job_name': 'ps', 'tb_pid': 0, 'tb_port': 0} 2019-05-24 18:28:07,268 INFO (MainThread-14329) node: {'executor_id': 1, 'addr': '/tmp/pymp-mt8Bzx/listener-bJqnz0', 'task_index': 0, 'port': 43409, 'authkey': '\x16.<3W\xecM.\xbc\xd3\xf2\x91^\xcb\xef!', 'host': '10.90.28.232', 'job_name': 'master', 'tb_pid': 0, 'tb_port': 0} 2019-05-24 18:28:07,268 INFO (MainThread-14329) node: {'executor_id': 2, 'addr': '/tmp/pymp-86T28K/listener-mmvppQ', 'task_index': 0, 'port': 38230, 'authkey': '\xb6\x01\x8f\x90\xf1)E\x01\xafd\x83\x8e\xc8\xf7', 'host': '10.90.28.222', 'job_name': 'worker', 'tb_pid': 48272, 'tb_port': 37891} 2019-05-24 18:28:07,268 INFO (MainThread-14329) node: {'executor_id': 3, 'addr': '/tmp/pymp-bajb7y/listener-wQAq_S', 'task_index': 1, 'port': 40710, 'authkey': '\x8f\x91i\xff\xa6FF\xb3\xb8\x89\x8akj\x9a\xf7\xd5', 'host': '10.90.28.230', 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0} 2019-05-24 18:28:07,268 INFO (MainThread-14329) node: {'executor_id': 4, 'addr': '/tmp/pymp-64Fxnt/listener-nQSz2Q', 'task_index': 2, 'port': 38045, 'authkey': '\xd9\x92~\xb3\xbbgG\xc6\x99\xd9\xea\xe9)|#w', 'host': '10.90.28.218', 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0} 2019-05-24 18:28:07,268 INFO (MainThread-14329) node: {'executor_id': 5, 'addr': '/tmp/pymp-a4Ecu4/listener-mJYsKh', 'task_index': 3, 'port': 33337, 'authkey': '\xbcJ{b\x06\xe3J{\x92P\x1e\xd0:\xab\xfc\xac', 'host': '10.90.28.199', 'job_name': 'worker', 'tb_pid': 0, 'tb_port': 0} 2019-05-24 18:28:07,269 INFO (MainThread-14329) export TF_CONFIG: {"environment": "cloud", "cluster": {"worker": ["10.90.28.222:38230", "10.90.28.230:40710", "10.90.28.218:38045", "10.90.28.199:33337"], "ps": ["10.90.28.223:41595"], "master": ["10.90.28.232:43409"]}, "task": {"index": 2, "type": "worker"}} 2019-05-24 18:28:07,269 INFO (MainThread-14329) Starting TensorFlow worker:2 as worker on cluster node 4 on background process 19/05/24 18:28:07 INFO PythonRunner: Times: total = 3440, boot = -33179, init = 34590, finish = 2029 19/05/24 18:28:07 INFO Executor: Finished task 4.0 in stage 72.0 (TID 1159). 1461 bytes result sent to driver 2019-05-24 18:28:07,281 INFO (MainThread-15957) 4: ======== worker:2 ======== 2019-05-24 18:28:07,282 INFO (MainThread-15957) 4: Cluster spec: {'worker': ['10.90.28.222:38230', '10.90.28.230:40710', '10.90.28.218:38045', '10.90.28.199:33337'], 'ps': ['10.90.28.223:41595'], 'master': ['10.90.28.232:43409']} 2019-05-24 18:28:07,282 INFO (MainThread-15957) 4: Using CPU 2019-05-24 18:28:07.307769: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2200145000 Hz 2019-05-24 18:28:07.308582: I tensorflow/compiler/xla/service/service.cc:150] XLA service 0x6ccfaf0 executing computations on platform Host. Devices: 2019-05-24 18:28:07.308610: I tensorflow/compiler/xla/service/service.cc:158] StreamExecutor device (0): , 2019-05-24 18:28:07.310340: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job master -> {0 -> 10.90.28.232:43409} 2019-05-24 18:28:07.310358: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job ps -> {0 -> 10.90.28.223:41595} 2019-05-24 18:28:07.310370: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job worker -> {0 -> 10.90.28.222:38230, 1 -> 10.90.28.230:40710, 2 -> localhost:38045, 3 -> 10.90.28.199:33337} 2019-05-24 18:28:07.312116: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:391] Started server with target: grpc://localhost:38045 2019-05-24 18:28:07,361 WARNING (MainThread-15957) From /usr/lib/python2.7/site-packages/tensorflow/python/ops/resource_variable_ops.py:435: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version. Instructions for updating: Colocations handled automatically by placer. 2019-05-24 18:28:07,380 WARNING (MainThread-15957) From /usr/lib/python2.7/site-packages/tensorflow/python/keras/layers/core.py:143: calling dropout (from tensorflow.python.ops.nn_ops) with keep_prob is deprecated and will be removed in a future version. Instructions for updating: Please use rate instead of keep_prob. Rate should be set to rate = 1 - keep_prob.


Layer (type) Output Shape Param #

dense (Dense) (None, 64) 11200


dropout (Dropout) (None, 64) 0


dense_1 (Dense) (None, 64) 4160


dropout_1 (Dropout) (None, 64) 0


dense_2 (Dense) (None, 1) 65

Total params: 15,425 Trainable params: 15,425 Non-trainable params: 0


WARNING: The TensorFlow contrib module will not be included in TensorFlow 2.0. For more information, please see:

2019-05-24 18:28:08,512 INFO (MainThread-15957) ParameterServerStrategy with compute_devices = ('/replica:0/task:0/device:CPU:0',), variable_device = '/device:CPU:0' 2019-05-24 18:28:08,512 INFO (MainThread-15957) TF_CONFIG environment variable: {u'environment': u'cloud', u'cluster': {u'ps': [u'10.90.28.223:41595'], u'worker': [u'10.90.28.222:38230', u'10.90.28.230:40710', u'10.90.28.218:38045', u'10.90.28.199:33337'], u'master': [u'10.90.28.232:43409']}, u'task': {u'index': 2, u'type': u'worker'}} 2019-05-24 18:28:08,512 INFO (MainThread-15957) Initializing RunConfig with distribution strategies. 2019-05-24 18:28:08,513 INFO (MainThread-15957) Not using Distribute Coordinator. 2019-05-24 18:28:08,514 INFO (MainThread-15957) Using the Keras model provided. 19/05/24 18:28:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19/05/24 18:28:10 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-05-24 18:28:10,993 WARNING (MainThread-15957) From /usr/lib/python2.7/site-packages/tensorflow/python/ops/math_ops.py:3066: to_int32 (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version. Instructions for updating: Use tf.cast instead. 2019-05-24 18:28:12,610 INFO (MainThread-15957) Using config: {'_save_checkpoints_secs': 600, '_session_config': device_filters: "/job:ps" device_filters: "/job:worker/task:2" allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_task_type': u'worker', '_train_distribute': <tensorflow.contrib.distribute.python.parameter_server_strategy.ParameterServerStrategy object at 0x9b227d0>, '_is_chief': False, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x9b22610>, '_model_dir': 'hdfs://srv-3b-01-a05.iad1.trmr.io:8020/user/pipeline/tf/model-20190524182803', '_protocol': None, '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_service': None, '_num_ps_replicas': 1, '_tf_random_seed': None, '_save_summary_steps': 100, '_device_fn': None, '_experimental_distribute': None, '_num_worker_replicas': 5, '_task_id': 2, '_log_step_count_steps': 100, '_evaluation_master': '', '_eval_distribute': <tensorflow.contrib.distribute.python.parameter_server_strategy.ParameterServerStrategy object at 0x9b227d0>, '_global_id_in_cluster': 3, '_master': u'grpc://10.90.28.218:38045', '_distribute_coordinator_mode': None} ----start: generating test data----- 19/05/24 18:28:25 INFO CoarseGrainedExecutorBackend: Got assigned task 1165 19/05/24 18:28:25 INFO Executor: Running task 0.0 in stage 73.0 (TID 1165) 19/05/24 18:28:25 INFO TorrentBroadcast: Started reading broadcast variable 110 19/05/24 18:28:25 INFO MemoryStore: Block broadcast_110_piece0 stored as bytes in memory (estimated size 934.7 KB, free 1971.7 MB) 19/05/24 18:28:25 INFO TorrentBroadcast: Reading broadcast variable 110 took 32 ms 19/05/24 18:28:25 INFO MemoryStore: Block broadcast_110 stored as values in memory (estimated size 6.9 MB, free 1964.9 MB) 19/05/24 18:28:25 INFO BlockManager: Found block rdd_412_0 locally 19/05/24 18:28:25 INFO CodeGenerator: Code generated in 19.735624 ms 19/05/24 18:28:26 INFO Executor: Finished task 0.0 in stage 73.0 (TID 1165). 3195 bytes result sent to driver 19/05/24 18:28:26 INFO CoarseGrainedExecutorBackend: Got assigned task 1167 19/05/24 18:28:26 INFO Executor: Running task 6.0 in stage 73.0 (TID 1167) 19/05/24 18:28:26 INFO BlockManager: Found block rdd_412_6 locally 19/05/24 18:28:26 INFO Executor: Finished task 6.0 in stage 73.0 (TID 1167). 3238 bytes result sent to driver 19/05/24 18:28:39 INFO CoarseGrainedExecutorBackend: Got assigned task 1177 19/05/24 18:28:39 INFO Executor: Running task 4.0 in stage 74.0 (TID 1177) 19/05/24 18:28:39 INFO MapOutputTrackerWorker: Updating epoch to 37 and clearing cache 19/05/24 18:28:39 INFO TorrentBroadcast: Started reading broadcast variable 111 19/05/24 18:28:39 INFO MemoryStore: Block broadcast_111_piece0 stored as bytes in memory (estimated size 17.0 KB, free 1964.8 MB) 19/05/24 18:28:39 INFO TorrentBroadcast: Reading broadcast variable 111 took 7 ms 19/05/24 18:28:39 INFO MemoryStore: Block broadcast_111 stored as values in memory (estimated size 52.4 KB, free 1964.8 MB) 19/05/24 18:28:39 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 36, fetching them 19/05/24 18:28:39 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@srv-01-11-b09.iad1.trmr.io:39097) 19/05/24 18:28:39 INFO MapOutputTrackerWorker: Got the output locations 19/05/24 18:28:39 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty blocks out of 12 blocks 19/05/24 18:28:39 INFO ShuffleBlockFetcherIterator: Started 4 remote fetches in 1 ms 2019-05-24 18:28:39,854 INFO (MainThread-14334) Connected to TFSparkNode.mgr on 10.90.28.218, executor=4, state='running' 2019-05-24 18:28:39,859 INFO (MainThread-14334) mgr.state='running' 2019-05-24 18:28:39,859 INFO (MainThread-14334) Feeding partition <itertools.chain object at 0x21f29d0> into input queue <multiprocessing.queues.JoinableQueue object at 0x692fa10> 2019-05-24 18:28:41,098 INFO (MainThread-14334) Processed 486 items in partition 19/05/24 18:28:41 INFO PythonRunner: Times: total = 1368, boot = -59392, init = 59504, finish = 1256 19/05/24 18:28:41 INFO Executor: Finished task 4.0 in stage 74.0 (TID 1177). 1960 bytes result sent to driver 19/05/24 18:28:41 INFO CoarseGrainedExecutorBackend: Got assigned task 1180 19/05/24 18:28:41 INFO Executor: Running task 7.0 in stage 74.0 (TID 1180) 19/05/24 18:28:41 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty blocks out of 12 blocks 19/05/24 18:28:41 INFO ShuffleBlockFetcherIterator: Started 4 remote fetches in 0 ms 2019-05-24 18:28:41,158 INFO (MainThread-15611) Connected to TFSparkNode.mgr on 10.90.28.218, executor=4, state='running' 2019-05-24 18:28:41,164 INFO (MainThread-15611) mgr.state='running' 2019-05-24 18:28:41,164 INFO (MainThread-15611) Feeding partition <itertools.chain object at 0x21f29d0> into input queue <multiprocessing.queues.JoinableQueue object at 0x692fa10> 2019-05-24 18:28:42,412 INFO (MainThread-15611) Processed 486 items in partition 19/05/24 18:28:42 INFO PythonRunner: Times: total = 1299, boot = -50174, init = 50211, finish = 1262 19/05/24 18:28:42 INFO Executor: Finished task 7.0 in stage 74.0 (TID 1180). 1960 bytes result sent to driver 19/05/24 18:28:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1183 19/05/24 18:28:42 INFO Executor: Running task 10.0 in stage 74.0 (TID 1183) 19/05/24 18:28:42 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty blocks out of 12 blocks 19/05/24 18:28:42 INFO ShuffleBlockFetcherIterator: Started 4 remote fetches in 1 ms 2019-05-24 18:28:42,441 INFO (MainThread-14329) Connected to TFSparkNode.mgr on 10.90.28.218, executor=4, state='running' 2019-05-24 18:28:42,447 INFO (MainThread-14329) mgr.state='running' 2019-05-24 18:28:42,448 INFO (MainThread-14329) Feeding partition <itertools.chain object at 0x5b49fd0> into input queue <multiprocessing.queues.JoinableQueue object at 0x692fa10> ----finish: generating test data----- 2019-05-24 18:28:48,987 INFO (MainThread-15957) Not using Distribute Coordinator. 2019-05-24 18:28:48,988 INFO (MainThread-15957) Start Tensorflow server. E0524 18:28:48.990061198 15957 server_chttp2.cc:40] {"created":"@1558722528.989990794","description":"No address added out of total 1 resolved","file":"external/grpc/src/core/ext/transport/chttp2/server/chttp2_server.cc","file_line":349,"referenced_errors":[{"created":"@1558722528.989986968","description":"Failed to add any wildcard listeners","file":"external/grpc/src/core/lib/iomgr/tcp_server_posix.cc","file_line":324,"referenced_errors":[{"created":"@1558722528.989968573","description":"Unable to configure socket","fd":354,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc","file_line":217,"referenced_errors":[{"created":"@1558722528.989955983","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc","file_line":190,"os_error":"Address already in use","syscall":"bind"}]},{"created":"@1558722528.989986472","description":"Unable to configure socket","fd":354,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc","file_line":217,"referenced_errors":[{"created":"@1558722528.989982267","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.cc","file_line":190,"os_error":"Address already in use","syscall":"bind"}]}]}]} 2019-05-24 18:28:48.990134: E tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:466] Unknown: Could not start gRPC server

The cluster spec in the logs is Cluster spec: {'worker': ['10.90.28.222:38230', '10.90.28.230:40710', '10.90.28.218:38045', '10.90.28.199:33337'], 'ps': ['10.90.28.223:41595'], 'master': ['10.90.28.232:43409']}

leewyang commented 5 years ago

I don't see anything obvious that would explain why you're seeing that error. I see about a 20 second gap between generating data and starting the tensorflow server. If there are a lot of other jobs on your cluster, I wonder if they're binding to the socket that we've "reserved" for tensorflow. If you can minimize/eliminate this gap, it might resolve that problem.

Otherwise, have you been able to successfully run any of the other (non-DistributionStrategy) examples on your cluster?

markromedia commented 5 years ago

@leewyang the 20 seconds was introduced by me to see if the cluster needed some "warmup" time before starting to train. I took it out.

I noticed this part of the logs (after changing master->chief)

2019-05-28 20:27:07,886 INFO (MainThread-37335) Runningtrain_and_evaluatewith Distribute Coordinator. 2019-05-28 20:27:07,888 INFO (MainThread-37335) Running Distribute Coordinator with mode = 'independent_worker', cluster_spec = {u'ps': [u'10.90.28.223:42563'], u'chief': [u'10.90.28.222:37941'], u'worker': [u'10.90.28.199:33546', u'10.90.28.252:41471', u'10.90.28.230:46375', u'10.90.28.253:41595']}, task_type = u'worker', task_id = 2, environment = u'cloud', rpc_layer = 'grpc' 2019-05-28 20:27:07,890 INFO (MainThread-37335) Multi-worker ParameterServerStrategy with cluster_spec = {u'ps': [u'10.90.28.223:42563'], u'chief': [u'10.90.28.222:37941'], u'worker': [u'10.90.28.199:33546', u'10.90.28.252:41471', u'10.90.28.230:46375', u'10.90.28.253:41595']}, task_type = u'worker', task_id = 2, num_ps_replicas = 1, is_chief = False, compute_devices = ('/job:worker/replica:0/task:2/device:CPU:0',), variable_device = <bound method _ReplicaDeviceChooser.device_function of <tensorflow.python.training.device_setter._ReplicaDeviceChooser object at 0x7f1277cb7cd0>> 2019-05-28 20:27:07,892 INFO (MainThread-37335) Starting standard TensorFlow server, target = u'grpc://10.90.28.230:46375', session_config= allow_soft_placement: true

It seems like TF doesn't see that the cluster is already up. It's reading the TF_CONFIG variable correctly, but then attempting to start a TF node using the values from TF_CONFIG. This I believe leads to the address already in use issue.

I verified that there is an instance running on that port prior to TF attempting to start one.

Is it possible I am missing a config which tells TF not to run the distribute_coordinate.run_standard_tensorflow_server() bit of code?

leewyang commented 5 years ago

That's strange that you're seeing a TF instance already bound to the port. Are you instantiating a tf.train.Server() or invoking TFNode.start_cluster_server() in your code?

markromedia commented 5 years ago

@leewyang Thanks to all of your pointers, I think I finally have this sorted. There was indeed a TFNode.start_cluster_server(ctx) inside of my main_func. Once I removed this, I saw the TF itself started up the servers using the provided TF_CONFIG.

One thing I did notice though was that the parameter server was not started, so I had to do this bit of code to get it to work.

    if job_name == "ps":
        cluster, server = TFNode.start_cluster_server(ctx)
        server.join()
    elif job_name == "worker" or job_name == "chief":
        worker_main(args, ctx)

Not sure if that's the correct way to do it, but it seems to work.

Thanks again for all the help. One last thing, how do I get access to https://groups.google.com/forum/#!forum/TensorFlowOnSpark-users?

leewyang commented 5 years ago

Glad you got it working. You shouldn't need to start the ps node yourself... you might just need to set this parameter. As for the group, there should just be a button to apply to join the group.