tensorflow / ecosystem

Integration of TensorFlow with other open-source frameworks
Apache License 2.0
1.37k stars 391 forks source link

Contribute Spark TensorFlow Distributor to ecosystem #151

Closed sarthfrey closed 4 years ago

sarthfrey commented 4 years ago

I’ve been working on a package to make it easier for users to do distributed deep learning training on their Apache Spark clusters with the new TensorFlow 2 strategy API. I think this could be useful for the community so I'd like to contribute it. @jhseu, please let us know if this is something you are interested in - here's example usage based on the MNIST tutorial on the TensorFlow website. Note that this project is in active development, so the API below is tentative.

from spark_tensorflow_distributor import MirroredStrategyRunner
import tensorflow_datasets as tfds
import tensorflow as tf

def make_datasets_unbatched():
   # Scaling MNIST data from (0, 255] to (0., 1.]
   def scale(image, label):
     image = tf.cast(image, tf.float32)
     image /= 255
     return image, label
   datasets, info = tfds.load(name='mnist',
                             with_info=True,
                             as_supervised=True)
   return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)

def build_and_compile_cnn_model():
   model = tf.keras.Sequential([
     tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
     tf.keras.layers.MaxPooling2D(),
     tf.keras.layers.Flatten(),
     tf.keras.layers.Dense(64, activation='relu'),
     tf.keras.layers.Dense(10, activation='softmax')
   ])
   model.compile(
     loss=tf.keras.losses.sparse_categorical_crossentropy,
     optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
     metrics=['accuracy'])
   return model

def train():
  BUFFER_SIZE = 10000
  BATCH_SIZE = 64
  GLOBAL_BATCH_SIZE = 64 * 8
  train_datasets = make_datasets_unbatched().batch(GLOBAL_BATCH_SIZE).repeat()
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
  train_datasets = train_datasets.with_options(options)
  multi_worker_model = build_and_compile_cnn_model()
  multi_worker_model.fit(x=train_datasets, epochs=3, steps_per_epoch=5)

# Assume a cluster with 3 workers + 1 driver, each with 4 GPUs

# Run locally on the driver with one GPU
MirroredStrategyRunner(num_gpus=-1).run(train)

# Run locally on the driver with 4 GPUs
MirroredStrategyRunner(num_gpus=-4).run(train)

# Run distributed on the workers each using 2 GPUs only
MirroredStrategyRunner(num_gpus=6).run(train)

# Run distributed on the workers each using 4 GPUs (full capacity)
MirroredStrategyRunner(num_gpus=12).run(train)
martinwicke commented 4 years ago

@guptapriya @jhseu Wdyt about this?

jhseu commented 4 years ago

Seems reasonable to me to add here. Priya can comment on how it might fit with the other strategies.

mengxr commented 4 years ago

Ping @guptapriya :) We have a working prototype at https://github.com/sarthfrey/ecosystem/pull/1/files and we would appreciate a quick decision from the TensorFlow team.

guptapriya commented 4 years ago

Hi, sorry for the delay - the overall idea looks fine to me, I just did a code review for the PR. Once the details are sorted out, it can be merged. Thanks

sarthfrey commented 4 years ago

Thanks all! Submitted a PR.

sarthfrey commented 4 years ago

Closing this as project was merged :)