uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

LocalDiskArrowCache performance & validity #366

Open panfengfeng opened 5 years ago

panfengfeng commented 5 years ago

There is a large performance gap between LocalDiskArrowCache and LocalDiskCache (LocalDiskCache is much better than LocalDiskArrowCache), and also when I restart the program, it will raise a segment fault when it reads the data from LocalDiskArrowCache in make_batch_read. So any problems with localDiskArrowCache?

selitvin commented 5 years ago

Wan't aware of the issue, so thank you for the report. Can you please provide more information:

pyarrow 0.13 has some ABI conflicts with TF. You may try downgrading to pyarrow 0.12.1 and see if the segfaults occur (the issue will be fixed in pyarrow 0.14)

panfengfeng commented 5 years ago

hi, selitvin, sorry for no detailed descriptions. I just follow the local_disk_arrow_table_cache.py to write test samples. one is disk-cache-create.py

import os
import pandas as pd
import pyarrow as pa
import shutil
from diskcache import FanoutCache

default_settings = {
    'size_limit': 1048576,
    'eviction_policy': 'least-recently-stored',
}

path = '/tmp/disk-cache-test'
if os.path.exists(path):
    shutil.rmtree(path)
shards = 6
cache = FanoutCache(path, shards, **default_settings)

key = "first"

df = pd.DataFrame({'int': [1, 2], 'str': ['a', 'b']})
value = pa.Table.from_pandas(df)
table_pandas = value.to_pandas()
serialized_df = pa.serialize(table_pandas)
components = serialized_df.to_components()

cache.set(key, components)

value = cache.get(key, default=None)

print()
if value is None:
    print('value is None')
else:
    print('value is not None')
    original_df = pa.deserialize_components(value)
    value = pa.Table.from_pandas(original_df, preserve_index=False)
    print(value)

this sample code can work, and also data is persisted into storage device. Another sample code is disk-cache-read.py (just read data from disk cache dir).

import pyarrow as pa
from diskcache import FanoutCache

default_settings = {
    'size_limit': 1048576,
    'eviction_policy': 'least-recently-stored',
}

path = '/tmp/disk-cache-test'
shards = 6
cache = FanoutCache(path, shards, **default_settings)

key = "first"

value = cache.get(key, default=None)

if value is None:
    print('value is None')
else:
    print('value is not None')
    original_df = pa.deserialize_components(value)
    value = pa.Table.from_pandas(original_df, preserve_index=False)
    print(value)

When I run this sample, it raises an segment fault error. pyarrow version is 0.11.1, I also tried 0.12.1, but it failed, too. Is there something wrong with my test code?

selitvin commented 5 years ago

I reduced your example even further. It seems like a bug in arrow code. There is a workaround I found. If you try to serialize just anything before deserializing from components, the crash does not occur. I guess some internal serialization state is not initialized properly when we try to deserialize right away...

Here is the arrow ticket I opened: https://issues.apache.org/jira/browse/ARROW-5260

And here is a PR that uses this workaround: https://github.com/uber/petastorm/pull/368. Can you try and see if it helps your case?

I will look into the performance issue next. Can you provide a bit more info? What are the data types you are using and what is their sizes, approximately. I can try to reproduce and investigate.

panfengfeng commented 5 years ago

Thank you, I will try it. And some performance info are as follows:

24GB data set (1.28 million images), stored as parquet files with 800 mb each with total as 31 parquet files. I have 56 core cpu machine with 180GB RAM.

I use below code

    with make_batch_reader(dataset_url, num_epochs=4, cache_type='local-disk', cache_locations='***', cache_size_limit='**', cache_row_size_estimate='**') as train_reader:
        i = 0
        start = time.time()
        for schema_view in train_reader:
            for j in range(len(schema_view.imagename)):
                i += 1
                if i % 1281167 == 0:
                    end = time.time()
                    print("time is " + str(end - start))
                    start = end
        print(i)

so the local-disk cache I use is LocalDiskArrowTableCache, the four epoch time is 110s, 103s, 97s, 99s, and I change local-disk cache from LocalDiskArrowTableCache to LocalDiskCache, the four epoch time is 54s, 32s, 29s, 28s.

LocalDiskCache code

        value = self._cache.get(key, default=None)
        if value is None:
            value = fill_cache_func()
            self._cache.set(key, value)

        return value

LocalDiskArrowTableCache code

     value = self._cache.get(key, default=None)
        if value is None:
            value = fill_cache_func()
            table_pandas = value.to_pandas()
            serialized_df = pa.serialize(table_pandas)
            components = serialized_df.to_components()
            self._cache.set(key, components)
        else:
            original_df = pa.deserialize_components(value)
            value = pa.Table.from_pandas(original_df, preserve_index=False)

        return value

so we can see that LocalDiskArrowTableCache has "to_pandas", "serialize" and other operations, I think these operations take much overheads on the read process. what is the scenario of LocalDiskArrowTableCache?