modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.76k stars 651 forks source link

BUG: groupby fails if dataframe has at least one empty partition #5461

Open dchigarev opened 1 year ago

dchigarev commented 1 year ago

Modin version checks

Reproducible Example

import pandas
from modin.test.storage_formats.pandas.test_internals import construct_modin_df_by_scheme

md_df = construct_modin_df_by_scheme(
    pandas_df=pandas.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]}),
    partitioning_scheme={"row_lengths": [2, 2], "column_widths": [2]}
)
partitions = md_df._query_compiler._modin_frame._partitions

print("==== Original frame ====")
print(f"1st partition content:\n{partitions[0, 0].get()}\n")
print(f"2nd partition content:\n{partitions[1, 0].get()}\n")

md_res = md_df.query("a > 1")
partitions = md_res._query_compiler._modin_frame._partitions

print("==== Query result ====")
# First partition appears to be empty because it doesn't match the query's filter
print(f"1st partition content:\n{partitions[0, 0].get()}\n")
print(f"2nd partition content:\n{partitions[1, 0].get()}\n")

grp_obj = md_res.groupby("a")

group_frame_partitioning = grp_obj._df._query_compiler._modin_frame._partitions.shape
by_column_partitioning = grp_obj._by._modin_frame._partitions.shape

print("==== Groupby object ====")
# The first empty partition is still present in the source frame
print(f"{group_frame_partitioning=}")
# The first empty partition was filtered out from the 'by' column
print(f"{by_column_partitioning=}")

print(grp_obj.count()) # index error due to partitioning missmatching

Issue Description

(see under the 'error logs' spoiler for the reproducer's output)

Lazy filtering of empty partitions appears to act badly in cases where partitions broadcasting is required for such semi-filtered frames.

Here in the reproducer, we have a source frame md_res having one empty partition (in the result of the .query()) and its projection as a column to group on (grp_obj._by) that do not have empty partitions as empties were filtered out when making this projection.

As we don't verify partitioning when broadcasting projections of the frame to the frame itself (we believe that they must be partitioned identically) the broadcasting during groupby results into an error right here because len(rt_axis_parts) == 1 (without empty partition) and len(left) == 2 (includes an empty partition): https://github.com/modin-project/modin/blob/810072cfaf2d7c1fad584b14855574db1b9066b7/modin/core/dataframe/pandas/partitioning/partition_manager.py#L372-L387

Expected Behavior

It's expected to work properly :)

BTW, this problem has a simple workaround found by @Egor-Krivov. Users can manually trigger filtering empty partitions out by calling an .iloc with an indexer bigger than the frame's length:

md_res = md_df.query("a > 1")
print(md_res._query_compiler._modin_frame._partitions.shape) # (2, 1) <-- contains an empty partition

md_res = md_res.iloc[:len(md_res), :]
print(md_res._query_compiler._modin_frame._partitions.shape) # (1, 1) <-- all empty partitions were filtered out
print(md_res.groupby("a").count()) # works perfectly fine

Error Logs

```python-traceback ==== Original frame ==== 1st partition content: a b 0 1 3 1 1 4 2nd partition content: a b 2 2 5 3 2 6 ==== Query result ==== 1st partition content: Empty DataFrame Columns: [a, b] Index: [] 2nd partition content: a b 2 2 5 3 2 6 ==== Groupby object ==== group_frame_partitioning=(2, 1) by_column_partitioning=(1, 1) Traceback (most recent call last): File "t2.py", line 46, in print(grp_obj.count()) # index error due to partitioning missmatching File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/pandas/groupby.py", line 868, in count result = self._wrap_aggregation( File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/pandas/groupby.py", line 1150, in _wrap_aggregation query_compiler=qc_method( File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/core/dataframe/algebra/groupby.py", line 68, in return lambda *args, **kwargs: cls.caller( File "/localdisk/dchigare/repos/modin/modin/core/dataframe/algebra/groupby.py", line 357, in caller new_modin_frame = query_compiler._modin_frame.groupby_reduce( File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 126, in run_f_on_minimally_updated_metadata result = f(self, *args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 3107, in groupby_reduce new_partitions = self._partition_mgr_cls.groupby_reduce( File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 244, in groupby_reduce mapped_partitions = cls.broadcast_apply( File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 58, in wait result = func(cls, *args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 373, in broadcast_apply [ File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 374, in [ File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 380, in else rt_axis_parts[row_idx].list_of_blocks IndexError: list index out of range ```

Installed Versions

Replace this line with the output of pd.show_versions()
Egor-Krivov commented 1 year ago

Still suffering from this one

dchigarev commented 1 year ago

The #6307 only fixed some of the problematic cases.

Here are more that still trigger the same problem:

  1. The grouping frame doesn't have materialized partition shapes at the time of groupby:
import pandas
import modin.pandas as pd
from modin.test.storage_formats.pandas.test_internals import (
    construct_modin_df_by_scheme,
)

md_df = construct_modin_df_by_scheme(
    pandas_df=pandas.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]}),
    partitioning_scheme={"row_lengths": [2, 2], "column_widths": [2]},
)
md_res = md_df.query("a > 1")
grp_obj = md_res.groupby(md_res["a"])
print(grp_obj.count()) # fails with an assertion error
traceback ``` Traceback (most recent call last): File "t2.py", line 16, in print(grp_obj.count()) File "modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "modin/modin/pandas/groupby.py", line 1165, in count return self._wrap_aggregation( File "modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "modin/modin/pandas/groupby.py", line 1509, in _wrap_aggregation query_compiler=qc_method( File "modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "modin/modin/core/storage_formats/pandas/groupby.py", line 110, in method return map_reduce_method(query_compiler, *args, **kwargs) File "modin/modin/core/dataframe/algebra/groupby.py", line 86, in return lambda *args, **kwargs: cls.caller( File "modin/modin/core/dataframe/algebra/groupby.py", line 409, in caller new_modin_frame = query_compiler._modin_frame.groupby_reduce( File "modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "modin/modin/core/dataframe/pandas/dataframe/utils.py", line 376, in run_f_on_minimally_updated_metadata result = f(self, *args, **kwargs) File "modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 3619, in groupby_reduce new_partitions = self._partition_mgr_cls.groupby_reduce( File "modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 247, in groupby_reduce assert partitions.shape[axis] == by.shape[axis], ( AssertionError: the number of partitions along axis=0 is not equal: 1 != 2 ```
  1. the 'by' frame itself has empty partitions:
    
    import pandas
    import modin.pandas as pd
    from modin.test.storage_formats.pandas.test_internals import (
    construct_modin_df_by_scheme,
    )

md_df = construct_modin_df_by_scheme( pandas_df=pandas.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]}), partitioning_scheme={"row_lengths": [2, 2], "column_widths": [2]}, ) by_df = construct_modin_df_by_scheme( pandas_df=pandas.DataFrame({"a": [1, 1, 2, 2, None, None]}), partitioning_scheme={"row_lengths": [2, 2, 2], "column_widths": [1]}, ).squeeze()

grp_obj = md_df.groupby(by_df.dropna()) print(grp_obj.count()) # fails with an assertion error