uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Predicting is slow and sometimes doesn't even work. #689

Closed diogoribeiro09 closed 3 years ago

diogoribeiro09 commented 3 years ago

Hi, I'm currently using PySpark 3.1.1 and I'm using petastorm to be able to use my TF models with Spark Dataframes. After much digging through the examples I'm struggling with some implementations. I'm trying to implement an AutoEncoder model and my dataset is as follows:

+----------+-------------+--------------+------------+---------+--------------+----+
|screw_id  |profile_1111|profile_2222   |profile_time|   gof   |profile_stepnr|rank|
+----------+-------------+--------------+------------+---------+--------------+----+
|12925510_1|0.0          |2.28          |1           |1.0      |0             |1   |
|12925510_1|5.1          |0.0           |30          |1.0      |0             |1   |
|12925510_1|10.3         |0.0           |40          |1.0      |0             |1   |
|12925510_1|15.9         |0.0           |47          |1.0      |0             |1   |
|12925510_1|21.0         |0.0           |52          |1.0      |0             |1   |
|12925510_1|26.2         |2.16          |61          |1.0      |0             |1   |
|12925510_1|31.4         |2.08          |68          |1.0      |0             |1   |
|12925510_1|36.5         |2.2           |75          |1.0      |0             |1   |
|12925510_1|41.7         |2.2           |87          |1.0      |0             |1   |
+----------+-------------+--------------+------------+---------+--------------+----+

After some feature engineering implemented via a pipeline my features get encoded into a vector format in a new column named "features". I create the AE model (I don't think is relevant for this use-case to post it here, but I can add it if needed) and then the spark converter for both my training and validation dataset:

converter_train = make_spark_converter(train_tf.select('features')) converter_val = make_spark_converter(val_tf.select('features'))

Using the examples provided in this repo I have implemented the train_and_evaluate function as shown next. If I'm not mistaken, for unsupervised learning where no labels are provided I should use my 'features' for both X and Y or it will complain that I did not provide the gradients for any variable:

BATCH_SIZE = 2**11
#Epochs set to 1 for testing purposes
NUM_EPOCHS = 1
import os
import tensorflow as tf

def train_and_evaluate(lr=0.001):
    model = get_compiled_model(lr)

    with converter_train.make_tf_dataset(batch_size=BATCH_SIZE) as train_dataset, \
           converter_val.make_tf_dataset(batch_size=BATCH_SIZE) as val_dataset:

        # tf.keras only accept tuples, not namedtuples
        train_dataset = train_dataset.map(lambda x: (x.features, x.features))
        steps_per_epoch = len(converter_train) // BATCH_SIZE

        val_dataset = val_dataset.map(lambda x: (x.features, x.features))
        validation_steps = max(1, len(converter_test) // BATCH_SIZE)

        print(f"steps_per_epoch: {steps_per_epoch}, validation_steps: {validation_steps}")

        hist = model.fit(train_dataset,
                         steps_per_epoch=steps_per_epoch,
                         epochs=NUM_EPOCHS,
                         validation_data=val_dataset,
                         validation_steps=validation_steps,
                         callbacks=ae_callback(),
                         verbose=2)

        return hist.history['val_loss'][-1], hist.history['val_accuracy'][-1], model 

loss, accuracy, model = train_and_evaluate()
print("Validation Accuracy: {}".format(accuracy))

The model trains "fine" (performance is not as good as it did in Pandas but I haven't spent much time calibrating it) and relatively fast (2/3 min). With this trained model I now want to infer on a new dataset:

def pred():
    with converter_unit.make_tf_dataset(batch_size=BATCH_SIZE) as t_dataset:
        te_dataset = t_dataset.map(lambda x: (x.features, x.features))
        return model.predict(te_dataset, verbose=2)

I run this function and never (or almost never) get the results and it never errors out. The test dataframe has only 400 lines so it should be pretty fast considering that training the model took only a couple min. Any suggestion ?

selitvin commented 3 years ago

Can you try fetching directly from the t_dataset. How does time-to-the-first sample look like? How do the delays between consequent samples look?

diogoribeiro09 commented 3 years ago

Thanks for coming back to me so quick.

Can you try fetching directly from the t_dataset

Tried using predict on the 'train_dataset' and like before it gets stuck after this - C:\Users\USER\AppData\Roaming\Python\Python38\site-packages\petastorm\arrow_reader_worker.py:53: FutureWarning: Calling .data on ChunkedArray is provided for compatibility after Column was removed, simply drop this attribute column_as_pandas = column.data.chunks[0].to_pandas().

How does time-to-the-first sample look like? How do the delays between consequent samples look?

Could you please elaborate on this? I'm not quite sure what do you mean by time-to-the-first sample.

selitvin commented 3 years ago

Can you try fetching directly from the t_dataset I mean something along these lines:

for sample in t_dataset:
print(sample)
diogoribeiro09 commented 3 years ago

Can you try fetching directly from the t_dataset I mean something along these lines:

for sample in t_dataset:
  print(sample)

Oh my bad, I even have that implemented on my side for debugging. First sample (time in bold):

inferred_schema_view(features=<tf.Tensor: shape=(32, 2), dtype=float32, numpy= array([[0. , 0.01496567], [0.00515603, 0.02494279], [0.01081756, 0.01995423], [0.01607469, 0.02494279], [0.02123072, 0.03242562], [0.02648785, 0.03990846], [0.03174498, 0.02494279], [0.03690101, 0.02993134], [0.04215814, 0.03990846], [0.04731417, 0.04489701], [0.0525713 , 0.03990846], [0.05772733, 0.02494279], [0.06298446, 0.03990846], [0.06864598, 0.02743706], [0.07390311, 0.02993134], [0.07905915, 0.02743706], [0.08431628, 0.03242562], [0.0894723 , 0.01496567], [0.09472943, 0.02743706], [0.09998656, 0.02494279], [0.10564809, 0.02743706], [0.11130961, 0.02743706], [0.11656674, 0.02494279], [0.12172277, 0.02494279], [0.12738429, 0.02993134], [0.13355131, 0.03242562], [0.13880844, 0.03242562], [0.14639084, 0.04240273], [0.15346774, 0.03990846], [0.15923037, 0.03990846], [0.16529629, 0.04489701], [0.17146331, 0.05487413]], dtype=float32)>) Took 0.0009908999782055616

Consequent samples: Took 0.0006593999860342592 Took 0.0007233999785967171

I cant add more if needed.

EDIT: In case it helps, my model type : <tensorflow.python.keras.engine.functional.Functional at 0x1ddb56f71c0> EDIT2: If i do pred = model.predict(train_dataset.take(1)) it actually works. Can't think of a reason why it wouldn't work on a small dataframe ? EDIT3: Sorry for all edits. I think my problem is either in the _make_sparkconverter or how my original dataframe is. My test dataset has 377 lines but when printing the samples I see that after it reaches the 377th line it repeats the dataset all over again. The only way to prevent this is :

with converter_unit.make_tf_dataset(batch_size=377) as t_dataset:

        pred = model.predict(t_dataset.take(1), verbose=2)
        print(pred)

Shouldn't it be able to work without me implicitly forcing the _batchsize to the size of my dataset and use .take(1) ?

selitvin commented 3 years ago

Not sure what's going on there. Perhaps enabling verbose logging can reveal something?

import logging
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
selitvin commented 3 years ago

I assume that

for i, sample in enumerate(t_dataset):
  print(i, sample)

would get all 377 samples?

diogoribeiro09 commented 3 years ago

import logging logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG)

Tried it and couldn't see anything significant.

would get all 377 samples?

I would guess so too but it doesn't. I manually have to break the execution or it keeps on printing the values forever.

with converter_unit.make_tf_dataset() as t_dataset:
    for i, sample in enumerate(t_dataset):
        print(i, sample)

If I run this code the shape of the Tensor is (32,2) which is the default tensor size and I have 2 features. With that being said, I was expecting my i to reach 12 (377/32). Before I stopped the execution it was already at 3850. Shouldn't this code form a single tensor with shape=(377,2) ?

selitvin commented 3 years ago

Can you please try see if converter_unit.make_tf_dataset(num_epochs=1) helps (docs)?

diogoribeiro09 commented 3 years ago

Can you please try see if converter_unit.make_tf_dataset(num_epochs=1) helps (docs)?

This indeed works, thank you very much. Now I wonder, should I use the same on the train_and_evaluate function ? If I got this right the number of epochs on the mate_tf_datasetis "totally independent" from the number of epochs on the model.fit().

selitvin commented 3 years ago

I have not work with Keras, but from looking at the code, it's indeed seems that

steps_per_epoch=steps_per_epoch,
epochs=NUM_EPOCHS,

arguments define the total number of iterations.

Indeed make_tf_dataset, by default will just keep producing samples while asked and keras will probably do the epoch / samples counting on its own.

diogoribeiro09 commented 3 years ago

Gotcha! I need to investigate further because for the same dataset, using pandas and the same model (and hypeparams) I'm getting way better metrics than with pyspark. This leads me to believe that something related with the way I feed the "batches" is influencing the model learning capabilities. Maybe I could close this issue and re-open it if I figure whats going on. It could probably help others with similar problems.

selitvin commented 3 years ago

Sounds good - good luck with your project!