ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.93k stars 5.77k forks source link

How to use ray with pandas and scikit-learn #1172

Closed barrachri closed 7 years ago

barrachri commented 7 years ago

Hi, is there already a support for pandas and scikit-learn?

robertnishihara commented 7 years ago

Can you elaborate a bit on the kind of support that you're looking for? You can pass pandas dataframes into (and out of) remote functions. Soon pandas data frames will make more efficient use of shared memory. As for scikit-learn, you can use scikit-learn to define tasks. You should be able to pass scikitlearn objects in and out of remote functions.

barrachri commented 7 years ago

Hello @robertnishihara thanks for the answer.

I was checking the docs and the examples but I didn't find a proper solution.

If I understood correctly you can do something like this (pseudo-code):

import ray
import pandas as pd

# download some data from an s3 bucket
@ray.remote
def create_df(s3):
    return pd.read_csv(s3)

# put the data together
@ray.remote
def put_together(df_1, df_2):
    df = pd.concat([df_1, df_2], ignore_index=True)
    return df

# compute something
@ray.remote
def compute_something(df):
    return df.describe

data = [create_df.remote(s3) for s3 in list_s3]

while len(data) > 1:
    data.append(put_together.remote(data.pop(0), data.pop(0)))

result = compute_something.remote(*data)

ray.get(result)

Is this correct?

With scikit-learn I would like to fit models (ensemble in this case) and then combine the models averaging or by majority.

robertnishihara commented 7 years ago

Yes, the above code looks like it should work. Let me know if you run into any problems. For averaging models, the structure could be the same.

barrachri commented 7 years ago

Thanks, I will try and let you know.

barrachri commented 7 years ago

Tried....

a few issues

import ray
import pandas as pd

ray.init(redis_address="ip")

# download some data from an s3 bucket
@ray.remote
def create_df(s3):
    return pd.read_csv(s3, encoding="utf-8", error_bad_lines=False)
# put the data together
@ray.remote
def put_together(df_1, df_2):
    df = pd.concat([df_1, df_2], ignore_index=True)
    return df

# compute something
@ray.remote
def compute_something(df):
    return df.describe()

list_s3 = [
    's3://gdelt-open-data/events/1979.csv',
    's3://gdelt-open-data/events/1980.csv',
    's3://gdelt-open-data/events/1981.csv',
    's3://gdelt-open-data/events/1982.csv',
]

data = [create_df.remote(s3) for s3 in list_s3]

while len(data) > 1:
    data.append(put_together.remote(data.pop(0), data.pop(0)))

result = compute_something.remote(*data)

ray.get(result)
  File "pyarrow/error.pxi", line 81, in pyarrow.lib.check_status (/ray/src/thirdparty/arrow/python/build/temp.linux-x86_64-3.5/lib.cxx:8092)
pyarrow.lib.ArrowMemoryError: malloc of size 67108864 failed
robertnishihara commented 7 years ago

How much memory is on the machine? Also, what about the size of the object store (this should be printed when you run ray start)?

barrachri commented 7 years ago

3 machines with 1GB each (just for a basic test).

here?

Waiting for redis server at 127.0.0.1:6379 to respond...
Waiting for redis server at 127.0.0.1:59943 to respond...
Starting local scheduler with 1 CPUs, 0 GPUs
Failed to start the UI, you may need to run 'pip install jupyter'.
{'webui_url': None, 'redis_address': 'ip:6379', 'local_scheduler_socket_names': ['/tmp/scheduler37142258'], 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store75305730', manager_name='/tmp/plasma_manager71723891', manager_port=19765)], 'node_ip_address': 'ip'}
barrachri commented 7 years ago
# download some data from an s3 bucket
@ray.remote
def create_df(s3):
    return pd.read_csv(s3, encoding="utf-8", error_bad_lines=False)
# put the data together
@ray.remote
def put_together(df_1, df_2):
    df = pd.concat([df_1, df_2], ignore_index=True)
    return df

# compute something
@ray.remote
def compute_something(df):
    return df.describe()

list_s3 = [
    'http://hci.stanford.edu/courses/cs448b/data/ipasn/cs448b_ipasn.csv' for _ in range(45)
]

data = [create_df.remote(s3) for s3 in list_s3]

while len(data) > 1:
    data.append(put_together.remote(data.pop(0), data.pop(0)))

result = compute_something.remote(*data)

ray.get(result)

With 45 I usually get a similar error, with 40 it works fine.

I guess arrow should handle this more nicely, no?

Error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/ray/worker.py", line 786, in _process_task
    self._store_outputs_in_objstore(return_object_ids, outputs)
  File "/usr/local/lib/python3.5/dist-packages/ray/worker.py", line 713, in _store_outputs_in_objstore
    self.put_object(objectids[i], outputs[i])
  File "/usr/local/lib/python3.5/dist-packages/ray/worker.py", line 355, in put_object
    self.store_and_register(object_id, value)
  File "/usr/local/lib/python3.5/dist-packages/ray/worker.py", line 290, in store_and_register
    object_id.id()), self.serialization_context)
  File "pyarrow/plasma.pyx", line 394, in pyarrow.plasma.PlasmaClient.put (/ray/src/thirdparty/arrow/python/build/temp.linux-x86_64-3.5/plasma.cxx:5069)
  File "pyarrow/serialization.pxi", line 226, in pyarrow.lib.serialize (/ray/src/thirdparty/arrow/python/build/temp.linux-x86_64-3.5/lib.cxx:68724)
  File "pyarrow/serialization.pxi", line 99, in pyarrow.lib.SerializationContext._serialize_callback (/ray/src/thirdparty/arrow/python/build/temp.linux-x86_64-3.5/lib.cxx:66772)
  File "/usr/local/lib/python3.5/dist-packages/ray/pyarrow_files/pyarrow/serialization.py", line 113, in _serialize_pandas_dataframe
    return serialize_pandas(obj).to_pybytes()
  File "/usr/local/lib/python3.5/dist-packages/ray/pyarrow_files/pyarrow/ipc.py", line 166, in serialize_pandas
    writer.write_batch(batch)
  File "pyarrow/ipc.pxi", line 202, in pyarrow.lib._RecordBatchWriter.write_batch (/ray/src/thirdparty/arrow/python/build/temp.linux-x86_64-3.5/lib.cxx:60518)
  File "pyarrow/error.pxi", line 81, in pyarrow.lib.check_status (/ray/src/thirdparty/arrow/python/build/temp.linux-x86_64-3.5/lib.cxx:8092)
pyarrow.lib.ArrowMemoryError: malloc of size 67108864 failed
robertnishihara commented 7 years ago

What would you prefer the behavior to be in this case? We should be able to do more to reduce the memory footprint of pandas dataframes down the road..

barrachri commented 7 years ago

I think fragmenting the operation in a way that is not going to saturate the memory of single machines.

Let's say that the sum of the s3 files is 20gb and the memory of the cluster sum up to 10gb.

You can raise a memory error but is not what you expect from a distributed library I think.

So the preferred behaviour would be to chunk the pandas operations.

in pseudo-code

s3 = files # tuple or list
s3_size = sum([file.size for file in s3])

if s3_size > ram:
    # read in chunks
else:
    # read the whole file in memory

I would be happy to work on this if help is needed.

robertnishihara commented 7 years ago

I see, some of this could be addressed by spilling to disk #1157.

Another approach would be to improve the scheduling algorithm to schedule things in a way that is likely to use less memory. That's probably out of scope for now.

Another approach would be to have a dataframes library that implements operations lazily and submits task in a way that requires less memory.

barrachri commented 7 years ago

Thanks for the feedback. Closing here, moving to #1157.