ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.11k stars 5.6k forks source link

[Data] Sorting on grouped data not working with pandas format #46748

Open tanzeyy opened 1 month ago

tanzeyy commented 1 month ago

What happened + What you expected to happen

I am working on handling grouped data and sorting on certain column, and I want to use pandas operations to process the grouped data. However, it seems sort operation doesn't work with map_groups(..., batch_format='pandas').

Reproduction process:

import ray

data = [
    {"tag": "kitty", "number": 12},
    {"tag": "karen", "number": 1},
    {"tag": "karen", "number": 3},
    {"tag": "times", "number": 2},
]

ds = ray.data.from_items(data)

results = (
    ds.groupby("tag")
    .map_groups(lambda g: g, batch_format="pandas")
    .sort("number", descending=True)
    .to_pandas()
)
print(results)

Run above script will produce AttributeError: 'DataFrame' object has no attribute 'num_rows':

...
ray.exceptions.RayTaskError(AttributeError): ray::reduce() (pid=1919807, ip=10.208.49.99)
  File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/sort_task_spec.py", line 143, in reduce
    return BlockAccessor.for_block(mapper_outputs[0]).merge_sorted_blocks(
  File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 554, in merge_sorted_blocks
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 554, in <listcomp>
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/.../venvs/dev/lib/python3.10/site-packages/pandas/core/generic.py", line 6299, in __getattr__
    return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'num_rows'

After changing batch_format to pyarrow, above code would work properly: image

Moreover, remove sort and it will also work:

image

Versions / Dependencies

Reproduction script

import ray

data = [
    {"tag": "kitty", "number": 12},
    {"tag": "karen", "number": 1},
    {"tag": "karen", "number": 3},
    {"tag": "times", "number": 2},
]

ds = ray.data.from_items(data)

results = (
    ds.groupby("tag")
    .map_groups(lambda g: g, batch_format="pandas")
    .sort("number", descending=True)
    .to_pandas()
)
print(results)

Issue Severity

Medium: It is a significant difficulty but I can work around it.

PranitKatwe commented 1 month ago

Hey, I am trying to contribute to Ray and I am willing to try this as my first issue. I wanted to understand what you exactly want with the output or what exactly you are looking to solve from this issue. As far as I understand you want Karen to group by and then provide a sorted list? Correct me if I am wrong

import ray
import pandas as pd

data = [
    {"tag": "kitty", "number": 12},
    {"tag": "karen", "number": 1},
    {"tag": "karen", "number": 3},
    {"tag": "times", "number": 2},
]

ds = ray.data.from_items(data)

def process_data(df):
    return (df.groupby("tag")
              .agg({"number": "sum"}) 
              .reset_index()
              .sort_values("number", ascending=False))

results = (
    ds.to_pandas()
    .pipe(process_data)
)

print(results)

This code should give you the output of groupby as well as sorted

tanzeyy commented 1 month ago

Hey, I am trying to contribute to Ray and I am willing to try this as my first issue. I wanted to understand what you exactly want with the output or what exactly you are looking to solve from this issue. As far as I understand you want Karen to group by and then provide a sorted list? Correct me if I am wrong

import ray
import pandas as pd

data = [
    {"tag": "kitty", "number": 12},
    {"tag": "karen", "number": 1},
    {"tag": "karen", "number": 3},
    {"tag": "times", "number": 2},
]

ds = ray.data.from_items(data)

def process_data(df):
    return (df.groupby("tag")
              .agg({"number": "sum"}) 
              .reset_index()
              .sort_values("number", ascending=False))

results = (
    ds.to_pandas()
    .pipe(process_data)
)

print(results)

This code should give you the output of groupby as well as sorted

Thank you for your reply!

I need to process data at a scale of hundreds of gigabytes, so the initial ds.to_pandas() approach might not be feasible for me. Also, my workflow involves complex processing on the grouped data, which is why I intended to use map_groups.

Additionally, I managed to work around this issue by returning an arrow table in the function provided to map_groups, but it should be a bug related to the data structures used in sort operation I think.

PranitKatwe commented 1 month ago

Thank you for the acknowledgment. I am glad you found a workaround. If you are working on a large dataset instead of pandas you can use Modin as well to scale the processing. https://modin.readthedocs.io/en/latest/

xingyu-long commented 1 week ago

Hi @scottjlee, could I take this one? I think we probably we miss some code to handle panda.Dataframe in sort function

xingyu-long commented 1 week ago

I did some initial research on this, here are my thoughts:

  File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/ray/data/_internal/planner/exchange/sort_task_spec.py", line 145, in reduce
    return BlockAccessor.for_block(mapper_outputs[0]).merge_sorted_blocks(
  File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/ray/data/_internal/arrow_block.py", line 602, in merge_sorted_blocks
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/ray/data/_internal/arrow_block.py", line 602, in <listcomp>
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/Users/xingyulong/Desktop/virtualenvs/ray_dev/lib/python3.9/site-packages/pandas/core/generic.py", line 5902, in __getattr__
    return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'num_rows'

mapper_outputs is the list of pyarrow.Table blocks (which is why we use arrow_block.py afterwards) and somehow, the inside data was DataFrame and block type was pyarrow.Table, so we called arrow_block.py.

Instead, we should invoke code at here https://github.com/ray-project/ray/blob/a1be06346e532e656c128ffd2230f06a4d72679e/python/ray/data/_internal/pandas_block.py#L504-L518

I have a few questions:

  1. what's the data format would be with following code
    results = (
    ds.groupby("tag")
    .map_groups(lambda g: g, batch_format="pandas")
  2. if we add sort, do we have data format info along the way? (I didn't find it in codebase)

@scottjlee could you give some thoughts on this?

scottjlee commented 1 week ago

mapper_outputs is the list of pyarrow.Table blocks (which is why we use arrow_block.py afterwards) and somehow, the inside data was DataFrame and block type was pyarrow.Table, so we called arrow_block.py.

Yes, this sounds like the right analysis of the bug here. The core problem is that in this line, we assume that all of the block types are the same, and we grab the first block to get its corresponding BlockAccessor.

My initial thoughts for the fix is to add a block normalization call like:

blocks = TableBlockAccessor.normalize_block_types(blocks, format)

to the beginning of SortTaskSpec, where format would be the batch/block format from its previous operator.

  1. what's the data format would be with following code
    results = (
    ds.groupby("tag")
    .map_groups(lambda g: g, batch_format="pandas")

In this case, the output block format would be pandas DataFrames.

  1. if we add sort, do we have data format info along the way? (I didn't find it in codebase)

Sort method shouldn't be specifying any other specific data type, it should use the data type from previous operator.

Thanks @xingyu-long !

xingyu-long commented 1 week ago

@scottjlee Thanks for the clarification! Could you assign this to me? I can give it a try. Thanks!

xingyu-long commented 3 days ago

Hi @scottjlee, after spending more time on this issue, I found out after map_groups, we will create empty pyarrow tables and later we use apply udf (https://github.com/ray-project/ray/blob/ee320aa6670514b85e77fce8d6903affcf883cc4/python/ray/data/grouped_data.py#L223) with batch data (so we don't update empty table). so one place we may update beforehand is the beginning of the map_groups(...) function with TableBlockAccessor.normalize_block_types(blocks, format) https://github.com/ray-project/ray/blob/ee320aa6670514b85e77fce8d6903affcf883cc4/python/ray/data/grouped_data.py#L192

However, I have trouble to get all blocks there, and don't know how to replace the blocks within the Dataset.

Could you give some hints on this? or if it's not the place you wanted, could you elaborate it a little more? Thanks!