uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Petastrom fails with big datasets #241

Open eliorc opened 5 years ago

eliorc commented 5 years ago

Using the code from the repo Github main page as reference, my code looks like follows:

spark = SparkSession.builder.config('spark.driver.memory', '10g').master('local[4]').getOrCreate()
sc = spark.sparkContext

with materialize_dataset(spark=spark, dataset_url='file:///opt/data/hello_world_dataset',
                         schema=MySchema, row_group_size_mb=256):

    logging.info('Building RDD...')
    rows_rdd = sc.parallelize(ids)\
        .flatMap(row_generator)\  # Generator that yields lists of examples
        .map(lambda x: dict_to_spark_row(MySchema, x))

    logging.info('Creating DataFrame...')
    spark.createDataFrame(rows_rdd, MySchema.as_spark_schema()) \
        .coalesce(10) \
        .write \
        .mode('overwrite') \
        .parquet('file:///opt/data/hello_world_dataset') 

Now the RDD code executes successfully but fails only the .createDataFrame call with the following error:

_pickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 4GiB

This is my first experience with Spark, so I can't really tell if this error originates in Spark or Petastorm.

Looking through other solutions to this error (in respect to Spark, not Petastorm) I saw that it might have to do with the pickling protocol, but I can't confirm that, neither did I find a way of altering the pickling protocol.

How could I avoid this error?

rgruener commented 5 years ago

This is definitely a spark problem. I believe that what is happening is that when spark is serializing the data between the python and java processes it is failing since you have a field/row that is larger than 4Gb. What version of spark are you using? How big are your fields/rows?

selitvin commented 5 years ago

One more thing to try is to explicitly set number of partitions to a larger number. It all depends on the number of records and the size of the dataset, as @rgruener mentioned. e.g.:

rows_rdd = sc.parallelize(ids, 10000)

(try playing with 10000)

eliorc commented 5 years ago

@rgruener The fields which are not in the shape of a number, are arrays of maximum length of 500. I have about 80 fields, which about half of them are arrays, the rest are number and one of them is a string. About the rows, we are talking about millions.

My Spark version is 2.4.0

@selitvin I have 81K~ ids, each one yield on average 7 dictionaries from the row_generator. I have tried 81K, 100,000, and 1,000,000. All fail with the same error.

Previously I solved this using TFRecords but since they are so hard to manipulate and slow to access specific rows, we decided to give Petastorm a try.

eliorc commented 5 years ago

Solved it, It seems that it does not manage to to serialize objects bigger than 4GB when transferring the work to the different processes on the parallelize call.

Adding the following code before starting a Spark session solved the issue

from pyspark import broadcast
import pickle

def broadcast_dump(self, value, f):
    pickle.dump(value, f, 4)  # was 2, 4 is first protocol supporting >4GB
    f.close()

    return f.name

broadcast.Broadcast.dump = broadcast_dump
selitvin commented 5 years ago

Are you using all 80 columns or you are selecting just some of them when you create Reader? At the moment, Petastorm does not handle datasets with relatively small field sizes (we are using it with multi-megabytes images). We are currently working to make it more efficient in the small-field/row scenario.

As a tuning option that may make reading the data somewhat faster, is to create the reader with the following parameters:

make_reader(...., pyarrow_serialize=True, reader_pool_type='process')
eliorc commented 5 years ago

I am using all of them when I create a Reader. Unfortunately I seem to get an error when I try to run from petastorm import make_reader, says there is no method called that way.

I have successfully written the Petastorm dataset, but I can't even seem to be able to read it using the following code

from petastorm.reader import Reader
from petastorm.tf_utils import make_petastorm_dataset
import tensorflow as tf

reader = Reader('file:///data/hello_world_dataset')  # this is actually my custom dataset
dataset = make_petastorm_dataset(reader)
iterator = dataset.make_initializable_iterator()
tensor = iterator.get_next()

sess = tf.Session()
sess.run(iterator.initializer)
sess.run(tensor)

Seems to get stalled infinitely

selitvin commented 5 years ago

I suspect that it is simply super slow because of the kind of record structure that you have. As I mentioned it is very different from the data we use Petastorm internally and we'll have a good solution for this in the next release.

Can you try generating a small dataset: say with just 5 records? Then we can check if the problem is the speed of reading or that it hangs for some other reason.

Some more ideas to try: read directly from Python, so you don't go via Tensorflow at all:

reader = Reader('file:///data/hello_world_dataset')
sample = next(reader)

Another option to try to speed up reads as for now is:

reader = Reader('file:///data/hello_world_dataset', reader_pool=ProcessPool(5, pyarrow_serialize=True))
sample = next(reader)
eliorc commented 5 years ago

I'm back at TF Records ATM, so I don't have the availability to try the 5 records test.

I did though try the next(reader) calls, both of them stall too long too

selitvin commented 5 years ago

Got it. I'll update the ticket when we are out with the new version that should handle your case well.

shaywinter commented 5 years ago

@selitlevin Following on above thread, saw you released 0.5.0 10 days ago - is this the version that is supposed to resolve this issue?

selitvin commented 5 years ago

Yes, you can now use make_batch_reader to open a dataset that was not created by Petastorm. The performance issues that are reported in this issue should not occur. 0.5.0 support some, but not all scalar types. Pending PRs: #273 and #272 extend support for date/timestamp and array types. Planning to release them as part of 0.5.1 after the holidays.

shaywinter commented 5 years ago

Reading works for me now albeit I noticed that most time is spent on first next(reader), and that this time is linear in data length - could it be that you're scanning all data on first read? Can it be avoided? In parallel I'm trying make_batch_reader which doesn't have this issue.

selitvin commented 5 years ago

Since 0.5.1 was released we support reading non Petastorm datasets. Your column-types, as you have described them, are all native-Parquet compliant (e.g. you don't have multidimensional tensors, or images). I think it would be best to create a Parquet dataset without materialize_dataset context manager using pure Parquet types only and then read it using make_batch_reader. This should have the most optimal performance.

We don't have a good example that shows the full flow I just described, except for the unit-test setup code, but we plan to add one: #286.

shaywinter commented 5 years ago

My datatypes include sequences therefore I cannot use make_batch_reader. I did succeed in getting good standalone (without tensorflow) performance using process pool - however when I tried this with tensorflow, it didn't work (generated processes were killed and didn't deliver results) - Do you have any working example of petastorm process-parallelization with tensorflow?

selitvin commented 5 years ago

My datatypes include sequences therefore I cannot use make_batch_reader. make_batch_reader does support Parquet array types. If you are using these, your should be fine. If the code still fails for some reason, would be happy to see a call stack so we can address the issue. I would strongly recommend using make_batch_reader if it is possible in your case as it provides best performance.

There should not be any difference between process and thread pools in the way they are used. Using a process pool is more memory wasteful. Could it be that using multiprocessing triggers violation of some memory-cap? Can it be increased?

You can also try and minimize memory consumption by reducing the number of processes or the size of the shuffling queue (if you have one set up in the call to tf_tensors).

YunseokJANG commented 4 years ago

I have a question related to this.

When I tried to convert a large dataset (10TB hdf5 file) with the example code in the local server, I think the RDD causes an "out of memory" error.

Random guess) I guess the RDD is processed step by step. To overcome such an issue, I think the sample code should feed the old data to the next RDD once the number of rows exceeds some limit. However, as a newbie to Spark, I failed to find a way to set this logic. Could you give me a hint about how I can change the behavior of it?

selitvin commented 4 years ago

Getting an OOM is a very common thing with pyspark - when working with large data, especially images/sensor data. While there could be many different reasons for this, let me try pointing into some directions. I assume, you are running on a single machine with a 'local' spark, i.e.:

spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()

If you could some code example that could also help...

YunseokJANG commented 4 years ago

Thanks for your reply. Actually, I've tried all other bullet points except the "64MB per row-group". However, it seems like pushing every data to RAM whenever passing to a new RDD (if my understanding is correct. So, per each RDD it processed like a MapReduce, instead of processing as a stream), and that causes the OOM. In my case, there are 200K items, and one item takes around 20M. I will try the third option if I have some more time. Thanks again!

miguelalonsojr commented 3 years ago

@YunseokJANG Did you ever resolve your issue? I'm facing similar problems, OOM, for large datasets, ~100k.

YunseokJANG commented 3 years ago

(just in case) @miguelalonsojr I actually no longer handling this issue (changed the dataset to a smaller one). Hope the maintainers resolved this issue.