ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.04k stars 5.78k forks source link

[dask] Object store fills up too quickly in simple processing script #11565

Open richardliaw opened 4 years ago

richardliaw commented 4 years ago

What is the problem?

We get an OOM though the working set is much smaller than expected.

ray.exceptions.ObjectStoreFullError: Failed to put object ad6a921e910a8533ffffffff0100000001000000 in object store because it is full. Object size is 993654894 bytes.
The local object store is full of objects that are still in scope and cannot be evicted. Tip: Use the `ray memory` command to list active objects in the cluster.
Traceback (most recent call last):
  File "preprocess.py", line 68, in <module>
    res.compute(scheduler=ray_dask_get)
  File "/Users/X/ludwig/env/lib/python3.7/site-packages/dask/base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/X/ludwig/env/lib/python3.7/site-packages/dask/base.py", line 452, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/X/ludwig/env/lib/python3.7/site-packages/ray/util/dask/scheduler.py", line 118, in ray_dask_get
    result = ray_get_unpack(object_refs)
  File "/Users/X/ludwig/env/lib/python3.7/site-packages/ray/util/dask/scheduler.py", line 357, in ray_get_unpack
    computed_result = ray.get(object_refs)
  File "/Users/X/ludwig/env/lib/python3.7/site-packages/ray/worker.py", line 1428, in get
    raise value.as_instanceof_cause()

Reproduction (REQUIRED)

import os
from collections import deque
import pandas as pd
import ray
from ray.util.dask import ray_dask_get
import dask.dataframe as dd
from dask import delayed
ray.init()
movie_titles = dd.read_csv('./movie_titles.csv', 
                           encoding = 'ISO-8859-1', 
                           header = None, 
                           names=['Movie', 'Year', 'Name']).set_index('Movie')
def read_and_label_csv(filename):
    # Load single data-file
    df_raw = pd.read_csv(filename, header=None, names=['User', 'Rating', 'Date'], usecols=[0, 1, 2])
    # Find empty rows to slice dataframe for each movie
    tmp_movies = df_raw[df_raw['Rating'].isna()]['User'].reset_index()
    movie_indices = [[index, int(movie[:-1])] for index, movie in tmp_movies.values]
    # Shift the movie_indices by one to get start and endpoints of all movies
    shifted_movie_indices = deque(movie_indices)
    shifted_movie_indices.rotate(-1)
    # Gather all dataframes
    user_data = []
    # Iterate over all movies
    for [df_id_1, movie_id], [df_id_2, next_movie_id] in zip(movie_indices, shifted_movie_indices):
        # Check if it is the last movie in the file
        if df_id_1<df_id_2:
            tmp_df = df_raw.loc[df_id_1+1:df_id_2-1].copy()
        else:
            tmp_df = df_raw.loc[df_id_1+1:].copy()
        # Create movie_id column
        tmp_df['Movie'] = movie_id
        # Append dataframe to list
        user_data.append(tmp_df)
    # Combine all dataframes
    df = pd.concat(user_data)
    del user_data, df_raw, tmp_movies, tmp_df, shifted_movie_indices, movie_indices, df_id_1, movie_id, df_id_2, next_movie_id
    return df
# create a list of functions ready to return a pandas.DataFrame
file_list = [f'./combined_data_{i+1}.txt' for i in range(4)]
dfs = [delayed(read_and_label_csv)(fname) for fname in file_list]
# using delayed, assemble the pandas.DataFrames into a dask.DataFrame
ratings = dd.from_delayed(dfs)
ratings = ratings.repartition(12)
movie_titles = movie_titles.repartition(npartitions=1)
dataset = ratings.merge(movie_titles, how='inner', left_on='Movie', right_on='Movie')
os.makedirs('dataset.parquet')
res = dataset.to_parquet('dataset.parquet', compute=False)
res.compute(scheduler=ray_dask_get)

Using: the netflix dataset: https://www.kaggle.com/netflix-inc/netflix-prize-data

cc @tgaddair

richardliaw commented 4 years ago

cc @clarkzinzow

tgaddair commented 4 years ago

Thanks for raising this issue @richardliaw.

For more context @clarkzinzow, I'm finding that even with a dataset 1/12 the size (157M), this error is occurring when attempting to write to Parquet (but not, interestingly, when calling compute() with the Ray scheduler). So it appears to be something related to the serialization / parquet writing procedure when using the Ray scheduler.

Let me know if you need more info from me. This is currently a blocker for me, so happy to help in any way I can.

tgaddair commented 4 years ago

I take that back, I'm actually seeing this for compute() calls with Ray as well. Will keep digging into this to find a simpler repro for you.

clarkzinzow commented 4 years ago

In my attempt to reproduce, I'm getting a lot of segfaults, not sure what the cause is yet. I'll keep investigating.

Let me know if you are able to create a simpler reproduction, that might help me see through a lot of the noise here.

clarkzinzow commented 4 years ago

Hmm interesting, computing this without using the Dask-on-Ray scheduler crashed my dev VM! Maybe there's an underlying Dask or Parquet issue at work here. Are you able to run this workload to completion with a non-Ray scheduler? How large of a machine are you running this on?

tgaddair commented 4 years ago

Yes, I can run without the Ray scheduler. I'm using a MacBook Pro with 16GB of RAM.

richardliaw commented 4 years ago

I'm able to run this script on a MBP with 64GB of Ram. I can reliably produce failure with:

GB = 1024 ** 3
ray.init(_memory=4*GB, object_store_memory=2*GB)

This produces a ray.cluster_resources() of:

{'CPU': 16.0, 'memory': 81.0, 'object_store_memory': 28.0, 'node:192.168.1.115': 1.0}

Which is similar to Travis' output of:

{'object_store_memory': 33.0,
 'CPU': 12.0,
 'node:192.168.4.54': 1.0,
 'memory': 97.0}
clarkzinzow commented 4 years ago

@tgaddair @richardliaw, which versions of Ray, Dask, and Pandas are y'all on?

tgaddair commented 4 years ago
ray==1.0.0
dask==2.28.0
pandas==1.1.2

Thanks @clarkzinzow for continuing to look into this!

richardliaw commented 4 years ago
ray==master
dask==2.28
pandas==1.0.5

thanks a bunch @clarkzinzow !

clarkzinzow commented 4 years ago

I'm still working on getting to the bottom of the segfaults on Parquet table writing that I keep hitting, which is concerning in and of itself. I'll try to poke at this a bit more tomorrow.

clarkzinzow commented 4 years ago

I'm assuming that both of y'all have fastparquet installed, and that Dask is using fastparquet and not pyarrow as the Parquet table writer? I get consistent segfaults with pyarrow as the Parquet I/O implementation, and the object store fills up with fastparquet.

richardliaw commented 4 years ago

@clarkzinzow nope didn't know that was a thing! trying it out now.

clarkzinzow commented 4 years ago

@richardliaw Interesting, not sure why the pyarrow Parquet implementation is segfaulting for me and not you. What version of pyarrow are you using?

richardliaw commented 4 years ago
pyarrow                       1.0.1

though maybe we bundle it internally?

clarkzinzow commented 4 years ago

though maybe we bundle it internally?

Not anymore! 😁

richardliaw commented 4 years ago

ah nice! Let me know what other information you need to help debug your segfault.

clarkzinzow commented 4 years ago

I'll probably end up opening a separate issue for the segfaults since it seems to be specific to using the pyarrow Parquet writer on Linux. Given that I have a workaround (using the fastparquet writer instead), I'm going to ignore the segfaults and keep investigating the object store OOMs so we can hopefully unblock @tgaddair's work.

tgaddair commented 4 years ago

Hey @clarkzinzow, sorry for the delayed response. I'm actually using pyarrow, I don't have fastparquet installed. In my case, we use Pyarrow to read the data after Dask writes it, so we may need to stick with pyarrow.

tgaddair commented 4 years ago

I'm using pyarrow 2.0.0 by the way.

clarkzinzow commented 4 years ago

@tgaddair Sweet, thanks! And if pyarrow isn't segfaulting for you, then no need to switch to fastparquet! Given that the OOMs are happening even without writing to Parquet files (just computing dataset), I'm considering it safe for me to debug the bloated object store while using fastparquet as the writer, with the hope that the solution will transfer.

So, despite the source dataset being only 2GiB, I think that the working set for this load + repartition + merge is fundamentally large due to Dask's partitioned dataframe implementation (not due to the implementation of the Dask-on-Ray scheduler), and that this results in a failure when using the Ray scheduler because the working set is limited to the allocated object store memory instead of the memory capacity of the entire machine. Running this sans the Ray scheduler yields a peak memory utilization of 20GiB, a lot of which is a working set of intermediate task outputs that must be materialized in memory concurrently. If you have allocated less than 6GiB to the object store, then you can expect to see OOMs since that object store memory budget can't meet the peak demand.

Until the underlying bloated memory utilization is fixed, you could try allocating e.g. 14GiB of memory to Ray and 7GiB to the object store in particular via

GB = 1024 ** 3
ray.init(_memory=14*GB, object_store_memory=7*GB)

to see if you have better luck, or try running this on a cloud instance or a set of cloud instances with more memory available.

If I'm able to confirm that there are some unnecessary dataframe copies happening in Dask's dataframe implementation, then I'll open an issue on the Dask repo, and I'll double-check to make sure that the object store is freeing unused intermediate task outputs at the expected times.

richardliaw commented 4 years ago

@clarkzinzow thanks a bunch for this thorough evaluation!

So from what I understand, the same memory utilization occurs on both workloads, but the difference is that Dask does not have "memory limits" while Ray enforces memory limits. Is that correct?

clarkzinzow commented 4 years ago

@richardliaw That's my current theory, yes! I do want to find out why the working set is so large during repartitioning and merging, and whether intermediate objects are being garbage collected in a timely manner (which I currently think they are, just want to double-check). Hopefully I'll have some time tomorrow to dig further and open up some issues.