uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Values returned by a transform function are not validated against the schema #402

Open selitvin opened 5 years ago

selitvin commented 5 years ago

Based on an internal report.

praateekmahajan commented 5 years ago

@selitvin is there a way to use transform function to change the schema using the transform_spec?

def func(row)
  row['existing_col'] = row['existing_col'].apply(tokenize_me)
  row['new_col'] = row['existing_col'].apply(lambda x : some_func(x))
  return row

transform = TransformSpec(func)

Is there a way we add new_col on the fly to the data?

selitvin commented 5 years ago

You are using this function from PyTorch? (For TF, the type needs to be explicit as we are creating the graph before we read any sample). We planned to do #355, is this the same ask?

praateekmahajan commented 5 years ago

I am not sure if the ask is the same. It might be related, but I don't see when does TransformSpec results in a failure for PyTorch. What I am curious is how to use TransformSpec for a batch_reader to create new columns on the fly.

I think reading through the following example should be trivial

def _transform_row(df):
    import pandas as pd
    df['new_col'] = pd.Series(100)
    return df

transform = TransformSpec(func=_transform_row)

train_loader = DataLoader(reader = make_batch_reader(petastorm_dataset_url, 
transform_spec=transform))
for i, data in enumerate(train_loader):
  print(data.keys())
  assert 'new_col' in data.keys(), "New col wasn't created"
  break 

And this code would result in AssertionError : New col wasn't created

For the curious ones, a use case could be, you have a text column and you'd want to generate multiple features (for eg BERT tokens, masks, padding based on text column)

selitvin commented 5 years ago

I think what's missing is edit_fields=... argument to the TensorSpec. Originally this explicit specification for the new field is a TF requirement. I think it should be relaxed for pytorch to avoid the confusion (assuming I understood your question correctly and there is a confusion).

Here is a modified examples/hello_world/external_dataset/python_hello_world.py I used to demonstrate the feature. Hope I understood your question correctly, but please let me know if I missed something:

from __future__ import print_function

from petastorm import make_batch_reader, TransformSpec
from petastorm.pytorch import DataLoader
from petastorm.unischema import UnischemaField
import numpy as np

def _transform_row(df):
    import pandas as pd
    df['new_col'] = pd.Series((10,20,30,40,50))
    return df

transform = TransformSpec(func=_transform_row, edit_fields=[UnischemaField('new_col', np.float64, (), None, False)])

def pytorch_hello_world(dataset_url='file:///tmp/external_dataset'):
    with DataLoader(make_batch_reader(dataset_url, transform_spec=transform, reader_pool_type='dummy')) as train_loader:
        for sample in iter(train_loader):
            print("id batch: {0}".format(sample['id']))

if __name__ == '__main__':
    pytorch_hello_world()
praateekmahajan commented 5 years ago

You're right. edit_fields was the argument missing. It'll be great to have this relaxed for PyTorch, but I don't see this to be a high priority issue though. Assuming this example, of adding new columns for PyTorch is documented.