uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

tensorflow pyspark #743

Closed malinphy closed 2 years ago

malinphy commented 2 years ago

I am an experienced TensorFlow/Keras user. I am trying to learn how to feed the pyspark data frame into the tf/keras project. To understand how to feed pyspark data frame into keras model, I created a toy dataset and model using sequential API that has one input and one output. I am using Colab to run code. You can see the code below. In my case, I could not figure out how to separate the input and output columns to train the model. The code below somehow takes the pyspark dataframe and starts to run, however, It does not finish training the model. I believe I am making a mistake in defining the inputs and output of the model using petastorm. I can also convert the pyspark data frame into pandas data frame, however, I want to learn how to feed the model using pyspark dataframe. Is there any simple solution or simple example to solve this problem?

import numpy as np 
import pandas as pd
!pip install --quiet pyspark py4j
!pip install --quiet petastorm

from pyspark.sql import SparkSession
spark  = SparkSession.builder.appName('practice').getOrCreate()
import numpy as np

from petastorm import TransformSpec
from petastorm.spark import make_spark_converter,SparkDatasetConverter
import tensorflow as tf
from  tensorflow import keras
df = spark.createDataFrame([(-1, -3),
                            (0, -1),
                            (1, 1),
                            (2, 3),
                            (3, 5),
                            (4, 7),
                            ], ['A', 'B'])

df.show()
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache")
converter = make_spark_converter(df.select('A','B'))
def get_model():
  model = tf.keras.Sequential([keras.layers.Dense(units=1, input_shape=[1])])
  return model

def get_compiled_model():

  model = get_model()

  model.compile(optimizer='sgd', loss='mean_squared_error', metrics = ['accuracy'])
  return model
def train_and_evaluate():

  model = get_compiled_model()

  with converter.make_tf_dataset() as dataset:
    dataset= dataset.map(lambda x: (x.A, x.B))
    model.fit(dataset,epochs=2)
train_and_evaluate()
selitvin commented 2 years ago

Thank you for a great repro snippet. I used your code and ran it locally and was able to have the training running: image

Do you observe something different? These are the versions of packages in my environment: my_env.txt

malinphy commented 2 years ago

Thank you very much for your quick response. As I mentioned in my post starts to run, however, It does not finish training the model. What about your try? Did you manage to complete the given number of epochs (In our case epochs =2). My code starts running the model however, does not finish training, even cannot complete just one epoch. That is the problem.

RobindeGrootNL commented 2 years ago

In the documentation it states that for converter.make_tf_dataser()the default value for num_epochs is None, meaning that "Setting num_epochs to None will result in an infinite number of epochs.". This leads to the dataset to keep giving data to your model and training never ending since your model is not aware that the first epoch has ended. Specifying a num_epochs = 2 or another value for num_epochs in make_tf_dataset() should solve your problem I think.

I hope this helps!

malinphy commented 2 years ago

@RobindeGrootNL , Yes, you are right. Thanks for your help. Adding num_epochs solved the problem. As an additional note, num_epochs in make_tf_dataset just let the finish training process. Real epoch number is given in model.fit command. The working version is shared below.

import numpy as np 
import pandas as pd
!pip install --quiet pyspark py4j
!pip install --quiet petastorm

from pyspark.sql import SparkSession
spark  = SparkSession.builder.appName('practice').getOrCreate()
import numpy as np

from petastorm import TransformSpec
from petastorm.spark import make_spark_converter,SparkDatasetConverter
import tensorflow as tf
from  tensorflow import keras
df = spark.createDataFrame([(-1, -3),
                            (0, -1),
                            (1, 1),
                            (2, 3),
                            (3, 5),
                            (4, 7),
                            ], ['A', 'B'])

df.show()

spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache")
converter = make_spark_converter(df.select('A','B'))
def get_model():
  model = tf.keras.Sequential([keras.layers.Dense(units=1, input_shape=[1])])
  return model

def get_compiled_model():

  model = get_model()

  model.compile(optimizer='sgd', loss='mean_squared_error', metrics = ['accuracy'])
  return model
def train_and_evaluate():

  model = get_compiled_model()

  with converter.make_tf_dataset(num_epochs=1) as dataset:
    dataset= dataset.map(lambda x: (x.A, x.B))
    model.fit(dataset,epochs=1000)
    print(model.predict([4]))
train_and_evaluate()