uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.79k stars 285 forks source link

Do not change the order of fields defined by unischema #248

Open selitvin opened 5 years ago

selitvin commented 5 years ago

We are sorting fields in the unischema object. This is not a good idea as it affects the order of columns in parquet store. A user might want to tweak performance of parquet storage by placing columns that are frequently used together, close to each other.

Jonathanpro commented 5 years ago

Hello selitvin, I believe that I face an issue which correlates to the issue you are mentioning.

I use an existing parquet file and I want to transfer it into a petastorm parquet file and than write it to hdfs as petastrom parquet

df_parquet_petastorm = sqlContext.createDataFrame(df_parquet.rdd, PetastormSchema.as_spark_schema())
with materialize_dataset(spark, output_url, PetastormSchema):
    df_parquet_petastorm.write.mode("overwrite").parquet(output_url)

the dataframe contains variables of different types like strings, integers and DecimalType However if I don't order the variables in df_parquet in alphabetic order this fails. A workaround for me is:

var_selected.sort()
df_parquet = df_parquet.select(var_selected)

it seems to be the case that

df_parquet_petastorm = sqlContext.createDataFrame(df_parquet.rdd, PetastormSchema.as_spark_schema())

matches variables in the rdd and the schema by order of the variables not by keys(variable names)

If you have any questions don't hesitate to ask me.

Kind regards, Jonathan

selitvin commented 5 years ago

Yes, you are right. These issues are related.

We recently invested some effort to support using petastorm to read directly from non-petastorm datasets. It is already works quite well. In order to do so, you can use make_batch_reader (instead of make_reader) so you won't need to do the conversion step you show here. It returns data in batches and is significantly faster in the cases where your dataset imcludes only native parquet types. Would the new way of reading work in your case?

carabolic commented 4 years ago

I'm facing an issue related to this. I use petastorm togehter with horovod, pyspark and tensorflow.keras. I use pyspark to pre-process the data, in my case make sequences out of texts. Then I have a DataFrame that has the following schema: sequence, price. I materialize this DataFrame with materialize_dataset to use the make_reader. Now comes the problem. The order of columns is important for tensorflow. The first tensor in a dataset needs to be the inputs and the second needs to be the targets. In my case sequence is the input and price the target, but since price < sequence the order was changed by petastorm and tensorflow doesn't get the proper training data.

selitvin commented 4 years ago

The reader returns a named tuple. Does this mean that you simply treat it as a unnamed tuple and then the order of fields becomes important? Would you be able to map the named tuple into a tuple using the order that is expected by you? Something along these lines:

make_petastorm_dataset(reader).map(lambda t: (t.c, t.b, t.a)

Then you would not depend on the order of the columns, as they are indeed not guaranteed.