danielenricocahall / elephas

Distributed Deep learning with Keras & Spark
MIT License
17 stars 5 forks source link

tensor.keras.layers Reshape not defined when calling estimator.fit #20

Open GeoffDuniam opened 1 year ago

GeoffDuniam commented 1 year ago

Hi Daniel

We have an interesting issue when calling the estimator to fit a model to a dataframe.

Input Data frame The dataframe represents target and light curve data for astronomical objects, with one row per object. Each object has an object_id, a categorical target representing the type (or category) of the object, and array columns representing the features that describe this particular object.

The schema of the dataframe is here -

root
 |-- object_id: integer (nullable = true)
 |-- target: integer (nullable = true)
 |-- meta: vector (nullable = true)
 |-- band: vector (nullable = true)
 |-- features: vector (nullable = true)

The two columns we're using for the analysis are the target and features columns.

The features column is a dense vector comprising the meta, band, and eight other features. The breakdown of this vector by position is

Datatypes are floats.

Model definition

def get_model(train_df, input_dim, size=80):

    def get_meta(x):
        x=x[:,0:10]
        return x

    def get_band(x):
        x=x[:,10:266]
        return x

    def get_hist(x):
        # x=x[:,266:input_dim]  -- this is before we passed in the shape from input dim - to avoid two select calls
        x=x[:,266:input_dim[0]]
        x=Reshape((8,256))(x)
        x=K.permute_dimensions(x, (0,2,1))
        return x

    #raw_input  = Input(shape=train_df.select("features").first()[0].shape, name='raw')
    raw_input  = Input(shape=input_dim, name='raw')

    hist_input = Lambda(get_hist,  name="hist")(raw_input)
    meta_input = Lambda(get_meta,  name="meta")(raw_input)
    band_input = Lambda(get_band,  name="band")(raw_input)

    band_emb = Embedding(8, 8)(band_input)
    hist = concatenate([hist_input, band_emb])
    hist = TimeDistributed(Dense(40, activation='relu'))(hist)

    rnn = Bidirectional(GRU(size, return_sequences=True))(hist)
    rnn = SpatialDropout1D(0.5)(rnn)

    gmp = GlobalMaxPool1D()(rnn)
    gmp = Dropout(0.5)(gmp)

    x = concatenate([meta_input, gmp])
    x = Dense(128, activation='relu')(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.5)(x)

    output = Dense(15, activation='softmax')(x)
    model = Model(inputs=[raw_input], outputs=output)

    return model

Model creation

model = get_model(train_df.select('features'), input_dim)
model.compile(optimizer=optimizer, loss=mywloss, metrics=['accuracy'])
adam=optimizers.Nadam(learning_rate=0.01)
opt_conf = optimizers.serialize(adam)

Model summary

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
==================================================================================================
 raw (InputLayer)               [(None, 2314)]       0           []                               

 band (Lambda)                  (None, 256)          0           ['raw[0][0]']                    

 hist (Lambda)                  (None, 256, 8)       0           ['raw[0][0]']                    

 embedding (Embedding)          (None, 256, 8)       64          ['band[0][0]']                   

 concatenate (Concatenate)      (None, 256, 16)      0           ['hist[0][0]',                   
                                                                  'embedding[0][0]']              

 time_distributed (TimeDistribu  (None, 256, 40)     680         ['concatenate[0][0]']            
 ted)                                                                                             

 bidirectional (Bidirectional)  (None, 256, 160)     58560       ['time_distributed[0][0]']       

 spatial_dropout1d (SpatialDrop  (None, 256, 160)    0           ['bidirectional[0][0]']          
 out1D)                                                                                           

 global_max_pooling1d (GlobalMa  (None, 160)         0           ['spatial_dropout1d[0][0]']      
 xPooling1D)                                                                                      

 meta (Lambda)                  (None, 10)           0           ['raw[0][0]']                    

 dropout (Dropout)              (None, 160)          0           ['global_max_pooling1d[0][0]']   

 concatenate_1 (Concatenate)    (None, 170)          0           ['meta[0][0]',                   
                                                                  'dropout[0][0]']                

 dense_1 (Dense)                (None, 128)          21888       ['concatenate_1[0][0]']          

 dense_2 (Dense)                (None, 128)          16512       ['dense_1[0][0]']                

 dropout_1 (Dropout)            (None, 128)          0           ['dense_2[0][0]']                

 dense_3 (Dense)                (None, 15)           1935        ['dropout_1[0][0]']              

==================================================================================================
Total params: 99,639
Trainable params: 99,639
Non-trainable params: 0
__________________________________________________________________________________________________

Elephas estimator definition

estimator = ElephasEstimator()
estimator.setFeaturesCol("features")             # These two come directly from pyspark,
estimator.setLabelCol("target")                 # hence the camel case. Sorry :)
estimator.set_keras_model_config(model.to_json())       # Provide serialized Keras model
estimator.set_categorical_labels(True)
estimator.set_nb_classes(num_classes)
estimator.set_num_workers(10)  # We just use one worker here. Feel free to adapt it.
estimator.set_epochs(2) # was max-epochs
estimator.set_batch_size(128) # was 128
estimator.set_verbosity(2) # was 1
estimator.set_validation_split(0.15)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous") # Was synchronous
estimator.set_loss(mywloss) # was("categorical_crossentropy")
estimator.set_metrics(['accuracy'])

The error The error is getting thrown during the model training process -

pipeline = Pipeline(stages=[estimator])
fitted_pipeline = pipeline.fit(train_df)

And the error we are receiving is here -

NameError: Exception encountered when calling layer "hist" (type Lambda).

name 'Reshape' is not defined

Call arguments received by layer "hist" (type Lambda):
  • inputs=tf.Tensor(shape=(None, 2314), dtype=float32)
  • mask=None
  • training=None

Which is baffling, because the imports are globally declared at the beginning of the program -

# Keras / Deep Learning
import tensorflow as tf
import keras as k
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.layers import Input, Lambda, Embedding, concatenate, TimeDistributed, Bidirectional
from tensorflow.keras.layers import GRU, SpatialDropout1D, GlobalMaxPool1D, Reshape
from keras import backend as K
from tensorflow.keras import optimizers, regularizers
from tensorflow.keras.optimizers import Adam

# Elephas for Deep Learning on Spark
from  elephas.ml_model import ElephasEstimator

Environment

The problem is common across our HPC Cluster installation (Centos

as well as locally on a MacOS desktop

Additional context What is interesting is that the same code was running fine on older versions of Elephas. We can call the Reshape function anywhere else in the program with no error.

We're not sure what's causing this; it's probably something simple, but we've been wrestling with this for the past week, and any help would be gratefully appreciated.

Regards

Geoff.

danielenricocahall commented 1 year ago

Hello @GeoffDuniam ! Thank you for reaching out. What version of Elephas are you currently running?

GeoffDuniam commented 1 year ago

Hi Danny (@danielenricocahall)

Thanks very much for getting back to us - we're running Elephas 3.4.8, installed as the default pip3 install.

Cheers

Geoff

danielenricocahall commented 1 year ago

Thank you! And you mentioned it worked in previous versions - do you know which version?

GeoffDuniam commented 1 year ago

Ah - I couldn't tell you what specific version, but it was one of Max's early versions circa 2019 or thereabouts. Does that help?

danielenricocahall commented 1 year ago

Hm, interesting - that would be the TF 1.x API then, as Elephas wasn't compatible until 2.0 in 2021 (https://github.com/maxpumperla/elephas/releases/tag/2.0.0). Without data, it's a bit hard to reproduce - do you have any samples of dummy data? One observation is using keras backend directly for permuting dimensions rather than tf.keras.backend, which could potentially cause issues. Also could you send the full stack trace?

GeoffDuniam commented 1 year ago

@danielenricocahall Hi Danny, Sorry for the delay getting back to you - the usual end-of-month planning meetings got in the way.

I'll try using keras backend directly and report back. I can certainly get you a data sample, I've got it stored as a parquet file so you could read it directly as a dataframe. Would that work?

Thanks again for the help, appreciate your time!

Cheers

Geoff

GeoffDuniam commented 1 year ago

Here's a sample of the training vectors table in Parquet format - SampleTrainingVectors.zip

And the stack trace -


Py4JJavaError Traceback (most recent call last) Cell In [76], line 1 ----> 1 fitted_pipeline = pipeline.fit(train_df)

File ~/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params) 203 return self.copy(params)._fit(dataset) 204 else: --> 205 return self._fit(dataset) 206 else: 207 raise TypeError( 208 "Params must be either a param map or a list/tuple of param maps, " 209 "but got %s." % type(params) 210 )

File ~/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/pyspark/ml/pipeline.py:134, in Pipeline._fit(self, dataset) 132 dataset = stage.transform(dataset) 133 else: # must be an Estimator --> 134 model = stage.fit(dataset) 135 transformers.append(model) 136 if i < indexOfLastEstimator:

File ~/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params) 203 return self.copy(params)._fit(dataset) 204 else: --> 205 return self._fit(dataset) 206 else: 207 raise TypeError( 208 "Params must be either a param map or a list/tuple of param maps, " 209 "but got %s." % type(params) 210 )

File ~/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/elephas/ml_model.py:98, in ElephasEstimator._fit(self, df) 91 keras_model.compile(loss=loss, optimizer=optimizer, metrics=metrics) 93 spark_model = SparkModel(model=keras_model, 94 mode=self.get_mode(), 95 frequency=self.get_frequency(), 96 num_workers=self.get_num_workers(), 97 custom_objects=self.get_custom_objects()) ---> 98 spark_model.fit(simple_rdd, 99 epochs=self.get_epochs(), 100 batch_size=self.get_batch_size(), 101 verbose=self.get_verbosity(), 102 validation_split=self.get_validation_split()) 104 model_weights = spark_model.master_network.get_weights() 105 return ElephasTransformer(labelCol=self.getLabelCol(), 106 outputCol=self.getOutputCol(), 107 featuresCol=self.getFeaturesCol(), (...) 111 model_type=LossModelTypeMapper().get_model_type(loss), 112 history=spark_model.training_histories)

File ~/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/elephas/spark_model.py:186, in SparkModel.fit(self, rdd, kwargs) 183 rdd = rdd.repartition(self.num_workers) 185 if self.mode in ['asynchronous', 'synchronous', 'hogwild']: --> 186 self._fit(rdd, kwargs) 187 else: 188 raise ValueError( 189 "Choose from one of the modes: asynchronous, synchronous or hogwild")

File ~/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/elephas/spark_model.py:220, in SparkModel._fit(self, rdd, **kwargs) 217 elif self.mode == 'synchronous': 218 worker = SparkWorker(model_json, parameters, train_config, 219 optimizer, loss, metrics, custom) --> 220 training_outcomes = rdd.mapPartitions(worker.train).collect() 221 new_parameters = self._master_network.get_weights() 222 number_of_sub_models = len(training_outcomes)

File ~/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/pyspark/rdd.py:1197, in RDD.collect(self) 1195 with SCCallSiteSync(self.context): 1196 assert self.ctx._jvm is not None -> 1197 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 1198 return list(_load_from_socket(sock_info, self._jrdd_deserializer))

File ~/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/py4j/java_gateway.py:1321, in JavaMember.call(self, *args) 1315 command = proto.CALL_COMMAND_NAME +\ 1316 self.command_header +\ 1317 args_command +\ 1318 proto.END_COMMAND_PART 1320 answer = self.gateway_client.send_command(command) -> 1321 return_value = get_return_value( 1322 answer, self.gateway_client, self.target_id, self.name) 1324 for temp_arg in temp_args: 1325 temp_arg._detach()

File ~/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/pyspark/sql/utils.py:190, in capture_sql_exception..deco(*a, kw) 188 def deco(*a: Any, *kw: Any) -> Any: 189 try: --> 190 return f(a, kw) 191 except Py4JJavaError as e: 192 converted = convert_exception(e.java_exception)

File ~/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 38.0 failed 1 times, most recent failure: Lost task 2.0 in stage 38.0 (TID 143) (m3p010.massive.org.au executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/gdun0010/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main process() File "/home/gdun0010/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process serializer.dump_stream(out_iter, outfile) File "/home/gdun0010/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/elephas/worker.py", line 31, in train self.model = model_from_json(self.json, self.custom_objects) File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/saving/model_config.py", line 109, in model_from_json return deserialize_from_json(json_string, custom_objects=custom_objects) File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/layers/serialization.py", line 272, in deserialize_from_json return deserialize(config, custom_objects) File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/layers/serialization.py", line 249, in deserialize return generic_utils.deserialize_keras_object( File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/utils/generic_utils.py", line 734, in deserialize_keras_object deserialized_obj = cls.from_config( File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/engine/training.py", line 3034, in from_config inputs, outputs, layers = functional.reconstruct_from_config( File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/engine/functional.py", line 1481, in reconstruct_from_config if process_node(layer, node_data): File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/engine/functional.py", line 1421, in process_node output_tensors = layer(input_tensors, **kwargs) File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/utils/traceback_utils.py", line 70, in error_handler raise e.with_traceback(filtered_tb) from None File "", line 16, in get_hist NameError: Exception encountered when calling layer "hist" (type Lambda).

name 'Reshape' is not defined

Call arguments received by layer "hist" (type Lambda): • inputs=tf.Tensor(shape=(None, 2314), dtype=float32) • mask=None • training=None

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:406) at org.apache.spark.rdd.RDD.collect(RDD.scala:1020) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/gdun0010/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main process() File "/home/gdun0010/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process serializer.dump_stream(out_iter, outfile) File "/home/gdun0010/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/elephas/worker.py", line 31, in train self.model = model_from_json(self.json, self.custom_objects) File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/saving/model_config.py", line 109, in model_from_json return deserialize_from_json(json_string, custom_objects=custom_objects) File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/layers/serialization.py", line 272, in deserialize_from_json return deserialize(config, custom_objects) File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/layers/serialization.py", line 249, in deserialize return generic_utils.deserialize_keras_object( File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/utils/generic_utils.py", line 734, in deserialize_keras_object deserialized_obj = cls.from_config( File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/engine/training.py", line 3034, in from_config inputs, outputs, layers = functional.reconstruct_from_config( File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/engine/functional.py", line 1481, in reconstruct_from_config if process_node(layer, node_data): File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/engine/functional.py", line 1421, in process_node output_tensors = layer(input_tensors, **kwargs) File "/home/gdun0010/miniconda/conda/envs/Elephas/lib/python3.8/site-packages/keras/utils/traceback_utils.py", line 70, in error_handler raise e.with_traceback(filtered_tb) from None File "", line 16, in get_hist NameError: Exception encountered when calling layer "hist" (type Lambda).

name 'Reshape' is not defined

Call arguments received by layer "hist" (type Lambda): • inputs=tf.Tensor(shape=(None, 2314), dtype=float32) • mask=None • training=None

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
danielenricocahall commented 1 year ago

Hi @GeoffDuniam , thank you for the data! I put together a script to test based on the snippets you provided, but I'm getting:

        estimator = ElephasEstimator()
        estimator.setFeaturesCol("features")  # These two come directly from pyspark,
        estimator.setLabelCol("target")  # hence the camel case. Sorry :)
>       estimator.set_keras_model_config(model.to_json())  # Provide serialized Keras model

test_ml_model.py:440: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../venv/lib/python3.8/site-packages/keras/engine/training.py:3086: in to_json
    model_config = self._updated_config()
../venv/lib/python3.8/site-packages/keras/engine/training.py:2954: in _updated_config
    config = self.get_config()
../venv/lib/python3.8/site-packages/keras/engine/functional.py:781: in get_config
    return copy.deepcopy(get_network_config(self, config=config))
/usr/lib/python3.8/copy.py:146: in deepcopy
    y = copier(x, memo)
/usr/lib/python3.8/copy.py:230: in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
/usr/lib/python3.8/copy.py:146: in deepcopy
    y = copier(x, memo)
/usr/lib/python3.8/copy.py:205: in _deepcopy_list
    append(deepcopy(a, memo))
/usr/lib/python3.8/copy.py:172: in deepcopy
    y = _reconstruct(x, memo, *rv)
/usr/lib/python3.8/copy.py:296: in _reconstruct
    value = deepcopy(value, memo)
/usr/lib/python3.8/copy.py:146: in deepcopy
    y = copier(x, memo)
/usr/lib/python3.8/copy.py:230: in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
/usr/lib/python3.8/copy.py:146: in deepcopy
    y = copier(x, memo)
/usr/lib/python3.8/copy.py:210: in _deepcopy_tuple
    y = [deepcopy(a, memo) for a in x]
/usr/lib/python3.8/copy.py:210: in <listcomp>
    y = [deepcopy(a, memo) for a in x]
/usr/lib/python3.8/copy.py:146: in deepcopy
    y = copier(x, memo)
/usr/lib/python3.8/copy.py:210: in _deepcopy_tuple
    y = [deepcopy(a, memo) for a in x]
/usr/lib/python3.8/copy.py:210: in <listcomp>
    y = [deepcopy(a, memo) for a in x]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

x = <module 'keras.backend' from '/home/daniel/PycharmProjects/elephas/venv/lib/python3.8/site-packages/keras/backend.py'>
memo = {140207898369856: {}, 140207898375088: {'class_name': 'InputLayer', 'config': {'batch_input_shape': (None, 2314), 'dty...88992: {'dtype': 'float32', 'name': 'hist', 'trainable': True}, 140207898766400: {'object_id': 2, 'ref_count': 1}, ...}
_nil = []

    def deepcopy(x, memo=None, _nil=[]):
        """Deep copy operation on arbitrary Python objects.

        See the module's __doc__ string for more info.
        """

        if memo is None:
            memo = {}

        d = id(x)
        y = memo.get(d, _nil)
        if y is not _nil:
            return y

        cls = type(x)

        copier = _deepcopy_dispatch.get(cls)
        if copier is not None:
            y = copier(x, memo)
        else:
            if issubclass(cls, type):
                y = _deepcopy_atomic(x, memo)
            else:
                copier = getattr(x, "__deepcopy__", None)
                if copier is not None:
                    y = copier(memo)
                else:
                    reductor = dispatch_table.get(cls)
                    if reductor:
                        rv = reductor(x)
                    else:
                        reductor = getattr(x, "__reduce_ex__", None)
                        if reductor is not None:
>                           rv = reductor(4)
E                           TypeError: cannot pickle 'module' object

Could you possibly provide a module with the code for reproducing? In case I missed something.

GeoffDuniam commented 1 year ago

@danielenricocahall Hi Danny,

Apologies for the delay getting back to you - we've had a slight reign of chaos here over the past week or so, and there was some firefighting that needed doing that took up a fair chunk of time. I'll get a code base for you including the model definition we're using (funny, haven't seen that error before) later today or first thing tomorrow.

Thanks again for the help with this, really appreciate it.

Regards

Geoff.

GeoffDuniam commented 1 year ago

@danielenricocahall Danny, are you OK with a notebook, or would you prefer straight python code?

GeoffDuniam commented 1 year ago

DanielsCodebase.ipynb.zip Here's the notebook, let me know if you'd prefer straight python code

GeoffDuniam commented 1 year ago

DanielsCodebase.py.zip Converted the notebook o native python. Let me know if you need anything else. Thanks again :)

danielenricocahall commented 1 year ago

Hello @GeoffDuniam , I just took a look into it. I was able to reproduce the problem locally with the dataset and module you supplied. I'm actually not sure why it couldn't identify Reshape - per my previous comment, if the last time this worked on Elephas was from an older version (pre tensorflow 2.x), it could be an issue with the API changes between Tensorflow/Keras over the years. I'll keep looking into it, but I want to be sure to unblock you. It looks like it may be an issue with using certain imports in the get_hist function. If you change them to use the tensorflow imports, the model actually trains:

    def get_hist(x):
        x = x[:, 266:]
        x = tf.keras.layers.Reshape((8, 256))(x)
        x = tf.keras.backend.permute_dimensions(x, (0, 2, 1))
        return x

Alternatively, you could also utilize pure tensorflow API functions to accomplish the reshaping/transpose, and this also works:

    def get_hist(x):
        x = x[:, 266:]
        x = tf.reshape(x, (-1, 8, 256))
        x = tf.transpose(x, (0, 2, 1))
        return x

Will keep this issue open for now, but let me know if this gets you moving again!

Update It looks like it's how tensorflow serializes certain layers, based on the stack trace:

  File "/home/daniel/PycharmProjects/elephas/elephas/worker.py", line 31, in train
    self.model = model_from_json(self.json, self.custom_objects)
..

name 'Reshape' is not defined

A minimal reproducible example:

    from tensorflow.keras import Input, Model
    from tensorflow.keras.layers import Dense, Lambda, Reshape, Input

    input_layer = Input(shape=(1, ))
    mode = "synchronous"

    x = Lambda(lambda a: Reshape((-1, ))(a))(input_layer)
    output = Dense(1, activation='sigmoid')(x)
    model = Model(inputs=input_layer, outputs=output)
    sgd = SGD(learning_rate=0.1)
    model.compile(sgd, 'binary_crossentropy', ['acc'])

    x_train = np.random.rand(100)
    y_train = np.zeros(100)
    x_test = np.random.rand(10)
    y_test = np.zeros(10)
    y_train[:50] = 1
    rdd = to_simple_rdd(spark_context, x_train, y_train)

    spark_model = SparkModel(model, frequency='epoch', mode=mode,
                             port=4000 + random.randint(0, 800))
    spark_model.fit(rdd, epochs=1, batch_size=16, verbose=0, validation_split=0.1)

If you add Reshape to custom_objects in the SparkModel initializer, that solves the issue and the model trains:

spark_model = SparkModel(model, frequency='epoch', mode=mode,
                             port=4000 + random.randint(0, 800),
                             custom_objects={'Reshape': Reshape})

Under the hood in Elephas, the model is serialized as json from the master node and sent to each worker, so if there's an issue with loading/saving the model, it will impact the training/inference/etc. Still not exactly why this specific layer causes an issue - especially because it can be serialized and deserialized without an issue outside of elephas:


    input_layer = Input(shape=(1, ))

    x = Lambda(lambda a: Reshape((-1,))(a))(input_layer)
    output = Dense(1, activation='sigmoid')(x)
    model = Model(inputs=input_layer, outputs=output)
    sgd = SGD(learning_rate=0.1)
    model.compile(sgd, 'binary_crossentropy', ['acc'])

   # no problems here!
    model_from_json(model.to_json())

Still looking into it, but wanted to shed some more light on the situation!

GeoffDuniam commented 1 year ago

@danielenricocahall Hi Danny,

that works for us, and thanks very much, really appreciate the effort. Once the chaos subsides I'l take another look under the covers as well, just to get a handle on what's going on. Really helpful input, appreciate your time.

IOU a couple of beers :)

Cheers

Geoff