ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.04k stars 5.78k forks source link

[Data] groupby/sort failure: 'DataFrame' object has no attribute 'num_rows' #44536

Open tespent opened 7 months ago

tespent commented 7 months ago

What happened + What you expected to happen

Several ray data operation pattern may the following raise exception, see reproduction script below:

  File "/opt/miniconda3/envs/raytest/lib/python3.10/site-packages/ray/data/iterator.py", line 241, in _wrapped_iterator
    for batch in batch_iterable:
  File "/opt/miniconda3/envs/raytest/lib/python3.10/site-packages/ray/data/iterator.py", line 162, in _create_iterator
    block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
  File "/opt/miniconda3/envs/raytest/lib/python3.10/site-packages/ray/data/_internal/iterator/iterator_impl.py", line 33, in _to_block_iterator
    block_iterator, stats, executor = ds._plan.execute_to_iterator()
  File "/opt/miniconda3/envs/raytest/lib/python3.10/site-packages/ray/data/exceptions.py", line 83, in handle_trace
    raise e.with_traceback(None) from SystemException()
ray.exceptions.RayTaskError(AttributeError): ray::reduce() (pid=13825, ip=127.0.0.1)
  File "/opt/miniconda3/envs/raytest/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/sort_task_spec.py", line 145, in reduce
    return BlockAccessor.for_block(mapper_outputs[0]).merge_sorted_blocks(
  File "/opt/miniconda3/envs/raytest/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 559, in merge_sorted_blocks
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/opt/miniconda3/envs/raytest/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 559, in <listcomp>
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/opt/miniconda3/envs/raytest/lib/python3.10/site-packages/pandas/core/generic.py", line 6296, in __getattr__
    return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'num_rows'

Versions / Dependencies

Ray 2.6, Ray 2.10, Ray 3.0.0-dev0 (nightly)

Reproduction script

Case 1: map_batches then sort

ray.data.from_items(range(10), override_num_blocks=10).map_batches(lambda v: {'item':[]} if v['item'][0] < 4 else v,batch_format='numpy').map_batches(lambda v:v,batch_format='pandas').sort('item').show()

Case 2: groupby then groupby

ray.data.from_items([{'a':0,'b':i} for i in range(10)]).groupby('a').map_groups(lambda v:v).groupby('a').map_groups(lambda v:v).show()

Case 3: groupby then sort

ray.data.from_items([{'a':0,'b':i} for i in range(10)]).groupby('a').map_groups(lambda v:v).sort('a').show()

Issue Severity

High: It blocks me from completing my task.

ivanthewebber commented 2 weeks ago

I think I'm running into the same problem when doing groupby after a groupby. It seems like I can get around it with ds = ray.data.from_pandas(pd.DataFrame(ds.take_all())) before the second.