uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Suggestion for converting a spark array column to a format allowed by make_reader? #646

Closed Jomonsugi closed 3 years ago

Jomonsugi commented 3 years ago

I am wondering if there is a suggested/expected way to format a spark array column into a format that will be accepted by make_reader.

For example:

df = spark.createDataFrame(
    [
        ([['1','2']]), ([['3','4','2']]), ([[]])
    ],
    ['id'] 
)

df.printSchema()
root
 |-- id: array (nullable = true)
 |    |-- element: string (containsNull = true)

Then using materialize_dataset will successfully write the df and add the _common_metadata file:

Schema = Unischema('Schema', [
          UnischemaField('id', np.string_, (1, None), NdarrayCodec(), False),
        ])

def generate__dataset(output_url='s3://example-bucket/petastorm/'):

    with materialize_dataset(spark, output_url, Schema):

        df.write.mode('overwrite').parquet(output_url)

generate__dataset()

But then when I try to load the data I receive an error:

dataloader=DataLoader(make_reader('s3://example-bucket/petastorm/'), batch_size=1)
with dataloader:
    for row in dataloader:
        print(row)
    raise ValueError("Cannot load file containing pickled data "
petastorm.utils.DecodeFieldError: Decoding field "id" failed
Iteration on Petastorm DataLoader raise error: DecodeFieldError('Decoding field "id" failed')

The only way I have been able to load the data using make_reader is by taking a cue from the readme and converting my df to an rdd and creating a bytearray then back to a df. (Apologies for the seemingly over-engineered lambda here as I am working with a much larger data frame with many more columns and multiple types)

Schema = Unischema('Schema', [
          UnischemaField('id', np.string_, (1, None), NdarrayCodec(), False),
        ])

def _row_to_dict(schema, row):

    def type_check(k,v):
        if isinstance(v, list):
            return np.array([v], dtype=schema._fields[k].numpy_dtype)
        else:
            return v

    return {k:type_check(k,v) for (k,v) in row.asDict().items()}

def generate__dataset(output_url='s3://example-bucket/petastorm/'):

    with materialize_dataset(spark, output_url, Schema):

        rows_rdd = df.rdd.map(lambda x: _row_to_dict(Schema, x)) \
                  .map(lambda x: dict_to_spark_row(Schema, x))

        spark.createDataFrame(rows_rdd, Schema.as_spark_schema()) \
           .write \
           .mode('overwrite') \
           .parquet(output_url)

generate__dataset()

The above code works and I'm able to read it:

def _transform_row(row):
    row['id'] = torch.tensor(row['id'].flatten().astype(int), dtype=torch.float32)
    return row

transform = TransformSpec(_transform_row)

dataloader=DataLoader(make_reader('s3://example-bucket/petastorm/', transform_spec=transform), batch_size=1)
with dataloader:
    for row in dataloader:
        print(row)
...
{'id': tensor([], size=(1, 0))}
{'id': tensor([[1., 2.]])}
{'id': tensor([[3., 4., 2.]])}

However, the strategy I employ above where I covert my dataframe to an rdd then map two lambda statements does not scale well at all. Plus, it's convoluted. I already have my dataframe ready. I just need to prep columns for make_reader (if need be?) and provide the Unischema object to materialize_dataset. I'd prefer a simple withColumn expression or at worst a udf to convert my array columns in preparation for make_reader. Or maybe I am missing a simple function that petastorm provides? On the same dataframe I have been able to use make_spark_converter successfully (no change to the array column), but that is not my preference as the data comes through in pandas df batches where the dictionary per row output above is perfect for my application.

Looking forward to hearing what the intended way to handle the array column is.

selitvin commented 3 years ago

Your 'two map functions' approach is correct. Originally, I would expect it to be only a single map (with dict_to_spark_row), but the reason, as you have probably discovered, is that your datasource is a plain python list, while the expected input dataset for dict_to_spark_row is numpy arrays. As a convenience feature, we could probably fold this 'automatic conversion' from python list to numpy array into the body of dict_to_spark_row (or probably better idea: into NdarrayCodec implementation).

For 'small size' fields (say < 100KB), it's likely that using make_reader won't be very performant (make_reader style reading is designed to read large images from a dataset). I wonder if in your case you could use make_batch_reader? It works with vanilla parquet stores and if all you need is a list of strings, following approach could be much more read-efficient:

from pyspark.sql import SparkSession

from petastorm import make_batch_reader, TransformSpec
from petastorm.pytorch import BatchedDataLoader

url = "file:///tmp/i646"

def generate__dataset(output_url=url):
    spark = SparkSession.builder.getOrCreate()
    data = [([['1', '2']]),
            ([['3', '4', '2']]),
            ([['3', '1', '2']]),
            ([['3', '3', '2']]),
            ([['3', '5']]),
            ([[]])]
    df = spark.createDataFrame(data, ['id']).repartition(1)
    df.write.mode('overwrite').parquet(output_url)

generate__dataset()

def featurize(x):
    x.id = x.id.map(lambda x: [len(x), len(x)])
    return x

reader=make_batch_reader(url, transform_spec=TransformSpec(featurize))
for batch in BatchedDataLoader(reader, batch_size=3):
    print(batch)

# Output:
# {'id': tensor([[2, 2],
#         [3, 3],
#         [3, 3]])}
# {'id': tensor([[3, 3],
#         [2, 2],
#         [0, 0]])}

Note, that for row collation to work properly, we need to introduce a transform that would encodes your variable size data so it can fit into a proper tensor (numpy and petastorm as a result don't support ragged data types at the moment). I use some silly featurize function in the example.

Jomonsugi commented 3 years ago

Thank you for the suggestion. A few questions as I work through trying out BatchedDataLoader as a solution.

Am I right that:

I ask because, although not a dealbreaker, these expectations are prescriptive. The pipeline I'm working with currently expects a dictionary for each index exactly like the petastorm Dataloader returns so it was going to be a lot easier to work with.

As well, can you please expand on this note? How can I ensure that my data is being properly shuffled between epochs?

selitvin commented 3 years ago

You are right on the spot for all three bullet points. This is a design is intentional to enable vector processing on the entire batch rather than operate in a per-record fashion, which has huge performance benefits, especially when field sizes are small (i.e. scalars vs images)

he pipeline I'm working with currently expects a dictionary for each index exactly like the petastorm Dataloader returns so it was going to be a lot easier to work with.

What is the size of a record in your usecase? How large is a batch-size you use for training. I am just guessing here, since I don't know your case, that you would have small-size rows and large batch sizes? Does that mean that there would be a lot of python processing (assuming data arrives as a list of dictionaries) done in the main python process and as a result low GPU utilization?

Regarding the 'shuffling note': make_batch_reader loads entire parquet row-groups in a single load (typically hunders MB of data). When you request shuffling from make_batch_reader, you are getting random sampling of these huge row-groups. Unless, you parquet data is pre-shuffled, you might high correlation between samples. Unless additional, row-level shuffling is not done in-memory (shuffling_queue_capacity argument of BatchedDataLoader) trained model performance is likely to deteriorate significantly.

Jomonsugi commented 3 years ago

Batch size will vary, but typically will be around 100 to 1000 records. The size of the record can also vary as I am attempting to use petastorm as a solution to an abstracted pipeline. However, as an example, loaded as a pandas dataframe, there might be around 50 columns. A few of those will be single string values, but most will be arrays with one value or many (even hundreds or thousands of values). Given a particular lookback, these arrays are sliced, then padded with 0's where appropriate so that all arrays are the same size. Because of the way this particular modeling pipeline produces negative samples as the data is loaded, within the transformation step passed to petastorm's BatchedDataLoader, every column's contents will be mapped to an index via python dictionary. This index mapping will all depend on the value of two different columns in the pandas dataframe.

All this can be done using a pandas dataframe and I see your point about taking advantage of vectorized operations wherever possible under the hood of the pandas dataframe.

I can definitely shuffle data before saving it in parquet format, and can shuffle row groups via the BatchedDataLoader. Although I know shuffling_queue_capacity will slow the data loading process, I can at least try it. It might be imperative as you suggest.

Given the description of my data, do you have any suggestions on how I might optimally save the data in parquet format to optimize speed and shuffling? Should I save smaller size parquet files within s3? Should I make sure the "key" (e.g. customer id) that I am most concerned with being shuffled within a batch has the highest degree of variation within each saved parquet file in s3? Essentially, how can I save my data in a way that will produce the best variation across a particular variable within a row group?

selitvin commented 3 years ago

Should I save smaller size parquet files within s3 I think what's important is row-group size, not necessarily the size of one file. Of course indirect way of controlling a group size is by enforcing a particular partitioning scheme that produces smaller sizes (as row-group size can not be larger then a file size).

Should I make sure the "key" (e.g. customer id) that I am most concerned with being shuffled within a batch has the highest degree of variation within each saved parquet file in s3. I assume that depends on your data distribution (not sure I properly understand the idea...).

  • If you have just a handful of records per customer id, you would end up with many small files which will give you good decorrelation but probably slower performance (due to multiple round-trips).
  • If you have a lot of data per customer, you'll still end up with high correlated data since a lot of data for a particular customer would be loaded at once.

Once finished writing, I am not sure that's the answer you were looking for... I guess a lot of it is about trial and error...

Jomonsugi commented 3 years ago

Thanks for the ideas. We can close this issue. If anything, I understand better the options/limitations that petastorm provides. I'll be playing around with loading the data through BatchedDataLoader this week and can open a separate issue if I run into a specific snag not already covered in the solved issues.