uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Train-Test Dataset Split #391

Closed seranotannason closed 5 years ago

seranotannason commented 5 years ago

Is there currently support for splitting a Petastorm dataset into train-test for PyTorch? In PyTorch, one would typically do this to a Dataset class but since Petastorm only has the classes Reader and DataLoader (as below), I wonder if this feature has been implemented.

trainloader = DataLoader(make_reader('file://' + filename), batch_size=128)

selitvin commented 5 years ago

You can use make_reader(..., predicate=in_lambda(['sample_id'], lambda x: x % 10 < 2)). The predicate will be called for each sample (or batch in case of make_batch_reader). You can decide which rows are included in the current split.

We have off-the-shelf in_pseudorandom_split predicate which implements this kind of partitioning for you. For example: make_reader(..., predicate=in_pseudorandom_split([0.8, 0.2], 0, 'sample_id')) will return 80% of the samples you can use a training set. Using in_pseudorandom_split([0.8, 0,2], 1, 'sample_id') would return the complimenting 80%.

Note that #393 makes the function work also with non-string fields.

In order for this approach to work more effectively and avoid loading and (automatically) discarding data from the split you are not interested in, it would be recommended to keep your parquet sorted or partitioned by the field you use for your split criteria.

seranotannason commented 5 years ago

Consider the following block when the schema has the columns filename, image, and label:

def _transform_row_train(cifar_row):
    result_row = {
        'image': cifar_row['image'],
        'label': cifar_row['label']
    }
    return result_row

transform_spec_train = TransformSpec(_transform_row_train, removed_fields=['filename'])

trainloader = DataLoader(make_reader(filename, 
    predicate=in_pseudorandom_split([0.75, 0.25], 0, 'filename'), 
    transform_spec=transform_spec_train))

This block will produce an error that cifar_row has no keys 'image' or 'label'. It turns out that transform_spec performs its operations only on the column specified as the third argument of predicate. Is this the desired functionality?

selitvin commented 5 years ago

Nope. That was not an aware choice. I think you are right. It makes sense to filter first by predicate and then apply the transform. Actually, with make_batch_reader the order is correct. We should fix it.

Created #394 with a fix. Can you please try and let me know if it works for you?

seranotannason commented 5 years ago

Both #393 and #394 work!

selitvin commented 5 years ago

These are good news! Will include them in the next release probably within couple of days up to two weeks from now.

cupdike commented 5 years ago

In order for this approach to work more effectively and avoid loading and (automatically) discarding data from the split you are not interested in, it would be recommended to keep your parquet sorted or partitioned by the field you use for your split criteria.

So let's say my data had pre-defined train, test, validation sets and I wanted to implement parquet store partitioning so each could be efficiently loaded. When I look for examples of how to implement retrieval by partition key (e.g. PartitionKeyInSetPredicate):

https://github.com/uber/petastorm/blob/master/petastorm/tests/test_end_to_end_predicates_impl.py#L20

...it looks like it is evaluating the partition key value by value from the row data (and not using directory pruning based on the parquet partitioning). Am I perhaps looking at the wrong sample here or is there some other way that the directory pruning occurs? Thanks...

selitvin commented 5 years ago

This code is expected to kick in: https://github.com/uber/petastorm/blob/9b58038ecd7b1d2e788edff6a887ad14d37a9bea/petastorm/reader.py#L468

The limitation, though, is that your predicate should be defined only the fields that you partition by.

If for some reason, this method does not work for you, another way, I wonder if you could point directly to the partition directory when specifying a URL (something along these lines: hdfs:///..../my_dataset/split=train)?

cupdike commented 5 years ago

Unfortunately it doesn't seem to work that way... Petastorm can't find _common_meta when I use a targeted partition like this file:///var/nfs/general/fmowRGB/splitName\=train. The following error results:

  File "/home/user1/venv_tf/venv/lib/python3.6/site-packages/petastorm/etl/dataset_metadata.py", line 344, in get_schema
    'Could not find _common_metadata file. Use materialize_dataset(..) in'

However it does work with native pyarrow.parquet:

>>> dataset = pq.ParquetDataset('/var/nfs/general/fmowRGB/splitName=train/')
>>> table = dataset.read()
>>> pandasTable = table.to_pandas()
>>> pandasTable
  category                                           filename  \
0  airport  /var/tmp/fmowRgbSplitsSample/train/airport/air...
1  airport  /var/tmp/fmowRgbSplitsSample/train/airport/air...
2  airport  /var/tmp/fmowRgbSplitsSample/train/airport/air...
...

Note that splitName is not a column when I do this. If I rename _common_metadata so that presumably pyarrow.parquet can not find it, it still properly loads the partition. Perhaps parquet stops treating it like a dataset when a partition is specified and just treats it like a set of parquet files.

Spark works similarly to pyarrow parquet by default but you can apparently apply a base_path to target a partition and still have the partition column be included:
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery

cupdike commented 5 years ago

I looked into the metadata files. As I understand it, only parquet-mr writes these files. Pyarrow Parquet does not write them. And neither Spark nor parquet-mr require them at all. These files appear to be deprecated in parquet-mr to be removed in the 2.0 release (no indication when that will be).

selitvin commented 5 years ago

Sorry for the delayed response.

You are right. Petastorm uses _common_metadata as a place to store an instance of a Unischema and the count of rowgroups per file (helps to start reading faster as our minimal scheduling unit is a rowgroup).

As am alternative, it should be possible to pass a base_path argument to make_reader (similar to pyspark), and get all the information we need from there.

Before we do that, I was wondering if the idea with defining the predicate only on partition-by fields does not work (the one that goes via _apply_predicate_to_row_groups code path).

cupdike commented 5 years ago

Yes, it did work. I was able to pass in a "train" partition/splitName and use a predicate to only retrieve those values, viz:

python petaTest.PartitionKeyPredicate.py file:///var/nfs/general/fmowRGB/ train

I also verified it still worked if I deleted the directories containing the other partitions (val and test) to make certain nothing was accessing them or even depending on them in any way. 👍

The only catch was that I had to put a decode on the values (unlike the Petastorm predicate example).

class PartitionKeyInSetPredicate(PredicateBase):
...
    def do_include(self, values):
        return values['splitName'].decode() in self._inclusion_values

Is that just a Python 2 vs Python 3 thing?

selitvin commented 5 years ago

What is the numpy dtype you use to define splitName in UnischemaField? We have a similar example in test_predicate_on_partition unit test. The field is defined with np.unicode_ type and it that definition works transparently with PY2 and PY3:

    UnischemaField('partition_key', np.unicode_, (), ...

Perhaps using np.unicode_would solve your case as well?

cupdike commented 5 years ago

I guess I never came across the unicode example and should have dug into numpy types. I've been using: UnischemaField('splitName', np.string_, (),ScalarCodec(StringType()), nullable=False)

In case anyone else gets tripped up by this, here is the numpy reference: https://numpy.org/devdocs/reference/arrays.dtypes.html#string-dtype-note

Thanks for the pointer...