uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Tensorflow pentastrom , training stuck #733

Closed Riser01 closed 2 years ago

Riser01 commented 2 years ago

I have 2 very large (in tb) datasets (using pentastorm to train tf model)

what I am doing is loading the datasets using pentastorm and then creating a single feature and labels dataset, as I cant pass two separate datasets

    train_X_mlp = lm_df_train.select(mlp_feature)# features dataset with 11 columns
    train_Y = lm_df_train.select(out_feature)# 1 label

using pentastorm :

    penta_test_X_mlp = make_spark_converter(test_X_mlp)
    penta_train_Y = make_spark_converter(train_Y)

model function:

def build_model_mlp(in_shape=None,LEARNING_RATE=0.001):
    print("input shape:",in_shape)   

    input_layer_mlp = Input(shape=(in_shape,))
    m1 = Dense(32, activation=LeakyReLU(alpha=LEAKY_RELU_ALPHA), kernel_initializer='glorot_uniform')(input_layer_mlp)
    #     m2 = Concatenate()([l2, m2])
    m3 = Dense(16, activation=LeakyReLU(alpha=LEAKY_RELU_ALPHA))(m1)
    out = Dense(1, activation=LeakyReLU(alpha=LEAKY_RELU_ALPHA), name="output_mlp")(m3)
    losses_mlp = {'output_mlp': Huber(delta=1.0)}

    metrics_mlp = {'output_mlp': MeanAbsoluteError()}
    optimizer = tf.keras.optimizers.Adam(learning_rate = LEARNING_RATE)

    model_mlp = tf.keras.Model(inputs=input_layer_mlp,
                      outputs=out)

    model_mlp.compile(optimizer=optimizer, loss=losses_mlp, metrics = metrics_mlp)
    return model_mlp

Traning loop :

def mlp_split_window(x):
    features = x[0:-1]
    labels = x[-1:]
    return features, labels

with penta_train_X_mlp.make_tf_dataset(batch_size=BATCH_SIZE) as train_dataset_mlp,penta_train_Y.make_tf_dataset(batch_size=BATCH_SIZE) as train_dataset_Y,:

    train_dataset_mlp = train_dataset_mlp.map(lambda x: tf.reshape(tf.convert_to_tensor(x, dtype=tf.float64),[-1,11]))
    train_dataset_Y = train_dataset_Y.map(lambda x: tf.reshape(tf.convert_to_tensor(x, dtype=tf.float64),[-1,1]))

    model_mlp = build_model_mlp(in_shape=mlp_size_input)
    train_data=tf.data.Dataset.zip((train_dataset_mlp, train_dataset_Y))

    early_stopping = EarlyStopping(patience=3, monitor='val_accuracy', restore_best_weights=True, verbose=1)
    print(train_dataset_mlp,train_dataset_Y,test_dataset_mlp,test_dataset_Y,train_data,test_data)

    model_mlp.fit(train_data, epochs=5, verbose=2, callbacks=[early_stopping])

Error:

Stuck at Epoch 1/5
/databricks/python/lib/python3.7/site-packages/petastorm/arrow_reader_worker.py:53: FutureWarning: Calling .data on ChunkedArray is provided for compatibility after Column was removed, simply drop this attribute
  column_as_pandas = column.data.chunks[0].to_pandas()
for a long time

Any help would be great.

selitvin commented 2 years ago

Do you think it's a performance problem (running slow) or it just hangs? What would happen if you try your code on a small subset of the table? Would it work fine?

selitvin commented 2 years ago

Also, please note that "make_spark_converter" takes a spark dataframe and materializes it into a parquet file. If your data is big, this materialization step may take a long time.

Riser01 commented 2 years ago

@selitvin it completed but took a long time,

Riser01 commented 2 years ago

also i am getting the following warnings while training /databricks/python/lib/python3.7/site-packages/petastorm/arrow_reader_worker.py:53: FutureWarning: Calling .data on ChunkedArray is provided for compatibility after Column was removed, simply drop this attribute column_as_pandas = column.data.chunks[0].to_pandas() WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/tensorflow/python/ops/summary_ops_v2.py:1277: stop (from tensorflow.python.eager.profiler) is deprecated and will be removed after 2020-07-01.

Riser01 commented 2 years ago
Screenshot 2022-01-19 at 8 22 56 AM

this is how the GPU and CPU utilisation looks , while traning

selitvin commented 2 years ago

how does make_spark_converter compare with using directly tf records? After make_spark_converter is done converting a pyspark dataframe into parquet, we instantiates make_batch_reader() to read the data from the materialized Parquet. I assume your question is how make_batch_reader() compares to reading directly from TFRecords? If so, then TFRecords would typically be faster, but it's not exactly comparing apples to apples as the added value of petastorm is saving the typical ETL step that is required to produce TFRecords and allowing you to read directly from a parquet store.

can I use workers_count to increase the speed, does workers_count mean the no of CPU cores on a single node GPU instance. Tweaking workers_count could help speeding up your training. You can also try setting reader_pool_type='process'. It's hard to say upfront since the particular choice of these parameters are function of the data and the processing you are doing.