ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.7k stars 5.73k forks source link

[data] Data doesn't account for object store memory from pandas batch formats #48506

Open richardliaw opened 2 hours ago

richardliaw commented 2 hours ago

While it's true that this is no longer an issue if the blocks are Arrow table, you'll still run into the issue if the blocks are pandas tables. This can happen if you use the "pandas" batch format, or if you use APIs like drop_columns that use the "pandas" batch format under-the-hood.

Here's a simple repro:

import ray

def generate_data(batch):
    for _ in range(8):
        yield {"data": [[b"\x00" * 128 * 1024 * 1024]]}

ds = (
    ray.data.range(1, override_num_blocks=1)
    .map_batches(generate_data, batch_size=1)
    .map_batches(lambda batch: batch, batch_format=...)
)

for bundle in ds.iter_internal_ref_bundles():
    print(f"num_rows={bundle.num_rows()} size_bytes={bundle.size_bytes()}")

Output with pandas:

num_rows=8 size_bytes=192         

Output with PyArrow:

num_rows=1 size_bytes=134217748                                                                                 
num_rows=1 size_bytes=134217748                                                                                                          
num_rows=1 size_bytes=134217748                                                                                                          
num_rows=1 size_bytes=134217748                                                                                                          
num_rows=1 size_bytes=134217748                                                                                                             
num_rows=1 size_bytes=134217748                                                                                                             
num_rows=1 size_bytes=134217748                                                                                                             
num_rows=1 size_bytes=134217748  

Originally posted by @bveeramani in https://github.com/ray-project/ray/issues/44577#issuecomment-2428035432

richardliaw commented 1 hour ago

@raulchen did some debugging and identified that there is some odd behavior from Pandas:

import pandas as pd
import numpy as np
import pickle

df = pd.DataFrame({
    "data": [np.random.randint(size=1024, low=0, high=100, dtype=np.int8) for _ in range(1_000_000)]
})

print(df["data"].size, df["data"].dtype, df.memory_usage(index=True, deep=True).sum())
# 1000000 object 1144000132
df2 = pickle.loads(pickle.dumps(df))
print(df2["data"].size, df2["data"].dtype, df2.memory_usage(index=True, deep=True).sum())
# 1000000 object 120000132

Posted this on StackOverflow as well. https://stackoverflow.com/questions/79149716/pandas-memory-usage-inconsistent-for-in-line-numpy

richardliaw commented 1 hour ago

The only known clue right now is that the "OWNDATA" flag for the numpy array is different.

Before pickle:
- dtype: int8
- flags:   C_CONTIGUOUS : True
  F_CONTIGUOUS : True
  OWNDATA : True
  WRITEABLE : True
  ALIGNED : True
  WRITEBACKIFCOPY : False

- strides: (1,)
- data pointer: 54063872

After pickle:
- dtype: int8
- flags:   C_CONTIGUOUS : True
  F_CONTIGUOUS : True
  OWNDATA : False
  WRITEABLE : True
  ALIGNED : True
  WRITEBACKIFCOPY : False

- strides: (1,)
- data pointer: 1022687168