Open vizzyno1 opened 2 years ago
I am sorry I can not help you with the first part, I also would like to know if and how that would work.
However, the second part works at least if you put training and prediction in one function. I tried it by adapting from the official example at https://docs.microsoft.com/en-us/azure/databricks/_static/notebooks/deep-learning/spark-tensorflow-distributor.html
### 1.) Combined train and predict (helper functions listed below)
def train_and_predict( batch_size ):
train_datasets = make_training_dataset(batch_size)
train_dataset_x , train_dataset_y = make_prediction_dataset(batch_size)
multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(x=train_datasets, epochs=3, steps_per_epoch=5)
prediction = multi_worker_model.predict( x=train_dataset_x , batch_size=batch_size )
return prediction
### 2.) Execute with Runner
from spark_tensorflow_distributor import MirroredStrategyRunner
runner = MirroredStrategyRunner(num_slots=NUM_SLOTS, local_mode=False, use_gpu=USE_GPU)
prediction = runner.run( train_and_predict , batch_size=batch_size )
---------------------------------------------------------------------
### A.) Functions to generate data
def load_mnist():
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
return x_train, y_train
def make_training_dataset(batch_size):
x_train, y_train = load_mnist()
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train) ).repeat().batch(batch_size)
# Specify the data auto-shard policy: DATA
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
train_dataset = train_dataset.with_options(options)
return train_dataset
def make_prediction_dataset(batch_size):
x_train, y_train = load_mnist()
train_dataset_x = tf.data.Dataset.from_tensor_slices( x_train ).batch(batch_size)
train_dataset_y = tf.data.Dataset.from_tensor_slices( y_train ).batch(batch_size)
# Specify the data auto-shard policy: DATA
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
train_dataset_x = train_dataset_x.with_options(options)
train_dataset_x = train_dataset_x.with_options(options)
return train_dataset_x , train_dataset_y
### B.) function to build model
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax'),
])
model.compile(
loss=tf.keras.losses.sparse_categorical_crossentropy,
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'],
)
return model
However, this does not get around the problem of saving the model. I would also like to have the answer to that question??
Hi Team ,
I am trying to train the Tensorflow LSTM model over the Spark after the epoch training, it is failing with the below error:-
========================== Doing CPU training... Will run with 8 Spark tasks. /usr/local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:1844: UserWarning:
Model.fit_generator
is deprecated and will be removed in a future version. Please useModel.fit
, which supports generators. warnings.warn('Model.fit_generator
is deprecated and ' 2022-02-10 11:41:59.702186: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2) 2022-02-10 11:41:59.718653: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2999995000 Hz Epoch 1/5 247/247 [==============================] - 141s 543ms/step - loss: 0.0380 - mean_absolute_error: 0.1516 - val_loss: 0.0141 - val_mean_absolute_error: 0.0775 Epoch 2/5 247/247 [==============================] - 129s 522ms/step - loss: 0.0126 - mean_absolute_error: 0.0821 - val_loss: 0.0103 - val_mean_absolute_error: 0.0648 Epoch 3/5 247/247 [==============================] - 137s 554ms/step - loss: 0.0094 - mean_absolute_error: 0.0695 - val_loss: 0.0082 - val_mean_absolute_error: 0.0519 Epoch 4/5 247/247 [==============================] - 149s 605ms/step - loss: 0.0079 - mean_absolute_error: 0.0614 - val_loss: 0.0070 - val_mean_absolute_error: 0.0479 Epoch 5/5 247/247 [==============================] - 132s 536ms/step - loss: 0.0067 - mean_absolute_error: 0.0567 - val_loss: 0.0060 - val_mean_absolute_error: 0.0422 Distributed training in progress... View Spark executor stderr logs to inspect training... Traceback (most recent call last): File "/usr/lib/spark/python/pyspark/serializers.py", line 437, in dumps return cloudpickle.dumps(obj, pickle_protocol) File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 101, in dumps cp.dump(obj) File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 540, in dump return Pickler.dump(self, obj) File "/usr/lib64/python3.7/pickle.py", line 437, in dump self.save(obj) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 789, in save_tuple save(element) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function self._dynamic_function_reduce(obj), obj=obj File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5 save(state) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple save(element) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems save(v) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple save(element) File "/usr/lib64/python3.7/pickle.py", line 549, in save self.save_reduce(obj=obj, rv) File "/usr/lib64/python3.7/pickle.py", line 638, in save_reduce save(args) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple save(element) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function self._dynamic_function_reduce(obj), obj=obj File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5 save(state) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple save(element) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems save(v) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple save(element) File "/usr/lib64/python3.7/pickle.py", line 549, in save self.save_reduce(obj=obj, rv) File "/usr/lib64/python3.7/pickle.py", line 638, in save_reduce save(args) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple save(element) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function self._dynamic_function_reduce(obj), obj=obj File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5 save(state) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple save(element) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems save(v) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 789, in save_tuple save(element) File "/usr/lib64/python3.7/pickle.py", line 549, in save self.save_reduce(obj=obj, rv) File "/usr/lib64/python3.7/pickle.py", line 638, in save_reduce save(args) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple save(element) File "/usr/lib64/python3.7/pickle.py", line 549, in save self.save_reduce(obj=obj, rv) File "/usr/lib64/python3.7/pickle.py", line 662, in save_reduce save(state) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems save(v) File "/usr/lib64/python3.7/pickle.py", line 549, in save self.save_reduce(obj=obj, rv) File "/usr/lib64/python3.7/pickle.py", line 662, in save_reduce save(state) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems save(v) File "/usr/lib64/python3.7/pickle.py", line 549, in save self.save_reduce(obj=obj, *rv) File "/usr/lib64/python3.7/pickle.py", line 662, in save_reduce save(state) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems save(v) File "/usr/lib64/python3.7/pickle.py", line 504, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict self._batch_setitems(obj.items()) File "/usr/lib64/python3.7/pickle.py", line 884, in _batch_setitems save(k) File "/usr/lib64/python3.7/pickle.py", line 524, in save rv = reduce(self.proto) TypeError: can't pickle weakref objects======================================
2) Are there any example on the model prediction after the distributed training using MirroredStrategyRunner.?