uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

_common_metadata file gets corrupted #515

Closed filipski closed 4 years ago

filipski commented 4 years ago

For some reason the _common_metadata file in my data set has been corrupted. Instead of proper parquet content, it has just 4 bytes (zipped here just due to GitHub limitation for attaching files): _common_metadata.zip Every time I try to append to the set now, I get pyarrow.lib.ArrowIOError: Invalid Parquet file size is 4 bytes, smaller than standard file footer (8 bytes) exception:

2020-03-23 14:33:07,177 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Traceback (most recent call last):
  File "./test_ingest.py", line 75, in <module>
    main()
  File "./test_ingest.py", line 71, in main
    ingest_folder("../../annotated_data/ideon_val/images", spark)
  File "./test_ingest.py", line 62, in ingest_folder
    print('DONE')
  File "/home/hduser/miniconda3/envs/petastorm/lib/python3.6/contextlib.py", line 88, in __exit__
    next(self.gen)
  File "/home/hduser/projects/petastorm_ingest/petastorm/etl/dataset_metadata.py", line 113, in materialize_dataset
    validate_schema=False)
  File "/home/hduser/miniconda3/envs/petastorm/lib/python3.6/site-packages/pyarrow/parquet.py", line 1037, in __init__
    memory_map=memory_map
  File "/home/hduser/miniconda3/envs/petastorm/lib/python3.6/site-packages/pyarrow/parquet.py", line 1503, in read_metadata
    return ParquetFile(where, memory_map=memory_map).metadata
  File "/home/hduser/miniconda3/envs/petastorm/lib/python3.6/site-packages/pyarrow/parquet.py", line 137, in __init__
    read_dictionary=read_dictionary, metadata=metadata)
  File "pyarrow/_parquet.pyx", line 1036, in pyarrow._parquet.ParquetReader.open
  File "pyarrow/error.pxi", line 80, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Invalid Parquet file size is 4 bytes, smaller than standard file footer (8 bytes)

Any idea why did my metadata got corrupted? Have you experienced the same? How to prevent this? How to fix that broken metadata for the existing data set?

The code

#!/usr/bin/env python3

import os, sys
import numpy as np
from glob import glob
from datetime import date

import cv2

from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructField, StructType, IntegerType, BinaryType, StringType, TimestampType, DateType

from petastorm.fs_utils import FilesystemResolver
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField

ROWGROUP_SIZE_MB = 128 # The same as the default HDFS block size

# The schema defines how the dataset schema looks like
ImageSchema = Unischema('ImageSchema', [
    UnischemaField('date', np.datetime64, (), ScalarCodec(DateType()), False),
    UnischemaField('dataset_name', np.string_, (), ScalarCodec(StringType()), False),
    UnischemaField('path', np.string_, (), ScalarCodec(StringType()), False),
    UnischemaField('image', np.uint8, (1080, 1280, 3), CompressedImageCodec('png'), False)
])

#output_url = "file:///tmp/petastorm_ingest_test"
output_url = "hdfs:///data/petastorm_ingestion_tests"

def ingest_folder(images_folder, spark):

    # List all images in the folder
    image_files = sorted(glob(os.path.join(images_folder, "*.png")))

    # Switching from default libdfs3 to libdfs as the former crashes while storing metadata
    resolver=FilesystemResolver(output_url, spark.sparkContext._jsc.hadoopConfiguration(), hdfs_driver='libhdfs')
    with materialize_dataset(spark, output_url, ImageSchema, ROWGROUP_SIZE_MB, filesystem_factory=resolver.filesystem_factory()):
    #with materialize_dataset(spark, output_url, ImageSchema, ROWGROUP_SIZE_MB):

        print('STAGE 1')
        input_rdd = spark.sparkContext.parallelize(image_files) \
            .map(lambda image_path:
                    {ImageSchema.date.name: date.today(),
                     ImageSchema.dataset_name.name: 'sample_set',
                     ImageSchema.path.name: image_path,
                     ImageSchema.image.name: cv2.imread(image_path)})

        print('STAGE 2')
        rows_rdd = input_rdd.map(lambda r: dict_to_spark_row(ImageSchema, r))

        print('STAGE 3')

        spark.createDataFrame(rows_rdd, ImageSchema.as_spark_schema()) \
                .repartition(1) \
                .write \
                .mode('append') \
                .partitionBy('date', 'dataset_name') \
                .option('compression', 'none') \
                .parquet(output_url)
        print('DONE')

def main():

    # Start the Spark session
    spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[*]').getOrCreate()    
    sc = spark.sparkContext

    # Ingest images and annotations for a given folder
    ingest_folder("../images", spark)
    print('Finished')

if __name__ == '__main__':
    main()
filipski commented 4 years ago

Any ideas, at least how to fix that broken metadata for the existing data set?

selitvin commented 4 years ago

Was not able to reproduce using your example:

$ ll /tmp/petastorm_ingest_test/_common_metadata 
-rw-r--r-- 1 yevgeni uberatc 6774 Mar 26 22:45 /tmp/petastorm_ingest_test/_common_metadata

I also tried using our CI docker image to run your example (was running from petastorm workspace directory):

docker run -it -e PYTHONPATH=/petastorm -v `pwd`:/repro -v `pwd`:/petastorm selitvin/petastorm_ci_auto:ci-2020-31-01-09 bash -c "source /petastorm_venv3.7/bin/activate && python3 /repro/repro_515.py "

(tweaked repro_515.py to fake images: https://gist.github.com/selitvin/7150398b11db9f26c4f03cb9dbf9e679)

filipski commented 4 years ago

Thanks for trying. It's HDFS on the following setup: petastorm==0.8.2 pyspark=2.4.4=py_0

I guess it will be difficult to reproduce, it happened once in couple of hundreds ingestions I've done so far. What you can reproduce, is just the exception on next attempts to append to the set, if you replace your _common_metadata file with the one I attached.

It might be related to a restart of a name+data node of the cluster a few minutes after the ingestion, but it was well after the script finished, so all data should be there on HDFS already (just maybe not replicated to other nodes, I have replication ratio set to 3.0).

Is there an easy way to re-create proper _common_metadata file for an existing data set? I presume it would require scanning through all the files, but it's better than re-creating the whole set from scratch...

selitvin commented 4 years ago

Oh, I see. Did not realize this is a one-off thing. A small trick you can use to reconstruct _common_metadata is just trick materialize_dataset to redo its recreation like this:

    with materialize_dataset(spark, output_url, ImageSchema, ROWGROUP_SIZE_MB, filesystem_factory=resolver.filesystem_factory()):
        pass

Hope that works for you...

filipski commented 4 years ago

Thanks. It indeed works, but with some quirks. _common_metadata file is generated, however if I run the code multiple times (without any changes to the underlying data set given in output_url), the size of that file grows. I tried using parquet-tools to inspect the content of that file after each execution:

hadoop jar /usr/local/tools/parquet-tools-1.10.1.jar meta /data/petastorm_ingestion_tests/_common_metadata

and it seems the part which grows is this one:

extra: ARROW:schema =

The other fields look stable and seem to contain correct content, namely:

extra:                   dataset-toolkit.unischema.v1 = 
extra:                   dataset-toolkit.num_row_groups_per_file.v1 = 
extra:                   org.apache.spark.sql.parquet.row.metadata = 

I compared the content of dataset-toolkit.num_row_groups_per_file.v1 with the content of the folders in my data set and all unique parquet files are listed there, so this looks fine. org.apache.spark.sql.parquet.row.metadata also matches my Unischema.

I did some debugging and it looks to me that the content of the ARROW:schema changes in this line: https://github.com/uber/petastorm/blob/b425e435a5004d56d2618021d9e12fb88b939810/petastorm/utils.py#L117 but I'm not sure what exactly to_arrow_schema() does.

So, the only thing which worries me a bit now is why does the ARROW:schema = grow every time I run your empty materialize_dataset section on unchanged data set. Does it have any negative impact?

selitvin commented 4 years ago

I am not sure if there is a negative impact. Would hope not, as long as you are able to read all fields from the dataset. Perhaps, deleting _common_metadata again and restoring with above method would get rid of whatever extra weight was added to the ARROW:schema? (sorry I do not have indepth understanding into the mechanics of pyarrow usage of _common_metadata)

filipski commented 4 years ago

Thanks, let's close it for now. Final questions - do you experience this growth when appending data to your data sets? Does it affect the performance of your writes or reads?

selitvin commented 4 years ago

Personally I have never implemented a scenario of appending data to datasets in our org setup - all datasets were immutable. I assume that all extensions would be done under materialize_dataset, since Petastorm keeps a list of all row-groups in the metadata and it has to include newly added parquet files. Don't see a reason for a performance degradation, from the top of my head, but I can not say for sure without trying/measuring.

filipski commented 4 years ago

By appending I meant adding new parquet files to the partitioned structure, as in the code above, as the parquets themselves are indeed immutable. Thanks for your help, I'll monitor this to see if it affects the performance when my data set grows bigger.