yahoo / TensorFlowOnSpark

TensorFlowOnSpark brings TensorFlow programs to Apache Spark clusters.
Apache License 2.0
3.87k stars 937 forks source link

error when using Embedding #477

Closed xiaomajia700 closed 4 years ago

xiaomajia700 commented 4 years ago

Environment:

Describe the bug: I got an error when using tf.keras.layers.Embedding, remove the embedding layer it works fine. What is the right way to use embedding layer?

Code:

from pyspark.sql import SparkSession

import argparse
import json

BATCH_SIZE = 10
LEARNING_RATE = 0.01
EMBEDDING_DIM = 32

def main_fun(args, ctx):
    import numpy as np
    import tensorflow as tf
    from tensorflowonspark import TFNode

    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

    def build_and_compile_model():
        col = "v_type"
        input_layer = tf.keras.layers.Input(shape=(2, 1), name=col, dtype=tf.int32)
        layer = input_layer
        layer = tf.keras.layers.Embedding(2, EMBEDDING_DIM, name="embed_" + col)(layer)
        layer = tf.keras.layers.Flatten(name="flat_" + col)(layer)
        score = tf.keras.layers.Dense(1, activation='relu')(layer)
        model = tf.keras.Model(inputs=input_layer, outputs=score)
        model.compile(optimizer=tf.keras.optimizers.Adam(), loss=tf.keras.losses.MSE)
        return model

    tf_feed = TFNode.DataFeed(ctx.mgr, False)

    def rdd_generator():
        while not tf_feed.should_stop():
            batch = tf_feed.next_batch(1)
            if len(batch) > 0:
                inputs = np.array([1, 1]).reshape((2, 1))
                outputs = np.array([1.0]).reshape((1, ))
                yield (inputs, outputs)
            else:
                return

    output_types = (tf.int32, tf.float32)
    output_shapes = (tf.TensorShape([2, 1]), tf.TensorShape([1]))

    ds = tf.data.Dataset.from_generator(rdd_generator, output_types, output_shapes)
    ds = ds.batch(args.batch_size)

    with strategy.scope():
        multi_worker_model = build_and_compile_model()

    multi_worker_model.summary()

    steps_per_epoch = args.num_records / args.batch_size
    steps_per_epoch_per_worker = steps_per_epoch / ctx.num_workers
    max_steps_per_worker = int(steps_per_epoch_per_worker * 0.9)

    multi_worker_model.fit(x=ds, epochs=args.epochs, steps_per_epoch=max_steps_per_worker)

    # terminating feed tells spark to skip processing further partitions
    tf_feed.terminate()

if __name__ == '__main__':
    from tensorflowonspark.pipeline import TFEstimator, TFModel

    spark = SparkSession.builder.enableHiveSupport().getOrCreate()
    sc = spark.sparkContext

    executors = sc._conf.get("spark.executor.instances")
    num_executors = int(executors) if executors is not None else 1

    parser = argparse.ArgumentParser()
    parser.add_argument("--batch_size", help="number of records per batch", type=int, default=BATCH_SIZE)
    parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
    parser.add_argument("--epochs", help="number of epochs", type=int, default=3)
    parser.add_argument("--model_dir", help="(ignore) path to save checkpoint", default="tmp_model")
    parser.add_argument("--export_dir", help="(ignore) path to export saved_model", default="tmp_export")

    args = parser.parse_args()
    print("args:", args)

    args.num_records = 100
    df = spark.createDataFrame([(x,) for x in range(args.num_records)], ["a"])

    estimator = TFEstimator(main_fun, args) \
            .setInputMapping({"a": "a"}) \
            .setModelDir(args.model_dir) \
            .setExportDir(args.export_dir) \
            .setClusterSize(args.cluster_size) \
            .setTensorboard(False) \
            .setEpochs(args.epochs) \
            .setBatchSize(args.batch_size) \
            .setGraceSecs(60)
    model = estimator.fit(df)

Spark Logs:

19/11/25 10:35:41 INFO spark.SparkContext: Running Spark version 2.4.0-cdh6.1.0
19/11/25 10:35:41 INFO spark.SparkContext: Submitted application: spark_test.py
19/11/25 10:35:41 INFO spark.SecurityManager: Changing view acls to: gzitv
19/11/25 10:35:41 INFO spark.SecurityManager: Changing modify acls to: gzitv
19/11/25 10:35:41 INFO spark.SecurityManager: Changing view acls groups to: 
19/11/25 10:35:41 INFO spark.SecurityManager: Changing modify acls groups to: 
19/11/25 10:35:41 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(gzitv); groups with view permissions: Set(); users  with modify permissions: Set(gzitv); groups with modify permissions: Set()
19/11/25 10:35:42 INFO util.Utils: Successfully started service 'sparkDriver' on port 60248.
19/11/25 10:35:42 INFO spark.SparkEnv: Registering MapOutputTracker
19/11/25 10:35:42 INFO spark.SparkEnv: Registering BlockManagerMaster
19/11/25 10:35:42 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/11/25 10:35:42 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/11/25 10:35:42 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-a370d02a-9918-4df2-8793-02e6a8fcf028
19/11/25 10:35:42 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
19/11/25 10:35:42 INFO spark.SparkEnv: Registering OutputCommitCoordinator
19/11/25 10:35:42 INFO util.log: Logging initialized @7105ms
19/11/25 10:35:42 INFO server.Server: jetty-9.3.z-SNAPSHOT
19/11/25 10:35:42 INFO server.Server: Started @7222ms
19/11/25 10:35:42 INFO server.AbstractConnector: Started ServerConnector@49d5937e{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
19/11/25 10:35:42 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7309813{/jobs,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4e4e37bf{/jobs/json,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4fb72cd{/jobs/job,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1bbe820d{/jobs/job/json,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@24f2bd0a{/stages,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7b40de43{/stages/json,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4108816d{/stages/stage,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@62ad58ec{/stages/stage/json,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@672bbe3d{/stages/pool,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7ef83f6{/stages/pool/json,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@9ff8a89{/storage,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@19dc962{/storage/json,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3bfd553a{/storage/rdd,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@40f69180{/storage/rdd/json,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3528e0d5{/environment,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5a91f9cb{/environment/json,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@b143326{/executors,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7f1ad955{/executors/json,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@dc6b02b{/executors/threadDump,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@58cc5ff9{/executors/threadDump/json,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@72ebcac5{/static,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@36da411e{/,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1b3a845e{/api,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@43392ceb{/jobs/job/kill,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@47556815{/stages/stage/kill,null,AVAILABLE,@Spark}
19/11/25 10:35:42 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://node-04:4040
19/11/25 10:35:43 INFO client.RMProxy: Connecting to ResourceManager at node-03/192.168.200.4:8032
19/11/25 10:35:43 INFO yarn.Client: Requesting a new application from cluster with 9 NodeManagers
19/11/25 10:35:43 INFO conf.Configuration: resource-types.xml not found
19/11/25 10:35:43 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
19/11/25 10:35:43 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (18432 MB per container)
19/11/25 10:35:43 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
19/11/25 10:35:43 INFO yarn.Client: Setting up container launch context for our AM
19/11/25 10:35:43 INFO yarn.Client: Setting up the launch environment for our AM container
19/11/25 10:35:43 INFO yarn.Client: Preparing resources for our AM container
19/11/25 10:35:44 INFO yarn.Client: Uploading resource file:/home/gzitv/ccw/recommend/tensorflow-hadoop-1.0-SNAPSHOT.jar -> hdfs://node-03:8020/user/gzitv/.sparkStaging/application_1574297549802_0182/tensorflow-hadoop-1.0-SNAPSHOT.jar
19/11/25 10:35:44 INFO yarn.Client: Uploading resource file:/tmp/spark-80626013-5ed5-47a7-8987-5b6152988869/__spark_conf__4528364305729155661.zip -> hdfs://node-03:8020/user/gzitv/.sparkStaging/application_1574297549802_0182/__spark_conf__.zip
19/11/25 10:35:44 INFO spark.SecurityManager: Changing view acls to: gzitv
19/11/25 10:35:44 INFO spark.SecurityManager: Changing modify acls to: gzitv
19/11/25 10:35:44 INFO spark.SecurityManager: Changing view acls groups to: 
19/11/25 10:35:44 INFO spark.SecurityManager: Changing modify acls groups to: 
19/11/25 10:35:44 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(gzitv); groups with view permissions: Set(); users  with modify permissions: Set(gzitv); groups with modify permissions: Set()
19/11/25 10:35:44 INFO conf.HiveConf: Found configuration file file:/etc/hive/conf.cloudera.hive/hive-site.xml
19/11/25 10:35:44 INFO yarn.Client: Submitting application application_1574297549802_0182 to ResourceManager
19/11/25 10:35:44 INFO impl.YarnClientImpl: Submitted application application_1574297549802_0182
19/11/25 10:35:44 INFO cluster.SchedulerExtensionServices: Starting Yarn extension services with app application_1574297549802_0182 and attemptId None
19/11/25 10:35:45 INFO yarn.Client: Application report for application_1574297549802_0182 (state: ACCEPTED)
19/11/25 10:35:45 INFO yarn.Client: 
        client token: N/A
        diagnostics: AM container is launched, waiting for AM container to Register with RM
        ApplicationMaster host: N/A
        ApplicationMaster RPC port: -1
        queue: root.users.gzitv
        start time: 1574649344858
        final status: UNDEFINED
        tracking URL: http://node-03:8088/proxy/application_1574297549802_0182/
        user: gzitv
19/11/25 10:35:46 INFO yarn.Client: Application report for application_1574297549802_0182 (state: ACCEPTED)
19/11/25 10:35:47 INFO yarn.Client: Application report for application_1574297549802_0182 (state: ACCEPTED)
19/11/25 10:35:48 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> node-03, PROXY_URI_BASES -> http://node-03:8088/proxy/application_1574297549802_0182), /proxy/application_1574297549802_0182
19/11/25 10:35:48 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /jobs, /jobs/json, /jobs/job, /jobs/job/json, /stages, /stages/json, /stages/stage, /stages/stage/json, /stages/pool, /stages/pool/json, /storage, /storage/json, /storage/rdd, /storage/rdd/json, /environment, /environment/json, /executors, /executors/json, /executors/threadDump, /executors/threadDump/json, /static, /, /api, /jobs/job/kill, /stages/stage/kill.
19/11/25 10:35:48 INFO yarn.Client: Application report for application_1574297549802_0182 (state: RUNNING)
19/11/25 10:35:48 INFO yarn.Client: 
        client token: N/A
        diagnostics: N/A
        ApplicationMaster host: 192.168.200.4
        ApplicationMaster RPC port: -1
        queue: root.users.gzitv
        start time: 1574649344858
        final status: UNDEFINED
        tracking URL: http://node-03:8088/proxy/application_1574297549802_0182/
        user: gzitv
19/11/25 10:35:48 INFO cluster.YarnClientSchedulerBackend: Application application_1574297549802_0182 has started running.
19/11/25 10:35:48 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58711.
19/11/25 10:35:48 INFO netty.NettyBlockTransferService: Server created on node-04:58711
19/11/25 10:35:48 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/11/25 10:35:49 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, node-04, 58711, None)
19/11/25 10:35:49 INFO storage.BlockManagerMasterEndpoint: Registering block manager node-04:58711 with 366.3 MB RAM, BlockManagerId(driver, node-04, 58711, None)
19/11/25 10:35:49 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, node-04, 58711, None)
19/11/25 10:35:49 INFO storage.BlockManager: external shuffle service port = 7337
19/11/25 10:35:49 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, node-04, 58711, None)
19/11/25 10:35:49 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
19/11/25 10:35:49 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /metrics/json.
19/11/25 10:35:49 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@619c4e31{/metrics/json,null,AVAILABLE,@Spark}
19/11/25 10:35:49 INFO scheduler.EventLoggingListener: Logging events to hdfs://node-03:8020/user/spark/applicationHistory/application_1574297549802_0182
19/11/25 10:35:49 WARN lineage.LineageWriter: Lineage directory /var/log/spark/lineage doesn't exist or is not writable. Lineage for this application will be disabled.
19/11/25 10:35:49 INFO util.Utils: Extension com.cloudera.spark.lineage.NavigatorAppListener not being initialized.
19/11/25 10:35:52 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.200.7:50598) with ID 3
19/11/25 10:35:52 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.200.12:50698) with ID 2
19/11/25 10:35:52 INFO storage.BlockManagerMasterEndpoint: Registering block manager node-06:36587 with 5.2 GB RAM, BlockManagerId(3, node-06, 36587, None)
19/11/25 10:35:52 INFO storage.BlockManagerMasterEndpoint: Registering block manager node-11:58531 with 5.2 GB RAM, BlockManagerId(2, node-11, 58531, None)
19/11/25 10:35:52 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.200.5:44178) with ID 1
19/11/25 10:35:52 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
19/11/25 10:35:52 INFO storage.BlockManagerMasterEndpoint: Registering block manager node-04:54800 with 5.2 GB RAM, BlockManagerId(1, node-04, 54800, None)
19/11/25 10:35:53 INFO internal.SharedState: loading hive config file: file:/etc/hive/conf.cloudera.hive/hive-site.xml
19/11/25 10:35:53 INFO internal.SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('/user/hive/warehouse').
19/11/25 10:35:53 INFO internal.SharedState: Warehouse path is '/user/hive/warehouse'.
19/11/25 10:35:53 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL.
19/11/25 10:35:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2b76358d{/SQL,null,AVAILABLE,@Spark}
19/11/25 10:35:53 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/json.
19/11/25 10:35:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5b71a21a{/SQL/json,null,AVAILABLE,@Spark}
19/11/25 10:35:53 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/execution.
19/11/25 10:35:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2ed4f367{/SQL/execution,null,AVAILABLE,@Spark}
19/11/25 10:35:53 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/execution/json.
19/11/25 10:35:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@51a7f70c{/SQL/execution/json,null,AVAILABLE,@Spark}
19/11/25 10:35:53 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /static/sql.
19/11/25 10:35:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@43e6fc32{/static/sql,null,AVAILABLE,@Spark}
19/11/25 10:35:54 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
19/11/25 10:35:54 WARN lineage.LineageWriter: Lineage directory /var/log/spark/lineage doesn't exist or is not writable. Lineage for this application will be disabled.
19/11/25 10:35:54 INFO util.Utils: Extension com.cloudera.spark.lineage.NavigatorQueryListener not being initialized.
args: Namespace(batch_size=10, cluster_size=3, epochs=3, export_dir='tmp_export', model_dir='tmp_model')
2019-11-25 10:35:56,438 INFO (MainThread-1506) ===== 1. train args: Namespace(batch_size=10, cluster_size=3, epochs=3, export_dir='tmp_export', model_dir='tmp_model', num_records=100)
2019-11-25 10:35:56,438 INFO (MainThread-1506) ===== 2. train params: {Param(parent='TFEstimator_aa93e7f12f01', name='input_mapping', doc='Mapping of input DataFrame column to input tensor'): {'a': 'a'}, Param(parent='TFEstimator_aa93e7f12f01', name='model_dir', doc='Path to save/load model checkpoints'): 'tmp_model', Param(parent='TFEstimator_aa93e7f12f01', name='export_dir', doc='Directory to export saved_model'): 'tmp_export', Param(parent='TFEstimator_aa93e7f12f01', name='cluster_size', doc='Number of nodes in the cluster'): 3, Param(parent='TFEstimator_aa93e7f12f01', name='tensorboard', doc='Launch tensorboard process'): False, Param(parent='TFEstimator_aa93e7f12f01', name='epochs', doc='Number of epochs to train'): 3, Param(parent='TFEstimator_aa93e7f12f01', name='batch_size', doc='Number of records per batch'): 10, Param(parent='TFEstimator_aa93e7f12f01', name='grace_secs', doc='Number of seconds to wait after feeding data (for final tasks like exporting a saved_model)'): 60}
2019-11-25 10:35:56,439 INFO (MainThread-1506) ===== 3. train args + params: Namespace(batch_size=10, cluster_size=3, driver_ps_nodes=False, epochs=3, export_dir='tmp_export', grace_secs=60, input_mapping={'a': 'a'}, master_node='chief', model_dir='tmp_model', num_ps=0, num_records=100, protocol='grpc', readers=1, steps=1000, tensorboard=False, tfrecord_dir=None)
2019-11-25 10:35:56,440 INFO (MainThread-1506) Reserving TFSparkNodes 
2019-11-25 10:35:56,440 INFO (MainThread-1506) cluster_template: {'chief': [0], 'worker': [1, 2]}
2019-11-25 10:35:56,444 INFO (MainThread-1506) listening for reservations at ('192.168.200.5', 55855)
2019-11-25 10:35:56,445 INFO (MainThread-1506) Starting TensorFlow on executors
2019-11-25 10:35:56,451 INFO (MainThread-1506) Waiting for TFSparkNodes to start
2019-11-25 10:35:56,451 INFO (MainThread-1506) waiting for 3 reservations
19/11/25 10:35:56 INFO spark.SparkContext: Starting job: foreachPartition at /opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFCluster.py:320
19/11/25 10:35:56 INFO scheduler.DAGScheduler: Got job 0 (foreachPartition at /opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFCluster.py:320) with 3 output partitions
19/11/25 10:35:56 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (foreachPartition at /opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFCluster.py:320)
19/11/25 10:35:56 INFO scheduler.DAGScheduler: Parents of final stage: List()
19/11/25 10:35:56 INFO scheduler.DAGScheduler: Missing parents: List()
19/11/25 10:35:56 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (PythonRDD[6] at foreachPartition at /opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFCluster.py:320), which has no missing parents
19/11/25 10:35:56 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 19.1 KB, free 366.3 MB)
19/11/25 10:35:57 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.5 KB, free 366.3 MB)
19/11/25 10:35:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on node-04:58711 (size: 13.5 KB, free: 366.3 MB)
19/11/25 10:35:57 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1164
19/11/25 10:35:57 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (PythonRDD[6] at foreachPartition at /opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFCluster.py:320) (first 15 tasks are for partitions Vector(0, 1, 2))
19/11/25 10:35:57 INFO cluster.YarnScheduler: Adding task set 0.0 with 3 tasks
19/11/25 10:35:57 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, node-06, executor 3, partition 0, PROCESS_LOCAL, 7736 bytes)
19/11/25 10:35:57 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, node-04, executor 1, partition 1, PROCESS_LOCAL, 7736 bytes)
19/11/25 10:35:57 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, node-11, executor 2, partition 2, PROCESS_LOCAL, 7736 bytes)
2019-11-25 10:35:57,451 INFO (MainThread-1506) waiting for 3 reservations
19/11/25 10:35:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on node-06:36587 (size: 13.5 KB, free: 5.2 GB)
19/11/25 10:35:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on node-11:58531 (size: 13.5 KB, free: 5.2 GB)
19/11/25 10:35:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on node-04:54800 (size: 13.5 KB, free: 5.2 GB)
2019-11-25 10:35:58,453 INFO (MainThread-1506) waiting for 3 reservations
2019-11-25 10:35:59,454 INFO (MainThread-1506) waiting for 3 reservations
2019-11-25 10:36:00,455 INFO (MainThread-1506) waiting for 3 reservations
2019-11-25 10:36:01,457 INFO (MainThread-1506) waiting for 3 reservations
2019-11-25 10:36:02,458 INFO (MainThread-1506) all reservations completed
2019-11-25 10:36:02,458 INFO (MainThread-1506) All TFSparkNodes started
2019-11-25 10:36:02,459 INFO (MainThread-1506) {'executor_id': 0, 'host': '192.168.200.7', 'job_name': 'chief', 'task_index': 0, 'port': 46465, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-mt48obxw/listener-q9fbj0re', 'authkey': b'\xc8\xc8YMBUH\x0c\xb5\xe3&\xae\xbe\xf6,<'}
2019-11-25 10:36:02,459 INFO (MainThread-1506) {'executor_id': 2, 'host': '192.168.200.12', 'job_name': 'worker', 'task_index': 1, 'port': 45418, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-5smf0kkb/listener-jolq8ogz', 'authkey': b'\xa4\x0f\x84\xea\x19(L\xf7\x94\x84\x1d\xe7\x1d^\x8d\x18'}
2019-11-25 10:36:02,459 INFO (MainThread-1506) {'executor_id': 1, 'host': '192.168.200.5', 'job_name': 'worker', 'task_index': 0, 'port': 45948, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-ja9tmgvi/listener-29sqsi8n', 'authkey': b'\xaa\x9e\x9c$z\xac@o\xaf\x11\xdf\x8esR\xb7 '}
2019-11-25 10:36:02,922 INFO (MainThread-1506) Feeding training data
19/11/25 10:36:03 INFO spark.SparkContext: Starting job: collect at PythonRDD.scala:166
19/11/25 10:36:03 INFO scheduler.DAGScheduler: Got job 1 (collect at PythonRDD.scala:166) with 9 output partitions
19/11/25 10:36:03 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (collect at PythonRDD.scala:166)
19/11/25 10:36:03 INFO scheduler.DAGScheduler: Parents of final stage: List()
19/11/25 10:36:03 INFO scheduler.DAGScheduler: Missing parents: List()
19/11/25 10:36:03 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (PythonRDD[11] at RDD at PythonRDD.scala:53), which has no missing parents
19/11/25 10:36:03 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 16.0 KB, free 366.3 MB)
19/11/25 10:36:03 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 9.6 KB, free 366.2 MB)
19/11/25 10:36:03 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on node-04:58711 (size: 9.6 KB, free: 366.3 MB)
19/11/25 10:36:03 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1164
19/11/25 10:36:03 INFO scheduler.DAGScheduler: Submitting 9 missing tasks from ResultStage 1 (PythonRDD[11] at RDD at PythonRDD.scala:53) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8))
19/11/25 10:36:03 INFO cluster.YarnScheduler: Adding task set 1.0 with 9 tasks
19/11/25 10:36:03 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, node-04, executor 1, partition 0, PROCESS_LOCAL, 8021 bytes)
19/11/25 10:36:03 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 6156 ms on node-04 (executor 1) (1/3)
19/11/25 10:36:03 INFO python.PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 53374
19/11/25 10:36:03 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on node-04:54800 (size: 9.6 KB, free: 5.2 GB)
19/11/25 10:36:03 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 4, node-06, executor 3, partition 1, PROCESS_LOCAL, 8021 bytes)
19/11/25 10:36:03 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 5, node-11, executor 2, partition 2, PROCESS_LOCAL, 8035 bytes)
19/11/25 10:36:03 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 6465 ms on node-06 (executor 3) (2/3)
19/11/25 10:36:03 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 6415 ms on node-11 (executor 2) (3/3)
19/11/25 10:36:03 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
19/11/25 10:36:03 INFO scheduler.DAGScheduler: ResultStage 0 (foreachPartition at /opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFCluster.py:320) finished in 6.942 s
19/11/25 10:36:03 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on node-06:36587 (size: 9.6 KB, free: 5.2 GB)
19/11/25 10:36:03 INFO scheduler.DAGScheduler: Job 0 finished: foreachPartition at /opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFCluster.py:320, took 7.018314 s
19/11/25 10:36:03 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on node-11:58531 (size: 9.6 KB, free: 5.2 GB)
19/11/25 10:36:07 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 1.0 (TID 6, node-06, executor 3, partition 3, PROCESS_LOCAL, 8021 bytes)
19/11/25 10:36:07 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 4, node-06, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 352, in func
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 801, in func
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 414, in _train
    raise Exception("Exception in worker:\n" + e_str)
Exception: Exception in worker:
Traceback (most recent call last):
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 330, in wrapper_fn_background
    wrapper_fn(args, context)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 324, in wrapper_fn
    fn(args, context)
File "/home/gzitv/ccw/recommend/spark_test.py", line 72, in main_fun
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py", line 728, in fit
    use_multiprocessing=use_multiprocessing)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_distributed.py", line 789, in fit
    *args, **kwargs)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_distributed.py", line 776, in wrapper
    mode=dc.CoordinatorMode.INDEPENDENT_WORKER)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_coordinator.py", line 853, in run_distribute_coordinator
    task_id, session_config, rpc_layer)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_coordinator.py", line 360, in _run_single_worker
    return worker_fn(strategy)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_distributed.py", line 771, in _worker_fn
    return method(model, **kwargs)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py", line 324, in fit
    total_epochs=epochs)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py", line 123, in run_one_epoch
    batch_outs = execution_function(iterator)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py", line 86, in execution_function
    distributed_function(input_fn))
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py", line 457, in __call__
    result = self._call(*args, **kwds)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py", line 520, in _call
    return self._stateless_fn(*args, **kwds)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 1823, in __call__
    return graph_function._filtered_call(args, kwargs)  # pylint: disable=protected-access
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 1141, in _filtered_call
    self.captured_inputs)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 1224, in _call_flat
    ctx, args, cancellation_manager=cancellation_manager)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 511, in call
    ctx=ctx)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/execute.py", line 67, in quick_execute
    six.raise_from(core._status_to_exception(e.code, message), None)
File "<string>", line 3, in raise_from
tensorflow.python.framework.errors_impl.InternalError:  RecvBufResponse returned 16 bytes where to_tensor expected 32
        [[node allreduce_1/CollectiveReduce (defined at opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py:1751) ]] [Op:__inference_distributed_function_720]

Function call stack:
distributed_function

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Executor Logs:

19/11/25 10:35:50 INFO executor.CoarseGrainedExecutorBackend: Started daemon with process name: 89573@node-06
19/11/25 10:35:50 INFO util.SignalUtils: Registered signal handler for TERM
19/11/25 10:35:50 INFO util.SignalUtils: Registered signal handler for HUP
19/11/25 10:35:50 INFO util.SignalUtils: Registered signal handler for INT
19/11/25 10:35:51 INFO spark.SecurityManager: Changing view acls to: yarn,gzitv
19/11/25 10:35:51 INFO spark.SecurityManager: Changing modify acls to: yarn,gzitv
19/11/25 10:35:51 INFO spark.SecurityManager: Changing view acls groups to: 
19/11/25 10:35:51 INFO spark.SecurityManager: Changing modify acls groups to: 
19/11/25 10:35:51 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, gzitv); groups with view permissions: Set(); users  with modify permissions: Set(yarn, gzitv); groups with modify permissions: Set()
19/11/25 10:35:51 INFO client.TransportClientFactory: Successfully created connection to node-04/192.168.200.5:60248 after 114 ms (0 ms spent in bootstraps)
19/11/25 10:35:51 INFO spark.SecurityManager: Changing view acls to: yarn,gzitv
19/11/25 10:35:51 INFO spark.SecurityManager: Changing modify acls to: yarn,gzitv
19/11/25 10:35:51 INFO spark.SecurityManager: Changing view acls groups to: 
19/11/25 10:35:51 INFO spark.SecurityManager: Changing modify acls groups to: 
19/11/25 10:35:51 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, gzitv); groups with view permissions: Set(); users  with modify permissions: Set(yarn, gzitv); groups with modify permissions: Set()
19/11/25 10:35:51 INFO client.TransportClientFactory: Successfully created connection to node-04/192.168.200.5:60248 after 1 ms (0 ms spent in bootstraps)
19/11/25 10:35:52 INFO storage.DiskBlockManager: Created local directory at /home/data/yarn/nm/usercache/gzitv/appcache/application_1574297549802_0182/blockmgr-e46e4653-8476-4645-8994-d1d6887de7ef
19/11/25 10:35:52 INFO memory.MemoryStore: MemoryStore started with capacity 5.2 GB
19/11/25 10:35:52 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@node-04:60248
19/11/25 10:35:52 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver
19/11/25 10:35:52 INFO executor.Executor: Starting executor ID 3 on host node-06
19/11/25 10:35:52 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36587.
19/11/25 10:35:52 INFO netty.NettyBlockTransferService: Server created on node-06:36587
19/11/25 10:35:52 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/11/25 10:35:52 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(3, node-06, 36587, None)
19/11/25 10:35:52 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(3, node-06, 36587, None)
19/11/25 10:35:52 INFO storage.BlockManager: external shuffle service port = 7337
19/11/25 10:35:52 INFO storage.BlockManager: Registering executor with local external shuffle service.
19/11/25 10:35:52 INFO client.TransportClientFactory: Successfully created connection to node-06/192.168.200.7:7337 after 1 ms (0 ms spent in bootstraps)
19/11/25 10:35:52 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(3, node-06, 36587, None)
19/11/25 10:35:57 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0
19/11/25 10:35:57 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
19/11/25 10:35:57 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0
19/11/25 10:35:57 INFO client.TransportClientFactory: Successfully created connection to node-04/192.168.200.5:58711 after 2 ms (0 ms spent in bootstraps)
19/11/25 10:35:57 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.5 KB, free 5.2 GB)
19/11/25 10:35:57 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 171 ms
19/11/25 10:35:57 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 19.1 KB, free 5.2 GB)
2019-11-25 10:36:01,491 INFO (MainThread-89693) connected to server at ('192.168.200.5', 55855)
2019-11-25 10:36:01,492 INFO (MainThread-89693) TFSparkNode.reserve: {'executor_id': 0, 'host': '192.168.200.7', 'job_name': 'chief', 'task_index': 0, 'port': 46465, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-mt48obxw/listener-q9fbj0re', 'authkey': b'\xc8\xc8YMBUH\x0c\xb5\xe3&\xae\xbe\xf6,<'}
2019-11-25 10:36:03,496 INFO (MainThread-89693) node: {'executor_id': 0, 'host': '192.168.200.7', 'job_name': 'chief', 'task_index': 0, 'port': 46465, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-mt48obxw/listener-q9fbj0re', 'authkey': b'\xc8\xc8YMBUH\x0c\xb5\xe3&\xae\xbe\xf6,<'}
2019-11-25 10:36:03,496 INFO (MainThread-89693) node: {'executor_id': 1, 'host': '192.168.200.5', 'job_name': 'worker', 'task_index': 0, 'port': 45948, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-ja9tmgvi/listener-29sqsi8n', 'authkey': b'\xaa\x9e\x9c$z\xac@o\xaf\x11\xdf\x8esR\xb7 '}
2019-11-25 10:36:03,496 INFO (MainThread-89693) node: {'executor_id': 2, 'host': '192.168.200.12', 'job_name': 'worker', 'task_index': 1, 'port': 45418, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-5smf0kkb/listener-jolq8ogz', 'authkey': b'\xa4\x0f\x84\xea\x19(L\xf7\x94\x84\x1d\xe7\x1d^\x8d\x18'}
2019-11-25 10:36:03,497 INFO (MainThread-89693) export TF_CONFIG: {"cluster": {"chief": ["192.168.200.7:46465"], "worker": ["192.168.200.5:45948", "192.168.200.12:45418"]}, "task": {"type": "chief", "index": 0}, "environment": "cloud"}
2019-11-25 10:36:03,497 INFO (MainThread-89693) Starting TensorFlow chief:0 as chief on cluster node 0 on background process
19/11/25 10:36:03 INFO python.PythonRunner: Times: total = 5611, boot = 607, init = 2961, finish = 2043
2019-11-25 10:36:03.531691: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2194735000 Hz
2019-11-25 10:36:03.532858: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x7f01e0004ae0 executing computations on platform Host. Devices:
2019-11-25 10:36:03.532895: I tensorflow/compiler/xla/service/service.cc:175]   StreamExecutor device (0): Host, Default Version
2019-11-25 10:36:03.537483: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job chief -> {0 -> localhost:46465}
2019-11-25 10:36:03.537518: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job worker -> {0 -> 192.168.200.5:45948, 1 -> 192.168.200.12:45418}
2019-11-25 10:36:03.539686: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:365] Started server with target: grpc://localhost:46465
19/11/25 10:36:03 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1462 bytes result sent to driver
2019-11-25 10:36:03,554 INFO (MainThread-89806) Enabled multi-worker collective ops with available devices: ['/job:chief/replica:0/task:0/device:CPU:0', '/job:chief/replica:0/task:0/device:XLA_CPU:0']
19/11/25 10:36:03 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 4
19/11/25 10:36:03 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 4)
2019-11-25 10:36:03,574 INFO (MainThread-89806) Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'chief': ['192.168.200.7:46465'], 'worker': ['192.168.200.5:45948', '192.168.200.12:45418']}, task_type = 'chief', task_id = 0, num_workers = 3, local_devices = ('/job:chief/task:0',), communication = CollectiveCommunication.AUTO
19/11/25 10:36:03 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 1
19/11/25 10:36:03 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 9.6 KB, free 5.2 GB)
19/11/25 10:36:03 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 18 ms
19/11/25 10:36:03 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 16.0 KB, free 5.2 GB)
Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
v_type (InputLayer)          [(None, 2, 1)]            0         
_________________________________________________________________
embed_v_type (Embedding)     (None, 2, 1, 32)          64        
_________________________________________________________________
flat_v_type (Flatten)        (None, 64)                0         
_________________________________________________________________
dense (Dense)                (None, 1)                 65        
=================================================================
Total params: 129
Trainable params: 129
Non-trainable params: 0
_________________________________________________________________
2019-11-25 10:36:04,075 INFO (MainThread-89806) Running Distribute Coordinator with mode = 'independent_worker', cluster_spec = {'chief': ['192.168.200.7:46465'], 'worker': ['192.168.200.5:45948', '192.168.200.12:45418']}, task_type = 'chief', task_id = 0, environment = 'cloud', rpc_layer = 'grpc'
2019-11-25 10:36:04,075 WARNING (MainThread-89806) `eval_fn` is not passed in. The `worker_fn` will be used if an "evaluator" task exists in the cluster.
2019-11-25 10:36:04,075 WARNING (MainThread-89806) `eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
2019-11-25 10:36:04,077 INFO (MainThread-89806) Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'chief': ['192.168.200.7:46465'], 'worker': ['192.168.200.5:45948', '192.168.200.12:45418']}, task_type = 'chief', task_id = 0, num_workers = 3, local_devices = ('/job:chief/task:0',), communication = CollectiveCommunication.AUTO
2019-11-25 10:36:04,078 INFO (MainThread-89806) Multi-worker CollectiveAllReduceStrategy with cluster_spec = {'chief': ['192.168.200.7:46465'], 'worker': ['192.168.200.5:45948', '192.168.200.12:45418']}, task_type = 'chief', task_id = 0, num_workers = 3, local_devices = ('/job:chief/task:0',), communication = CollectiveCommunication.AUTO
2019-11-25 10:36:04,078 WARNING (MainThread-89806) ModelCheckpoint callback is not provided. Workers will need to restart training if any fails.
2019-11-25 10:36:04.091124: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:400] Cannot find shardable dataset, adding a shard node at the end of the dataset instead. This may have performance implications.
Train for 3 steps
Epoch 1/3
19/11/25 10:36:04 INFO codegen.CodeGenerator: Code generated in 265.403394 ms
19/11/25 10:36:04 INFO python.PythonRunner: Times: total = 51, boot = 6, init = 45, finish = 0
2019-11-25 10:36:04,452 INFO (MainThread-90072) Connected to TFSparkNode.mgr on 192.168.200.7, executor=0, state='running'
2019-11-25 10:36:04,462 INFO (MainThread-90072) mgr.state='running'
2019-11-25 10:36:04,463 INFO (MainThread-90072) Feeding partition <itertools.chain object at 0x7f01cce3aa20> into input queue <multiprocessing.queues.JoinableQueue object at 0x7f01cc4b59e8>
2019-11-25 10:36:04,480 INFO (MainThread-89806) Collective batch_all_reduce: 2 all-reduces, num_workers = 3
2019-11-25 10:36:04,483 INFO (MainThread-89806) Collective batch_all_reduce for IndexedSlices: 1 all-reduces, num_workers = 3
2019-11-25 10:36:05,529 INFO (MainThread-89806) Collective batch_all_reduce: 1 all-reduces, num_workers = 3
2019-11-25 10:36:05,533 INFO (MainThread-89806) Collective batch_all_reduce: 1 all-reduces, num_workers = 3
2019-11-25 10:36:06,041 INFO (MainThread-89806) Collective batch_all_reduce: 2 all-reduces, num_workers = 3
2019-11-25 10:36:06,044 INFO (MainThread-89806) Collective batch_all_reduce for IndexedSlices: 1 all-reduces, num_workers = 3
2019-11-25 10:36:07,087 INFO (MainThread-89806) Collective batch_all_reduce: 1 all-reduces, num_workers = 3
2019-11-25 10:36:07,092 INFO (MainThread-89806) Collective batch_all_reduce: 1 all-reduces, num_workers = 3
2019-11-25 10:36:07.261999: E tensorflow/core/common_runtime/ring_alg.cc:279] Aborting RingGather with Internal: RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.262071: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Internal: RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.262090: E tensorflow/core/common_runtime/ring_alg.cc:279] Aborting RingReduce with Internal: RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.262100: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Internal: RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.262335: W tensorflow/core/framework/op_kernel.cc:1622] OP_REQUIRES failed at collective_ops.cc:234 : Internal: RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.262350: W tensorflow/core/framework/op_kernel.cc:1622] OP_REQUIRES failed at collective_ops.cc:125 : Internal: RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.262401: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Internal: RecvBufResponse returned 16 bytes where to_tensor expected 32
    [[{{node allreduce_1/CollectiveReduce}}]]
2019-11-25 10:36:07.268305: E tensorflow/core/common_runtime/ring_alg.cc:279] Aborting RingReduce with Internal: [_Derived_]RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.268357: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Internal: [_Derived_]RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.268450: W tensorflow/core/framework/op_kernel.cc:1622] OP_REQUIRES failed at collective_ops.cc:234 : Internal: [_Derived_]RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.268554: E tensorflow/core/common_runtime/ring_alg.cc:279] Aborting RingReduce with Internal: [_Derived_]RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.268597: E tensorflow/core/common_runtime/ring_alg.cc:279] Aborting RingGather with Internal: [_Derived_]RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.268605: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Internal: [_Derived_]RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.268630: W tensorflow/core/common_runtime/base_collective_executor.cc:216] BaseCollectiveExecutor::StartAbort Internal: [_Derived_]RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.268753: W tensorflow/core/framework/op_kernel.cc:1622] OP_REQUIRES failed at collective_ops.cc:234 : Internal: [_Derived_]RecvBufResponse returned 16 bytes where to_tensor expected 32
2019-11-25 10:36:07.268783: W tensorflow/core/framework/op_kernel.cc:1622] OP_REQUIRES failed at collective_ops.cc:125 : Internal: [_Derived_]RecvBufResponse returned 16 bytes where to_tensor expected 32

1/3 [=========>....................] - ETA: 6s19/11/25 10:36:07 ERROR executor.Executor: Exception in task 1.0 in stage 1.0 (TID 4)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 352, in func
File "/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 801, in func
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 414, in _train
    raise Exception("Exception in worker:\n" + e_str)
Exception: Exception in worker:
Traceback (most recent call last):
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 330, in wrapper_fn_background
    wrapper_fn(args, context)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 324, in wrapper_fn
    fn(args, context)
File "/home/gzitv/ccw/recommend/spark_test.py", line 72, in main_fun
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py", line 728, in fit
    use_multiprocessing=use_multiprocessing)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_distributed.py", line 789, in fit
    *args, **kwargs)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_distributed.py", line 776, in wrapper
    mode=dc.CoordinatorMode.INDEPENDENT_WORKER)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_coordinator.py", line 853, in run_distribute_coordinator
    task_id, session_config, rpc_layer)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_coordinator.py", line 360, in _run_single_worker
    return worker_fn(strategy)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_distributed.py", line 771, in _worker_fn
    return method(model, **kwargs)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py", line 324, in fit
    total_epochs=epochs)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py", line 123, in run_one_epoch
    batch_outs = execution_function(iterator)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py", line 86, in execution_function
    distributed_function(input_fn))
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py", line 457, in __call__
    result = self._call(*args, **kwds)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py", line 520, in _call
    return self._stateless_fn(*args, **kwds)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 1823, in __call__
    return graph_function._filtered_call(args, kwargs)  # pylint: disable=protected-access
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 1141, in _filtered_call
    self.captured_inputs)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 1224, in _call_flat
    ctx, args, cancellation_manager=cancellation_manager)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py", line 511, in call
    ctx=ctx)
File "/opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/execute.py", line 67, in quick_execute
    six.raise_from(core._status_to_exception(e.code, message), None)
File "<string>", line 3, in raise_from
tensorflow.python.framework.errors_impl.InternalError:  RecvBufResponse returned 16 bytes where to_tensor expected 32
    [[node allreduce_1/CollectiveReduce (defined at opt/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py:1751) ]] [Op:__inference_distributed_function_720]

Function call stack:
distributed_function

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/11/25 10:36:07 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 6
19/11/25 10:36:07 INFO executor.Executor: Running task 3.0 in stage 1.0 (TID 6)
19/11/25 10:36:07 INFO python.PythonRunner: Times: total = 43, boot = -3745, init = 3787, finish = 1
2019-11-25 10:36:07,631 INFO (MainThread-90216) Connected to TFSparkNode.mgr on 192.168.200.7, executor=0, state='running'
2019-11-25 10:36:07,641 INFO (MainThread-90216) mgr.state='running'
2019-11-25 10:36:07,641 INFO (MainThread-90216) Feeding partition <itertools.chain object at 0x7f01cce3aa20> into input queue <multiprocessing.queues.JoinableQueue object at 0x7f01cc4b59e8>
19/11/25 10:36:21 INFO executor.Executor: Executor is trying to kill task 3.0 in stage 1.0 (TID 6), reason: Stage cancelled
19/11/25 10:36:21 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown
19/11/25 10:36:21 INFO memory.MemoryStore: MemoryStore cleared
19/11/25 10:36:21 INFO storage.BlockManager: BlockManager stopped
19/11/25 10:36:21 INFO util.ShutdownHookManager: Shutdown hook called
19/11/25 10:36:21 INFO executor.Executor: Executor killed task 3.0 in stage 1.0 (TID 6), reason: Stage cancelled

Spark Submit Command Line:

spark-submit --master yarn --conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" --jars tensorflow-hadoop-1.0-SNAPSHOT.jar --num-executors 3 --executor-memory 10g --conf spark.dynamicAllocation.enabled=false spark_test.py --epochs 3
leewyang commented 4 years ago

Unfortunately, I'm not aware of any errors/resolutions related to this... that said:

  1. Are you able to run this code as a single-node TF app? (if not, I'd start there).
  2. Then, once you're happy w/ the single-node TF app, you should try to run a "simple" manual cluster, using two processes on the same machine w/ separate TF_CONFIG variables (see this example for details).
  3. Then, if that all works, you can move to an equivalent TFoS cluster of two nodes and compare w/ the behavior from 2.
  4. Gradually scale out your cluster, adjusting various hyperparameters and spark settings as needed.

Also, please be aware many of the DistributionStrategies in TF2.0 are still marked as "experimental" or "limited support" by the TF team, so you may just be encountering "bleeding edge" issues.

xiaomajia700 commented 4 years ago

The model works fine as a single-node TF app. I will try mutil-process.

leewyang commented 4 years ago

Closing due to inactivity. Feel free to re-open if still an issue.