ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.49k stars 5.69k forks source link

[data] Size estimation leads to incorrect block sizes #40246

Open stephanie-wang opened 1 year ago

stephanie-wang commented 1 year ago

What happened + What you expected to happen

Size estimation is used to decide task size and when to output a block to the object store. Seems that can be as much as 2x off from the actual size.

Versions / Dependencies

3.0dev

Reproduction script

The following script runs a .range and then the same Dataset with an identity map function. In theory, these should produce the same number of output blocks. But currently the latter produces twice as many blocks because of a bug in size estimation.

import ray

ray.init(num_cpus=4)
ctx = ray.data.DataContext.get_current()
ctx.target_max_block_size = 800

print(ray.data.range(1000).materialize().stats())
print(ray.data.range(1000).map(lambda x: x).materialize().stats())

2023-10-10 16:15:00,758 WARNING split_read_output_blocks.py:29 -- Expected in-memory size 8000, block size 8.0
2023-10-10 16:15:00,758 WARNING split_read_output_blocks.py:40 -- Size based split factor 1
2023-10-10 16:15:00,758 WARNING split_read_output_blocks.py:42 -- Blocks after size splits 1000
2023-10-10 16:15:00,758 WARNING split_read_output_blocks.py:54 -- Estimated num output blocks 1000
2023-10-10 16:15:00,759 INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange]
2023-10-10 16:15:00,759 INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_ou
tput=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-10-10 16:15:00,760 INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
Stage 1 ReadRange: 10/10 blocks executed in 1.37s                                                                                                                                             
* Remote wall time: 403.62us min, 17.08ms max, 5.69ms mean, 56.9ms total
* Remote cpu time: 402.17us min, 16.32ms max, 5.51ms mean, 55.05ms total
* Peak heap memory usage (MiB): 155.65 min, 156.71 max, 156 mean
* Output num rows: 100 min, 100 max, 100 mean, 1000 total
* Output size bytes: 800 min, 800 max, 800 mean, 8000 total
* Tasks per node: 10 min, 10 max, 10 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_alloc': 8000, 'obj_store_mem_freed': 22787, 'obj_store_mem_peak': 9914, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}

2023-10-10 16:15:02,284 WARNING split_read_output_blocks.py:29 -- Expected in-memory size 8000, block size 8.0
2023-10-10 16:15:02,284 WARNING split_read_output_blocks.py:40 -- Size based split factor 1
2023-10-10 16:15:02,284 WARNING split_read_output_blocks.py:42 -- Blocks after size splits 1000
2023-10-10 16:15:02,284 WARNING split_read_output_blocks.py:54 -- Estimated num output blocks 1000
2023-10-10 16:15:02,285 INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->Map(<lambda>)]
2023-10-10 16:15:02,285 INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_ou
tput=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-10-10 16:15:02,285 INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
Stage 1 ReadRange->Map(<lambda>): 20/20 blocks executed in 0.1s                                                                                                                               
* Remote wall time: 3.27ms min, 15.13ms max, 6.39ms mean, 127.79ms total
* Remote cpu time: 3.27ms min, 9.38ms max, 5.39ms mean, 107.81ms total
* Peak heap memory usage (MiB): 155.65 min, 157.02 max, 156 mean
* Output num rows: 42 min, 58 max, 50 mean, 1000 total
* Output size bytes: 336 min, 464 max, 400 mean, 8000 total
* Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_alloc': 8000, 'obj_store_mem_freed': 22787, 'obj_store_mem_peak': 11585, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}

Issue Severity

None

anyscalesam commented 11 months ago

@c21 please take a look; let's see if we can aim for ray29.