uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Performance comparison of make_reader() & make_petastorm_dataset() vs make_spark_converter() & make_tf_dataset() #644

Open lndkcg opened 3 years ago

lndkcg commented 3 years ago

From the API user guide, it seems that there are two different way of using Petastorm to train tensorflow models

  1. Using make_reader() or make_batch_reader() and then using make_petastorm_dataset() to create a tf.data object
  2. Using make_spark_converter() to materialize the dataset and then using converter.make_tf_dataset() to create a tf.data object

All things equal, which of these would be expected to have faster performance? I know that option 1 reads from a file path while option 2 starts with a spark dataframe. Option 2 seems simpler, but is there a loss of performance that would be associated with it?

Thanks

selitvin commented 3 years ago

Sorry for a delayed response. All these options are different:

You might find additional information: https://petastorm.readthedocs.io/en/latest

lndkcg commented 3 years ago

No problem, I appreciate you taking the time to answer.

I understand those differences, and was wondering if there would be any performance effect between those methods in terms of reading only. Judging by your last line, it appears that this is not the case, i.e. other than initialization times, the methods are equivalent in terms of speed.

I am asking because I am trying to use petastorm using the make_spark_converter -> make_tf_dataset() paradigm, and I am seeing extremely slow training speeds (50x+ slower than when training on a local tf dataset) and was hoping to learn how to improve that performance.

I can post my code and questions in a new issue and we can close this one (unless I should simply post them here).

Thank you!

selitvin commented 3 years ago

make_spark_converter -> make_tf_dataset uses make_batch_reader+make_petastorm_dataset underneath (to read from the temporary parquet store created).

Can you please provide more information on the slowdown?

It would be best if you could distill a small example I could actually run and profile. It might be hard to see the issue just from the code as it's likely about the combination of the code and the data structure underneath.

lndkcg commented 3 years ago

Below is a simplified version of my current approach, training from a local tf.dataset and then using petastorm. Hopefully it might be helpful. I can also try running it with a simpler model architecture (using spark to create a single feature column and simply training on that).

What would be the best way to provide a workable example? I could provide a sample of the data as a csv (would have to do some masking in order to share it).

Appreciate any help and insight.

import numpy as np
import tensorflow as tf
import pandas as pd
import pyspark.sql.functions as f
from petastorm.spark import SparkDatasetConverter, make_spark_converter

data = spark.read.format('delta').load("<path to data>")

rel_months = ['2020-01-01', '2020-02-01'] #One month for train, one for validation (would normally have much more for train)
rel_data = data.where(f.col('Date').isin(rel_months))

max_date = max(rel_months)
train = rel_data.where(f.col('Date') != max_date)
val = rel_data.where(f.col('Date') == max_date)

numeric_features = ['<list of numeric features>']
cat_features = ['<list of categorical string features']
time_features['<list of time (month, quarter, etc) features>']

MAX_EPOCHS = 3
dense_layer_sizes = [1024, 512, 256]

def get_category_encoding_layer(name):
  index = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary = unique_value_dictionary[name], name = name+'_lookup')
  #Unique Value Dictionary is an already created dictionary that has unique values for each string column in the dataset
  encoder = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=index.vocab_size(), name = name+'_enc')
  return lambda feature: encoder(index(feature))

def CreateModel():
  all_inputs=[]
  encoded_features = []

  for header in numeric_features:
    numeric_col = tf.keras.Input(shape=(1,), name = header)
    all_inputs.append(numeric_col)
    encoded_features.append(numeric_col)

  for header in cat_features + time_features:
    if len(unique_value_dictionary[header]) < 2:
      print('Skipping ' + header + ' because not enough unique values')
      continue
    cat_col = tf.keras.Input(shape=(1,), name=header, dtype = 'string')
    enc_layer = get_category_encoding_layer(header)
    encoded = enc_layer(cat_col)
    all_inputs.append(cat_col)
    encoded_features.append(encoded)

  # Model Building

  x = tf.concat(encoded_features, axis = 1, name = 'concat_inputs')
  for i, layer_size in enumerate(dense_layer_sizes):
    x = tf.keras.layers.Dense(layer_size, activation='relu', name = 'dense_'+str(i))(x)
    if (i != len(dense_layer_sizes)):
      x = tf.keras.layers.Dropout(.25, name = 'dropout_'+str(i))(x)
  output = tf.keras.layers.Dense(1, name = 'output_layer')(x)

  model = tf.keras.Model(all_inputs, output)

  return model

purchaser = CreateModel()
optimizer = tf.keras.optimizers.Adam(learning_rate = .0001)
purchaser.compile(optimizer=optimizer, loss = tf.keras.losses.MeanAbsoluteError())

#### Training locally (pandas)

def df_to_dataset(spark_dataframe, shuffle = True, batch_size = 32, target_col = 'Sales'):

  ds = tf.data.Dataset.from_tensor_slices(
    ({feature: spark_dataframe.select(feature).toPandas()[feature].to_numpy() for feature in spark_dataframe.columns if feature!=target_col}, 
    spark_dataframe.select(target_col).toPandas()[target_col].to_numpy())
  )
  if shuffle:
    ds = ds.shuffle(buffer_size = spark_dataframe.count())
  ds = ds.batch(batch_size)
  ds = ds.cache()
  ds = ds.prefetch(batch_size)
  return ds

train_batch_size = 512
val_batch_size = train_batch_size * 2

train_ds = df_to_dataset(train, batch_size = train_batch_size)
val_ds = df_to_dataset(val, False, val_batch_size)

checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, save_weights_only = True, monitor = 'val_loss', mode = 'min', save_best_only = True)
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=tensorboard_path, update_freq='epoch')
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(patience = 5, factor = .5)
early_stop = tf.keras.callbacks.EarlyStopping(patience = 50)

my_callbacks = [checkpoint, tensorboard_callback, reduce_lr, early_stop]

train_history = purchaser.fit(train_ds,
             epochs = MAX_EPOCHS, 
             validation_data = val_ds, validation_steps = np.max([1, len(val_ds) // val_batch_size]), 
             callbacks = my_callbacks,
             verbose = 2)

#### Training with Petastorm

spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/ml/petastorm/cache")
converter_train = make_spark_converter(train, dtype = None)
converter_val = make_spark_converter(val, dtype = None)

purchaser = CreateModel()
optimizer = tf.keras.optimizers.Adam(learning_rate = .0001)
purchaser.compile(optimizer=optimizer, loss = tf.keras.losses.MeanAbsoluteError())

with converter_train.make_tf_dataset(batch_size = train_batch_size) as train_dataset, \
   converter_val.make_tf_dataset(batch_size = val_batch_size) as val_dataset:

  train_dataset = train_dataset.map(lambda x:
                                    ({train.columns[i]: x[i] for i in range(len(train.columns))},
                                    x.Sales))

  val_dataset = val_dataset.map(lambda x:
                                    ({val.columns[i]: x[i] for i in range(len(val.columns))},
                                    x.Sales))

  steps_per_epoch = len(converter_train) // train_batch_size
  val_steps = np.max([1, len(converter_val) // val_batch_size])

  purchaser.fit(train_dataset,
                steps_per_epoch = steps_per_epoch,
                epochs = MAX_EPOCHS,
                validation_data = val_dataset, 
                validation_steps = val_steps,
                callbacks = my_callbacks,
                verbose = 2)
lndkcg commented 3 years ago

Using the above code I get the following benchmark speeds:

So the 50x was definitely an exaggeration! Apologies for that, I swear I previously had results that were ~120 seconds and ~4,800 seconds per epoch for the two methods.

I know that petastorm has some overhead that will slow it down compared to in-memory training. Does this ~3x ratio seem right to you in terms of speed?

Any ideas on how else I could optimize the process and make training time faster? (I normally use multi-gpu training, and not sure how much more I can increase the batch size). All advice or recommendations for readings would be appreciated.

selitvin commented 3 years ago

Hi. Thank you for the detailed example (although, because of the length of the example it took me to gather up courage to start reading it, hence the long response time :) ).

I know that petastorm has some overhead that will slow it down compared to in-memory training. Does this ~3x ratio seem right to you in terms of speed?

Obviously, it's hard to compete with reading data from memory. However, the competition here is with your model's fwd/bwd propagation time. As long as we can get higher data loading rate than your model processing rate, we are in a good spot (so it depends on the model). It's important to streamline data flow and not have any heavy python code run on your batches. That's why I suspect the per-sample map call in your code.

Looking forward to hear from you. Hope we can nail this problem down.

lndkcg commented 3 years ago

Apologies for the long example, I appreciate you taking the time to read it.

To try and estimate the impact of the map transform I've run the following code:

with converter_train.make_tf_dataset(batch_size = train_batch_size) as train_dataset:
  train_dataset = train_dataset.map(lambda x: ({train.columns[i]: x[i] for i in range(len(train.columns))}, x.Sales))
  i = 0
  for elem in train_dataset:
    i+=1
    if i > len(converter_train) // train_batch_size:
      break

With the map statement and without it. In both cases it took the same amount of time. Does this seem like a valid way of determining the map's impact?

Do you happen to know of any examples of using Petastorm for structured data with tensorflow that I could take a look at? Currently I'm basing everything off of this databricks example this databricks example (as I'm using the databricks platform).

In the mean time I am going to try and create a simpler model pipeline to hopefully make things easier to detect.

Thanks, wishing I could give you some github karma

selitvin commented 3 years ago

No problem at all...

Can you please take a look at the horovod example:

https://github.com/horovod/horovod/blob/master/examples/spark/keras/keras_spark_rossmann_run.py, I know they were polishing training pipeline performance and have a good batch-based implementation. Perhaps it will give you some clues.