uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Decode with make_batch_reader #390

Open ThrowMeForALoop opened 5 years ago

ThrowMeForALoop commented 5 years ago

Hi team,

I have created spark parquet with the schema (and without 'materialize_dataset') below:

TrainSchema = Unischema('TrainSchema', [
    UnischemaField('features', np.uint8, (DEFAULT_IMAGE_SIZE[0], DEFAULT_IMAGE_SIZE[1], 3), NdarrayCodec(), False),
    UnischemaField('masks', np.uint8, (DEFAULT_IMAGE_SIZE[0], DEFAULT_IMAGE_SIZE[1]), NdarrayCodec(), False)
])

The parquet with two columns was generated with ndarray encoded. However when I use make_batch_reader to fill my parquet to tensorflow dataset, I located that make_batch_reader has not decoded numpy array value.

What i tried is to use 'transform_spec' to decode column value but it doesn't work well with error 'only file with allow_pickle= True can be loaded'

def decode_image_and_mask(row):
        codec = NdarrayCodec()
        return (codec.decode(TrainSchema[0], row.features.values), codec.decode(TrainSchema[1], row.masks.values))

transform = TransformSpec(decode_image_and_mask)

with make_batch_reader('hdfs://node013.ib.cluster:8020/train/train_df.parquet', num_epochs=None,
                           cur_shard=hvd.rank(), shard_count=hvd.size(),
                           hdfs_driver='libhdfs', transform_spec=transform) as train_reader:

Please let me know if you have any idea about the issue. Thanks and regards,

selitvin commented 5 years ago

Interesting. make_batch_reader was intended for reading native parquet files, for the purpose of working directly with existing enterprise data-warehouse stores that are also used by other, non petastorm systems. Petastorm decoders are not used when reading the data since we assume all the Parquet store has only native Parquet types.

If you want to store numpy arrays in your parquet store, the recommended way would be to use materialize_dataset, as shown in the this example.

Is there a reason why you avoid using materialize_dataset and make_reader and choose to use make_batch_reader?

karterotte commented 4 years ago

@selitvin I'm facing the same problem: Team has parquet data files (and without 'materialize_dataset') with the schema : Unischema(inferred_schema, [ UnischemaField('label', int32, (), None, True), UnischemaField('feature', str_, (), None, True), ]) I need to parse "feature" column to ndarray. I need to split this string and combine the result . I want to use make_batch_reader with transform_spec, which I'll make a func to do the parse job. But I can't find the way.

Is there a reason why you avoid using materialize_dataset and make_reader and choose to use make_batch_reader?

Because the origin data is not mine. Translate this data to materialize_dataset is not a good choice(extra time and space)

Thanks and regards!

selitvin commented 4 years ago

Would a code similar to the one in the test work? If not, can you please show a snippet that you are trying to run and the error? If you can create a small reproducable example, it would be even better.

mirko-m commented 1 year ago

I was also having issues using a TransformSpec together with make_batch_reader . In my case the issue was that I thought that the func parameter which is passed to the TransformSpec needs to return a dictionary when in fact it needs to return a pandas DataFrame . This becomes apparent when looking at the examples that @selitvin posted, but is not clear from the API documentation.