uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Better support for Spark image type #668

Closed RobindeGrootNL closed 3 years ago

RobindeGrootNL commented 3 years ago

I have a spark dataframe with two image columns (read into spark using spark.read.format("image").load(PATH)) and some columns containing strings and arrays. This image type is a struct with some identifying information and the data in binary format as per the schema below (more information here on the image type: https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/image).

image

The issue with this is that I could not find a codec that would work for this type of data in the petastorm codecs. I also tried extracting the 'image.data' column to have the binary format data and then dropped the 'image' column but this did not seem to fix the issue either. The error I get is DecodeFieldError: Decoding field "image_left_data" failed which contains the binary data and I used the CompressedImageCodec('jpeg') in the Unischema that I used with materialize_dataset().

Then I decided to not use materialize dataset anymore and just directly write the dataframe into 'normal' parquet and loading the data using BatchedDataLoader(make_batch_reader()) so it can be used in PyTorch. I still have some issues getting the binary formatted image into a tensor but I think that can be ironed out easily.

I prefer to not use make_spark_converter() as that results in a complete write of the dataset to temp storage every time I run the script. Since writing data is the main cost in my environment I prefer to not use this, and I prefer to write out the dataset in (petastorm-)parquet format just once.

Having more native support for the image type in spark (easy write out/specific codec/etc.) would make this task for me a lot easier and I expect others who work with large image datasets will also benefit a lot from this, so I hope you can provide more information here.

I hope I made my problem and its context clear but don't hesitate to ask more information if you need it! Also, if I am doing something wrong with e.g. codecs or something else or if I should import the images with a different datatype to begin with, then help is also very welcome :).

selitvin commented 3 years ago

I suspect fixing BatchedDataLoader would be a better solution as you could use standard tools for writing the dataset (avoiding petstorm's materialize_dataset). One downside appears to be that the image struct that you show here represents uncompressed image, which increases disk storage by quite a lot.

If you think it's helpful, I can see how hard would it be to add native petastorm support for reading image structs from a parquet store.

RobindeGrootNL commented 3 years ago

Thank you for your quick response!

Adjusting the BatchDataLoader() is indeed probably an easier solution, though I would lose some performance and the type checks of the materialize_dataset() method. I don't think that's too much of an issue for my project.

I do still have a question about how you would ingest images in spark and in what format (spark type) you would have it in your DataFrame and thus also in the parquet that is read in using the make_batch_reader(). For me the PyTorch transforms don't behave the way they should as I have not yet figured out how to read the image into the PyTorch dataloader.

What I have done now is to get the binary string from the image struct and deleted the rest of the data in the image struct, wrote that binary in the parquet and trying to import that into the PyTorch dataloader using the following function: transformers = [transforms.Lambda(lambda x: Image.open(io.BytesIO(x)))]. However, this gives me errors with the image file: Error: cannot identify image file <_io.BytesIO object at 0x7f1a0e507ae0> when I run the following command:

pd_batch['eye_left'] = pd_batch['image_left_data'].map(lambda x: trans(x).ToTensor().numpy()) trans = transforms.Compose(transformers)

It would be great if you can give me some pointers on how to improve my process and make sure I don't run into these issues again. Adjusting the petastorm library is probably not necessary but having some more examples on this sort of workflow (slightly more involved than the currently available examples) and best practices could be helpful. Thank you very much for your help!

selitvin commented 3 years ago

I am not sure how to fix the code. If you can you post a code snippet so I could reproduce your failure easily, I could take a quick look.

RobindeGrootNL commented 3 years ago

I have managed to fix the code and import the images correctly into the dataloader after some conversions. I will show the most important parts of my workflow here to show my process and how there are inefficiencies in the code. Especially with the TransformSpec I faced some issues, making me doubt that I saved the data in the parquet file in the correct format.

This is how I imported the image from disk (.jpg images) and then selected only the image.data column which contains the bytes string format.

imageDF_left = (spark.read.format("image")
                .load(os.path.join(ALT_PATH, 'images', '*2_*_150_left.jpg'))
                .withColumn("image_id", SparkF.regexp_extract("image.origin", "\d{5}_\d{5}", 0))
                .withColumnRenamed("image", "image_left")
                .withColumn("participant_id", SparkF.regexp_extract("image_id", "\d{5}", 0))
                .select(SparkF.col('image_left.data').alias('image_left_data'), 'image_id', 'participant_id'))

This is how the datasets are written to disk, and so far so good.

df_train \
    .coalesce(args.parquet_files_count) \
    .write \
    .mode('overwrite') \
    .option('parquet.block.size', 1024*1024) \
    .parquet(os.path.join('file:///dbfs/' + PARQUET_PATH_TRAIN))

This is the part where it gets more complicated with the transform_spec part. I ended up having to split up the get_transform_spec() functions into separate versions for train and validation since I apply different transformations on the train and validation set (e.g. RandomCrop for training and centercrop for validation). This is not a big issue but it makes my script more cumbersome than necessary. I tried having an argument to one function be is_train with a boolean value denoting True if it was the training set and False if not. Through the TransformSpec I tried feeding this argument to the function but it tried to recognise it as its own array, causing errors.

with BatchedDataLoader(make_batch_reader(os.path.join('file:///dbfs' + PARQUET_PATH, 'train'), 
                                transform_spec=get_transform_spec_train(),
                                num_epochs=None, 
                                workers_count=8), 
                    batch_size=args.batch_size) as train_dataloader
def get_transform_spec_train():
    # Note that the output shape of the `TransformSpec` is not automatically known by petastorm, 
    # so we need to specify the shape for new columns in `edit_fields` and specify the order of 
    # the output columns in `selected_fields`.
    return TransformSpec(partial(transform_row_train), 
                         edit_fields=[('eye_left', np.float32, (3, 128, 128), False),
                                      ('eye_right', np.float32, (3, 128, 128), False),
                                      ('eye_landmarks', np.float32, (12, 1), False),
                                      ('targets', np.float32, (2, 1), False)],
                         selected_fields=['eye_left', 'eye_right', 'eye_landmarks', 'targets'])
                         #removed_fields=['face_landmarks', 'image_id'])

This function was necessary to convert the input bytes string that I get from the dataloader (since that's how I save the image in parquet) to a PIL image type which is what PyTorch needs.

def bytes_to_img(img_str):
    nparr = np.frombuffer(img_str, np.uint8).reshape(150, 150, 3)
    img_np = cv2.cvtColor(nparr, cv2.COLOR_BGR2RGB)
    img = Image.fromarray(img_np)
    return img
transformers = [transforms.Lambda(lambda x: bytes_to_img(x))]
    if is_train is True:
        transformers.extend([
            transforms.RandomResizedCrop(128),
            #transforms.RandomHorizontalFlip(),
        ])
    else:
        transformers.extend([
            #transforms.Resize(128),
            transforms.CenterCrop(128),
        ])
    transformers.extend([
      transforms.ToTensor()
      #transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ])

    trans = transforms.Compose(transformers)

    pd_batch['eye_left'] = pd_batch['image_left_data'].map(lambda x: trans(x).numpy())

In the end, trying to figure all of this out by myself took me many hours due to the limited number of examples and their limited complexity, as well as the limited number of examples available on other websites. I think it would be great if an example could be made available that covers reading images in a conventional format (e.g. jpg or png) into a spark dataframe, writing that to parquet in the most efficient/recommended way, and showing how to load in that data correctly into the DataLoader through petastorm. Something like that would have saved me a lot of time and would probably yield a better/more concise/more efficient solution than what I managed to come up with.

Still, if you notice some quick and easy ways to improve my workflow here then please let me know. Unfortunately, my dataset is not openly available and thus I cannot give my code directly for use in such an example, but if I can contribute in another way let me know!

RobindeGrootNL commented 3 years ago

I'll close this issue now, just send me a message if you need more info!

selitvin commented 3 years ago

Thank you for your detailed message on your solution. Until we come up with a better documentation these examples can serve users that are looking for more information.