uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Python running out of RAM #441

Closed Mmiglio closed 5 years ago

Mmiglio commented 5 years ago

Problem

Python process killed while creating a reader because it runs out of RAM.

How to reproduce

Dataset Generation

Dataset is generated by modifying the schema of the Hello World example. In my dataset I have two columns: an integer id and a nd array with shape 801x19.

import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField

# The schema defines how the dataset schema looks like
HelloWorldSchema = Unischema('HelloWorldSchema', [
    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('array', np.float32, (801, 19), NdarrayCodec(), False),
])

def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return {'id': x,
            'array': np.random.randn(801, 19).astype(np.float32)}

def generate_petastorm_dataset(output_url='file:///tmp/hello_world_dataset'):
    rowgroup_size_mb = 256

    spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
    sc = spark.sparkContext

    # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
    # well as save petastorm specific metadata
    rows_count = 10000
    with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):

        rows_rdd = sc.parallelize(range(rows_count))\
            .map(row_generator)\
            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

        spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
            .write \
            .mode('overwrite') \
            .parquet(output_url)

if __name__ == '__main__':
    generate_petastorm_dataset()

Issue

If I run

from petastorm import make_reader
reader = make_reader('file:///tmp/hello_world_dataset', schema_fields=['array'])
print(next(iter))

after a couple of seconds the process gets killed because it runs out of memory. What is causing it? I tried to play around with reader parameters and rowgroup_size_mb during the dataset generation, but I didn't find a solution.

selitvin commented 5 years ago

Can you please take a look at this message? It explains how to estimate your memory footprint. Let me know if it does not help, so we can investigate further: https://github.com/uber/petastorm/issues/306#issuecomment-461511736

Mmiglio commented 5 years ago

Thanks for the reply. I read the comment and I have one question: how do you compute the number of rows in a rowgroup? I can't understand how you get ~600 rows starting from a rowgroup size of 256MB. Because that can be the only issue since

memory footprint = 10 35MB + 5060KB ~ 350MB

selitvin commented 5 years ago

Don't remember all the details of that thread. I would imagine that it would be row-group-size-in-mb/size-of-the-row-compressed.

You can double check this value by using parquet-tools (https://github.com/apache/parquet-mr/tree/master/parquet-tools). You can also open your parquet dataset using some software API (e.g. pyarrow). You should be able to load a single row-group and look at the rows count.

Mmiglio commented 5 years ago

Thanks for the help, I managed to solve the problem by tuning rowgroup_size_mb and workers_count.