ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.76k stars 5.74k forks source link

[Data] Ray Data runs out of disk while writing Parquet files #48104

Open bveeramani opened 2 weeks ago

bveeramani commented 2 weeks ago

What happened + What you expected to happen

See repro.

I'm trying to load Parquet files that contain image URIs, load those images, and write the loaded images to Parquet files. To ensure that Ray Data fully utilizes the cluster, I repartitioned the input data, but then I ran into a new problem.

Explanation of problem:

Versions / Dependencies

2.37

Reproduction script

import numpy as np

import ray

ray.init(num_cpus=8)

def read_many_uris(batch):
    for _ in range(1000):
        yield {"data": np.zeros((1, 128, 1024, 1024), dtype=np.uint8)}

(
    ray.data.range(8, override_num_blocks=1)
    .repartition(8)  # Repartition the data to ensure we can fully utilize the CPUs.
    .map_batches(read_many_uris, batch_size=1)
    .write_parquet("/tmp", ray_remote_args={"memory": 1024**3})
)

Issue Severity

None

alexeykudinkin commented 2 weeks ago

@bveeramani why is back-pressure not kicking in here?

bveeramani commented 2 weeks ago

Calling repartition causes Ray Data to fall back to the old scheduling behavior

@alexeykudinkin Because of this

alexeykudinkin commented 6 days ago

@bveeramani can you help me understand why we're falling back to old behavior then? Is it b/c of override_num_blocks being used?

bveeramani commented 6 days ago

No, override_num_blocks isn't relevant here.

All-to-all operations don't implement accurate memory accounting. So, if you have one in your DAG, it doesn't use the op reservation algorithm: https://github.com/ray-project/ray/blob/23cc23b7c295a2959682df785408e534095b2e19/python/ray/data/_internal/execution/resource_manager.py#L77-L84

In this case, it'll use the old scheduling behavior (_execution_allowed) and pull as much data as possible from streaming generators: https://github.com/ray-project/ray/blob/23cc23b7c295a2959682df785408e534095b2e19/python/ray/data/_internal/execution/streaming_executor_state.py#L550-L555

https://github.com/ray-project/ray/blob/23cc23b7c295a2959682df785408e534095b2e19/python/ray/data/_internal/execution/streaming_executor_state.py#L412-L421