ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.11k stars 5.6k forks source link

[data] Reduce memory usage for shuffle ops #42146

Open stephanie-wang opened 8 months ago

stephanie-wang commented 8 months ago

Description

Sort and random_shuffle both build a sorted index to shuffle the rows in a block. This can add significant memory overhead if the size of each row is very small. Ideally, we would sort/shuffle rows in place to avoid the memory overhead.

Use case

No response

stephanie-wang commented 7 months ago

Code links for index creation:

alejandroarmas commented 7 months ago

Hi @stephanie-wang is this ticket already assigned to someone? I would like to make my first contribution to Ray and thought this would be a great starting point. Let me know if you'd like me to move forward :D

alejandroarmas commented 7 months ago

@scottjlee @stephanie-wang I'm new to the open-source development world, so I really want to give this one a go! However, I just wanted to check in and make sure I'm understanding the problem correctly. I wrote down what I think the problem is in the Problem Statement section and also some assumptions I have in the section after. Could you please let me know if I have misunderstood something or if there is a gap in my understanding? Thank you in advance!!

Problem Statement

Issue Being Solved: Significant memory overhead during index creation, if the size of each row is very small (i.e. not many columns)

Scope: The problem is limited to the sort and random_shuffle functions, which both build a numpy.array to reorder the rows in a given PyArrow block.

PyArrow Sort Relies on Index:

When the DataContext has context.use_polars = False, we default to a sort method that relies on an index: https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/arrow_block.py#L60 However, this is not true for when context.use_polars = True

More specifically, the pyarrow.compute API has a method that returns the indices that would sort an array, record batch or table. This is used in transform_pyarrow.py: https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/arrow_ops/transform_pyarrow.py#L15

Random Shuffle Relies on Index

The return self.take(random.permutation(self.num_rows())) on https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/arrow_block.py#L255 relies on an index argument (which is a random permutation of the integers from 0 to the number of rows in the table). This is input into the take : https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/arrow_ops/transform_pyarrow.py#L19

Question: What should be assume to be a small number of rows? 1, 2, 3 or fewer?

Assumptions

ArrowBlockAccessor is the name of the class that needs to be changed. It inherits from the BlockAccessor class and has data referenced by the self._table data member of type pyarrow.Table.

PyArrow: is a standardized in-memory columnar data format. It optimizes for:

  1. columnar data processing: for use cases that involve selecting, filtering, and aggregating large columns
  2. Efficient serialization and deserialization of data
  3. high performance: SIMD, multi-threading for high performance
  4. memory efficiency: features like zero-copy serialization and compression reduce memory usage

Shuffling: For each column in a table, the indices of a shuffle must be identical.

>>> table
pyarrow.Table
name: string
age: int64
score: double
----
name: [["Alice","Bob","Charlie"]]
age: [[25,30,35]]
score: [[95.5,88,92.5]]

Let's say we have our columns in index 0 and 1 shuffled, we should expect the output to be something like

>>> table
pyarrow.Table
name: string
age: int64
score: double
----
name: [["Bob","Alice","Charlie"]]
age: [[30,25,35]]
score: [[88,95.5,92.5]]

Sorting:

If we instead wanted to sort our table and has out sort_key = "score", then we should expect the output to be something like this:

>>> table
pyarrow.Table
name: string
age: int64
score: double
----
name: [["Bob","Charlie","Alice"]]
age: [[30,35,25]]
score: [[88,92.5,95.5]]

Permutation Function Provided with a self.num_rows() = 3, the permutation function will generate a list of indices in random order.

>>> np.random.RandomState(69_420).permutation(3)
array([0, 1, 2])
>>> np.random.RandomState(69_420).permutation(3)
array([0, 1, 2])
>>> np.random.RandomState(69_421).permutation(3)
array([1, 0, 2])
>>> np.random.RandomState(69_425).permutation(3)
array([2, 1, 0])

Potential for Large In-Memory Overhead:

The function np.random.RandomState(69_420).permutation(1_000_000_000) uses 7.45 GB of data in-memory. This is because it creates a copy of the array np.arange(1_000_000_000) and shuffles its elements randomly The array has a size of 1 billion and a data type of int64, which means each element occupies 8 bytes of memory. Therefore, the total memory usage is 1_000_000_000 * 8 / (1024**3) = 7.45 GB.

PyArrow Extension Types: pyarrow extension type is a way to extend the built-in types of the Arrow data format with custom types and serialization mechanisms. This allows us to utilize arrow-compatible data structures, not natively supported by Arrow and keep the same zero-copy data sharing, fast serialization etc.

We have to Handle PyArrow Extension Type Columns Differently: If we are to implement our own sort or shuffle in-memory function, then we should take special consideration for extension types. For example the .take API for a pyarrow table breaks for an extension type here: https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/arrow_ops/transform_pyarrow.py#L37 and a special type of processing is done here: https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/arrow_ops/transform_pyarrow.py#L196

Question: what type of considerations should be made? I'm not familiar with PyArrow Extensions

PyArrow Compute API does not give Direct Access to Memory

If we would like to sort/shuffle rows in place to avoid the memory overhead, then it might not be possible?

I see that https://arrow.apache.org/docs/python/api/compute.html provides functions on-top of a PyArrow data-structure such as take https://arrow.apache.org/docs/python/api/compute.html#selections, but nothing direct (as far as I can tell??)

Copying the Data into Another Data Structure Defeats the Purpose:

Let's say I have data from pyarrow, but I want to instead convert the table into an intermediate format. This defeats the purpose even if it does shift/sort our values? For example, we might do something like the following for shuffle

>>> arrow_array = pa.array([1, 2, 3, 4, 5])
<pyarrow.lib.Int64Array object at 0x128b88220>
[
  1,
  2,
  3,
  4,
  5
]
>>> numpy_array = np.array(arrow_array.to_numpy())
>>> numpy_array
array([1, 2, 3, 4, 5])
>>> np.random.shuffle(numpy_array)
>>> numpy_array
array([2, 1, 5, 4, 3])
>>> pa.array(numpy_array)
<pyarrow.lib.Int64Array object at 0x128b88220>
[
  2,
  1,
  5,
  4,
  3
]
stephanie-wang commented 7 months ago

I think @chrislevn may be taking on this issue if you want to work together on it.

I believe you have captured the problem well. There are two approaches here that would be good to evaluate:

alejandroarmas commented 7 months ago

Thanks so much for the follow-up @stephanie-wang. I'm happy to split the work with @chrislevn or side it solo. Let me know what you all prefer ☺️

For now, I wanted to ask two more clarifying questions:

Just so that it is clear to me, what I'm hearing is that it is fine to shuffle in-place using a second data structure only if it means that we do a single copy? In other words, copying the entire table into data structures and doing the shuffle in-memory in those data structures and copying back?

I've never worked with managing memory between PyArrow and Python, so how should someone in this PR track memory usage during the shuffling operation?

I've tried figuring this out myself, but its still not clear to me. I have the following PyArrow table:

>>> import pyarrow as pa
>>> pa.total_allocated_bytes()
0
>>> n_legs = pa.array([2, 4, 5, 100])
>>> animals = pa.array(["Flamingo", "Horse", "Brittle stars", "Centipede"])
>>> names = ["n_legs", "animals"]
>>> table.nbytes
83
>>> pa.total_allocated_bytes()
192
>>> table = pa.Table.from_arrays([n_legs, animals], names=names)
>>> pa.total_allocated_bytes()
192
>>> table.take([0, 3, 2, 1])
pyarrow.Table
n_legs: int64
animals: string
----
n_legs: [[2,100,5,4]]
animals: [["Flamingo","Centipede","Brittle stars","Horse"]]
>>> pa.total_allocated_bytes()
512
>>> pa.total_allocated_bytes()
192

I noticed that the total_allocated_bytes() value fluctuates after a few moments.

Also, when I delete the reference toward python objects, it seems to not have an effect on the PyArrow memory pool.

>>> del n_legs
>>> del animals
>>> import gc
>>> gc.collect()
28
>>> pa.total_allocated_bytes()
192

Should I assume that total_allocated_bytes provides a ceiling of a memory pool and then PyArrow manages its memory pool because, after the take operation, the size is reduced? Are there better ways of doing this?

alejandroarmas commented 7 months ago

I also have a proposed solution for the copying in-place, I wanted to run it by you first though:

I'm going to assume that by setting the same seed within the same function scope will guarantee the same shuffle result.

import numpy as np
import pyarrow as pa

def my_shuffle(arr: pa.array) -> pa.array:
    np.random.seed(1234)
    arr = np.array(arr.to_numpy())
    np.random.shuffle(arr)
    return pa.array(arr)
>>> print(f'{[my_shuffle(pa.array([1,2,3,4])) for i in range(2)]=}')
[my_shuffle(pa.array([1,2,3,4])) for i in range(5)]=[<pyarrow.lib.Int64Array object at 0x10060ac80>
[
  4,
  3,
  1,
  2
], <pyarrow.lib.Int64Array object at 0x12f49c220>
[
  4,
  3,
  1,
  2
]]

Proposed Solution

I'm also assuming PyArrow tables are immutable and I cannot directly change the data of a column. Also my other assumption is that the PyArrow tables contain references to PyArrow arrays. So, I will create a new table with the modified column after each shuffle.

def table_shuffle(table: pa.table) -> pa.table:
     num_columns = table.num_columns
     for i in range(num_columns):
             table = table.set_column(i, table.column_names[i], my_shuffle(table.column(i)))
     return table

Here is a sample execution of the function:

>>> n_legs = pa.array([2, 4, 5, 100])
>>> animals = pa.array(["Flamingo", "Horse", "Brittle stars", "Centipede"])
>>> names = ["n_legs", "animals"]
>>> table = pa.Table.from_arrays([n_legs, animals], names=names)
>>> table
pyarrow.Table
n_legs: int64
animals: string
----
n_legs: [[2,4,5,100]]
animals: [["Flamingo","Horse","Brittle stars","Centipede"]]

>>> table_shuffle(table)
pyarrow.Table
n_legs: int64
animals: string
----
n_legs: [[100,5,2,4]]
animals: [["Centipede","Brittle stars","Flamingo","Horse"]]
alejandroarmas commented 7 months ago

When it comes to evaluating the performance of this PR, is there a preferred approach?

In the docs there a few image and CSV formatted datasets. For example we have the following:

ds = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")

It contains 569 records.

Dataset(
   num_blocks=20,
   num_rows=569,
   schema={
   ...
   }
)

I'm guessing we can do comparisons between a few different datasets like the following:

import timeit
num_trials = 1000
elapsed_time = timeit.timeit(ds.random_shuffle, number=num_trials)
print(f'Average time: {elapsed_time / num_trials:.6f} seconds')

Is there a requirement behind the speedup? How much should the benchmarks vary in size? I was only able to find two other datasets that have <4K images. Is that not large enough? Should I just fill up large datasets with random values? What if my machine can't fit that in-memory, can I cap it to a certain amount?

alejandroarmas commented 7 months ago

@stephanie-wang Pinging you in case you got busy! Thanks in advance 😃

anyscalesam commented 4 months ago

sorry for the delay here; will review with @c21 if there's someone who can pair with you here to help make progress

alejandroarmas commented 4 months ago

Thanks so much @anyscalesam. Let me know what I can do for you 😄