intel-analytics / analytics-zoo

Distributed Tensorflow, Keras and PyTorch on Apache Spark/Flink & Ray
https://analytics-zoo.readthedocs.io/
Apache License 2.0
16 stars 3 forks source link

Training with orca.tf.estimator converges at a higher loss when compared to using TensorFlow/PyTorch with the same settings and data. #559

Closed ottermegazord closed 3 years ago

ottermegazord commented 3 years ago

Using the same architecture, data and hyperparameters, I am able to replicate the performance of my TensorFlow model in PyTorch within a very small degree of variance. However the same implementation on Analytics Zoo yields a significantly higher loss function, even if the models are trained with the same inputs as the models built using TensorFlow and PyTorch. Is this expected?

Model Architecture

def model_zoo():
    """Autoencoder"""
    input_1 = tf.keras.Input(shape=(2016,), name='input_1')
    encoder_1 = tf.keras.layers.Dense(1024, name='encoder_1', activation='relu')(input_1)
    encoder_2 = tf.keras.layers.Dense(256, name='encoder_2', activation='relu')(encoder_1)
    encoder_3 = tf.keras.layers.Dense(128, name='encoder_3', activation='relu')(encoder_2)

    decoder_1 = tf.keras.layers.Dense(128, name='decoder_1', activation='relu')(encoder_3)
    decoder_2 = tf.keras.layers.Dense(256, name='decoder_2', activation='relu')(decoder_1)
    decoder_3 = tf.keras.layers.Dense(1024, name='decoder_3', activation='relu')(decoder_2)
    decoder_4 = tf.keras.layers.Dense(2016, name='decoder_4', activation='relu')(decoder_3)

    """DNN 3"""
    dense_3_1 = tf.keras.layers.Dense(128, name='dense_3_1', activation='relu')(encoder_3)
    dense_3_2 = tf.keras.layers.Dense(64, name='dense_3_2', activation='relu')(dense_3_1)
    dense_3_3 = tf.keras.layers.Dense(32, name='dense_3_3', activation='relu')(dense_3_2)
    dense_3_3 = tf.keras.layers.BatchNormalization()(dense_3_3)
    dense_3_3 = tf.keras.layers.Dropout(0.2)(dense_3_3)

    output = tf.keras.layers.Dense(1, name='output', activation='sigmoid')(dense_3_3)

    model = tf.keras.Model(inputs=[input_1],
                           outputs=[decoder_4, output])

    num_steps = 35

    learning_rate_fn = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
        [50 * num_steps, 100 * num_steps, 30 * num_steps, 20 * num_steps, 50 * num_steps],
        [1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 5e-6]
    )

    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate_fn)

    model.compile(
        optimizer=optimizer,
        loss={
            'decoder_4': 'mse',
            'output': 'binary_crossentropy',
        }
        # metrics=['accuracy']
    )

    return model
jenniew commented 3 years ago

We'll look at this issue.

jenniew commented 3 years ago

@ottermegazord Do you mean loss of Orca tf estimator is higher than the loss trained with Analytics Zoo TFPark API or the native Tensorflow?

ottermegazord commented 3 years ago

Loss of Orca TF Estimator is higher than the loss trained with either TensorFlow/Keras or PyTorch

yangw1234 commented 3 years ago

hi @ottermegazord , does your learnig rate schedule in above code work? I think the boundries should be increasing, yours are [50 * num_steps, 100 * num_steps, 30 * num_steps, 20 * num_steps, 50 * num_steps].

yangw1234 commented 3 years ago

hi @ottermegazord , I have tried your code with mnist data (with some modifications), the loss is a little different from TF and Analytics Zoo but mostly within 10%~15%.

Is it possible for you to share the exact code to produce this problem?

ottermegazord commented 3 years ago

On Analytics Zoo (cluster-mode), I loaded train and test dataframes from two Hive Tables, and here's the code (I changed the optimizer to make it simpler to compare both models)

Batch Size: 4000 (4096 on TensorFlow but I changed it to 4000 to match Analytics Zoo's) Learning Rate: 1e-2 with Adam :

def model_zoo():
    """Autoencoder"""
    input_1 = tf.keras.Input(shape=(2016,), name='input_1')
    encoder_1 = tf.keras.layers.Dense(1024, name='encoder_1', activation='relu')(input_1)
    encoder_2 = tf.keras.layers.Dense(256, name='encoder_2', activation='relu')(encoder_1)
    encoder_3 = tf.keras.layers.Dense(128, name='encoder_3', activation='relu')(encoder_2)

    decoder_1 = tf.keras.layers.Dense(128, name='decoder_1', activation='relu')(encoder_3)
    decoder_2 = tf.keras.layers.Dense(256, name='decoder_2', activation='relu')(decoder_1)
    decoder_3 = tf.keras.layers.Dense(1024, name='decoder_3', activation='relu')(decoder_2)
    decoder_4 = tf.keras.layers.Dense(2016, name='decoder_4', activation='relu')(decoder_3)

    """DNN 3"""
    dense_3_1 = tf.keras.layers.Dense(128, name='dense_3_1', activation='relu')(encoder_3)
    dense_3_2 = tf.keras.layers.Dense(64, name='dense_3_2', activation='relu')(dense_3_1)
    dense_3_3 = tf.keras.layers.Dense(32, name='dense_3_3', activation='relu')(dense_3_2)
    dense_3_3 = tf.keras.layers.BatchNormalization()(dense_3_3)
    dense_3_3 = tf.keras.layers.Dropout(0.2)(dense_3_3)

    output = tf.keras.layers.Dense(1, name='output', activation='sigmoid')(dense_3_3)

    model = tf.keras.Model(inputs=[input_1],
                           outputs=[decoder_4, output])

    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-2)

    model.compile(
        optimizer=optimizer,
        loss={
            'decoder_4': 'mse',
            'output': 'binary_crossentropy',
        }
        # metrics=['accuracy']
    )

    return model`

`# Train DataFrame (where columns 'input_1' and 'decoder_4' contain Vectors of size 2016)
    train_sql_df = tm117_data_proc.load_table(HiveContext(spark), train_table)
    train_df = tm117_data_proc.gen_table(spark, train_sql_df, path_to_file)

    # Test DataFrame
    test_sql_df = tm117_data_proc.load_table(HiveContext(spark), test_table)
    test_df = tm117_data_proc.gen_table(spark, test_sql_df, path_to_file)

    logger.info("Complete Data Processing")

    """Modeling Pipeline"""
    logger.info("Start Modeling Pipeline")
    model_name = app_name + '_' + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + '.h5'
    save_to_remote_dir = args.model_dir + model_name
    logger.info("model name is " + model_name)
    logger.info("remote model save path is " + save_to_remote_dir)

    # Create Instance of Model
    est = Estimator.from_keras(model_zoo, model_dir=log_dir)

    # Train Model
    logger.info("Start model training. Max epoch is set at" + args.max_epoch)
    est.fit(data=train_df,
            batch_size=batch_size,
            epochs=max_epoch,
            feature_cols=['input_1'],
            labels_cols=['decoder_4', 'output'],
            validation_data=test_df)

The loss that I got after 300 epochs is 0.696 on Analytics Zoo running on cluster resources.

20/11/18 04:24:34 INFO optim.DistriOptimizer$: [Epoch 300 12000/10000][Iteration 900][Wall Clock 358.867136371s] Trained 4000 records in 0.120361055 seconds. Throughput is 33233.344 records/second. Loss is 0.6966889.

Similarly, I load the same two tables with PySpark and saved them as Pandas tables. With the same data, model architecture, learning rate, and batch size, I get a 0.1879 when I trained it on TensorFlow/Keras. Here's a snippet:

`
"""
Data Mapping
 x: [input_1]
 y: [decoder_4, output]

"""
history = model_zoo.fit(x=[features_train], 
                  y=[features_train,label_train], 
                  validation_data=([features_test], [features_test, label_test]), 
                  batch_size=4000,
                  epochs=300,
                  callbacks=[checkpoint],
                  shuffle=True)

`

Epoch 00300: val_output_acc did not improve from 0.91517 9572/9572 [==============================] - 0s 39us/sample - loss: 0.1799 - decoder_4_loss: 0.0080 - output_loss: 0.1879 - output_acc: 0.9098 - val_loss: 0.1839 - val_decoder_4_loss: 0.0081 - val_output_loss: 0.1912- val_output_acc: 0.9047

yangw1234 commented 3 years ago

hi @ottermegazord could you try feeding both model with the following dataset?

dataset = tf.data.Dataset.from_tensor_slices((features_train, (features_train, label_train)))

Could you also print the top few lines of train_df and also features_train, label_train, to make sure they are the same?

yangw1234 commented 3 years ago

hi @ottermegazord here is our experimation on your model with mnist dataset. I think the result is reasonable.

Output of tf.keras 1.15

60000/60000 [==============================] - 5s 84us/sample - loss: 0.6381 - decoder_4_loss: 0.0568 - output_loss: 0.5792
Epoch 2/5
60000/60000 [==============================] - 5s 80us/sample - loss: 0.2162 - decoder_4_loss: 0.0325 - output_loss: 0.1832
Epoch 3/5
60000/60000 [==============================] - 5s 81us/sample - loss: 0.1306 - decoder_4_loss: 0.0274 - output_loss: 0.1034
Epoch 4/5
60000/60000 [==============================] - 5s 79us/sample - loss: 0.0906 - decoder_4_loss: 0.0248 - output_loss: 0.0658
Epoch 5/5
60000/60000 [==============================] - 4s 73us/sample - loss: 0.0685 - decoder_4_loss: 0.0230 - output_loss: 0.0456
[0.6380599728107452, 0.2161864271322886, 0.13062599093914032, 0.0905999551932017, 0.06852723401784896]

output of analytics zoo lastest nightly build:

2020-11-19 16:13:39 INFO  DistriOptimizer$:426 - [Epoch 5 51200/60000][Iteration 286][Wall Clock 41.622488156s] Trained 1024 records in 0.115715387 seconds. Throughput is 8849.299 records/second. Loss is 0.06239222.
2020-11-19 16:13:39 INFO  DistriOptimizer$:426 - [Epoch 5 52224/60000][Iteration 287][Wall Clock 41.743416493s] Trained 1024 records in 0.120928337 seconds. Throughput is 8467.825 records/second. Loss is 0.041940205.
2020-11-19 16:13:39 INFO  DistriOptimizer$:426 - [Epoch 5 53248/60000][Iteration 288][Wall Clock 41.880265839s] Trained 1024 records in 0.136849346 seconds. Throughput is 7482.681 records/second. Loss is 0.08105946.
2020-11-19 16:13:40 INFO  DistriOptimizer$:426 - [Epoch 5 54272/60000][Iteration 289][Wall Clock 42.012190166s] Trained 1024 records in 0.131924327 seconds. Throughput is 7762.0254 records/second. Loss is 0.06303574.
2020-11-19 16:13:40 INFO  DistriOptimizer$:426 - [Epoch 5 55296/60000][Iteration 290][Wall Clock 42.139472723s] Trained 1024 records in 0.127282557 seconds. Throughput is 8045.093 records/second. Loss is 0.04414216.
2020-11-19 16:13:40 INFO  DistriOptimizer$:426 - [Epoch 5 56320/60000][Iteration 291][Wall Clock 42.254086436s] Trained 1024 records in 0.114613713 seconds. Throughput is 8934.358 records/second. Loss is 0.03737992.
2020-11-19 16:13:40 INFO  DistriOptimizer$:426 - [Epoch 5 57344/60000][Iteration 292][Wall Clock 42.374692343s] Trained 1024 records in 0.120605907 seconds. Throughput is 8490.463 records/second. Loss is 0.07456296.
2020-11-19 16:13:40 INFO  DistriOptimizer$:426 - [Epoch 5 58368/60000][Iteration 293][Wall Clock 42.49517586s] Trained 1024 records in 0.120483517 seconds. Throughput is 8499.088 records/second. Loss is 0.06628802.
2020-11-19 16:13:40 INFO  DistriOptimizer$:426 - [Epoch 5 59392/60000][Iteration 294][Wall Clock 42.632292746s] Trained 1024 records in 0.137116886 seconds. Throughput is 7468.081 records/second. Loss is 0.046367764.
2020-11-19 16:13:40 INFO  DistriOptimizer$:426 - [Epoch 5 60416/60000][Iteration 295][Wall Clock 42.761960842s] Trained 1024 records in 0.129668096 seconds. Throughput is 7897.085 records/second. Loss is 0.0456643.
2020-11-19 16:13:40 INFO  DistriOptimizer$:471 - [Epoch 5 60416/60000][Iteration 295][Wall Clock 42.761960842s] Epoch finished. Wall clock time is 42783.328206 ms

average loss for each epoch using the following command

for i in `seq 1 5`; do cat test.log | grep "Epoch $i" | grep "Loss" | awk '{print $(NF)}' | awk '{print substr($1,0, length-1)}' | awk '{ sum += $1; n++ } END { if (n > 0) print sum / n; }'; done

epoch 1  0.588937
epoch 2  0.183051
epoch 3  0.118557
epoch 4  0.0849504
epoch 5  0.0669505

the scripts to reproduce the result.

tf.keras

import tensorflow as tf
import numpy as np

def model_zoo():
    """Autoencoder"""
    input_1 = tf.keras.Input(shape=(784,), name='input_1')
    encoder_1 = tf.keras.layers.Dense(1024, name='encoder_1', activation='relu')(input_1)
    encoder_2 = tf.keras.layers.Dense(256, name='encoder_2', activation='relu')(encoder_1)
    encoder_3 = tf.keras.layers.Dense(128, name='encoder_3', activation='relu')(encoder_2)

    decoder_1 = tf.keras.layers.Dense(128, name='decoder_1', activation='relu')(encoder_3)
    decoder_2 = tf.keras.layers.Dense(256, name='decoder_2', activation='relu')(decoder_1)
    decoder_3 = tf.keras.layers.Dense(1024, name='decoder_3', activation='relu')(decoder_2)
    decoder_4 = tf.keras.layers.Dense(784, name='decoder_4', activation='relu')(decoder_3)

    """DNN 3"""
    dense_3_1 = tf.keras.layers.Dense(128, name='dense_3_1', activation='relu')(encoder_3)
    dense_3_2 = tf.keras.layers.Dense(64, name='dense_3_2', activation='relu')(dense_3_1)
    dense_3_3 = tf.keras.layers.Dense(32, name='dense_3_3', activation='relu')(dense_3_2)
    dense_3_3 = tf.keras.layers.BatchNormalization()(dense_3_3)
    dense_3_3 = tf.keras.layers.Dropout(0.2, seed=1)(dense_3_3)

    output = tf.keras.layers.Dense(10, name='output', activation='softmax')(dense_3_3)

    model = tf.keras.Model(inputs=[input_1],
                           outputs=[decoder_4, output])

    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

    model.compile(
        optimizer=optimizer,
        loss={
            'decoder_4': 'mse',
            'output': 'sparse_categorical_crossentropy',
        }
        # metrics=['accuracy']
    )

    return model

def get_mnist():
    (train_feature, train_label), _ = tf.keras.datasets.mnist.load_data()

    train_feature = train_feature / 255.0

    x = np.reshape(train_feature, newshape=(-1, 28 * 28))
    y = train_label

    return x, (x, y)

feature, label = get_mnist()
keras_model = model_zoo()
history = keras_model.fit(feature, label, epochs=5, batch_size=1024)
print(history.history['loss'])

analytics zoo

import tensorflow as tf
import numpy as np
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.linalg import DenseVector
from zoo.orca import init_orca_context
from zoo.orca.learn.tf.estimator import Estimator
sc = init_orca_context(cores=4)

def model_zoo():
    """Autoencoder"""
    input_1 = tf.keras.Input(shape=(784,), name='input_1')
    encoder_1 = tf.keras.layers.Dense(1024, name='encoder_1', activation='relu')(input_1)
    encoder_2 = tf.keras.layers.Dense(256, name='encoder_2', activation='relu')(encoder_1)
    encoder_3 = tf.keras.layers.Dense(128, name='encoder_3', activation='relu')(encoder_2)

    decoder_1 = tf.keras.layers.Dense(128, name='decoder_1', activation='relu')(encoder_3)
    decoder_2 = tf.keras.layers.Dense(256, name='decoder_2', activation='relu')(decoder_1)
    decoder_3 = tf.keras.layers.Dense(1024, name='decoder_3', activation='relu')(decoder_2)
    decoder_4 = tf.keras.layers.Dense(784, name='decoder_4', activation='relu')(decoder_3)

    """DNN 3"""
    dense_3_1 = tf.keras.layers.Dense(128, name='dense_3_1', activation='relu')(encoder_3)
    dense_3_2 = tf.keras.layers.Dense(64, name='dense_3_2', activation='relu')(dense_3_1)
    dense_3_3 = tf.keras.layers.Dense(32, name='dense_3_3', activation='relu')(dense_3_2)
    dense_3_3 = tf.keras.layers.BatchNormalization()(dense_3_3)
    dense_3_3 = tf.keras.layers.Dropout(0.2, seed=1)(dense_3_3)

    output = tf.keras.layers.Dense(10, name='output', activation='softmax')(dense_3_3)

    model = tf.keras.Model(inputs=[input_1],
                           outputs=[decoder_4, output])

    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

    model.compile(
        optimizer=optimizer,
        loss={
            'decoder_4': 'mse',
            'output': 'sparse_categorical_crossentropy',
        }
        # metrics=['accuracy']
    )

    return model

def get_mnist_dataframe():
    (train_feature, train_label), _ = tf.keras.datasets.mnist.load_data()

    train_feature = train_feature / 255.0

    x = np.reshape(train_feature, newshape=(-1, 28 * 28))
    y = train_label
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)
    feature_rdd = sc.parallelize(x)
    label_rdd = sc.parallelize(y)

    data_rdd = feature_rdd.zip(label_rdd)

    df = data_rdd.map(lambda record: (DenseVector(record[0]), DenseVector(record[0]), int(record[1]))).toDF(["feature", "label1", "label2"])

    return df

df = get_mnist_dataframe()
keras_model = model_zoo()

estimator = Estimator.from_keras(keras_model=keras_model)
estimator.fit(df,
              feature_cols=['feature'],
              labels_cols=["label1", "label2"],
              batch_size=1024,
              epochs=5)
ottermegazord commented 3 years ago

Thanks!