uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

make_reader() is taking forever #720

Open GraceHLiu opened 3 years ago

GraceHLiu commented 3 years ago

I created a large petastorm data that holds more than 50,000 samples of data. Each sample has 80+columns of 1d, 2d and 3d data.

I partitioned the data by geohash and sample type when saving them out:

with materialize_dataset(spark, uri, schema, row_group_size_mb=256):
                self.rdd.map(lambda x: dict_to_spark_row(schema, to_dict(x))).toDF(
                    schema=schema.as_spark_schema()
                ).repartition(repartition).write.partitionBy(['geohash', 'field_type']).mode(mode).parquet(uri)

When I had only 5 geohashes of data saved, fitting 1 epoch of my model using data under one geohash took ~8 mins. However, when I had about ~20 geohashes in under uri, the code would stuck at make_reader() forever.

# I'm not making it a context manager because I needed to return the data and reader at the end for # further processing in a different function
train_reader = make_reader(
    uri, 
    schema_fields=load_fields, # I was only loading four 2D columns
    filters= [ ("geohash","=",geohash), ("field_type", "=", f"{dataset}_train")],
    shuffle_row_groups=True,
    num_epochs=None,
    workers_count=1,
)
train = make_petastorm_dataset(train_reader).map(func)
val_reader = make_reader(
    uri, 
    schema_fields=load_fields, # I was only loading four 2D columns
    filters= [ ("geohash","=",geohash), ("field_type", "=", f"{dataset}_val")],
    shuffle_row_groups=True,
    num_epochs=None,
    workers_count=1,
)
val = make_petastorm_dataset(val_reader).map(func)
train = train.batch(8).shuffle(shuffle_buffer=100,reshuffle_each_iteration=True).repeat(2)
val = val.batch(8).repeat(2)
model.fit(
        train, validation_data=val, epochs=2, verbose=1,steps_per_epoch = 100,v alidation_steps = 20
    )

Is that normal for make_reader() to take a long time when there's a lot of data under the data uri?

selitvin commented 3 years ago

What is the total number of partitions that you get on disk? Having large amount of partitions will definitely slow down make_reader as the underly pyarrow parquet implementation would be scanning for all files in the dataset directory. If number of partitions is large, this directory tree traversal will indeed be expensive.

GraceHLiu commented 3 years ago

Thanks for the quick reply! I set repartition=100 resulting each parquet file being 1~10mb but mostly on the smaller side. I guess I could lower the number of repartition since the files are so small. Or do you have a recommended number of partition?

A follow up question: does pyarrow parquet implementation scan all files under the parent uri even if I'm filtering to a specific geohash and file_type?

selitvin commented 3 years ago

What is the goal of partitioning into such small files? Typically larger files are more friendly for a distributed file system like HDFS and s3, but again, it depends on the application and your data consumption patterns.

I am not sure about the current implementation of pyarrow. I'd need to step through the code to understand exactly what kind of scanning is going on. I wouldn't be surprised if it would scan all directories recursively.

Are you trying to load data from all partitions at once? If you need access to a single partition, then you could specify partition directory as the path for make_reader and if you are trying to read from all, then having fewer files would help.

What kind of storage are you using? s3, hdfs or something else?

GraceHLiu commented 3 years ago

What is the goal of partitioning into such small files? Typically larger files are more friendly for a distributed file system like HDFS and s3, but again, it depends on the application and your data consumption patterns.

I am not sure about the current implementation of pyarrow. I'd need to step through the code to understand exactly what kind of scanning is going on. I wouldn't be surprised if it would scan all directories recursively.

Are you trying to load data from all partitions at once? If you need access to a single partition, then you could specify partition directory as the path for make_reader and if you are trying to read from all, then having fewer files would help.

What kind of storage are you using? s3, hdfs or something else?

GraceHLiu commented 3 years ago

I can definitely reduce the number of partition and the small file sizes does not help my application. I'm trying to load files from all geohash but one field_type (both are partition keys). I'm thinking if it scans the entire directory recursively, I will need to save the data with different field_type in different uri and partition just by geohash (instead of saving everything under the same uri and partition by both geohash and field_type). (I'm dealing with 160+ geohashes and 9 field types)

When I try to include partition keys at the end of my uri that's passed to make_reader (e.g.: uri + '/geohash=some_geohash/'). It just complains that make_reader only works for petastorm files (see the entire error message below).

I'm using s3.

/usr/local/anaconda/envs/py3/lib/python3.7/site-packages/petastorm/reader.py:155: UserWarning: Currently make_reader supports reading only Petastorm datasets. To read from a non-Petastorm Parquet store use make_batch_reader
  warnings.warn('Currently make_reader supports reading only Petastorm datasets. '
---------------------------------------------------------------------------
PetastormMetadataError                    Traceback (most recent call last)
/usr/local/anaconda/envs/py3/lib/python3.7/site-packages/petastorm/etl/dataset_metadata.py in infer_or_load_unischema(dataset)
    413     try:
--> 414         return get_schema(dataset)
    415     except PetastormMetadataError:

/usr/local/anaconda/envs/py3/lib/python3.7/site-packages/petastorm/etl/dataset_metadata.py in get_schema(dataset)
    363         raise PetastormMetadataError(
--> 364             'Could not find _common_metadata file. Use materialize_dataset(..) in'
    365             ' petastorm.etl.dataset_metadata.py to generate this file in your ETL code.'

PetastormMetadataError: Could not find _common_metadata file. Use materialize_dataset(..) in petastorm.etl.dataset_metadata.py to generate this file in your ETL code. You can generate it on an existing dataset using petastorm-generate-metadata.py

During handling of the above exception, another exception occurred:

IndexError                                Traceback (most recent call last)
<timed exec> in <module>

/usr/local/anaconda/envs/py3/lib/python3.7/site-packages/petastorm/reader.py in make_reader(dataset_url, schema_fields, reader_pool_type, workers_count, pyarrow_serialize, results_queue_size, shuffle_row_groups, shuffle_row_drop_partitions, predicate, rowgroup_selector, num_epochs, cur_shard, shard_count, shard_seed, cache_type, cache_location, cache_size_limit, cache_row_size_estimate, cache_extra_settings, hdfs_driver, transform_spec, filters, storage_options, zmq_copy_buffers, filesystem)
    189                       worker_class=PyDictReaderWorker,
    190                       is_batched_reader=False,
--> 191                       **kwargs)
    192     except PetastormMetadataError as e:
    193         logger.error('Unexpected exception: %s', str(e))

/usr/local/anaconda/envs/py3/lib/python3.7/site-packages/petastorm/reader.py in __init__(self, pyarrow_filesystem, dataset_path, schema_fields, shuffle_row_groups, shuffle_row_drop_partitions, predicate, rowgroup_selector, reader_pool, num_epochs, cur_shard, shard_count, cache, worker_class, transform_spec, is_batched_reader, filters, shard_seed)
    406                                          filters=filters)
    407 
--> 408         stored_schema = infer_or_load_unischema(self.dataset)
    409 
    410         if isinstance(schema_fields, NGram):

/usr/local/anaconda/envs/py3/lib/python3.7/site-packages/petastorm/etl/dataset_metadata.py in infer_or_load_unischema(dataset)
    416         logger.info('Failed loading Unischema from metadata in %s. Assuming the dataset was not created with '
    417                     'Petastorm. Will try to construct from native Parquet schema.')
--> 418         return Unischema.from_arrow_schema(dataset)

/usr/local/anaconda/envs/py3/lib/python3.7/site-packages/petastorm/unischema.py in from_arrow_schema(cls, parquet_dataset, omit_unsupported_fields)
    315         :return: A :class:`Unischema` object.
    316         """
--> 317         meta = parquet_dataset.pieces[0].get_metadata()
    318         arrow_schema = meta.schema.to_arrow_schema()
    319         unischema_fields = []

IndexError: list index out of range
selitvin commented 3 years ago

Oh, you are right. Using make_reader would require you to pass top level directory. What kind of data types do you store in your dataset? Do you have ndarrays there or only scalars? If later, perhaps make_batch_reader could be a good solution (then you also don't need to use materialize_dataset when writing your data)

GraceHLiu commented 3 years ago

I have both ndarrays and scalars in my dataset...

Does the parquet file size matter to make_reader? For example, if I have a small amount of parquet files but each file is very big, does it slow down make_reader?

selitvin commented 3 years ago

Big file sizes are typically healthier and should not slow petastorm down. Please note that rowgroup defines the minimal number of rows that is read from a parquet file - typically it's recommended for a rowgroup to be of an order of high-tens or better hundreds of megabytes.

GraceHLiu commented 3 years ago

Thanks for bringing that up! I don't think I have a very good understanding of the concept of row group size.

By rowgroup do you mean the parameter row_group_size_mb in materialize_dataset? If so, I've set that at 256, but I'm not quite sure how that plays together number repartition.

In my case, I had number of repartition equals to 100. With that setting (together with 2 partition keys), I ended up having parquet files sized at 1-10 MB. Does setting repartition and partitionkey overwrite row_group_size_mb?

selitvin commented 3 years ago

row_group_size_mb is an upper bound for a rowgroup size not to grow beyond. But a rowgroup can not span multiple files, so if you have many partition and each is up to 10MB, this means that your rowgroups are also under 10MB.

GraceHLiu commented 3 years ago

I've rerun my pipeline to reduce the number of files. I currently have ~120 files (total size = 24GB) under the top level directory. make_reader is still taking forever to run.

GraceHLiu commented 3 years ago

I'm currently leaving cur_shard and shard_count parameters as default. Could that contribute to the slowness?

selitvin commented 3 years ago

I think cur_shard/shard_count should not be a problem. Do you know which part of the code is slow? Can you point to the lines/function that slows things down?

GraceHLiu commented 3 years ago

The code would just stuck at this line:

train_reader = make_reader(
    uri, 
    schema_fields=load_fields, # I was only loading four 2D columns
    filters= [ ("geohash","=",geohash), ("field_type", "=", f"{dataset}_train")],
    shuffle_row_groups=True,
    num_epochs=None,
    workers_count=1,
)

Or do you mean which line inside the make_reader function?