ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.39k stars 5.66k forks source link

[<Ray component: Ray>] map_group doen't support actor #41406

Closed Bye-legumes closed 5 months ago

Bye-legumes commented 10 months ago

What happened + What you expected to happen

What happened

What happened was that when I used the group_by operation with the batch_size parameter, I encountered an error with “multiple values”. However, when I tried to use the group_by operation without the batch_size parameter, I encountered an exception that required the batch_size parameter.”

sorted_ds = ds.groupby(key="a").map_groups(DEMO,num_gpus=0.1,
        batch_size = 1000,
        batch_format  = 'pandas',
        compute=ActorPoolStrategy(min_size=10, max_size=200))

for the code above, there is an error

  File "gpb.py", line 41, in <module>
    sorted_ds = ds.groupby(key="a").map_groups(DEMO,num_gpus=0.1,
  File "/home/zhilong/miniconda3/lib/python3.8/site-packages/ray/data/grouped_data.py", line 336, in map_groups
    return sorted_ds.map_batches(
TypeError: map_batches() got multiple values for keyword argument 'batch_size'

If I delete the key_word batch_size, the error is

ValueError: `batch_size` must be provided to `map_batches` when requesting GPUs. The optimal batch size depends on the model, data, and GPU used. It is recommended to use the largest batch size that doesn't result in your GPU device running out of memory. You can view the GPU memory usage via the Ray dashboard.

What I expected

Ray data can perform the groupby+map_groups for actors as mentioned in the doc. https://docs.ray.io/en/latest/data/api/doc/ray.data.grouped_data.GroupedData.map_groups.html

Versions / Dependencies

ray 2.8.0

Reproduction script

ray.init(address = "123.123.123.123:6274")
from ray.data import ActorPoolStrategy

ctx = ray.data.context.DatasetContext.get_current()
use_push_based_shuffle = True
num_items = 100001
parallelism = 200
import pandas as pd
import numpy as np
import time

t1 = time.time()
original = ctx.use_push_based_shuffle
ctx.use_push_based_shuffle = use_push_based_shuffle

a = list(reversed(range(num_items)))

shard = int(np.ceil(num_items / parallelism))
b = [1]*1
offset = 0
dfs = []
while offset < num_items:
    dfs.append(
        pd.DataFrame(
            {"a": a[offset : offset + shard], "b": [b]*len(a[offset : offset + shard])}
        )
    )
    offset += shard
if offset < num_items:
    dfs.append(pd.DataFrame({"a": a[offset:], "b": b[offset:]}))
ds = ray.data.from_pandas(dfs)

class DEMO:
    def __init__(self):
        pass

    def __call__(a):
        return a
sorted_ds = ds.groupby(key="a").map_groups(DEMO,num_gpus=0.1,
        batch_size = 1000,
        batch_format  = 'pandas',
        compute=ActorPoolStrategy(min_size=10, max_size=200))
sorted_ds.show(5)
print(f"time used : \n{time.time()-t1}")

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Bye-legumes commented 10 months ago

in ray 2.4.0, the error for the map_group+ actor is same as this one https://github.com/ray-project/ray/issues/26244 tmp

anyscalesam commented 10 months ago

after discussion we should change map_groups() to allow (and require) batch_size iff gpu num is set. @scottjlee to follow up on this change,

Bye-legumes commented 10 months ago

I think it's not only a problem of the gpu + batch_size Currently, I cannot run the map_groups + actor.

sorted_ds = ds.groupby(key="a").map_groups(DEMO,
        compute=ray.data.ActorPoolStrategy(size=10),
        batch_format  = 'pandas')

it will failed with error.

2023-11-29 11:03:55,624  WARNING actor_pool_map_operator.py:271 -- To ensure full parallelization across an actor pool of size 10, the Dataset should consist of at least 10 distinct blocks. Consider increasing the parallelism when creating the Dataset.
Traceback (most recent call last):
  File "python/ray/_raylet.pyx", line 347, in ray._raylet.StreamingObjectRefGenerator._next_sync
  File "python/ray/_raylet.pyx", line 4643, in ray._raylet.CoreWorker.try_read_next_object_ref_stream
  File "python/ray/_raylet.pyx", line 447, in ray._raylet.check_status
ray.exceptions.ObjectRefStreamEndOfStreamError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 80, in on_data_ready
    meta = ray.get(next(self._streaming_gen))
  File "python/ray/_raylet.pyx", line 302, in ray._raylet.StreamingObjectRefGenerator.__next__
  File "python/ray/_raylet.pyx", line 365, in ray._raylet.StreamingObjectRefGenerator._next_sync
StopIteration

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/zhilong/ray_28_bug/gpb.py", line 48, in <module>
    sorted_ds.show(5)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/dataset.py", line 2466, in show
    for row in self.take(limit):
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/dataset.py", line 2390, in take
    for row in limited_ds.iter_rows():
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/iterator.py", line 219, in _wrapped_iterator
    for batch in batch_iterable:
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/iterator.py", line 164, in _create_iterator
    block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/iterator/iterator_impl.py", line 32, in _to_block_iterator
    block_iterator, stats, executor = ds._plan.execute_to_iterator()
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 548, in execute_to_iterator
    block_iter = itertools.chain([next(gen)], gen)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 54, in execute_to_legacy_block_iterator
    for bundle in bundle_iter:
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 141, in get_next
    raise item
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 201, in run
    while self._scheduling_loop_step(self._topology) and not self._shutdown:
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 252, in _scheduling_loop_step
    process_completed_tasks(topology, self._backpressure_policies)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 365, in process_completed_tasks
    num_blocks_read = task.on_data_ready(
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 88, in on_data_ready
    ex = ray.get(block_ref)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/_private/worker.py", line 2563, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TypeError): ray::MapBatches(group_fn)() (pid=3943892, ip=10.218.163.85, actor_id=501d51070249f810d044d51801000000, repr=MapWorker(MapBatches(group_fn)))
    yield from _map_task(
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 416, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 215, in __call__
    yield from self._batch_fn(input, ctx)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 190, in transform_fn
    res = fn(batch)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 120, in fn
    return op_fn(item, *fn_args, **fn_kwargs)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/grouped_data.py", line 362, in group_fn
    builder.add_batch(applied)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/delegating_block_builder.py", line 39, in add_batch
    return self.add_block(block)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/delegating_block_builder.py", line 42, in add_block
    accessor = BlockAccessor.for_block(block)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/block.py", line 402, in for_block
    raise TypeError("Not a block type: {} ({})".format(block, type(block)))
TypeError: Not a block type: <__main__.DEMO object at 0x7f112590b490> (<class '__main__.DEMO'>)

it works for the function as the input

sorted_ds = ds.groupby(key="a").map_groups(test,
        batch_format  = 'pandas')

here are the whole codes

import ray
ray.init(address = "10.218.163.85:6274")
from ray.data import ActorPoolStrategy

ctx = ray.data.context.DatasetContext.get_current()
use_push_based_shuffle = True
num_items = 100001
parallelism = 200
import pandas as pd
import numpy as np
import time

t1 = time.time()
original = ctx.use_push_based_shuffle
ctx.use_push_based_shuffle = use_push_based_shuffle

a = list(reversed(range(num_items)))
a = [x%10 for x in a]
shard = int(np.ceil(num_items / parallelism))
b = [1]*1
offset = 0
dfs = []
while offset < num_items:
    dfs.append(
        pd.DataFrame(
            {"a": a[offset : offset + shard], "b": [b]*len(a[offset : offset + shard])}
        )
    )
    offset += shard
if offset < num_items:
    dfs.append(pd.DataFrame({"a": a[offset:], "b": b[offset:]}))
ds = ray.data.from_pandas(dfs)

class DEMO:
    def __init__(self,*args, **kwargs):
        print(args)
        #print(kwargs)
        pass

    def __call__(self, a):
        return a
def test(a):
    return a
sorted_ds = ds.groupby(key="a").map_groups(DEMO,
        compute=ray.data.ActorPoolStrategy(size=10),
        batch_format  = 'pandas')
sorted_ds.show(5)
print(f"time used : \n{time.time()-t1}")
wingkitlee0 commented 10 months ago

Ya, I think map_groups does not support "callable class". It currently only supports running the mapping function as actors (e.g., compute=ray.data.ActorPoolStrategy(). In your example, using DEMO() would work if you are okay with creating the instance first..

sorted_ds = ds.groupby(key="a").map_groups(DEMO(),
        compute=ray.data.ActorPoolStrategy(size=10),
        batch_format  = 'pandas')
Bye-legumes commented 10 months ago

@wingkitlee0 I see! Thanks. Based on my search results, the map_groups function is indeed based on the map_batch function, but the batch_size parameter is set to None. This means that if the original data is stored across different nodes in a distributed way, all the data will be gathered to one single node, as batch_size is None . Therefore, it is possible that the memory requirements could be very large, depending on the size of the data being processed. Also the max number of parallelism is also depended on single node? Or it's possible one batch of data can be across different nodes?(but batch_to_block is used in side the group_fn)? Thanks!

 # Note we set batch_size=None here, so it will use the entire block as a batch,
    # which ensures that each group will be contained within a batch in entirety.
    return sorted_ds.map_batches(
        group_fn,
        batch_size=None,
        compute=compute,
        batch_format=batch_format,
        fn_args=fn_args,
        fn_kwargs=fn_kwargs,
        **ray_remote_args,
    )
Bye-legumes commented 10 months ago

@wingkitlee0 Also what is the motivation that we use map_batchs(DEMO) and map_groups(DEMO()), can we unified them? I think map_groups and map_batchs should have similiar parameter and the difference is map_groups is performced on grouped data. Something I want to understand is the difference that we use one map_batch + group_fn in map_group and we use [map_batch(group) for group in grouped data].

wingkitlee0 commented 10 months ago

[Not Ray team, but I used map_groups quite a lot lately] You may find previous discussions about map_groups useful (search on github).

all the data will be gathered to one single node, as batch_size is None

According to Ray team, the data are materialized, but not necessarily gathered into one node. (After all, sorting is distributed)

what is the motivation that we use map_batchs(DEMO) and map_groups(DEMO()),

I believe this is simply because they haven't implemented for callable class yet. This should be a straight-forward PR unless there are some limitations (in that case Ray team knows better)

the difference that we use one map_batch + group_fn in map_group and we use [map_batch(group) for group in grouped data].

It's about the block boundary (in Ray) vs group boundary (in data). We want the data that goes into the mapping function contains the whole group.

Bye-legumes commented 10 months ago

@wingkitlee0 Thanks a lot for you classification!

wayi1 commented 7 months ago

I encountered this issue as well. I have some relatively expensive state to initialize in an actor. However, unlike the operators map_batches, map, or flat_map, I realize that map_groups op does not support a fn_constructor_kwargs arg.

The workaround w/o using an actor means that I have to initialize such as state per call. Really look forward to ray 2.10 for this feature.

anyscalesam commented 5 months ago

@Bye-legumes - per our meeting last week; do you think you'd have the bandwidth in May to pick this up? We can help shepherd/review to get this merged but it'll be faster to resolve/mitigate this issue especially if it's impacting your scenario/use-case.