cerndb / dist-keras

Distributed Deep Learning, with a focus on distributed training, using Keras and Apache Spark.
http://joerihermans.com/work/distributed-keras/
GNU General Public License v3.0
624 stars 167 forks source link

Issues when initializing SparkSession and KafkaUtils.createStream #58

Open anishsharma opened 6 years ago

anishsharma commented 6 years ago

Hi,

I am running your ml_pipeline example and I have following configurations: Python : 3.6 Keras : 2 Spark : 2 Kafka : 2.11 Java : 8

'D:\spark\bin\spark-submit2.cmd" --conf "spark.app.name' is not recognized as an internal or external command, operable program or batch file. Traceback (most recent call last): File "ml_pipeline.py", line 69, in <module> sc = SparkSession.builder.config(conf=conf).appName(application_name).getOrCreate() File "d:\Anaconda3\lib\site-packages\pyspark\sql\session.py", line 173, in getOrCreate sc = SparkContext.getOrCreate(sparkConf) File "d:\Anaconda3\lib\site-packages\pyspark\context.py", line 334, in getOrCreate SparkContext(conf=conf or SparkConf()) File "d:\Anaconda3\lib\site-packages\pyspark\context.py", line 115, in __init__ SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) File "d:\Anaconda3\lib\site-packages\pyspark\context.py", line 283, in _ensure_initialized SparkContext._gateway = gateway or launch_gateway(conf) File "d:\Anaconda3\lib\site-packages\pyspark\java_gateway.py", line 95, in launch_gateway raise Exception("Java gateway process exited before sending the driver its port number") Exception: Java gateway process exited before sending the driver its port number

application_name = "Distributed Keras Kafka Pipeline I overcome this error by removing the spaces in title "Distributed Keras Kafka Pipeline". It was strange for me but it did work not sure why ??

Now. I am stuck on this error "AttributeError: 'SparkSession' object has no attribute '_getJavaStorageLevel'". I did some research and found that the default storage level has changed to MEMORY_AND_DISK. So I don't think I have to provide any such value as configuration. I appreciate your help.

EDIT:

I have resolved the above issue by using the relevant spark-streaming-kafka-assembly jar file and it successfully creating the streaming context now. But now I am facing another issue.

On fist iteration as control goes into predict(df) method, I got following errors:

============================PREPARED DATAFRAME=========================== DataFrame[DER_deltaeta_jet_jet: double, DER_deltar_tau_lep: double, DER_lep_eta_centrality: double, DER_mass_MMC: double, DER_mass_jet_jet: double, DER_mass_transverse_met_lep: double, DER_mass_vis: double, DER_met_phi_centrality: double, DER_prodeta_jet_jet: double, DER_pt_h: double, DER_pt_ratio_lep_tau: double, DER_pt_tot: double, DER_sum_pt: double, EventId: double, PRI_jet_all_pt: double, PRI_jet_leading_eta: double, PRI_jet_leading_phi: double, PRI_jet_leading_pt: double, PRI_jet_num: double, PRI_jet_subleading_eta: double, PRI_jet_subleading_phi: double, PRI_jet_subleading_pt: double, PRI_lep_eta: double, PRI_lep_phi: double, PRI_lep_pt: double, PRI_met: double, PRI_met_phi: double, PRI_met_sumet: double, PRI_tau_eta: double, PRI_tau_phi: double, PRI_tau_pt: double, features: vector, features_normalized: vector] 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896890400 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896890600 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896890800 replicated to only 0 peer(s) instead of 1 peers 2018-03-01 15:04:51.914911: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE instructions, but these are available on your machine and could speed up CPU computations. 18/03/01 15:04:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:51 WARN BlockManager: Block input-0-1519896891600 replicated to only 0 peer(s) instead of 1 peers 2018-03-01 15:04:51.923321: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE2 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.940857: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE3 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.949033: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.959770: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.970355: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.979551: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations. 2018-03-01 15:04:51.989807: W c:\l\tensorflow_1501918863922\work\tensorflow-1.2.1\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations. 18/03/01 15:04:52 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:52 WARN BlockManager: Block input-0-1519896891800 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:52 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. 18/03/01 15:04:52 WARN BlockManager: Block input-0-1519896892000 replicated to only 0 peer(s) instead of 1 peers 18/03/01 15:04:52 ERROR JobScheduler: Error running job streaming job 1519896870000 ms.0 org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "d:\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 65, in call r = self.func(t, *rdds) File "d:\Anaconda3\lib\site-packages\pyspark\streaming\dstream.py", line 159, in <lambda> func = lambda t, rdd: old_func(rdd) File "ml_pipeline.py", line 132, in process_instances df = predict(df) # Add the raw Neural Network predictions. File "ml_pipeline.py", line 91, in predict predictor = ModelPredictor(keras_model=model, features_col="features_normalized", output_col="prediction") File "d:\dist-keras\distkeras\predictors.py", line 45, in __init__ super(ModelPredictor, self).__init__(keras_model) File "d:\dist-keras\distkeras\predictors.py", line 23, in __init__ self.model = serialize_keras_model(keras_model) File "d:\dist-keras\distkeras\utils.py", line 84, in serialize_keras_model dictionary['weights'] = model.get_weights() File "d:\Anaconda3\lib\site-packages\keras\models.py", line 699, in get_weights return self.model.get_weights() File "d:\Anaconda3\lib\site-packages\keras\engine\topology.py", line 2008, in get_weights return K.batch_get_value(weights) File "d:\Anaconda3\lib\site-packages\keras\backend\tensorflow_backend.py", line 2320, in batch_get_value return get_session().run(ops) File "d:\Anaconda3\lib\site-packages\tensorflow\python\client\session.py", line 789, in run run_metadata_ptr) File "d:\Anaconda3\lib\site-packages\tensorflow\python\client\session.py", line 927, in _run raise RuntimeError('The Session graph is empty. Add operations to the ' RuntimeError: The Session graph is empty. Add operations to the graph before calling run().

Any help would be appreciated. Am I missing anything here ?

Thanks & Regards Anish Sharma

S-shweta commented 6 years ago

Hi,

I am facing the same error, any hints to the resolutions?