yahoo / TensorFlowOnSpark

TensorFlowOnSpark brings TensorFlow programs to Apache Spark clusters.
Apache License 2.0
3.87k stars 937 forks source link

Couldn't able to save the model file while using Keras example with a local dataset. #398

Closed vamsinimmala1992 closed 5 years ago

vamsinimmala1992 commented 5 years ago

I am using a keras example with TensorflowOnSpark. I am trying to save the model file to the hdfs location I specify as args.

With no exception or errors in the Yarn Log the process have been completed, BUT I DONT SEE ANY MODEL FILE SAVED IN THE HDFS.

below is the code and the log. Please check.

Also I am not inputting any validation data in the fit_generator method. is it mandatory?

from __future__ import print_function

def main_fun(args, ctx):
    import numpy
    import os
    import tensorflow as tf
    from tensorflow.python import keras
    from tensorflow.python.keras import backend as K
    from tensorflow.python.keras.datasets import mnist
    from tensorflow.python.keras.models import Sequential, load_model, save_model
    from tensorflow.python.keras.layers import Dense, Dropout
    from tensorflow.python.keras.optimizers import RMSprop
    from tensorflow.python.keras.callbacks import LambdaCallback, TensorBoard
    from tensorflow.python.saved_model import builder as saved_model_builder
    from tensorflow.python.saved_model import tag_constants
    from tensorflow.python.saved_model.signature_def_utils_impl import predict_signature_def
    from tensorflowonspark import TFNode

    cluster, server = TFNode.start_cluster_server(ctx)

    if ctx.job_name == "ps":
        server.join()
    elif ctx.job_name == "worker":

        def generate_rdd_data(tf_feed, batch_size):
            print("generate_rdd_data invoked")
            while True:
                batch = tf_feed.next_batch(batch_size)
                feature_vector = []
                lbls = []
                for item in batch:
                    feature_vector.append(item[0])
                    lbls.append(item[1])
                features = numpy.array(feature_vector).astype('float32')
                labels = numpy.stack(lbls).astype('float32')
                yield (features, labels)

        with tf.device(tf.train.replica_device_setter(
          worker_device="/job:worker/task:%d" % ctx.task_index,
          cluster=cluster)):

            batch_size = 100
            num_classes = 14
            # args.mode == 'spark':
            x_train = tf.placeholder(tf.float32, [None, 28047], name="x_train")
            y_train = tf.placeholder(tf.float32, [None, 14], name="y_train")

            model = Sequential()
            model.add(Dense(512, activation='relu', input_shape=(28047,)))
            model.add(Dropout(0.2))
            model.add(Dense(512, activation='relu'))
            model.add(Dropout(0.2))
            model.add(Dense(14, activation='softmax'))

            model.summary()

            model.compile(loss='categorical_crossentropy',
                          optimizer=tf.train.RMSPropOptimizer(learning_rate=0.001),
                          metrics=['accuracy'])

        saver = tf.train.Saver()

        with tf.Session(server.target) as sess:
            K.set_session(sess)

            def save_checkpoint(epoch, logs=None):
                if epoch == 1:
                    tf.train.write_graph(sess.graph.as_graph_def(), args.model_dir, 'graph.pbtxt')
                saver.save(sess, os.path.join(args.model_dir, 'model.ckpt'), global_step=epoch * args.steps_per_epoch)

            #ckpt_callback = LambdaCallback(on_epoch_end=save_checkpoint)
            #tb_callback = TensorBoard(log_dir=args.model_dir, histogram_freq=1, write_graph=True, write_images=True)

            # Add callbacks to save model checkpoint and tensorboard events (on worker:0 only)
            #callbacks = [ckpt_callback, tb_callback] if ctx.task_index == 0 else None

            # args.input_mode == 'spark':
            #  train on data read from a generator which is producing data from a Spark RDD
            tf_feed = TFNode.DataFeed(ctx.mgr)
            model.fit_generator(generator=generate_rdd_data(tf_feed, batch_size),
                                steps_per_epoch=args.steps_per_epoch,
                                epochs=args.epochs,
                                verbose=1,
                                callbacks=None)

            if args.export_dir and ctx.job_name == 'worker' and ctx.task_index == 0:
                # save a local Keras model, so we can reload it with an inferencing learning_phase
                save_model(model, "tmp_model")

                # reload the model
                K.set_learning_phase(False)
                new_model = load_model("tmp_model")

                # export a saved_model for inferencing
                builder = saved_model_builder.SavedModelBuilder(args.export_dir)
                signature = predict_signature_def(inputs={'fetures': new_model.input},
                                                  outputs={'scores': new_model.output})
                builder.add_meta_graph_and_variables(sess=sess,
                                                     tags=[tag_constants.SERVING],
                                                     signature_def_map={'predict': signature},
                                                     clear_devices=True)
                builder.save()

            if args.input_mode == 'spark':
                tf_feed.terminate()

if __name__ == '__main__':
    import argparse
    from pyspark.context import SparkContext
    from pyspark.conf import SparkConf
    from tensorflowonspark import TFCluster
    import keras

    sc = SparkContext(conf=SparkConf().setAppName("PhaseOneModelling"))
    executors = sc._conf.get("spark.executor.instances")
    num_executors = int(executors) if executors is not None else 1
    num_ps = 1

    parser = argparse.ArgumentParser()
    parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
    parser.add_argument("--epochs", help="number of epochs of training data", type=int, default=20)
    parser.add_argument("--export_dir", help="directory to export saved_model")
    parser.add_argument("--data", help="HDFS path to data in parallelized CSV format")
    # parser.add_argument("--input_mode", help="input mode (tf|spark)", default="tf")
    parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized CSV format")
    parser.add_argument("--model_dir", help="directory to write model checkpoints")
    parser.add_argument("--num_ps", help="number of ps nodes", type=int, default=1)
    parser.add_argument("--steps_per_epoch", help="number of steps per epoch", type=int, default=100)
    parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")

    args = parser.parse_args()
    print("args:", args)

    data = sc.textFile(args.data)
    data = data.map(lambda l: l.encode("UTF8", "ignore").split('\t'))

    labels = data.map(lambda x: x[1])
    data = data.map(lambda x: x[19:28066])

    header = data.first()
    data = data.filter(lambda line: line != header)
    label_header = labels.first()
    labels = labels.filter(lambda line: line != label_header)

    # convert values to float
    convertToFloat = lambda data: [float(str(x)) for x in data]
    dataset = data.map(convertToFloat)
    labels = labels.map(lambda x: float(x))
    labels = labels.map(lambda x: keras.utils.to_categorical(x, num_classes=14))

    # Split the data for train and validation
    #testRDD, trainRDD = data.randomSplit(weights=[0.001, 0.999], seed=42)
    #testlabelRDD, trainlabelRDD = labels.randomSplit(weights=[0.001, 0.999], seed=42)

    dataRDD = dataset.zip(labels)

    #dataRDD = dataRDD.sample(False, 0.01, 42)
    #trainRDD = trainRDD.zip(trainlabelRDD)

    cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard,
                            TFCluster.InputMode.SPARK, log_dir=args.model_dir)
    cluster.train(dataRDD, args.epochs)

    cluster.shutdown()

And the log as follows:

Using TensorFlow backend.
19/03/03 22:48:13 INFO SparkContext: Running Spark version 2.3.0.2.6.5.0-292
19/03/03 22:48:13 INFO SparkContext: Submitted application: PhaseOneModelling
19/03/03 22:48:13 INFO SecurityManager: Changing view acls to: Surya@..com
19/03/03 22:48:13 INFO SecurityManager: Changing modify acls to: Surya@..com
19/03/03 22:48:13 INFO SecurityManager: Changing view acls groups to:
19/03/03 22:48:13 INFO SecurityManager: Changing modify acls groups to:
19/03/03 22:48:13 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(Surya@..com); groups with view permissions: Se                                                t(); users  with modify permissions: Set(Surya@..com); groups with modify permissions: Set()
19/03/03 22:48:13 INFO Utils: Successfully started service 'sparkDriver' on port 44164.
19/03/03 22:48:13 INFO SparkEnv: Registering MapOutputTracker
19/03/03 22:48:13 INFO SparkEnv: Registering BlockManagerMaster
19/03/03 22:48:13 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/03/03 22:48:13 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/03/03 22:48:13 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-966d6dac-7f6b-4411-91e2-7b4c07185c8d
19/03/03 22:48:13 INFO MemoryStore: MemoryStore started with capacity 153.4 GB
19/03/03 22:48:13 INFO SparkEnv: Registering OutputCommitCoordinator
19/03/03 22:48:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/03/03 22:48:14 INFO Utils: Successfully started service 'SparkUI' on port 4041.
19/03/03 22:48:14 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://instance-2026033.ipa.ba..com:4041
19/03/03 22:48:15 INFO RMProxy: Connecting to ResourceManager at instance-2026030.ipa.ba..com/10.28.26.30:8050
19/03/03 22:48:15 INFO Client: Requesting a new application from cluster with 14 NodeManagers
19/03/03 22:48:15 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (202752 MB per container)
19/03/03 22:48:15 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
19/03/03 22:48:15 INFO Client: Setting up container launch context for our AM
19/03/03 22:48:15 INFO Client: Setting up the launch environment for our AM container
19/03/03 22:48:15 INFO Client: Preparing resources for our AM container
19/03/03 22:48:15 INFO HadoopFSDelegationTokenProvider: getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-438946188_31, ugi=Surya@..com (auth:KERBEROS)]]
19/03/03 22:48:16 INFO KerberosName: Non-simple name Surya@..com after auth_to_local rule RULE:[1:$1@$0](.*@..com)/L
19/03/03 22:48:16 INFO DFSClient: Created HDFS_DELEGATION_TOKEN token 6651 for Surya@..com on 10.28.26.29:8020
19/03/03 22:48:19 INFO Client: Use hdfs cache file as spark.yarn.archive for HDP, hdfsCacheFile:hdfs://instance-2026029.ipa.ba..com:8020/hdp/apps/2.6.5.0-292/spark2/spark2-hdp-yarn-archive                                                .tar.gz
19/03/03 22:48:19 INFO Client: Source and destination file systems are the same. Not copying hdfs://instance-2026029.ipa.ba..com:8020/hdp/apps/2.6.5.0-292/spark2/spark2-hdp-yarn-archive.ta                                                r.gz
19/03/03 22:48:19 INFO Client: Uploading resource file:/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip -> hdfs://instance-2026029.ipa.ba..com:8020/user/Surya@..com/.sparkStagin                                                g/application_1551114784635_0177/pyspark.zip
19/03/03 22:48:19 INFO Client: Uploading resource file:/usr/hdp/2.6.5.0-292/spark2/python/lib/py4j-0.10.6-src.zip -> hdfs://instance-2026029.ipa.ba..com:8020/user/Surya@..com/.spa                                                rkStaging/application_1551114784635_0177/py4j-0.10.6-src.zip
19/03/03 22:48:19 INFO Client: Uploading resource file:/tmp/spark-2c81098d-4f91-4a1d-87ea-71ecf1c72204/__spark_conf__626918427791303293.zip -> hdfs://instance-2026029.ipa.ba..com:8020/user                                                /Surya@..com/.sparkStaging/application_1551114784635_0177/__spark_conf__.zip
19/03/03 22:48:19 INFO SecurityManager: Changing view acls to: Surya@..com
19/03/03 22:48:19 INFO SecurityManager: Changing modify acls to: Surya@..com
19/03/03 22:48:19 INFO SecurityManager: Changing view acls groups to:
19/03/03 22:48:19 INFO SecurityManager: Changing modify acls groups to:
19/03/03 22:48:19 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(Surya@..com); groups with view permissions: Se                                                t(); users  with modify permissions: Set(Surya@..com); groups with modify permissions: Set()
19/03/03 22:48:19 INFO Client: Submitting application application_1551114784635_0177 to ResourceManager
19/03/03 22:48:22 INFO YarnClientImpl: Submitted application application_1551114784635_0177
19/03/03 22:48:22 INFO SchedulerExtensionServices: Starting Yarn extension services with app application_1551114784635_0177 and attemptId None
19/03/03 22:48:23 INFO Client: Application report for application_1551114784635_0177 (state: ACCEPTED)
19/03/03 22:48:23 INFO Client:
         client token: Token { kind: YARN_CLIENT_TOKEN, service:  }
         diagnostics: AM container is launched, waiting for AM container to Register with RM
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: production
         start time: 1551671302538
         final status: UNDEFINED
         tracking URL: https://instance-2026030.ipa.ba..com:8090/proxy/application_1551114784635_0177/
         user: Surya@..com
19/03/03 22:48:24 INFO Client: Application report for application_1551114784635_0177 (state: ACCEPTED)
19/03/03 22:48:25 INFO Client: Application report for application_1551114784635_0177 (state: ACCEPTED)
19/03/03 22:48:26 INFO Client: Application report for application_1551114784635_0177 (state: ACCEPTED)
19/03/03 22:48:27 INFO Client: Application report for application_1551114784635_0177 (state: ACCEPTED)
19/03/03 22:48:28 INFO Client: Application report for application_1551114784635_0177 (state: ACCEPTED)
19/03/03 22:48:29 INFO Client: Application report for application_1551114784635_0177 (state: ACCEPTED)
19/03/03 22:48:30 INFO Client: Application report for application_1551114784635_0177 (state: ACCEPTED)
19/03/03 22:48:31 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> instance-2026030.ipa.ba..com, PROXY_URI_                                                BASES -> https://instance-2026030.ipa.ba..com:8090/proxy/application_1551114784635_0177), /proxy/application_1551114784635_0177
19/03/03 22:48:31 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
19/03/03 22:48:31 INFO Client: Application report for application_1551114784635_0177 (state: ACCEPTED)
19/03/03 22:48:31 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
19/03/03 22:48:32 INFO Client: Application report for application_1551114784635_0177 (state: RUNNING)
19/03/03 22:48:32 INFO Client:
         client token: Token { kind: YARN_CLIENT_TOKEN, service:  }
         diagnostics: N/A
         ApplicationMaster host: 10.28.26.40
         ApplicationMaster RPC port: 0
         queue: production
         start time: 1551671302538
         final status: UNDEFINED
         tracking URL: https://instance-2026030.ipa.ba..com:8090/proxy/application_1551114784635_0177/
         user: Surya@..com
19/03/03 22:48:32 INFO YarnClientSchedulerBackend: Application application_1551114784635_0177 has started running.
19/03/03 22:48:32 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 38256.
19/03/03 22:48:32 INFO NettyBlockTransferService: Server created on instance-2026033.ipa.ba..com:38256
19/03/03 22:48:32 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/03/03 22:48:32 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, instance-2026033.ipa.ba..com, 38256, None)
19/03/03 22:48:32 INFO BlockManagerMasterEndpoint: Registering block manager instance-2026033.ipa.ba..com:38256 with 153.4 GB RAM, BlockManagerId(driver, instance-2026033.ipa.ba..com, 382                                                56, None)
19/03/03 22:48:32 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, instance-2026033.ipa.ba..com, 38256, None)
19/03/03 22:48:32 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, instance-2026033.ipa.ba..com, 38256, None)
19/03/03 22:48:33 INFO EventLoggingListener: Logging events to hdfs:/spark2-history/application_1551114784635_0177
19/03/03 22:48:38 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.28.26.39:36672) with ID 3
19/03/03 22:48:38 INFO BlockManagerMasterEndpoint: Registering block manager instance-2026039.ipa.ba..com:33711 with 153.4 GB RAM, BlockManagerId(3, instance-2026039.ipa.ba..com, 33711, N                                                one)
19/03/03 22:48:39 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.28.26.38:49710) with ID 2
19/03/03 22:48:39 INFO BlockManagerMasterEndpoint: Registering block manager instance-2026038.ipa.ba..com:45476 with 153.4 GB RAM, BlockManagerId(2, instance-2026038.ipa.ba..com, 45476, N                                                one)
19/03/03 22:48:39 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.28.26.45:36844) with ID 1
19/03/03 22:48:39 INFO BlockManagerMasterEndpoint: Registering block manager instance-2026045.ipa.ba..com:45485 with 153.4 GB RAM, BlockManagerId(1, instance-2026045.ipa.ba..com, 45485, N                                                one)
19/03/03 22:48:39 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.28.26.46:58368) with ID 4
19/03/03 22:48:39 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.28.26.43:54760) with ID 6
19/03/03 22:48:39 INFO BlockManagerMasterEndpoint: Registering block manager instance-2026046.ipa.ba..com:41346 with 153.4 GB RAM, BlockManagerId(4, instance-2026046.ipa.ba..com, 41346, N                                                one)
19/03/03 22:48:40 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
19/03/03 22:48:40 INFO BlockManagerMasterEndpoint: Registering block manager instance-2026043.ipa.ba..com:33242 with 153.4 GB RAM, BlockManagerId(6, instance-2026043.ipa.ba..com, 33242, N                                                one)
args: Namespace(cluster_size=6, data='/user/imagen.admins/NormalizedAugustData/Wide/ReducedFeatures/wide_august_tf_idf_normalized_with_col_ReducedFeatures.tsv', epochs=5, export_dir='/tmp/m                                                ss/TensorflowOnSpark/export_dir/', labels=None, model_dir='/tmp/mss/TensorflowOnSpark/model_dir', num_ps=1, steps_per_epoch=2622, tensorboard=False)
19/03/03 22:48:40 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 466.7 KB, free 153.4 GB)
19/03/03 22:48:40 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 34.0 KB, free 153.4 GB)
19/03/03 22:48:40 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on instance-2026033.ipa.ba..com:38256 (size: 34.0 KB, free: 153.4 GB)
19/03/03 22:48:40 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:0
19/03/03 22:48:40 INFO KerberosName: Non-simple name Surya@..com after auth_to_local rule RULE:[1:$1@$0](.*@..com)/L
19/03/03 22:48:40 INFO DFSClient: Created HDFS_DELEGATION_TOKEN token 6652 for Surya@..com on 10.28.26.29:8020
19/03/03 22:48:40 INFO TokenCache: Got dt for hdfs://instance-2026029.ipa.ba..com:8020; Kind: HDFS_DELEGATION_TOKEN, Service: 10.28.26.29:8020, Ident: (HDFS_DELEGATION_TOKEN token 6652 fo                                                r Surya@..com)
19/03/03 22:48:40 INFO FileInputFormat: Total input paths to process : 1
19/03/03 22:48:40 INFO SparkContext: Starting job: runJob at PythonRDD.scala:141
19/03/03 22:48:40 INFO DAGScheduler: Got job 0 (runJob at PythonRDD.scala:141) with 1 output partitions
19/03/03 22:48:40 INFO DAGScheduler: Final stage: ResultStage 0 (runJob at PythonRDD.scala:141)
19/03/03 22:48:40 INFO DAGScheduler: Parents of final stage: List()
19/03/03 22:48:40 INFO DAGScheduler: Missing parents: List()
19/03/03 22:48:40 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at RDD at PythonRDD.scala:48), which has no missing parents
19/03/03 22:48:40 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.4 KB, free 153.4 GB)
19/03/03 22:48:40 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.0 KB, free 153.4 GB)
19/03/03 22:48:40 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on instance-2026033.ipa.ba..com:38256 (size: 4.0 KB, free: 153.4 GB)
19/03/03 22:48:40 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
19/03/03 22:48:40 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (PythonRDD[2] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0))
19/03/03 22:48:40 INFO YarnScheduler: Adding task set 0.0 with 1 tasks
19/03/03 22:48:40 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, instance-2026046.ipa.ba..com, executor 4, partition 0, RACK_LOCAL, 8020 bytes)
19/03/03 22:48:41 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.28.26.37:34390) with ID 5
19/03/03 22:48:41 INFO BlockManagerMasterEndpoint: Registering block manager instance-2026037.ipa.ba..com:45022 with 153.4 GB RAM, BlockManagerId(5, instance-2026037.ipa.ba..com, 45022, N                                                one)
19/03/03 22:48:41 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on instance-2026046.ipa.ba..com:41346 (size: 4.0 KB, free: 153.4 GB)
19/03/03 22:48:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on instance-2026046.ipa.ba..com:41346 (size: 34.0 KB, free: 153.4 GB)
19/03/03 22:48:43 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2887 ms on instance-2026046.ipa.ba..com (executor 4) (1/1)
19/03/03 22:48:43 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/03/03 22:48:43 INFO DAGScheduler: ResultStage 0 (runJob at PythonRDD.scala:141) finished in 3.100 s
19/03/03 22:48:43 INFO DAGScheduler: Job 0 finished: runJob at PythonRDD.scala:141, took 3.163148 s
19/03/03 22:48:43 INFO SparkContext: Starting job: runJob at PythonRDD.scala:141
19/03/03 22:48:43 INFO DAGScheduler: Got job 1 (runJob at PythonRDD.scala:141) with 1 output partitions
19/03/03 22:48:43 INFO DAGScheduler: Final stage: ResultStage 1 (runJob at PythonRDD.scala:141)
19/03/03 22:48:43 INFO DAGScheduler: Parents of final stage: List()
19/03/03 22:48:43 INFO DAGScheduler: Missing parents: List()
19/03/03 22:48:43 INFO DAGScheduler: Submitting ResultStage 1 (PythonRDD[3] at RDD at PythonRDD.scala:48), which has no missing parents
19/03/03 22:48:43 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 6.3 KB, free 153.4 GB)
19/03/03 22:48:43 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.0 KB, free 153.4 GB)
19/03/03 22:48:43 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on instance-2026033.ipa.ba..com:38256 (size: 4.0 KB, free: 153.4 GB)
19/03/03 22:48:43 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1039
19/03/03 22:48:43 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (PythonRDD[3] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0))
19/03/03 22:48:43 INFO YarnScheduler: Adding task set 1.0 with 1 tasks
19/03/03 22:48:43 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, instance-2026037.ipa.ba..com, executor 5, partition 0, NODE_LOCAL, 8020 bytes)
19/03/03 22:48:44 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on instance-2026037.ipa.ba..com:45022 (size: 4.0 KB, free: 153.4 GB)
19/03/03 22:48:44 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on instance-2026037.ipa.ba..com:45022 (size: 34.0 KB, free: 153.4 GB)
19/03/03 22:48:45 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 2121 ms on instance-2026037.ipa.ba..com (executor 5) (1/1)
19/03/03 22:48:45 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
19/03/03 22:48:45 INFO DAGScheduler: ResultStage 1 (runJob at PythonRDD.scala:141) finished in 2.129 s
19/03/03 22:48:45 INFO DAGScheduler: Job 1 finished: runJob at PythonRDD.scala:141, took 2.132856 s
2019-03-03 22:48:46,218 INFO (MainThread-75262) Reserving TFSparkNodes
2019-03-03 22:48:46,218 INFO (MainThread-75262) cluster_template: {'ps': [0], 'worker': [1, 2, 3, 4, 5]}
2019-03-03 22:48:46,219 INFO (MainThread-75262) listening for reservations at ('10.28.26.33', 42213)
2019-03-03 22:48:46,219 INFO (MainThread-75262) Starting TensorFlow on executors
2019-03-03 22:48:46,224 INFO (MainThread-75262) Waiting for TFSparkNodes to start
2019-03-03 22:48:46,224 INFO (MainThread-75262) waiting for 6 reservations
19/03/03 22:48:46 INFO SparkContext: Starting job: foreachPartition at /usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.py:301
19/03/03 22:48:46 INFO DAGScheduler: Got job 2 (foreachPartition at /usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.py:301) with 6 output partitions
19/03/03 22:48:46 INFO DAGScheduler: Final stage: ResultStage 2 (foreachPartition at /usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.py:301)
19/03/03 22:48:46 INFO DAGScheduler: Parents of final stage: List()
19/03/03 22:48:46 INFO DAGScheduler: Missing parents: List()
19/03/03 22:48:46 INFO DAGScheduler: Submitting ResultStage 2 (PythonRDD[8] at foreachPartition at /usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.py:301), which has no missing                                                 parents
19/03/03 22:48:46 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 17.4 KB, free 153.4 GB)
19/03/03 22:48:46 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 12.5 KB, free 153.4 GB)
19/03/03 22:48:46 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on instance-2026033.ipa.ba..com:38256 (size: 12.5 KB, free: 153.4 GB)
19/03/03 22:48:46 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1039
19/03/03 22:48:46 INFO DAGScheduler: Submitting 6 missing tasks from ResultStage 2 (PythonRDD[8] at foreachPartition at /usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.py:301)                                                 (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5))
19/03/03 22:48:46 INFO YarnScheduler: Adding task set 2.0 with 6 tasks
19/03/03 22:48:46 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, instance-2026039.ipa.ba..com, executor 3, partition 0, PROCESS_LOCAL, 7869 bytes)
19/03/03 22:48:46 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 3, instance-2026046.ipa.ba..com, executor 4, partition 1, PROCESS_LOCAL, 7869 bytes)
19/03/03 22:48:46 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID 4, instance-2026038.ipa.ba..com, executor 2, partition 2, PROCESS_LOCAL, 7869 bytes)
19/03/03 22:48:46 INFO TaskSetManager: Starting task 3.0 in stage 2.0 (TID 5, instance-2026037.ipa.ba..com, executor 5, partition 3, PROCESS_LOCAL, 7869 bytes)
19/03/03 22:48:46 INFO TaskSetManager: Starting task 4.0 in stage 2.0 (TID 6, instance-2026043.ipa.ba..com, executor 6, partition 4, PROCESS_LOCAL, 7869 bytes)
19/03/03 22:48:46 INFO TaskSetManager: Starting task 5.0 in stage 2.0 (TID 7, instance-2026045.ipa.ba..com, executor 1, partition 5, PROCESS_LOCAL, 7869 bytes)
19/03/03 22:48:46 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on instance-2026037.ipa.ba..com:45022 (size: 12.5 KB, free: 153.4 GB)
19/03/03 22:48:46 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on instance-2026046.ipa.ba..com:41346 (size: 12.5 KB, free: 153.4 GB)
19/03/03 22:48:46 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on instance-2026039.ipa.ba..com:33711 (size: 12.5 KB, free: 153.4 GB)
19/03/03 22:48:46 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on instance-2026038.ipa.ba..com:45476 (size: 12.5 KB, free: 153.4 GB)
19/03/03 22:48:46 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on instance-2026045.ipa.ba..com:45485 (size: 12.5 KB, free: 153.4 GB)
19/03/03 22:48:46 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on instance-2026043.ipa.ba..com:33242 (size: 12.5 KB, free: 153.4 GB)
2019-03-03 22:48:47,226 INFO (MainThread-75262) waiting for 6 reservations
2019-03-03 22:48:48,226 INFO (MainThread-75262) waiting for 4 reservations
2019-03-03 22:48:49,228 INFO (MainThread-75262) all reservations completed
2019-03-03 22:48:49,228 INFO (MainThread-75262) All TFSparkNodes started
2019-03-03 22:48:49,228 INFO (MainThread-75262) {'executor_id': 3, 'addr': '/tmp/pymp-Nm3GK_/listener-psN9ka', 'task_index': 2, 'job_name': 'worker', 'authkey': '\x19\x83\xec\x1e\xacNM\x16\                                                x89\xc0\xa0,\x9b\x87$\xd3', 'host': '10.28.26.37', 'port': 34616, 'tb_pid': 0, 'tb_port': 0}
2019-03-03 22:48:49,228 INFO (MainThread-75262) {'executor_id': 1, 'addr': '/tmp/pymp-anFXbv/listener-gYfcyl', 'task_index': 0, 'job_name': 'worker', 'authkey': '\xca\xfe\xf8+k\xbfC\n\xbfI\                                                x19\xc7=\xefR\x17', 'host': '10.28.26.46', 'port': 35676, 'tb_pid': 0, 'tb_port': 0}
2019-03-03 22:48:49,228 INFO (MainThread-75262) {'executor_id': 2, 'addr': '/tmp/pymp-Ws00Ww/listener-J06992', 'task_index': 1, 'job_name': 'worker', 'authkey': '?\xc8\xef\xde\x98\xb3EB\x8f                                                O\x80\x89\xeb\xff\x83\x91', 'host': '10.28.26.38', 'port': 34594, 'tb_pid': 0, 'tb_port': 0}
2019-03-03 22:48:49,228 INFO (MainThread-75262) {'executor_id': 0, 'addr': ('10.28.26.39', 45967), 'task_index': 0, 'job_name': 'ps', 'authkey': '\xce\x1b$\xbeg6@T\xb0q\xb8I\x04\xc5\x1a\r'                                                , 'host': '10.28.26.39', 'port': 39537, 'tb_pid': 0, 'tb_port': 0}
2019-03-03 22:48:49,229 INFO (MainThread-75262) {'executor_id': 4, 'addr': '/tmp/pymp-9cxRDb/listener-Vfnobx', 'task_index': 3, 'job_name': 'worker', 'authkey': '\xad\xf6\x10\xb6\x9f\x13O\x                                                c1\xbb\xb0\xd6\x85\xb3e\x16\xee', 'host': '10.28.26.43', 'port': 35727, 'tb_pid': 0, 'tb_port': 0}
2019-03-03 22:48:49,229 INFO (MainThread-75262) {'executor_id': 5, 'addr': '/tmp/pymp-C7cs51/listener-KKXXmn', 'task_index': 4, 'job_name': 'worker', 'authkey': ';\xbe"\xd73\xcdLD\xa559Z\xc                                                b{\x92\x92', 'host': '10.28.26.45', 'port': 46460, 'tb_pid': 0, 'tb_port': 0}
2019-03-03 22:48:49,229 INFO (MainThread-75262) Feeding training data
19/03/03 22:48:49 INFO SparkContext: Starting job: collect at PythonRDD.scala:153
19/03/03 22:48:49 INFO DAGScheduler: Got job 3 (collect at PythonRDD.scala:153) with 600 output partitions
19/03/03 22:48:49 INFO DAGScheduler: Final stage: ResultStage 3 (collect at PythonRDD.scala:153)
19/03/03 22:48:49 INFO DAGScheduler: Parents of final stage: List()
19/03/03 22:48:49 INFO DAGScheduler: Missing parents: List()
19/03/03 22:48:49 INFO DAGScheduler: Submitting ResultStage 3 (PythonRDD[10] at RDD at PythonRDD.scala:48), which has no missing parents
19/03/03 22:48:49 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 389.2 KB, free 153.4 GB)
19/03/03 22:48:49 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 242.0 KB, free 153.4 GB)
19/03/03 22:48:49 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on instance-2026033.ipa.ba..com:38256 (size: 242.0 KB, free: 153.4 GB)
19/03/03 22:48:49 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1039
19/03/03 22:48:49 INFO DAGScheduler: Submitting 600 missing tasks from ResultStage 3 (PythonRDD[10] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5,                                                 6, 7, 8, 9, 10, 11, 12, 13, 14))
19/03/03 22:48:49 INFO YarnScheduler: Adding task set 3.0 with 600 tasks
19/03/03 22:48:49 INFO TaskSetManager: Finished task 5.0 in stage 2.0 (TID 7) in 3490 ms on instance-2026045.ipa.ba..com (executor 1) (1/6)
19/03/03 22:48:49 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 8, instance-2026037.ipa.ba..com, executor 5, partition 0, NODE_LOCAL, 8440 bytes)
19/03/03 22:48:49 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID 5) in 3569 ms on instance-2026037.ipa.ba..com (executor 5) (2/6)
19/03/03 22:48:49 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on instance-2026037.ipa.ba..com:45022 (size: 242.0 KB, free: 153.4 GB)
19/03/03 22:48:50 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 3) in 3867 ms on instance-2026046.ipa.ba..com (executor 4) (3/6)
19/03/03 22:48:50 INFO TaskSetManager: Starting task 3.0 in stage 3.0 (TID 9, instance-2026038.ipa.ba..com, executor 2, partition 3, NODE_LOCAL, 8440 bytes)
19/03/03 22:48:50 INFO TaskSetManager: Finished task 2.0 in stage 2.0 (TID 4) in 4240 ms on instance-2026038.ipa.ba..com (executor 2) (4/6)
19/03/03 22:48:50 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on instance-2026038.ipa.ba..com:45476 (size: 242.0 KB, free: 153.4 GB)
19/03/03 22:48:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on instance-2026038.ipa.ba..com:45476 (size: 34.0 KB, free: 153.4 GB)
19/03/03 22:48:50 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 10, instance-2026043.ipa.ba..com, executor 6, partition 1, NODE_LOCAL, 8440 bytes)
19/03/03 22:48:50 INFO TaskSetManager: Finished task 4.0 in stage 2.0 (TID 6) in 4423 ms on instance-2026043.ipa.ba..com (executor 6) (5/6)
19/03/03 22:48:50 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on instance-2026043.ipa.ba..com:33242 (size: 242.0 KB, free: 153.4 GB)
19/03/03 22:48:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on instance-2026043.ipa.ba..com:33242 (size: 34.0 KB, free: 153.4 GB)
19/03/03 22:48:54 INFO TaskSetManager: Starting task 2.0 in stage 3.0 (TID 11, instance-2026046.ipa.ba..com, executor 4, partition 2, RACK_LOCAL, 8440 bytes)
19/03/03 22:48:54 INFO TaskSetManager: Starting task 4.0 in stage 3.0 (TID 12, instance-2026045.ipa.ba..com, executor 1, partition 4, RACK_LOCAL, 8440 bytes)
19/03/03 22:48:54 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on instance-2026045.ipa.ba..com:45485 (size: 242.0 KB, free: 153.4 GB)
19/03/03 22:48:54 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on instance-2026046.ipa.ba..com:41346 (size: 242.0 KB, free: 153.4 GB)
19/03/03 22:48:54 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on instance-2026045.ipa.ba..com:45485 (size: 34.0 KB, free: 153.4 GB)
19/03/03 22:49:35 INFO TaskSetManager: Starting task 5.0 in stage 3.0 (TID 13, instance-2026038.ipa.ba..com, executor 2, partition 5, NODE_LOCAL, 8440 bytes)
19/03/03 22:49:35 INFO TaskSetManager: Finished task 3.0 in stage 3.0 (TID 9) in 45462 ms on instance-2026038.ipa.ba..com (executor 2) (1/600)
19/03/03 22:49:36 INFO TaskSetManager: Starting task 6.0 in stage 3.0 (TID 14, instance-2026037.ipa.ba..com, executor 5, partition 6, NODE_LOCAL, 8440 bytes)
19/03/03 22:49:36 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 8) in 46239 ms on instance-2026037.ipa.ba..com (executor 5) (2/600)
19/03/03 22:49:36 INFO TaskSetManager: Starting task 7.0 in stage 3.0 (TID 15, instance-2026043.ipa.ba..com, executor 6, partition 7, NODE_LOCAL, 8440 bytes)
19/03/03 22:49:36 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 10) in 45790 ms on instance-2026043.ipa.ba..com (executor 6) (3/600)
19/03/03 22:49:39 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID 11) in 44513 ms on instance-2026046.ipa.ba..com (executor 4) (4/600)
19/03/03 22:49:39 INFO TaskSetManager: Starting task 8.0 in stage 3.0 (TID 16, instance-2026046.ipa.ba..com, executor 4, partition 8, RACK_LOCAL, 8440 bytes)
19/03/03 22:49:40 INFO TaskSetManager: Starting task 9.0 in stage 3.0 (TID 17, instance-2026045.ipa.ba..com, executor 1, partition 9, RACK_LOCAL, 8440 bytes)
19/03/03 22:49:40 INFO TaskSetManager: Finished task 4.0 in stage 3.0 (TID 12) in 45791 ms on instance-2026045.ipa.ba..com (executor 1) (5/600)
19/03/03 22:50:15 INFO TaskSetManager: Starting task 21.0 in stage 3.0 (TID 18, instance-2026038.ipa.ba..com, executor 2, partition 21, NODE_LOCAL, 8440 bytes)
19/03/03 22:50:15 INFO TaskSetManager: Finished task 5.0 in stage 3.0 (TID 13) in 39597 ms on instance-2026038.ipa.ba..com (executor 2) (6/600)
19/03/03 22:50:16 INFO TaskSetManager: Starting task 11.0 in stage 3.0 (TID 19, instance-2026043.ipa.ba..com, executor 6, partition 11, NODE_LOCAL, 8440 bytes)
19/03/03 22:50:16 INFO TaskSetManager: Finished task 7.0 in stage 3.0 (TID 15) in 39619 ms on instance-2026043.ipa.ba..com (executor 6) (7/600)
19/03/03 22:50:17 INFO TaskSetManager: Starting task 10.0 in stage 3.0 (TID 20, instance-2026037.ipa.ba..com, executor 5, partition 10, NODE_LOCAL, 8440 bytes)
19/03/03 22:50:17 INFO TaskSetManager: Finished task 6.0 in stage 3.0 (TID 14) in 41346 ms on instance-2026037.ipa.ba..com (executor 5) (8/600)
19/03/03 22:50:19 INFO TaskSetManager: Finished task 8.0 in stage 3.0 (TID 16) in 39872 ms on instance-2026046.ipa.ba..com (executor 4) (9/600)
19/03/03 22:50:20 INFO TaskSetManager: Starting task 12.0 in stage 3.0 (TID 21, instance-2026046.ipa.ba..com, executor 4, partition 12, RACK_LOCAL, 8440 bytes)
19/03/03 22:50:24 INFO TaskSetManager: Starting task 13.0 in stage 3.0 (TID 22, instance-2026045.ipa.ba..com, executor 1, partition 13, RACK_LOCAL, 8440 bytes)
19/03/03 22:50:24 INFO TaskSetManager: Finished task 9.0 in stage 3.0 (TID 17) in 43885 ms on instance-2026045.ipa.ba..com (executor 1) (10/600)
19/03/03 22:50:54 INFO TaskSetManager: Starting task 26.0 in stage 3.0 (TID 23, instance-2026038.ipa.ba..com, executor 2, partition 26, NODE_LOCAL, 8440 bytes)
19/03/03 22:50:54 INFO TaskSetManager: Finished task 21.0 in stage 3.0 (TID 18) in 39348 ms on instance-2026038.ipa.ba..com (executor 2) (11/600)
19/03/03 22:50:55 INFO TaskSetManager: Starting task 14.0 in stage 3.0 (TID 24, instance-2026043.ipa.ba..com, executor 6, partition 14, NODE_LOCAL, 8440 bytes)
19/03/03 22:50:55 INFO TaskSetManager: Finished task 11.0 in stage 3.0 (TID 19) in 39112 ms on instance-2026043.ipa.ba..com (executor 6) (12/600)
19/03/03 22:50:58 INFO TaskSetManager: Starting task 24.0 in stage 3.0 (TID 25, instance-2026037.ipa.ba..com, executor 5, partition 24, NODE_LOCAL, 8440 bytes)
19/03/03 22:50:58 INFO TaskSetManager: Finished task 10.0 in stage 3.0 (TID 20) in 41423 ms on instance-2026037.ipa.ba..com (executor 5) (13/600)
19/03/03 22:51:00 INFO TaskSetManager: Finished task 12.0 in stage 3.0 (TID 21) in 39971 ms on instance-2026046.ipa.ba..com (executor 4) (14/600)
19/03/03 22:51:02 INFO TaskSetManager: Starting task 15.0 in stage 3.0 (TID 26, instance-2026046.ipa.ba..com, executor 4, partition 15, RACK_LOCAL, 8440 bytes)
19/03/03 22:51:03 INFO TaskSetManager: Starting task 16.0 in stage 3.0 (TID 27, instance-2026045.ipa.ba..com, executor 1, partition 16, RACK_LOCAL, 8440 bytes)
19/03/03 22:51:03 INFO TaskSetManager: Finished task 13.0 in stage 3.0 (TID 22) in 39738 ms on instance-2026045.ipa.ba..com (executor 1) (15/600)
19/03/03 22:51:34 INFO TaskSetManager: Starting task 32.0 in stage 3.0 (TID 28, instance-2026043.ipa.ba..com, executor 6, partition 32, NODE_LOCAL, 8440 bytes)
19/03/03 22:51:34 INFO TaskSetManager: Finished task 14.0 in stage 3.0 (TID 24) in 39273 ms on instance-2026043.ipa.ba..com (executor 6) (16/600)
19/03/03 22:51:34 INFO TaskSetManager: Starting task 31.0 in stage 3.0 (TID 29, instance-2026038.ipa.ba..com, executor 2, partition 31, NODE_LOCAL, 8440 bytes)
19/03/03 22:51:34 INFO TaskSetManager: Finished task 26.0 in stage 3.0 (TID 23) in 39756 ms on instance-2026038.ipa.ba..com (executor 2) (17/600)
19/03/03 22:51:40 INFO TaskSetManager: Starting task 25.0 in stage 3.0 (TID 30, instance-2026037.ipa.ba..com, executor 5, partition 25, NODE_LOCAL, 8440 bytes)
19/03/03 22:51:40 INFO TaskSetManager: Finished task 24.0 in stage 3.0 (TID 25) in 41287 ms on instance-2026037.ipa.ba..com (executor 5) (18/600)
19/03/03 22:51:42 INFO TaskSetManager: Finished task 15.0 in stage 3.0 (TID 26) in 39507 ms on instance-2026046.ipa.ba..com (executor 4) (19/600)
19/03/03 22:51:43 INFO TaskSetManager: Starting task 17.0 in stage 3.0 (TID 31, instance-2026045.ipa.ba..com, executor 1, partition 17, RACK_LOCAL, 8440 bytes)
19/03/03 22:51:43 INFO TaskSetManager: Finished task 16.0 in stage 3.0 (TID 27) in 39474 ms on instance-2026045.ipa.ba..com (executor 1) (20/600)
19/03/04 00:07:54 INFO TaskSetManager: Starting task 541.0 in stage 3.0 (TID 583, instance-2026038.ipa.ba..com, executor 2, partition 541, RACK_LOCAL, 8440 bytes)
19/03/04 00:07:54 INFO TaskSetManager: Finished task 533.0 in stage 3.0 (TID 579) in 40271 ms on instance-2026038.ipa.ba..com (executor 2) (571/600)
19/03/04 00:07:57 INFO TaskSetManager: Starting task 543.0 in stage 3.0 (TID 584, instance-2026037.ipa.ba..com, executor 5, partition 543, RACK_LOCAL, 8440 bytes)
19/03/04 00:07:57 INFO TaskSetManager: Finished task 532.0 in stage 3.0 (TID 578) in 44489 ms on instance-2026037.ipa.ba..com (executor 5) (572/600)
19/03/04 00:08:12 INFO TaskSetManager: Starting task 544.0 in stage 3.0 (TID 585, instance-2026043.ipa.ba..com, executor 6, partition 544, RACK_LOCAL, 8440 bytes)
19/03/04 00:08:12 INFO TaskSetManager: Finished task 534.0 in stage 3.0 (TID 580) in 40645 ms on instance-2026043.ipa.ba..com (executor 6) (573/600)
19/03/04 00:08:17 INFO TaskSetManager: Starting task 545.0 in stage 3.0 (TID 586, instance-2026045.ipa.ba..com, executor 1, partition 545, RACK_LOCAL, 8440 bytes)
19/03/04 00:08:17 INFO TaskSetManager: Finished task 535.0 in stage 3.0 (TID 581) in 40536 ms on instance-2026045.ipa.ba..com (executor 1) (574/600)
19/03/04 00:08:31 INFO TaskSetManager: Starting task 548.0 in stage 3.0 (TID 587, instance-2026046.ipa.ba..com, executor 4, partition 548, RACK_LOCAL, 8440 bytes)
19/03/04 00:08:31 INFO TaskSetManager: Finished task 540.0 in stage 3.0 (TID 582) in 38886 ms on instance-2026046.ipa.ba..com (executor 4) (575/600)
19/03/04 00:08:36 INFO TaskSetManager: Starting task 551.0 in stage 3.0 (TID 588, instance-2026038.ipa.ba..com, executor 2, partition 551, RACK_LOCAL, 8440 bytes)
19/03/04 00:08:36 INFO TaskSetManager: Finished task 541.0 in stage 3.0 (TID 583) in 41700 ms on instance-2026038.ipa.ba..com (executor 2) (576/600)
19/03/04 00:08:41 INFO TaskSetManager: Starting task 554.0 in stage 3.0 (TID 589, instance-2026037.ipa.ba..com, executor 5, partition 554, RACK_LOCAL, 8440 bytes)
19/03/04 00:08:41 INFO TaskSetManager: Finished task 543.0 in stage 3.0 (TID 584) in 44824 ms on instance-2026037.ipa.ba..com (executor 5) (577/600)
19/03/04 00:08:54 INFO TaskSetManager: Starting task 555.0 in stage 3.0 (TID 590, instance-2026043.ipa.ba..com, executor 6, partition 555, RACK_LOCAL, 8440 bytes)
19/03/04 00:08:54 INFO TaskSetManager: Finished task 544.0 in stage 3.0 (TID 585) in 41618 ms on instance-2026043.ipa.ba..com (executor 6) (578/600)
19/03/04 00:08:58 INFO TaskSetManager: Starting task 558.0 in stage 3.0 (TID 591, instance-2026045.ipa.ba..com, executor 1, partition 558, RACK_LOCAL, 8440 bytes)
19/03/04 00:08:58 INFO TaskSetManager: Finished task 545.0 in stage 3.0 (TID 586) in 40567 ms on instance-2026045.ipa.ba..com (executor 1) (579/600)
19/03/04 00:09:10 INFO TaskSetManager: Starting task 559.0 in stage 3.0 (TID 592, instance-2026046.ipa.ba..com, executor 4, partition 559, RACK_LOCAL, 8440 bytes)
19/03/04 00:09:10 INFO TaskSetManager: Finished task 548.0 in stage 3.0 (TID 587) in 39338 ms on instance-2026046.ipa.ba..com (executor 4) (580/600)
19/03/04 00:09:17 INFO TaskSetManager: Starting task 562.0 in stage 3.0 (TID 593, instance-2026038.ipa.ba..com, executor 2, partition 562, RACK_LOCAL, 8440 bytes)
19/03/04 00:09:17 INFO TaskSetManager: Finished task 551.0 in stage 3.0 (TID 588) in 41027 ms on instance-2026038.ipa.ba..com (executor 2) (581/600)
19/03/04 00:09:27 INFO TaskSetManager: Starting task 563.0 in stage 3.0 (TID 594, instance-2026037.ipa.ba..com, executor 5, partition 563, RACK_LOCAL, 8440 bytes)
19/03/04 00:09:27 INFO TaskSetManager: Finished task 554.0 in stage 3.0 (TID 589) in 45716 ms on instance-2026037.ipa.ba..com (executor 5) (582/600)
19/03/04 00:09:35 INFO TaskSetManager: Starting task 565.0 in stage 3.0 (TID 595, instance-2026043.ipa.ba..com, executor 6, partition 565, RACK_LOCAL, 8440 bytes)
19/03/04 00:09:35 INFO TaskSetManager: Finished task 555.0 in stage 3.0 (TID 590) in 41138 ms on instance-2026043.ipa.ba..com (executor 6) (583/600)
19/03/04 00:09:40 INFO TaskSetManager: Starting task 568.0 in stage 3.0 (TID 596, instance-2026045.ipa.ba..com, executor 1, partition 568, RACK_LOCAL, 8440 bytes)
19/03/04 00:09:40 INFO TaskSetManager: Finished task 558.0 in stage 3.0 (TID 591) in 42608 ms on instance-2026045.ipa.ba..com (executor 1) (584/600)
19/03/04 00:09:50 INFO TaskSetManager: Starting task 574.0 in stage 3.0 (TID 597, instance-2026046.ipa.ba..com, executor 4, partition 574, RACK_LOCAL, 8440 bytes)
19/03/04 00:09:50 INFO TaskSetManager: Finished task 559.0 in stage 3.0 (TID 592) in 39112 ms on instance-2026046.ipa.ba..com (executor 4) (585/600)
19/03/04 00:09:58 INFO TaskSetManager: Starting task 578.0 in stage 3.0 (TID 598, instance-2026038.ipa.ba..com, executor 2, partition 578, RACK_LOCAL, 8440 bytes)
19/03/04 00:09:58 INFO TaskSetManager: Finished task 562.0 in stage 3.0 (TID 593) in 41681 ms on instance-2026038.ipa.ba..com (executor 2) (586/600)
19/03/04 00:10:08 INFO TaskSetManager: Starting task 579.0 in stage 3.0 (TID 599, instance-2026037.ipa.ba..com, executor 5, partition 579, RACK_LOCAL, 8440 bytes)
19/03/04 00:10:08 INFO TaskSetManager: Finished task 563.0 in stage 3.0 (TID 594) in 41401 ms on instance-2026037.ipa.ba..com (executor 5) (587/600)
19/03/04 00:10:16 INFO TaskSetManager: Starting task 582.0 in stage 3.0 (TID 600, instance-2026043.ipa.ba..com, executor 6, partition 582, RACK_LOCAL, 8440 bytes)
19/03/04 00:10:16 INFO TaskSetManager: Finished task 565.0 in stage 3.0 (TID 595) in 41009 ms on instance-2026043.ipa.ba..com (executor 6) (588/600)
19/03/04 00:10:22 INFO TaskSetManager: Starting task 583.0 in stage 3.0 (TID 601, instance-2026045.ipa.ba..com, executor 1, partition 583, RACK_LOCAL, 8440 bytes)
19/03/04 00:10:22 INFO TaskSetManager: Finished task 568.0 in stage 3.0 (TID 596) in 41692 ms on instance-2026045.ipa.ba..com (executor 1) (589/600)
19/03/04 00:10:29 INFO TaskSetManager: Starting task 584.0 in stage 3.0 (TID 602, instance-2026046.ipa.ba..com, executor 4, partition 584, RACK_LOCAL, 8440 bytes)
19/03/04 00:10:29 INFO TaskSetManager: Finished task 574.0 in stage 3.0 (TID 597) in 39804 ms on instance-2026046.ipa.ba..com (executor 4) (590/600)
19/03/04 00:10:41 INFO TaskSetManager: Starting task 588.0 in stage 3.0 (TID 603, instance-2026038.ipa.ba..com, executor 2, partition 588, RACK_LOCAL, 8440 bytes)
19/03/04 00:10:41 INFO TaskSetManager: Finished task 578.0 in stage 3.0 (TID 598) in 42417 ms on instance-2026038.ipa.ba..com (executor 2) (591/600)
19/03/04 00:10:50 INFO TaskSetManager: Starting task 590.0 in stage 3.0 (TID 604, instance-2026037.ipa.ba..com, executor 5, partition 590, RACK_LOCAL, 8440 bytes)
19/03/04 00:10:50 INFO TaskSetManager: Finished task 579.0 in stage 3.0 (TID 599) in 41431 ms on instance-2026037.ipa.ba..com (executor 5) (592/600)
19/03/04 00:10:59 INFO TaskSetManager: Starting task 591.0 in stage 3.0 (TID 605, instance-2026043.ipa.ba..com, executor 6, partition 591, RACK_LOCAL, 8440 bytes)
19/03/04 00:10:59 INFO TaskSetManager: Finished task 582.0 in stage 3.0 (TID 600) in 42926 ms on instance-2026043.ipa.ba..com (executor 6) (593/600)
19/03/04 00:11:04 INFO TaskSetManager: Starting task 595.0 in stage 3.0 (TID 606, instance-2026045.ipa.ba..com, executor 1, partition 595, RACK_LOCAL, 8440 bytes)
19/03/04 00:11:04 INFO TaskSetManager: Finished task 583.0 in stage 3.0 (TID 601) in 42264 ms on instance-2026045.ipa.ba..com (executor 1) (594/600)
19/03/04 00:11:10 INFO TaskSetManager: Starting task 597.0 in stage 3.0 (TID 607, instance-2026046.ipa.ba..com, executor 4, partition 597, RACK_LOCAL, 8440 bytes)
19/03/04 00:11:10 INFO TaskSetManager: Finished task 584.0 in stage 3.0 (TID 602) in 40587 ms on instance-2026046.ipa.ba..com (executor 4) (595/600)
19/03/04 00:11:25 INFO TaskSetManager: Finished task 588.0 in stage 3.0 (TID 603) in 44005 ms on instance-2026038.ipa.ba..com (executor 2) (596/600)
19/03/04 00:11:32 INFO TaskSetManager: Finished task 590.0 in stage 3.0 (TID 604) in 42027 ms on instance-2026037.ipa.ba..com (executor 5) (597/600)
19/03/04 00:11:42 INFO TaskSetManager: Finished task 591.0 in stage 3.0 (TID 605) in 43176 ms on instance-2026043.ipa.ba..com (executor 6) (598/600)
19/03/04 00:11:47 INFO TaskSetManager: Finished task 595.0 in stage 3.0 (TID 606) in 43036 ms on instance-2026045.ipa.ba..com (executor 1) (599/600)
19/03/04 00:11:51 INFO TaskSetManager: Finished task 597.0 in stage 3.0 (TID 607) in 41030 ms on instance-2026046.ipa.ba..com (executor 4) (600/600)
19/03/04 00:11:51 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool
19/03/04 00:11:51 INFO DAGScheduler: ResultStage 3 (collect at PythonRDD.scala:153) finished in 4982.168 s
19/03/04 00:11:51 INFO DAGScheduler: Job 3 finished: collect at PythonRDD.scala:153, took 4982.215393 s
2019-03-04 00:11:51,504 INFO (MainThread-75262) Stopping TensorFlow nodes
19/03/04 00:11:51 INFO SparkContext: Starting job: collect at PythonRDD.scala:153
19/03/04 00:11:51 INFO DAGScheduler: Got job 4 (collect at PythonRDD.scala:153) with 5 output partitions
19/03/04 00:11:51 INFO DAGScheduler: Final stage: ResultStage 4 (collect at PythonRDD.scala:153)
19/03/04 00:11:51 INFO DAGScheduler: Parents of final stage: List()
19/03/04 00:11:51 INFO DAGScheduler: Missing parents: List()
19/03/04 00:11:51 INFO DAGScheduler: Submitting ResultStage 4 (PythonRDD[12] at RDD at PythonRDD.scala:48), which has no missing parents
19/03/04 00:11:51 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 7.4 KB, free 153.4 GB)
19/03/04 00:11:51 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 4.9 KB, free 153.4 GB)
19/03/04 00:11:51 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on instance-2026033.ipa.ba..com:38256 (size: 4.9 KB, free: 153.4 GB)
19/03/04 00:11:51 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1039
19/03/04 00:11:51 INFO DAGScheduler: Submitting 5 missing tasks from ResultStage 4 (PythonRDD[12] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4))
19/03/04 00:11:51 INFO YarnScheduler: Adding task set 4.0 with 5 tasks
19/03/04 00:11:51 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 608, instance-2026045.ipa.ba..com, executor 1, partition 0, PROCESS_LOCAL, 7869 bytes)
19/03/04 00:11:51 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 609, instance-2026037.ipa.ba..com, executor 5, partition 1, PROCESS_LOCAL, 7869 bytes)
19/03/04 00:11:51 INFO TaskSetManager: Starting task 2.0 in stage 4.0 (TID 610, instance-2026043.ipa.ba..com, executor 6, partition 2, PROCESS_LOCAL, 7869 bytes)
19/03/04 00:11:51 INFO TaskSetManager: Starting task 3.0 in stage 4.0 (TID 611, instance-2026046.ipa.ba..com, executor 4, partition 3, PROCESS_LOCAL, 7869 bytes)
19/03/04 00:11:51 INFO TaskSetManager: Starting task 4.0 in stage 4.0 (TID 612, instance-2026038.ipa.ba..com, executor 2, partition 4, PROCESS_LOCAL, 7869 bytes)
19/03/04 00:11:51 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on instance-2026038.ipa.ba..com:45476 (size: 4.9 KB, free: 153.4 GB)
19/03/04 00:11:51 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on instance-2026046.ipa.ba..com:41346 (size: 4.9 KB, free: 153.4 GB)
19/03/04 00:11:51 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on instance-2026043.ipa.ba..com:33242 (size: 4.9 KB, free: 153.4 GB)
19/03/04 00:11:51 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on instance-2026037.ipa.ba..com:45022 (size: 4.9 KB, free: 153.4 GB)
19/03/04 00:11:51 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on instance-2026045.ipa.ba..com:45485 (size: 4.9 KB, free: 153.4 GB)
19/03/04 00:11:51 INFO TaskSetManager: Finished task 4.0 in stage 4.0 (TID 612) in 37 ms on instance-2026038.ipa.ba..com (executor 2) (1/5)
19/03/04 00:11:51 INFO TaskSetManager: Finished task 3.0 in stage 4.0 (TID 611) in 40 ms on instance-2026046.ipa.ba..com (executor 4) (2/5)
19/03/04 00:11:51 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 610) in 41 ms on instance-2026043.ipa.ba..com (executor 6) (3/5)
19/03/04 00:11:51 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 609) in 42 ms on instance-2026037.ipa.ba..com (executor 5) (4/5)
19/03/04 00:11:51 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 608) in 44 ms on instance-2026045.ipa.ba..com (executor 1) (5/5)
19/03/04 00:11:51 INFO YarnScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool
19/03/04 00:11:51 INFO DAGScheduler: ResultStage 4 (collect at PythonRDD.scala:153) finished in 0.052 s
19/03/04 00:11:51 INFO DAGScheduler: Job 4 finished: collect at PythonRDD.scala:153, took 0.054342 s
2019-03-04 00:11:51,584 INFO (MainThread-75262) Shutting down cluster
19/03/04 00:11:54 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 4988571 ms on instance-2026039.ipa.ba..com (executor 3) (6/6)
19/03/04 00:11:54 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool
19/03/04 00:11:54 INFO DAGScheduler: ResultStage 2 (foreachPartition at /usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.py:301) finished in 4988.578 s
19/03/04 00:11:54 INFO DAGScheduler: Job 2 finished: foreachPartition at /usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.py:301, took 4988.581628 s
19/03/04 00:11:57 INFO SparkContext: Invoking stop() from shutdown hook
19/03/04 00:11:57 INFO SparkUI: Stopped Spark web UI at http://instance-2026033.ipa.ba..com:4041
19/03/04 00:11:57 INFO YarnClientSchedulerBackend: Interrupting monitor thread
19/03/04 00:11:58 INFO YarnClientSchedulerBackend: Shutting down all executors
19/03/04 00:11:58 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
19/03/04 00:11:58 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
19/03/04 00:11:58 INFO YarnClientSchedulerBackend: Stopped
19/03/04 00:11:58 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/03/04 00:11:58 INFO MemoryStore: MemoryStore cleared
19/03/04 00:11:58 INFO BlockManager: BlockManager stopped
19/03/04 00:11:58 INFO BlockManagerMaster: BlockManagerMaster stopped
19/03/04 00:11:58 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/03/04 00:11:58 INFO SparkContext: Successfully stopped SparkContext
19/03/04 00:11:58 INFO ShutdownHookManager: Shutdown hook called
19/03/04 00:11:58 INFO ShutdownHookManager: Deleting directory /tmp/spark-2c81098d-4f91-4a1d-87ea-71ecf1c72204
19/03/04 00:11:58 INFO ShutdownHookManager: Deleting directory /tmp/spark-2c81098d-4f91-4a1d-87ea-71ecf1c72204/pyspark-6a95ff4e-93cf-4c31-ac1a-002432e73cd1
19/03/04 00:11:58 INFO ShutdownHookManager: Deleting directory /tmp/spark-3d2e5cc9-f5d1-45a4-a387-50197a773614
[Surya@..com@instance-2026033 ~]$
vamsinimmala1992 commented 5 years ago

The Start parameters i specify is as follows

/usr/bin/spark-submit --master yarn --queue production --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=128G --driver-memory 48G --executor-memory 48G --num-executors 6 --executor-cores 1 --conf spark.network.timeout=10000000 --conf spark.executor.heartbeatInterval=100000 --conf spark.executor.memoryOverhead=64G --conf spark.sql.tungsten.enabled=true --conf spark.sql.pivotMaxValues=500000 keras_model.py --cluster_size 6 --epochs 5 --steps_per_epoch 300 --export_dir "/tmp/mss/TensorflowOnSpark/export_dir/" --data "/tmp/mss/tf_idf_normalized_with_col_ReducedFeatures.tsv" --model_dir "/tmp/mss/TensorflowOnSpark/model_dir/"

leewyang commented 5 years ago

You're most likely missing the path to libhdfs.so per this example

leewyang commented 5 years ago

Also, try using the following absolute paths:

--export_dir "hdfs://default/tmp/mss/TensorflowOnSpark/export_dir/"
--model_dir "hdfs://default/tmp/mss/TensorflowOnSpark/model_dir/"

... and ensure that you have write permissions to the directory. Note that default is a Tensorflow-specific shortcut to the namenode, so you don't have to explicitly specify it.

vamsinimmala1992 commented 5 years ago

Sorry this was accidentally closed.

I have the libhdfs.so path setup to environmnet variable, Also I am using the absoulte paths with full write permissions. Still I could see any files written to the directory. Is the above code and log correct?

vamsinimmala1992 commented 5 years ago

Hello Lee,

While setting up the libhdfs.so path on the spark cluster, Should i need to do it on every node?

Also is this naming convention specific LIB_HDFS, LIB_JVM? to tensorflowOnspark or tensorflow?

leewyang commented 5 years ago

The LD_LIBRARY_PATH needs to include paths to libhdfs.so and libjvm.so (and the CUDA libraries if using GPUs). This either needs to be set on ALL nodes in your cluster OR you can pass it into the Spark job via the --conf spark.executorEnv.LD_LIBRARY_PATH setting per the examples.

The LIB_HDFS and LIB_JVM were just used to differentiate the different required libraries (in case your environment had different paths)... they are not required to be explicitly set with those names, as long as you have those libraries in your LD_LIBRARY_PATH.

vamsinimmala1992 commented 5 years ago

Thank you so much for the clarification.

Now I get Timed out Exceptions from all the executors. Below is the log showing the Timed out exceptions. Could you please explain me the issue and let me know where am I going wrong.

/usr/bin/spark-submit 
--master yarn 
--queue production 
--conf spark.memory.offHeap.enabled=true 
--conf spark.memory.offHeap.size=128G 
--driver-memory 48G 
--executor-memory 48G 
--num-executors 6 
--executor-cores 1 
--conf spark.network.timeout=10000000 
--conf spark.executor.heartbeatInterval=100000 
--conf spark.executor.memoryOverhead=64G 
--conf spark.sql.tungsten.enabled=true 
--conf spark.sql.pivotMaxValues=500000 
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/hdp/2.6.5.0-292/usr/lib/libhdfs.so:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre/lib/amd64/server/libjvm.so" 
new_keras_model.py 
--cluster_size 6 
--epochs 5 
--steps_per_epoch 300 
--input_mode spark 
--export_dir "hdfs://default/tmp/mss/TensorflowOnSpark/export_dir/" 
--data "/tmp/mss/tf_idf_normalized_with_col_ReducedFeatures.tsv" 
--model_dir "hdfs://default/tmp/mss/TensorflowOnSpark/model_dir/"

And the Log as shown:

19/03/06 18:01:52 INFO TaskSetManager: Finished task 597.0 in stage 3.0 (TID 582) in 29622 ms on instance-2026040.ipa.ba.aaa.com (executor 1) (572/600)
19/03/06 18:01:58 INFO TaskSetManager: Starting task 535.0 in stage 3.0 (TID 587, instance-2026037.ipa.ba.aaa.com, executor 3, partition 535, RACK_LOCAL, 8440 bytes)
19/03/06 18:01:58 INFO TaskSetManager: Finished task 526.0 in stage 3.0 (TID 583) in 32846 ms on instance-2026037.ipa.ba.aaa.com (executor 3) (573/600)
19/03/06 18:01:58 INFO TaskSetManager: Starting task 542.0 in stage 3.0 (TID 588, instance-2026036.ipa.ba.aaa.com, executor 6, partition 542, RACK_LOCAL, 8440 bytes)
19/03/06 18:01:58 INFO TaskSetManager: Finished task 528.0 in stage 3.0 (TID 584) in 29519 ms on instance-2026036.ipa.ba.aaa.com (executor 6) (574/600)
19/03/06 18:02:16 INFO TaskSetManager: Starting task 545.0 in stage 3.0 (TID 589, instance-2026045.ipa.ba.aaa.com, executor 2, partition 545, RACK_LOCAL, 8440 bytes)
19/03/06 18:02:16 INFO TaskSetManager: Finished task 532.0 in stage 3.0 (TID 585) in 29252 ms on instance-2026045.ipa.ba.aaa.com (executor 2) (575/600)
19/03/06 18:02:21 INFO TaskSetManager: Starting task 546.0 in stage 3.0 (TID 590, instance-2026040.ipa.ba.aaa.com, executor 1, partition 546, RACK_LOCAL, 8440 bytes)
19/03/06 18:02:21 INFO TaskSetManager: Finished task 533.0 in stage 3.0 (TID 586) in 29573 ms on instance-2026040.ipa.ba.aaa.com (executor 1) (576/600)
19/03/06 18:02:28 INFO TaskSetManager: Starting task 547.0 in stage 3.0 (TID 591, instance-2026036.ipa.ba.aaa.com, executor 6, partition 547, RACK_LOCAL, 8440 bytes)
19/03/06 18:02:28 INFO TaskSetManager: Finished task 542.0 in stage 3.0 (TID 588) in 29311 ms on instance-2026036.ipa.ba.aaa.com (executor 6) (577/600)
19/03/06 18:02:31 INFO TaskSetManager: Starting task 551.0 in stage 3.0 (TID 592, instance-2026037.ipa.ba.aaa.com, executor 3, partition 551, RACK_LOCAL, 8440 bytes)
19/03/06 18:02:31 INFO TaskSetManager: Finished task 535.0 in stage 3.0 (TID 587) in 32400 ms on instance-2026037.ipa.ba.aaa.com (executor 3) (578/600)
19/03/06 18:02:46 INFO TaskSetManager: Starting task 553.0 in stage 3.0 (TID 593, instance-2026045.ipa.ba.aaa.com, executor 2, partition 553, RACK_LOCAL, 8440 bytes)
19/03/06 18:02:46 INFO TaskSetManager: Finished task 545.0 in stage 3.0 (TID 589) in 30436 ms on instance-2026045.ipa.ba.aaa.com (executor 2) (579/600)
19/03/06 18:02:51 INFO TaskSetManager: Starting task 559.0 in stage 3.0 (TID 594, instance-2026040.ipa.ba.aaa.com, executor 1, partition 559, RACK_LOCAL, 8440 bytes)
19/03/06 18:02:51 INFO TaskSetManager: Finished task 546.0 in stage 3.0 (TID 590) in 29440 ms on instance-2026040.ipa.ba.aaa.com (executor 1) (580/600)
19/03/06 18:02:58 INFO TaskSetManager: Starting task 560.0 in stage 3.0 (TID 595, instance-2026036.ipa.ba.aaa.com, executor 6, partition 560, RACK_LOCAL, 8440 bytes)
19/03/06 18:02:58 INFO TaskSetManager: Finished task 547.0 in stage 3.0 (TID 591) in 29924 ms on instance-2026036.ipa.ba.aaa.com (executor 6) (581/600)
19/03/06 18:03:03 INFO TaskSetManager: Starting task 561.0 in stage 3.0 (TID 596, instance-2026037.ipa.ba.aaa.com, executor 3, partition 561, RACK_LOCAL, 8440 bytes)
19/03/06 18:03:03 INFO TaskSetManager: Finished task 551.0 in stage 3.0 (TID 592) in 32520 ms on instance-2026037.ipa.ba.aaa.com (executor 3) (582/600)
19/03/06 18:03:16 INFO TaskSetManager: Starting task 566.0 in stage 3.0 (TID 597, instance-2026045.ipa.ba.aaa.com, executor 2, partition 566, RACK_LOCAL, 8440 bytes)
19/03/06 18:03:16 INFO TaskSetManager: Finished task 553.0 in stage 3.0 (TID 593) in 29597 ms on instance-2026045.ipa.ba.aaa.com (executor 2) (583/600)
19/03/06 18:03:20 INFO TaskSetManager: Starting task 567.0 in stage 3.0 (TID 598, instance-2026040.ipa.ba.aaa.com, executor 1, partition 567, RACK_LOCAL, 8440 bytes)
19/03/06 18:03:20 INFO TaskSetManager: Finished task 559.0 in stage 3.0 (TID 594) in 29327 ms on instance-2026040.ipa.ba.aaa.com (executor 1) (584/600)
19/03/06 18:03:27 INFO TaskSetManager: Starting task 569.0 in stage 3.0 (TID 599, instance-2026036.ipa.ba.aaa.com, executor 6, partition 569, RACK_LOCAL, 8440 bytes)
19/03/06 18:03:27 INFO TaskSetManager: Finished task 560.0 in stage 3.0 (TID 595) in 29476 ms on instance-2026036.ipa.ba.aaa.com (executor 6) (585/600)
19/03/06 18:03:35 INFO TaskSetManager: Starting task 571.0 in stage 3.0 (TID 600, instance-2026037.ipa.ba.aaa.com, executor 3, partition 571, RACK_LOCAL, 8440 bytes)
19/03/06 18:03:35 INFO TaskSetManager: Finished task 561.0 in stage 3.0 (TID 596) in 32318 ms on instance-2026037.ipa.ba.aaa.com (executor 3) (586/600)
19/03/06 18:03:45 INFO TaskSetManager: Starting task 572.0 in stage 3.0 (TID 601, instance-2026045.ipa.ba.aaa.com, executor 2, partition 572, RACK_LOCAL, 8440 bytes)
19/03/06 18:03:45 INFO TaskSetManager: Finished task 566.0 in stage 3.0 (TID 597) in 29116 ms on instance-2026045.ipa.ba.aaa.com (executor 2) (587/600)
19/03/06 18:03:49 INFO TaskSetManager: Starting task 576.0 in stage 3.0 (TID 602, instance-2026038.ipa.ba.aaa.com, executor 5, partition 576, NODE_LOCAL, 8440 bytes)
19/03/06 18:03:49 WARN TaskSetManager: Lost task 441.0 in stage 3.0 (TID 516, instance-2026038.ipa.ba.aaa.com, executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 362, in func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in func
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 420, in _train
    raise Exception("Timeout while feeding partition")
Exception: Timeout while feeding partition

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

19/03/06 18:03:49 INFO TaskSetManager: Finished task 567.0 in stage 3.0 (TID 598) in 29091 ms on instance-2026040.ipa.ba.aaa.com (executor 1) (588/600)
19/03/06 18:03:52 INFO TaskSetManager: Starting task 441.1 in stage 3.0 (TID 603, instance-2026040.ipa.ba.aaa.com, executor 1, partition 441, RACK_LOCAL, 8440 bytes)
19/03/06 18:03:57 INFO TaskSetManager: Starting task 573.0 in stage 3.0 (TID 604, instance-2026036.ipa.ba.aaa.com, executor 6, partition 573, RACK_LOCAL, 8440 bytes)
19/03/06 18:03:57 INFO TaskSetManager: Finished task 569.0 in stage 3.0 (TID 599) in 29560 ms on instance-2026036.ipa.ba.aaa.com (executor 6) (589/600)
19/03/06 18:04:08 INFO TaskSetManager: Starting task 577.0 in stage 3.0 (TID 605, instance-2026037.ipa.ba.aaa.com, executor 3, partition 577, RACK_LOCAL, 8440 bytes)
19/03/06 18:04:08 INFO TaskSetManager: Finished task 571.0 in stage 3.0 (TID 600) in 32705 ms on instance-2026037.ipa.ba.aaa.com (executor 3) (590/600)
19/03/06 18:04:14 INFO TaskSetManager: Starting task 580.0 in stage 3.0 (TID 606, instance-2026045.ipa.ba.aaa.com, executor 2, partition 580, RACK_LOCAL, 8440 bytes)
19/03/06 18:04:14 INFO TaskSetManager: Finished task 572.0 in stage 3.0 (TID 601) in 29184 ms on instance-2026045.ipa.ba.aaa.com (executor 2) (591/600)
19/03/06 18:04:21 INFO TaskSetManager: Starting task 581.0 in stage 3.0 (TID 607, instance-2026040.ipa.ba.aaa.com, executor 1, partition 581, RACK_LOCAL, 8440 bytes)
19/03/06 18:04:21 INFO TaskSetManager: Finished task 441.1 in stage 3.0 (TID 603) in 29100 ms on instance-2026040.ipa.ba.aaa.com (executor 1) (592/600)
19/03/06 18:04:26 INFO TaskSetManager: Starting task 582.0 in stage 3.0 (TID 608, instance-2026036.ipa.ba.aaa.com, executor 6, partition 582, RACK_LOCAL, 8440 bytes)
19/03/06 18:04:26 INFO TaskSetManager: Finished task 573.0 in stage 3.0 (TID 604) in 28977 ms on instance-2026036.ipa.ba.aaa.com (executor 6) (593/600)
19/03/06 18:04:41 INFO TaskSetManager: Starting task 583.0 in stage 3.0 (TID 609, instance-2026037.ipa.ba.aaa.com, executor 3, partition 583, RACK_LOCAL, 8440 bytes)
19/03/06 18:04:41 INFO TaskSetManager: Finished task 577.0 in stage 3.0 (TID 605) in 32607 ms on instance-2026037.ipa.ba.aaa.com (executor 3) (594/600)
19/03/06 18:04:44 INFO TaskSetManager: Starting task 590.0 in stage 3.0 (TID 610, instance-2026045.ipa.ba.aaa.com, executor 2, partition 590, RACK_LOCAL, 8440 bytes)
19/03/06 18:04:44 INFO TaskSetManager: Finished task 580.0 in stage 3.0 (TID 606) in 29184 ms on instance-2026045.ipa.ba.aaa.com (executor 2) (595/600)
19/03/06 18:04:51 INFO TaskSetManager: Finished task 581.0 in stage 3.0 (TID 607) in 29169 ms on instance-2026040.ipa.ba.aaa.com (executor 1) (596/600)
19/03/06 18:04:55 INFO TaskSetManager: Finished task 582.0 in stage 3.0 (TID 608) in 29093 ms on instance-2026036.ipa.ba.aaa.com (executor 6) (597/600)
19/03/06 18:05:12 INFO TaskSetManager: Finished task 590.0 in stage 3.0 (TID 610) in 28952 ms on instance-2026045.ipa.ba.aaa.com (executor 2) (598/600)
19/03/06 18:05:13 INFO TaskSetManager: Finished task 583.0 in stage 3.0 (TID 609) in 32429 ms on instance-2026037.ipa.ba.aaa.com (executor 3) (599/600)
19/03/06 18:14:29 WARN TaskSetManager: Lost task 576.0 in stage 3.0 (TID 602, instance-2026038.ipa.ba.aaa.com, executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 362, in func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in func
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 420, in _train
    raise Exception("Timeout while feeding partition")
Exception: Timeout while feeding partition

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

19/03/06 18:14:29 INFO TaskSetManager: Starting task 576.1 in stage 3.0 (TID 611, instance-2026038.ipa.ba.aaa.com, executor 5, partition 576, NODE_LOCAL, 8440 bytes)
19/03/06 18:25:19 WARN TaskSetManager: Lost task 576.1 in stage 3.0 (TID 611, instance-2026038.ipa.ba.aaa.com, executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 362, in func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in func
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 420, in _train
    raise Exception("Timeout while feeding partition")
Exception: Timeout while feeding partition

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

19/03/06 18:25:19 INFO TaskSetManager: Starting task 576.2 in stage 3.0 (TID 612, instance-2026038.ipa.ba.aaa.com, executor 5, partition 576, NODE_LOCAL, 8440 bytes)
19/03/06 18:35:59 WARN TaskSetManager: Lost task 576.2 in stage 3.0 (TID 612, instance-2026038.ipa.ba.aaa.com, executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 362, in func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in func
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 420, in _train
    raise Exception("Timeout while feeding partition")
Exception: Timeout while feeding partition

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

19/03/06 18:35:59 INFO TaskSetManager: Starting task 576.3 in stage 3.0 (TID 613, instance-2026038.ipa.ba.aaa.com, executor 5, partition 576, NODE_LOCAL, 8440 bytes)
19/03/06 18:46:46 WARN TaskSetManager: Lost task 576.3 in stage 3.0 (TID 613, instance-2026038.ipa.ba.aaa.com, executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 362, in func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in func
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 420, in _train
    raise Exception("Timeout while feeding partition")
Exception: Timeout while feeding partition

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

19/03/06 18:46:46 ERROR TaskSetManager: Task 576 in stage 3.0 failed 4 times; aborting job
19/03/06 18:46:46 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool
19/03/06 18:46:46 INFO YarnScheduler: Cancelling stage 3
19/03/06 18:46:46 INFO DAGScheduler: ResultStage 3 (collect at PythonRDD.scala:153) failed in 7265.599 s due to Job aborted due to stage failure: Task 576 in stage 3.0 failed 4 times, most recent failure: Lost task 576.3 in stage 3.0 (TID 613, instance-2026038.ipa.ba.aaa.com, executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 362, in func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in func
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 420, in _train
    raise Exception("Timeout while feeding partition")
Exception: Timeout while feeding partition

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
19/03/06 18:46:46 INFO DAGScheduler: Job 3 failed: collect at PythonRDD.scala:153, took 7265.655212 s
Traceback (most recent call last):
  File "/home/ba.ad.aaa.com/Surya/new_keras_model.py", line 164, in <module>
    cluster.train(dataRDD, args.epochs)
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.py", line 92, in train
    unionRDD.foreachPartition(TFSparkNode.train(self.cluster_info, self.cluster_meta, feed_timeout=feed_timeout, qname=qname))
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 814, in foreachPartition
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1056, in count
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1047, in sum
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 921, in fold
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 824, in collect
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 576 in stage 3.0 failed 4 times, most recent failure: Lost task 576.3 in stage 3.0 (TID 613, instance-2026038.ipa.ba.aaa.com, executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 362, in func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in func
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 420, in _train
    raise Exception("Timeout while feeding partition")
Exception: Timeout while feeding partition

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
        at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
        at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0348/container_e51_1551114784635_0348_01_000008/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 362, in func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in func
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 420, in _train
    raise Exception("Timeout while feeding partition")
Exception: Timeout while feeding partition

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more

19/03/06 18:46:46 INFO SparkContext: Invoking stop() from shutdown hook
19/03/06 18:46:46 INFO SparkUI: Stopped Spark web UI at http://instance-2026033.ipa.ba.aaa.com:4041
19/03/06 18:46:47 INFO DAGScheduler: Job 2 failed: foreachPartition at /usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.py:301, took 7270.008469 s
19/03/06 18:46:47 INFO DAGScheduler: ResultStage 2 (foreachPartition at /usr/lib/python2.7/site-packages/tensorflowonspark/TFCluster.py:301) failed in 7270.006 s due to Stage cancelled because SparkContext was shut down
19/03/06 18:46:47 INFO YarnClientSchedulerBackend: Interrupting monitor thread
19/03/06 18:46:50 INFO YarnClientSchedulerBackend: Shutting down all executors
19/03/06 18:46:50 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
19/03/06 18:46:50 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
19/03/06 18:46:50 INFO YarnClientSchedulerBackend: Stopped
19/03/06 18:46:50 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/03/06 18:46:50 INFO MemoryStore: MemoryStore cleared
19/03/06 18:46:50 INFO BlockManager: BlockManager stopped
19/03/06 18:46:50 INFO BlockManagerMaster: BlockManagerMaster stopped
19/03/06 18:46:50 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/03/06 18:46:50 INFO SparkContext: Successfully stopped SparkContext
19/03/06 18:46:50 INFO ShutdownHookManager: Shutdown hook called
19/03/06 18:46:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-ea3f442e-3983-425d-94c1-fd677334611d
19/03/06 18:46:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-f7632ae7-5c64-4a57-87a0-7faa297932ea
19/03/06 18:46:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-ea3f442e-3983-425d-94c1-fd677334611d/pyspark-bfeef1f2-ad09-418b-bb37-ad850d493eb1
Vamsinimmala commented 5 years ago

Do you think, Is this because I am commenting out the model checkpoint callbacks and tensorboard callbacks?

leewyang commented 5 years ago

In your logs it looks like several executors are "working" fine, but 2026038.ipa.ba.aaa.com seems to always time out... perhaps it's not configured correctly? (If you can view the logs for that one executor while it's failing, it should highlight the root cause).

vamsinimmala1992 commented 5 years ago

The Specific failure in the log. I don't see any clear definition of exception here, How to debug this?

19/03/12 12:41:20 ERROR Executor: Exception in task 76.0 in stage 3.0 (TID 77)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0463/container_e51_1551114784635_0463_01_000009/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/hadoop2/yarn/local/usercache/Surya@ba.ad.aaa.com/appcache/application_1551114784635_0463/container_e51_1551114784635_0463_01_000009/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2438, in pipeline_func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 362, in func
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in func
  File "/usr/lib/python2.7/site-packages/tensorflowonspark/TFSparkNode.py", line 420, in _train
    raise Exception("Timeout while feeding partition")
Exception: Timeout while feeding partition
leewyang commented 5 years ago

400 might be related to your issue.

vamsinimmala1992 commented 5 years ago

Thanks for creating a issue immediately.

I am trying to use this amazing open source library in my organization. I am in a phase of testing it on my environment to proceed further and use it for model development. So my questions are

  1. Currently I am using TensorflowOnSpark '1.4.2'. If the issue #400 solved, Can I save the model file into HDFS with out any exceptions?

  2. I have seen there are no big changes in the commit except the pyspark version changes in #400, can i change my travis.yml file on my environment?

Or should I wait until the next release.

leewyang commented 5 years ago

400 only updates the example code to terminate the feed (using an API that has existed for a while now).

Basically, InputMode.SPARK "emulates" epochs by repeating the RDD via the RDD.union() API. Unfortunately, Spark RDDs must be consumed/processed in their entirety in order to mark a stage/job as a "success". So, if the TF app exits BEFORE the entire RDD has been processed (e.g. stopping at max_steps), you will get the timeout while feeding partition error. To handle this, the TFDataFeed.terminate() API will essentially process all remaining data/partitions with a no-op.

So for this reason, if you're using InputMode.SPARK (esp. with epochs), you should try to align any exit condition with the number of epochs... and you should terminate the feed when your training loop exits for any reason other than end of data.

So, specifically, to your questions:

  1. You need to add a terminate command (see the example in #400).
  2. You don't need to change your dependencies.
vamsinimmala1992 commented 5 years ago

Thank you Lee, The example you mentioned #400 is completely coded in pyspark without importing Keras. But as you see the code mentioned in this conversation which I am using uses keras library to build model example which is in your examples.

should the below code work as you said? I am using:

if args.input_mode == 'spark':
    tf_feed.terminate()

Or you are saying me to use the spark example shown without Keras?

leewyang commented 5 years ago

Sorry, forgot that you were already based on that example... if that's the case, can you add some debug statements here and here?

Basically, it looks like a termination condition issue. If you look at your logs, you can see that keras finishes 300 steps, but then you get a feed timeout error exactly 10 minutes after that. This is due to the fact that the spark side has fed data into the queue, but the tensorflow side is "done" and no longer consuming/processing the data.

To simplify the problem further, I'd also go down to one epoch and make sure that you've calculated the steps_per_epoch to match the size of your training set, e.g. steps_per_epoch = int(num_records / batch_size). That, plus the debug lines should help isolate the problem...

leewyang commented 5 years ago

Actually, for now, let's further simplify your setup to use only 2 executors (1 worker + 1 ps). If you'll note, your dataset size is 262,143 records, so with a batch size of 100, you end up with 2622 batches. However, since you have 5 workers and 1 ps, these 2621 batches are being split amongst the 5 workers, so each worker is processing 2621/5 => 524 steps (on average).

Regardless, try running a simple test of your python setup. If you have access to your grid nodes, just ssh into one of the nodes and try to write a file into HDFS using the python shell with the TF apis, e.g.

import tensorflow as tf
print(tf.__version__)

with tf.gfile.Open("hdfs://default/tmp/foo.txt", "w") as f:
    f.write("hello, world")

If that doesn't work, try following this example.

vamsinimmala1992 commented 5 years ago

Hi Lee,

I ssh into 2 nodes and did the following,

Exported all the variables

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/hdp/2.6.5.0-292/usr/lib 
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre/lib/amd64/server
export CLASSPATH=$(hadoop classpath --glob)

Started a Pyspark shell and ran the sample code to write the text file. It wrote the text to the file in HDFS. And now I ran the keras_model and I did not found any file written to HDFS. Also I have added some debug lines to the keras model code as follows in the following lines.

Checking1......Printing this at line of code 119 **before** save_model(model, 'tmp_model')...............Checking1
Checking2......Printing this at line of code 121 **after** save_model(model, 'tmp_model')...............Checking2

Now the second case is I started the shell without export the variables, instead I passed them as parameters as follows

pyspark --master yarn --queue development -- conf spark.executorEnv.LD_LIBRARY_PATH=”/usr/hdp/2.6.5.0-292/usr/lib:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre/lib/amd64/server” --conf spark.executorEnv.CLASSPATH=”$(hadoop classpath –glob)” It throws me the following exception:

>>> import tensorflow as tf
>>> with tf.gfile.Open("hdfs://default/tmp/mss/foo.txt","w") as f:
...     f.write("Write this to text file in HDFS")
...
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "/usr/lib/python2.7/site-packages/tensorflow/python/lib/io/file_io.py", line 108, in write
    self._prewrite_check()
  File "/usr/lib/python2.7/site-packages/tensorflow/python/lib/io/file_io.py", line 94, in _prewrite_check
    compat.as_bytes(self.__name), compat.as_bytes(self.__mode), status)
  File "/usr/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 528, in __exit__
    c_api.TF_GetCode(self.status.status))
tensorflow.python.framework.errors_impl.NotFoundError: libhdfs.so: cannot open shared object file: No such file or directory

What are your thoughts?

vamsinimmala1992 commented 5 years ago

It successfully ran the step Save_model object, but did not save the model. Do you think environment variables were not set on all nodes of the cluster?

leewyang commented 5 years ago

So, the LD_LIBRARY_PATH should be consistent across the nodes of your cluster in order for the executorEnv.LD_LIBRARY_PATH to be used. Otherwise, each node must set the appropriate path.

For your first test, make sure that the path that you're saving to is prefixed with hdfs://default/. You only had tmp_model in your log statements, so I'm not sure if you were using the fully-qualified HDFS path. If you don't provide this, TF/keras will happily save the model to the local hard disk of your grid node (check to see if it was written there instead of HDFS).

For your second test, your command line shows -- conf (with a space) instead of --conf (without a space... I'm not sure if that was just a copy-paste error or not). Anyhow, you can add the following code to your TF map_fun (at the top):

import os
print(os.environ['LD_LIBRARY_PATH'])

... and if this doesn't reflect what you're setting, then there's something more fundamentally broken. Also, if you can ssh into your grid node, you can just try the simple write test in a python shell (not a pyspark shell)... just to eliminate pyspark from the possible culprits.

vamsinimmala1992 commented 5 years ago

Hi Lee, After a long time, I am able to save the models and every file into hdfs after debugging a lot. Thanks for your help.

The code saves all the files (shown below) when using only 2 executors (--num_executors 2)

[Vamsi@awse-p-bdp ~]$ hdfs dfs -ls /user/TFOS/model_dir/
Found 9 items
-rw-r--r--   3 Vamsi clust01-users        128 2019-07-03 12:57 /user/TFOS/model_dir/checkpoint
-rw-r--r--   3 Vamsi clust01-users     310247 2019-07-03 12:57 /user/TFOS/model_dir/events.out.tfevents.1562172736.awse-p-bdp-prdclust01dncompute2.aci.is.cl.ssa.gov
-rw-r--r--   3 Vamsi clust01-users     246645 2019-07-03 12:57 /user/TFOS/model_dir/graph.pbtxt
-rw-r--r--   3 Vamsi clust01-users    1797132 2019-07-03 12:54 /user/TFOS/model_dir/model.ckpt-0.data-00000-of-00001
-rw-r--r--   3 Vamsi clust01-users        380 2019-07-03 12:54 /user/TFOS/model_dir/model.ckpt-0.index
-rw-r--r--   3 Vamsi clust01-users     168958 2019-07-03 12:54 /user/TFOS/model_dir/model.ckpt-0.meta
-rw-r--r--   3 Vamsi clust01-users    1797132 2019-07-03 12:57 /user/TFOS/model_dir/model.ckpt-128.data-00000-of-00001
-rw-r--r--   3 Vamsi clust01-users        380 2019-07-03 12:57 /user/TFOS/model_dir/model.ckpt-128.index
-rw-r--r--   3 Vamsi clust01-users     168958 2019-07-03 12:57 /user/TFOS/model_dir/model.ckpt-128.meta
[Vamsi@awse-p-bdp ~]$
[Vamsi@awse-p-bdp ~]$
[Vamsi@awse-p-bdp ~]$ hdfs dfs -ls /user/TFOS/export_dir/
Found 2 items
-rw-r--r--   3 Vamsi clust01-users     305117 2019-07-03 12:57 /user/TFOS/export_dir/saved_model.pb
drwxr-xr-x   - Vamsi clust01-users          0 2019-07-03 12:57 /user/TFOS/export_dir/variables

BUT When I use multiple executors (workers as per Tensorflow) like 3 or 6 (-- num_executors 3), it only saves model_dir (only checkpoints), it fails to save export_dir

The command i use to start the training:

/usr/local/share/imagen/py3spark-submit 
--master yarn 
--queue production 
--conf spark.executorEnv.CLASSPATH=$(hadoop classpath --glob) 
--driver-memory 32G 
--executor-memory 32G 
--num-executors 3 
--executor-cores 1 
--conf spark.executor.heartbeatInterval=100000 
--conf spark.executor.memoryOverhead=64G 
--conf spark.sql.tungsten.enabled=true 
--conf spark.sql.pivotMaxValues=500000 
--conf spark.network.timeout=10000000 
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/hdp/3.1.0.0-78/usr/lib:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-2.b14.el7_4.x86_64/jre/lib/amd64/server" 
keras_model_tfos.py 
--cluster_size 3 
--epochs 2 
--steps_per_epoch 128 
--input_mode spark 
--export_dir "hdfs://default/user/TFOS/export_dir/" 
--data "/user/data/chopped.tsv" 
--model_dir  "hdfs://default/user/imagen.admins/TFOS/model_dir/"

I added print statements to debug after fit_generator method to check why the files not saved when using 3 executors. I checked the container logs of 2 worker nodes and I don't see those print commands executed.

container_2 log:

2019-07-03 13:10:08,914 INFO (MainThread-18142) Stopping all queues
2019-07-03 13:10:08,917 INFO (MainThread-18142) Feeding None into input queue
2019-07-03 13:10:08,919 INFO (Thread-2-17541) next_batch() got None
2019-07-03 13:10:08,921 INFO (MainThread-18142) Feeding None into output queue
2019-07-03 13:10:08,924 INFO (MainThread-18142) Feeding None into error queue
2019-07-03 13:10:08,925 INFO (MainThread-18142) Setting mgr.state to 'stopped'
19/07/03 13:10:08 INFO PythonRunner: Times: total = 91, boot = -2025, init = 2095, finish = 21
127/128 [============================>.] - ETA: 1s - loss: 2.1851 - acc: 0.335419/07/03 13:10:14 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
19/07/03 13:10:14 INFO CoarseGrainedExecutorBackend: Driver from awse-p-bdp-prdclust01dncompute1.aci.is.cl.ssa.gov:37259 disconnected during shutdown
19/07/03 13:10:14 INFO CoarseGrainedExecutorBackend: Driver from awse-p-bdp-prdclust01dncompute1.aci.is.cl.ssa.gov:37259 disconnected during shutdown
19/07/03 13:10:14 INFO MemoryStore: MemoryStore cleared
19/07/03 13:10:14 INFO BlockManager: BlockManager stopped
19/07/03 13:10:14 INFO ShutdownHookManager: Shutdown hook called

container_3 log:


2019-07-03 13:09:35,455 INFO (MainThread-58337) mgr.state='running'
2019-07-03 13:09:35,455 INFO (MainThread-58337) Feeding partition <itertools.chain object at 0x7fcc0175c588> into input queue <multiprocessing.queues.JoinableQueue obje128/128 [==============================] - 154s 1s/step - loss: 2.2277 - acc: 0.2739
Epoch 2/2
  2/128 [..............................] - ETA: 2:13 - loss: 1.9684 - acc: 0.345019/07/03 13:09:58 INFO PythonRunner: Times: total = 24238, boot = -1954, init = 1955, finish = 24237
  3/128 [..............................] - ETA: 1:53 - loss: 2.0245 - acc: 0.33802019-07-03 13:10:00,541 INFO (MainThread-58337) Processed 2192 items in partition
19/07/03 13:10:00 INFO PythonRunner: Times: total = 26177, boot = -21, init = 1095, finish = 25103
19/07/03 13:10:00 INFO Executor: Finished task 10.0 in stage 1.0 (TID 13). 1504 bytes result sent to driver
19/07/03 13:10:08 INFO CoarseGrainedExecutorBackend: Got assigned task 15
19/07/03 13:10:08 INFO Executor: Running task 0.0 in stage 2.0 (TID 15)
19/07/03 13:10:08 INFO TorrentBroadcast: Started reading broadcast variable 3
19/07/03 13:10:08 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 10.7 KB, free 16.9 GB)
19/07/03 13:10:08 INFO TorrentBroadcast: Reading broadcast variable 3 took 6 ms
19/07/03 13:10:08 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 35.5 KB, free 16.9 GB)
2019-07-03 13:10:08,856 INFO (MainThread-58331) Connected to TFSparkNode.mgr on 172.34.36.98, executor=2, state='running'
2019-07-03 13:10:08,856 INFO (MainThread-58331) Stopping all queues
2019-07-03 13:10:08,857 INFO (MainThread-58331) Feeding None into input queue
2019-07-03 13:10:08,858 INFO (Thread-2-58175) next_batch() got None
2019-07-03 13:10:08,860 INFO (MainThread-58331) Feeding None into output queue
2019-07-03 13:10:08,861 INFO (MainThread-58331) Feeding None into error queue
2019-07-03 13:10:08,862 INFO (MainThread-58331) Setting mgr.state to 'stopped'
19/07/03 13:10:08 INFO PythonRunner: Times: total = 31, boot = -10225, init = 10245, finish = 11
  4/128 [..............................] - ETA: 6:15 - loss: 2.0022 - acc: 0.335619/07/03 13:10:14 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
19/07/03 13:10:14 INFO MemoryStore: MemoryStore cleared
19/07/03 13:10:14 INFO BlockManager: BlockManager stopped
19/07/03 13:10:14 INFO ShutdownHookManager: Shutdown hook called

container_4 log:

19/07/03 13:07:17 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 19.7 KB, free 16.9 GB)
19/07/03 13:07:17 INFO TorrentBroadcast: Reading broadcast variable 1 took 129 ms
19/07/03 13:07:17 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 48.0 KB, free 16.9 GB)
2019-07-03 13:07:19,788 INFO (MainThread-125174) connected to server at ('172.22.26.17', 33472)
2019-07-03 13:07:19,789 INFO (MainThread-125174) TFSparkNode.reserve: {'executor_id': 0, 'host': '172.34.36.98', 'job_name': 'ps', 'task_index': 0, 'port': 35223, 'tb_pid': 0, 'tb_port': 0, 'addr': ('172.22.26.17', 36192), 'authkey': b'\x8d8\x1c\xcd\x93\xa7E*\x93\xb8!\x04\xb3\x91X\x9d'}
2019-07-03 13:07:21,791 INFO (MainThread-125174) node: {'executor_id': 0, 'host': '172.32.36.17', 'job_name': 'ps', 'task_index': 0, 'port': 35223, 'tb_pid': 0, 'tb_port': 0, 'addr': ('172.22.26.17', 36192), 'authkey': b'\x8d8\x1c\xcd\x93\xa7E*\x93\xb8!\x04\xb3\x91X\x9d'}
2019-07-03 13:07:21,791 INFO (MainThread-125174) node: {'executor_id': 1, 'host': '172.32.36.57', 'job_name': 'worker', 'task_index': 0, 'port': 40955, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-wmn5ohda/listener-8yvgys3z', 'authkey': b'~\xac\x8c\xd9\xcd\x83O\x9e\xb8*\xd5D~\x1c:C'}
2019-07-03 13:07:21,791 INFO (MainThread-125174) node: {'executor_id': 2, 'host': '172.32.36.49', 'job_name': 'worker', 'task_index': 1, 'port': 32956, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-mcmpwyvt/listener-jf5hxfir', 'authkey': b'\xc5\xd9)-\xcc\xfbN\x0b\x9d\xb9i\x95\x9a\x8a\xee\xa5'}
2019-07-03 13:07:21,791 INFO (MainThread-125174) Starting TensorFlow ps:0 as ps on cluster node 0 on background process
2019-07-03 13:07:21,917 INFO (MainThread-125525) 0: ======== ps:0 ========
2019-07-03 13:07:21,917 INFO (MainThread-125525) 0: Cluster spec: {'ps': ['172.32.36.17:35223'], 'worker': ['172.32.36.57:40955', '172.32.36.49:32956']}
2019-07-03 13:07:21,917 INFO (MainThread-125525) 0: Using CPU
2019-07-03 13:07:21.919366: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2019-07-03 13:07:21.940802: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2300040000 Hz
2019-07-03 13:07:21.941955: I tensorflow/compiler/xla/service/service.cc:150] XLA service 0x3e46970 executing computations on platform Host. Devices:
2019-07-03 13:07:21.941989: I tensorflow/compiler/xla/service/service.cc:158]   StreamExecutor device (0): <undefined>, <undefined>
2019-07-03 13:07:21.943695: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job ps -> {0 -> localhost:35223}
2019-07-03 13:07:21.943722: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:252] Initialize GrpcChannelCache for job worker -> {0 -> 172.32.36.57:40955, 1 -> 172.22.26.49:32956}
2019-07-03 13:07:21.945257: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:391] Started server with target: grpc://localhost:35223
2019-07-03 13:10:09,313 INFO (MainThread-125174) Got msg: None
2019-07-03 13:10:09,313 INFO (MainThread-125174) Terminating ps
19/07/03 13:10:09 INFO PythonRunner: Times: total = 171816, boot = 458, init = 71, finish = 171287
19/07/03 13:10:09 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1418 bytes result sent to driver
19/07/03 13:10:14 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
19/07/03 13:10:14 INFO MemoryStore: MemoryStore cleared
19/07/03 13:10:14 INFO BlockManager: BlockManager stopped
19/07/03 13:10:14 INFO ShutdownHookManager: Shutdown hook called

The worker log file when I use 2 executors where files are written and also you can see the print statements in the log below:

2019-07-03 12:57:02,219 INFO (MainThread-51548) Connected to TFSparkNode.mgr on 172.32.36.81, executor=1, state='running'
2019-07-03 12:57:02,222 INFO (MainThread-51548) mgr.state='running'
2019-07-03 12:57:02,223 INFO (MainThread-51548) Feeding partition <itertools.chain object at 0x7f5c06b4d588> into input queue <multiprocessing.queues.JoinableQueue obje127/128 [============================>.] - ETA: 1s - loss: 1.9674 - acc: 0.3424Saving the model check point number 1 to args.model_dir
19/07/03 12:57:22 INFO PythonRunner: Times: total = 21593, boot = -1986, init = 1988, finish = 21591
128/128 [==============================] - 152s 1s/step - loss: 1.9665 - acc: 0.3427, finish = 834
printing this at line 101 to debug
printing this in worker to debug
Failed saving the model with exception 'utf-8' codec can't decode byte 0x89 in position 0: invalid start byte
2019-07-03 12:57:24,411 WARNING (MainThread-51301) From /hadoop2/yarn/local/usercache/851519/appcache/application_1561135715175_0103/container_e31_1561135715175_0103_01_000003/MYENV/Python-3.6.8/lib/python3.6/site-packages/tensorflow/python/saved_model/signature_def_utils_impl.py:205: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.
2019-07-03 12:57:24,412 INFO (MainThread-51301) No assets to save.
2019-07-03 12:57:24,412 INFO (MainThread-51301) No assets to write.
2019-07-03 12:57:25,102 INFO (MainThread-51301) SavedModel written to: hdfs://default/user//TFOS/export_dir/saved_model.pb
2019-07-03 12:57:25,102 INFO (MainThread-51301) terminate() invoked
2019-07-03 12:57:25,557 INFO (MainThread-51548) Processed 1921 items in partition
2019-07-03 12:57:25,560 INFO (MainThread-51548) TFSparkNode: requesting stop
2019-07-03 12:57:25,560 INFO (MainThread-51548) connected to server at ('172.32.36.71', 42936)
19/07/03 12:57:25 INFO PythonRunner: Times: total = 24432, boot = -13, init = 1093, finish = 23352
19/07/03 12:57:25 INFO Executor: Finished task 11.0 in stage 1.0 (TID 13). 1504 bytes result sent to driver
19/07/03 12:57:25 INFO CoarseGrainedExecutorBackend: Got assigned task 14
19/07/03 12:57:25 INFO Executor: Running task 0.0 in stage 2.0 (TID 14)
19/07/03 12:57:25 INFO TorrentBroadcast: Started reading broadcast variable 3
19/07/03 12:57:25 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 10.6 KB, free 16.9 GB)
19/07/03 12:57:25 INFO TorrentBroadcast: Reading broadcast variable 3 took 6 ms
19/07/03 12:57:25 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 35.3 KB, free 16.9 GB)
2019-07-03 12:57:25,651 INFO (MainThread-51542) Connected to TFSparkNode.mgr on 172.32.36.81, executor=1, state='terminating'
2019-07-03 12:57:25,651 INFO (MainThread-51542) Stopping all queues
2019-07-03 12:57:25,652 INFO (MainThread-51542) Feeding None into input queue
2019-07-03 12:57:25,654 INFO (MainThread-51542) Feeding None into output queue
2019-07-03 12:57:25,656 INFO (MainThread-51542) Feeding None into error queue
2019-07-03 12:57:25,656 INFO (MainThread-51542) Setting mgr.state to 'stopped'
19/07/03 12:57:25 INFO PythonRunner: Times: total = 31, boot = -2898, init = 2918, finish = 11
19/07/03 12:57:25 INFO Executor: Finished task 0.0 in stage 2.0 (TID 14). 1418 bytes result sent to driver
2019-07-03 12:57:30,659 INFO (MainThread-51301) dropped 53 items from queue
19/07/03 12:57:31 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
19/07/03 12:57:31 INFO MemoryStore: MemoryStore cleared
19/07/03 12:57:31 INFO BlockManager: BlockManager stopped
19/07/03 12:57:31 INFO ShutdownHookManager: Shutdown hook called

So, Why is not not working when the I give 3 or more executors?

vamsinimmala1992 commented 5 years ago

The updated code i use:

from __future__ import print_function

def main_fun(args, ctx):
    import numpy
    import os
    import tensorflow as tf
    from tensorflow.python import keras
    from tensorflow.python.keras import backend as K
    from tensorflow.python.keras.datasets import mnist
    from tensorflow.python.keras.models import Sequential, load_model, save_model
    from tensorflow.python.keras.layers import Dense, Dropout
    from tensorflow.python.keras.optimizers import RMSprop
    from tensorflow.python.keras.callbacks import LambdaCallback, TensorBoard
    from tensorflow.python.saved_model import builder as saved_model_builder
    from tensorflow.python.saved_model import tag_constants
    from tensorflow.python.saved_model.signature_def_utils_impl import predict_signature_def
    from tensorflowonspark import TFNode

    cluster, server = TFNode.start_cluster_server(ctx)

    if ctx.job_name == "ps":
        server.join()
    elif ctx.job_name == "worker":

        def generate_rdd_data(tf_feed, batch_size):
            print("generate_rdd_data invoked")
            while True:
                batch = tf_feed.next_batch(batch_size)
                feature_vector = []
                lbls = []
                for item in batch:
                    feature_vector.append(item[0])
                    lbls.append(item[1])
                features = numpy.array(feature_vector).astype('float32')
                labels = numpy.stack(lbls).astype('float32')
                yield (features, labels)

        with tf.device(tf.train.replica_device_setter(
          worker_device="/job:worker/task:%d" % ctx.task_index,
          cluster=cluster)):

            batch_size = 100
            num_classes = 14

            x_train = tf.placeholder(tf.float32, [None, 28047], name="x_train")
            y_train = tf.placeholder(tf.float32, [None, 14], name="y_train")

            model = Sequential()
            model.add(Dense(16, activation='relu', input_shape=(28047,)))
            model.add(Dropout(0.2))
            model.add(Dense(16, activation='relu'))
            model.add(Dropout(0.2))
            model.add(Dense(14, activation='softmax'))

            model.summary()

            model.compile(loss='categorical_crossentropy',
                          optimizer=tf.keras.optimizers.RMSprop(lr=0.001),
                          metrics=['accuracy'])

        saver = tf.train.Saver()

        with tf.Session(server.target) as sess:
            K.set_session(sess)

            def save_checkpoint(epoch, logs=None):
                if epoch == 1:
                    print("Saving the model check point number", epoch, "to args.model_dir")
                    tf.train.write_graph(sess.graph.as_graph_def(), args.model_dir, 'graph.pbtxt')
                saver.save(sess, os.path.join(args.model_dir, 'model.ckpt'), global_step=epoch * args.steps_per_epoch)

            ckpt_callback = LambdaCallback(on_epoch_end=save_checkpoint)
            tb_callback = TensorBoard(log_dir=args.model_dir, histogram_freq=0, write_graph=True, write_images=True)

            # Add callbacks to save model checkpoint and tensorboard events (on worker:0 only)
            callbacks = [ckpt_callback, tb_callback] if ctx.task_index == 0 else None

            #  train on data read from a generator which is producing data from a Spark RDD
            tf_feed = TFNode.DataFeed(ctx.mgr)
            model.fit_generator(generator=generate_rdd_data(tf_feed, batch_size),
                                steps_per_epoch=args.steps_per_epoch,
                                epochs=args.epochs,
                                verbose=1,
                                callbacks=callbacks)
            print("printing this at line 101 to debug")

            if args.export_dir and ctx.job_name == 'worker' and ctx.task_index == 0:
                # save a local Keras model, so we can reload it with an inferencing learning_phase

                print("printing this in worker to debug")
                model_json = model.to_json()
                try:
                    # Save the model
                    model.save("new_model_01_model.h5")

                    # Collect the model saved in job cache and save it to HDFS location
                    with open("new_model_01_model.h5") as model_file:
                        model_file_content = model_file.read()
                        tf.gfile.GFile(args.export_dir+"/new_model_01_neuralnet.h5",'wb').write(model_file_content)

                    # Interested in saving the model config into a json format
                    with tf.gfile.Open(args.export_dir+"/new_model_01_neuralnet.json","w") as json_file:
                        json_file.write(model_json)

                except Exception as e:
                    print("Failed saving the model with exception", e)

                # reload the model
                K.set_learning_phase(False)
                new_model = load_model("new_model_01_model.h5")

                # export a saved_model for inferencing
                builder = saved_model_builder.SavedModelBuilder(args.export_dir)
                signature = predict_signature_def(inputs={'fetures': new_model.input},
                                                  outputs={'scores': new_model.output})
                builder.add_meta_graph_and_variables(sess=sess,
                                                     tags=[tag_constants.SERVING],
                                                     signature_def_map={'predict': signature},
                                                     clear_devices=True)
                builder.save()

            if args.input_mode == "spark" :
                tf_feed.terminate()

if __name__ == '__main__':
    import argparse
    from pyspark.context import SparkContext
    from pyspark.conf import SparkConf
    from tensorflowonspark import TFCluster
    import keras

    sc = SparkContext(conf=SparkConf().setAppName("TFoS_TEST"))
    executors = sc._conf.get("spark.executor.instances")
    num_executors = int(executors) if executors is not None else 1
    num_ps = 1

    parser = argparse.ArgumentParser()
    parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
    parser.add_argument("--epochs", help="number of epochs of training data", type=int, default=20)
    parser.add_argument("--export_dir", help="directory to export saved_model")
    parser.add_argument("--data", help="HDFS path to data in parallelized CSV format")
    parser.add_argument("--input_mode", help="input mode (tf|spark)", default="tf")
    parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized CSV format")
    parser.add_argument("--model_dir", help="directory to write model checkpoints")
    parser.add_argument("--num_ps", help="number of ps nodes", type=int, default=1)
    parser.add_argument("--steps_per_epoch", help="number of steps per epoch", type=int, default=100)
    parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")

    args = parser.parse_args()
    print("args: {}".format(args))

    data = sc.textFile(args.data)
    data = data.map(lambda l: l.split('\t'))

    labels = data.map(lambda x: x[1])
    data = data.map(lambda x: x[19:28066])

    # header = data.first()
    # data = data.filter(lambda line: line != header)
    # label_header = labels.first()
    # labels = labels.filter(lambda line: line != label_header)

    # convert values to float
    convertToFloat = lambda data: [float(str(x)) for x in data]
    dataset = data.map(convertToFloat)
    labels = labels.map(lambda x: float(x))
    labels = labels.map(lambda x: keras.utils.to_categorical(x, num_classes=14))

    dataRDD = dataset.zip(labels)
    cluster = TFCluster.run(sc, main_fun, args, args.cluster_size, args.num_ps, args.tensorboard,
                            TFCluster.InputMode.SPARK, log_dir=args.model_dir)
    cluster.train(dataRDD, args.epochs)

    cluster.shutdown()
leewyang commented 5 years ago

Can you try adding grace_secs=600 to the cluster.shutdown() call per this?

Note: I've picked an extremely large value of 10min here (just to confirm this is the issue/solution). If it solves the issue, you can tune it down to a minimal wait period that succeeds.

vamsinimmala1992 commented 5 years ago

I don't see any change when even though I increase the grace_secs. The code used to save the model file is not invoked. Any other changes do you recommend?

leewyang commented 5 years ago

Missed this earlier:

model.save("new_model_01_model.h5")

Note that saving as h5py files only works on local filesystems (it doesn't support HDFS). Look for an API to export a saved_model format. Unfortunately, these APIs have been experimental and moving around a bit lately, e.g. for TF 1.14 you can use tf.keras.experimental.export_saved_model().

vamsinimmala1992 commented 5 years ago

I don't think model.save("new_model_01_model.h5") is the issue here, because I can't see the PRINT statements written into the container log when I specify 3 executors. I can see them printed into container logs only when I specify 2 executors.

print("printing this at line 101 to debug")
if args.export_dir and ctx.job_name == 'worker' and ctx.task_index == 0:
    # save a local Keras model, so we can reload it with an inferencing learning_phase

    print("printing this in worker to debug")
    model_json = model.to_json()
    try:
    # Save the model
    model.save("new_model_01_model.h5")

And as you can see when model.save("new_model_01_model.h5") code saves the hdf5 file to local, then using tf.GFile it is saved into HDFS. so I don't think this is a problem here.

leewyang commented 5 years ago

@vamsinimmala1992 right, missed that... looking at your logs again, I see:

19/07/03 13:10:08 INFO PythonRunner: Times: total = 31, boot = -10225, init = 10245, finish = 11
  4/128 [..............................] - ETA: 6:15 - loss: 2.0022 - acc: 0.3356
19/07/03 13:10:14 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
19/07/03 13:10:14 INFO MemoryStore: MemoryStore cleared

This usually means that the job terminated early (before it got a chance to print the debug statement and export the model). You didn't post the container_1 logs, so maybe something happened there?

Another way to simplify this test case (while debugging) is to go down to a single epoch. This will make it easier to see the relationship between epochs, steps, and batch_size... especially as you change the size of your cluster.

vamsinimmala1992 commented 5 years ago

I have an update, My friend tried the following and it worked for 3 executors. He reduced the Batch Size from 100 to 50. It successfully saved all the files. The command and the files stored is as shown

spark-submit 
--master yarn 
--queue production --conf spark.executorEnv.CLASSPATH=$(hadoop classpath --glob) 
--driver-memory 4G 
--executor-memory 4G 
--num-executors 3 
--executor-cores 1 
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/hdp/3.1.0.0-78/usr/lib:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-2.b14.el7_4.x86_64/jre/lib/amd64/server" 
keras_model_tfos.py 
--cluster_size 3 
--epochs 1 
--steps_per_epoch 128
--batch_size 50 
--input_mode spark 
--export_dir "hdfs://default/user/TFOS/export_dir/" 
--data "/user/chopped.tsv" 
--model_dir  "hdfs://default/user/TFOS/model_dir/"
[Vamsi@awse-compute1 ~]$ hdfs dfs -ls /user/TFOS/model_dir/
Found 5 items
-rw-r--r--   3 Vamsi prd-clust01--users         81 2019-07-12 00:29 /user/TFOS/model_dir/checkpoint
-rw-r--r--   3 Vamsi prd-clust01--users     310152 2019-07-12 00:29 /user/TFOS/model_dir/events.out.tfeve                                      nts.1562905694.awse-compute11.aci.is.cl.ssa.gov
-rw-r--r--   3 Vamsi prd-clust01--users    1797132 2019-07-12 00:29 /user/TFOS/model_dir/model.ckpt-0.data-00000-of-00001
-rw-r--r--   3 Vamsi prd-clust01--users        380 2019-07-12 00:29 /user/TFOS/model_dir/model.ckpt-0.index
-rw-r--r--   3 Vamsi prd-clust01--users     168958 2019-07-12 00:29 /user/TFOS/model_dir/model.ckpt-0.meta
[Vamsi@awse-compute1 ~]$
[Vamsi@awse-compute1 ~]$
[Vamsi@awse-compute1 ~]$ hdfs dfs -ls /user/TFOS/export_dir/
Found 2 items
-rw-r--r--   3 Vamsi prd-clust01--users     305117 2019-07-12 00:29 /user/TFOS/export_dir/saved_model.pb
drwxr-xr-x   - Vamsi prd-clust01--users          0 2019-07-12 00:29 /user/TFOS/export_dir/variables

Questions:

  1. Why is it not saving the files when there is a mismatch in the parameters like batch size, steps_per_epoch and cluster_size?
  2. How do I determine what is the optimal steps_per_epoch, batch_size and cluster_size for everything to run successfully and save the models?

PS: I used these formula to compute those params

                    steps = epochs * num_records_per_epoch / batch_size
                    steps_per_epoch = steps / num_workers
leewyang commented 5 years ago

@vamsinimmala1992 I think I finally have a root cause for you... I was able to reproduce the issue with the mnist_mlp.py example (which appears to be what you based your code on). Basically, it boils down to a conflict between the "inexact" behavior of distributed systems and the "exact" behavior of the Keras API.

I used a single-node Spark Standalone cluster w/ three Spark workers (2 TF workers + 1 TF PS). The key bit is that my MNIST CSV data was created (via Spark) with uneven partition sizes, as follows:

$ wc -l *
    5120 part-00000
    6144 part-00001
    6144 part-00002
    6144 part-00003
    6144 part-00004
    6144 part-00005
    6144 part-00006
    6144 part-00007
    6144 part-00008
    5728 part-00009
   60000 total

Note that in most typical datasets, you may see even more variability due to upstream processing steps in your data pipelines.

Now, the formulas from above work fine for a single worker setup. For example, for one epoch of MNIST, you'll end up with:

steps = 1 * 60000 / 100 => 600
steps_per_epoch = 600 / 1 => 600

... so that single worker will process all of the data and then export the model successfully.

Now, when you move to a two worker setup, you'll end up with:

steps = 1 * 60000 / 100 => 600
steps_per_epoch = 600 / 2 => 300

... and IF the partitions were exactly 6000 records in length (and IF each worker processed exactly the same number of partitions), everything would be fine.

However, given the data splits above, one worker actually sees 6144*4 + 5120 = 29,696 records (or 296 steps), while the other worker sees 6144*4 + 5728 = 30,304 records (or 303 steps).

Now, if the second worker is doing the exporting, it will actually complete the Keras fit_generator call locally at 300 steps, and then successfully export the model. But, if the first worker is responsible for exporting, it will actually be "stuck" in the fit_generator call, waiting for the last few records to arrive to complete its 300 steps. Meanwhile, everything else will be shutting down around it (because Spark already "finished" sending all of RDD data partitions).

So, a quick mitigation is to reduce the steps_per_epoch to something more "guaranteed" to finish (e.g. 290 in this example). However, please note though that Spark will assign partitions/tasks to the "next available" executor, so each worker can process different numbers of partitions/tasks. And this will only be more exaggerated as you scale up the cluster.

If you need to avoid this variability, you can use tf.data APIs, which generally shards datasets by filenames, so it should give you a more even distribution of data/tasks. However, this also means that your overall job will be as slow as the slowest executor/worker.

leewyang commented 5 years ago

Closing due to inactivity. Feel free to reopen if still an issue.