uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.79k stars 284 forks source link

Cannot resolve error "Cannot auto-create unischema due to unsupported column type" #501

Open muntasirraihan opened 4 years ago

muntasirraihan commented 4 years ago

Hi,

I am trying to convert a pyspark dataframe into parquet and load into a keras model.fit using petastorm.

My dataframe schema looks like:

"field1: integer field2: integer .... label: integer features: udt "

I used vectorassembler to pack the feature fields in a vector called "features".

To load the dataset into a keras model using petastorm, I am using the following code:

"with make_batch_reader(petastorm_dataset_url, num_epochs=100) as reader: dataset = make_petastorm_dataset(reader) model.compile(loss=loss, optimizer='adam') model.fit(dataset, batch_size=batch_size, epochs=epochs, steps_per_epoch=steps)"

I am getting the following error:

"ValueError: Cannot auto-create unischema due to unsupported column type struct<type: int8, size: int32, indices: list, values: list>"

My guess is I am facing some type conversion issues, but I would appreciate any hints on how to solve this.

Thanks

selitvin commented 4 years ago

Currently petastorm does not properly support arbitrary parquet types. We see more and more need for that, so I hope we'll improve the support soon. In your case, would it be possible to unpack features udts into separate columns? That might help.

Note that in order to automatically collate multiple rows into batches, each row with a list type of value, all lists would need to be of the same length. If not, you might be able use transform_spec argument to preprocess each row so the rows could be batched.

muntasirraihan commented 4 years ago

I am seeing the following error when trying to fit a keras model using petastorm dataset.

My sample code is as follows:

'with make_batch_reader(petastorm_dataset_url) as reader: for row in reader: print(row.features.shape) dataset = make_petastorm_dataset(reader) model.compile(loss=loss, optimizer='adam') model.fit(dataset, batch_size=batch_size, epochs=2, steps_per_epoch=2)"

Each dataset row has 108 columns as the feature set, and the dataset has 20,000 rows. The error I see is:

"ValueError: Error when checking model input: the list of Numpy arrays that you are passing to your model is not the size the model expected. Expected to see 108 array(s), but instead got the following list of 1 arrays: [<tf.Tensor 'ExpandDims_6:0' shape=(?, 1) dtype=float64>]..."

Any help would be appreciated to resolve this error.

Thanks

selitvin commented 4 years ago

Hard to say without looking at the data. Can you show the structure of all fields in the row? Does it match fit expectation ((inputs, targets) or (inputs, targets, sample_weights))? Consider creating an iterator and looking at the shapes, like in the example: examples/hello_world/external_dataset/tensorflow_hello_world.py. What are the actual TF tensor shapes?

muntasirraihan commented 4 years ago

I created an iterator, and extracted the tensor details as follows:

"with make_batch_reader(petastorm_dataset_url, num_epochs=100) as reader:

for row in reader:

print(row.features.shape)

dataset = make_petastorm_dataset(reader) iterator = dataset.make_one_shot_iterator() tensor = iterator.get_next() print(tensor) with tf.Session() as sess: sample = sess.run(tensor)"

Tensor output:

"inferred_schema_view(features=<tf.Tensor 'IteratorGetNext_7:0' shape=(?,) dtype=float64>, label=<tf.Tensor 'IteratorGetNext_7:1' shape=(?,) dtype=float64>)"

The original training dataframe has 108 columns , and 1 label column.

In the tensor, features is an array of 108 doubles, and label is a single double value.

selitvin commented 4 years ago

If the original parquet file has 108+1 columns, I would expect to see 109 columns in the record loaded and not just two fields. Originally, you mentioned that features was a udt, is that still the case?

It would be best if you could provide a small snippet of code for me to reproduce the issue that would create a dummy parquet store with the same structure as yours. Not sure I fully understand the way your dataset is structured.

muntasirraihan commented 4 years ago

I used the following code to pack 108 columns into a features udt,

// dataframe df has 108+1 columns // df schema // x1:double //x2:double //... //x108:double //label:double

"from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler

ignore = ['label']

assembler = VectorAssembler(inputCols=[col for col in df.columns if col not in ignore], outputCol='features') train_df = assembler.transform(df)"

I then filtered out the features and label:

"final_train_df = train_df.select(['features', 'label'])"

Next some code sample from: https://docs.azuredatabricks.net/_static/notebooks/deep-learning/petastorm.html -> "Petastorm supports scalar and array columns in Spark DataFrame. MLlib vector is a user-defined type (UDT), which requires special handling. Register a user-defined function (UDF) that converts MLlib vectors into dense arrays."

"%scala

import org.apache.spark.ml.linalg.Vector

val toArray = udf { v: Vector => v.toArray } spark.sqlContext.udf.register("toArray", toArray)"

"final_train_df.selectExpr("toArray(features) AS features", "label AS label") \ .repartition(10) \ .write.mode("overwrite") \ .option("parquet.block.size", 1024 * 1024) \ .parquet(parquet_path)"

This creates the parquet file I am trying to use for fitting the keras model.

muntasirraihan commented 4 years ago

For the Keras model, I have the form:

model = Model(inputs=[input1, ...input108], outputs = output).

I believe that is why I am getting mismatch. I am packing all 108 columns into the parquet file and transforming into petastorm dataset. Is there a Petastorm API that would be compatible with multi-input keras models like this?

selitvin commented 4 years ago

Since Petastorm is not aware of Keras at all, I would expect you to be able to reshape data in TF dataset produced by make_petastorm_dataset API by standard TF dataset manipulation means (e.g. using Dataset's map function - it should give you the full flexibility to transform the data acquired from a Parquet store into whatever Keras expects.

Would that work in your case?

muntasirraihan commented 4 years ago

Thanks, I will try that. I am now running into a different issue as I try to fit a keras model using petastorm dataset. My code segment is as follows:

all_cols = train_df.columns with make_batch_reader('file://' + get_local_path(parquetpath), num_epochs=None) as train_reader: train_ds` = make_petastorm_dataset(train_reader).map(lambda x: (tuple(getattr(x, col) for col in all_cols))) model.fit(train_ds, batch_size=batch_size, epochs=2, steps_per_epoch=2)

I am getting the error message: "ArrowIOError: Invalid parquet file. Corrupt footer."

I saved the parquet file using:

df.write.parquet(parquetpath, mode='overwrite')

I checked that I can properly read the parquet file using spark.read.parquet(parquetpath).

Once or twice, when I restart my databricks cluster, the error goes away, but I am not completely sure why I am getting this error. To double check I have also cleaned up committed and started files from the parquet location.

Kindly provide some guidance about this issue. Thanks

selitvin commented 4 years ago

Not sure. The fact that restarting the cluster helps - could it be that the files are not fully written/flushed out before you try reading them? The error message would make sense in this case.

just-maiyak commented 4 years ago

Hello, I am running into the same kind of issues on a Databricks cluster as well. I have a similar setup as @muntasirraihan, although rebooting the cluster doesn't seem to be of any help. I also used the sample code in databricks' documentation notebook as my base. I have a dataprep pipeline processing my data, which ends with a df.write.parquet(path) statment (reading it through spark seems to be working fine as well). My dataframe's rows are composed of three fields :

features:array (Previously a VectorUDT, converted to an array with the notebook's UDF)
    element:double
next:integer
weight:integer

This is where it breaks ; I have this code :

from petastorm import make_batch_reader
from petastorm.tf_utils import make_petastorm_dataset

with make_batch_reader(petastorm_dataset_url, num_epochs=10) as reader:
  dataset = make_petastorm_dataset(reader) \
    .map(mapper)
  model.fit(dataset, steps_per_epoch=10, epochs=10)

... breaking at the with clause with the following error :

ArrowIOError: Invalid parquet file. Corrupt footer.

Sometimes I have a slight variation and get the following in the stacktrace :

PetastormMetadataError: Could not find _common_metadata file. Use materialize_dataset(..) in petastorm.etl.dataset_metadata.py to generate this file in your ETL code. You can generate it on an existing dataset using petastorm-generate-metadata.py

I'd also be interested in some insight about this issue.

selitvin commented 4 years ago

Looked at the code. This can be somehow related to the _metadata or _common_metadata files stored in your parquet directory. Do you have either of these files? Can you please post that values you get from :

pq.ParquetDataset(dataset_path).metadata
pq.ParquetDataset(dataset_path).common_metadata

In particular, values of num_row_groups are interesting. This is the code that deciphers number of rowgroups in your Parquet store, and the message in your second error is raised by it.

Can you please also show full call-stack? Curious where ArrowIOError originates from.

just-maiyak commented 4 years ago

As a matter of fact, I can't even create the dataset with the pq.ParquetDataset primitive, I get the same ArrowIOError: Invalid parquet file. Corrupt footer..

I created a gist with the stacktrace, you can see it here.

selitvin commented 4 years ago

Then it seems that somehow your parquet store got corrupted. Can read it using a vanilla spark.read.parquet call as spark dataframe?

just-maiyak commented 4 years ago

Absolutely, I can, just like @muntasirraihan described it in his own setup. It seems that when I write my dataframe with df.write.parquet(location) neither _matadata nor _common_metadata are created.

selitvin commented 4 years ago

Based on you stack trace and the error message ArrowIOError: Invalid parquet file. Corrupt footer.. the issue does not stem from the lack of _metadata or _common_metadata. The failure occurs when actual Parquet files are read. This means that something bad happened during Parquet file write-out.

If Spark's native spark.read.parquet(location) works, and you can actually access the data while pa.ParquetDataset doesn't, I would think that there is some issue with pyarrow parquet reader. This basically means that you should be able to reproduce the failure without any Petastorm code. Or am I missing something?

just-maiyak commented 4 years ago

This is exactly what is happening indeed, I came to the same conlusion.

selitvin commented 4 years ago

Is there anything to check further, or we should close the issue?