uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Ambiguous workflow while using Spark #738

Open smartFunX opened 2 years ago

smartFunX commented 2 years ago

I'm a bit confused how Petastorm should be used together with Spark: I have a PySpark script that processes huge amount of CSV files and writes data in parquet format. Now I wanted to use Petastorm to create dataloaders for my PyTorch model. What is not clear to me:

  1. Should I modify the existing PySpark script to use/dump that Unischema format
  2. Or Should I use the spark dataset converter within the PyTorch script? I tried to use the option 1) by using the following snippet: with materialize_dataset(spark, path, ClassificationSchema, 32): df.coalesce(10).write.mode('overwrite').parquet(path) However it looks like my dataframe (df) is not encoded with Unischema and I didn't find an example how to encode an existing dataframe with unischema.
selitvin commented 2 years ago

One of the petastorm's features is that enhances parquet format to storing ndarrays (perhaps enhances is a big word for this since we store ndarrays in binary fields and keep extra metadata to allow us load these ndarrays). Users can read these parquet tables with ndarrays using make_reader API.

In your case, it seems that your data types that come from csv map directly into parquet format and they don't require and additional metadata.

Petastorm is still handy in this case as it allows you to read the data from parquet file directly to TF tensors or pytorch (in this case you should use make_batch_reader api)

There is an additional api make_spark_converter that takes pyspark dataframe and lets you feed the data into TF graph or read from pytorch directly. Note that this API is just a helper API that persists a pyspark dataframe into a temporary location and then uses make_batch_reader API to read the data. If you already materialize your data into parquet, you don't really need it and can call make_batch_reader yourself.

smartFunX commented 2 years ago

Thanks, so one option is to use the make_batch_reader as you mentioned. In my case I could also benefit from the efficiency of ndarrays (e.g. in PySpark one column is a big array of features). What is the easiest way to apply(convert) Unischema to an existing dataframe in spark

selitvin commented 2 years ago

Frankly, if you can get away without ndarrays and Unischemas, it might be better. make_batch_reader is much more efficient than make_reader since it reads and forwards to the user entire row-groups in one go. make_reader loads an entire row-group, but then iterates over rows to deserialzie all ndarrays.