modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.64k stars 652 forks source link

BUG: `groupby.get_group()` doesn't work with the reshuffling groupby implementation #6093

Open dchigarev opened 1 year ago

dchigarev commented 1 year ago

Modin version checks

Reproducible Example

import modin.pandas as pd
import modin.config as cfg

cfg.ExperimentalGroupbyImpl.put(True)

df = pd.concat(
    [
        pd.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]}),
        pd.DataFrame({"a": [3, 3, 4, 4], "b": [6, 7, 8, 9]}),
    ]
)
assert df._query_compiler._modin_frame._partitions.shape == (2, 1)

print(df.groupby("a").get_group(1)) # KeyError: 1

Issue Description

The problem occurs because we dispatch the .get_group(x) kernel to every partition when in actuality there's only one partition that may contain the X group and all the others will raise a KeyError.

Expected Behavior

Work as pandas

Error Logs

```python-traceback Traceback (most recent call last): File "t3.py", line 14, in print(df.groupby("a").get_group(1)) File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/pandas/base.py", line 3963, in __str__ return repr(self) File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/pandas/dataframe.py", line 246, in __repr__ result = repr(self._build_repr_df(num_rows, num_cols)) File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/pandas/base.py", line 185, in _build_repr_df if len(self.index) == 0 or (self._is_dataframe and len(self.columns) == 0): File "/localdisk/dchigare/repos/modin/modin/pandas/base.py", line 4038, in __getattribute__ attr = super().__getattribute__(item) File "/localdisk/dchigare/repos/modin/modin/pandas/base.py", line 591, in _get_index return self._query_compiler.index File "/localdisk/dchigare/repos/modin/modin/core/storage_formats/pandas/query_compiler.py", line 87, in return lambda self: self._modin_frame.index File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 484, in _get_index index, row_lengths = self._compute_axis_labels_and_lengths(0) File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 588, in _compute_axis_labels_and_lengths new_index, internal_idx = self._partition_mgr_cls.get_indices(axis, partitions) File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 926, in get_indices new_idx = cls.get_objects_from_partitions(new_idx) File "/localdisk/dchigare/repos/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 867, in get_objects_from_partitions return cls._execution_wrapper.materialize( File "/localdisk/dchigare/repos/modin/modin/core/execution/ray/common/engine_wrapper.py", line 92, in materialize return ray.get(obj_id) File "/localdisk/dchigare/miniconda3/envs/modinc/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper return func(*args, **kwargs) File "/localdisk/dchigare/miniconda3/envs/modinc/lib/python3.8/site-packages/ray/_private/worker.py", line 2380, in get raise value.as_instanceof_cause() ray.exceptions.RayTaskError(KeyError): ray::_apply_func() (pid=876479, ip=10.34.123.21) At least one of the input arguments for this task could not be computed: ray.exceptions.RayTaskError: ray::_deploy_ray_func() (pid=876479, ip=10.34.123.21) File "/localdisk/dchigare/repos/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py", line 618, in _deploy_ray_func result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs) File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/partitioning/axis_partition.py", line 263, in deploy_axis_func result = func(dataframe, *f_args, **f_kwargs) File "/localdisk/dchigare/repos/modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 3444, in apply_func return operator(df.groupby(by, **kwargs)) File "/localdisk/dchigare/repos/modin/modin/core/storage_formats/pandas/query_compiler.py", line 3289, in operator=lambda grp: agg_func(grp, *agg_args, **agg_kwargs), File "/localdisk/dchigare/miniconda3/envs/modinc/lib/python3.8/site-packages/pandas/core/groupby/generic.py", line 895, in aggregate result = op.agg() File "/localdisk/dchigare/miniconda3/envs/modinc/lib/python3.8/site-packages/pandas/core/apply.py", line 169, in agg return self.apply_str() File "/localdisk/dchigare/miniconda3/envs/modinc/lib/python3.8/site-packages/pandas/core/apply.py", line 580, in apply_str return self._try_aggregate_string_function(obj, f, *self.args, **self.kwargs) File "/localdisk/dchigare/miniconda3/envs/modinc/lib/python3.8/site-packages/pandas/core/apply.py", line 649, in _try_aggregate_string_function return f(*args, **kwargs) File "/localdisk/dchigare/miniconda3/envs/modinc/lib/python3.8/site-packages/pandas/core/groupby/groupby.py", line 817, in get_group raise KeyError(name) KeyError: 1 ```

Installed Versions

Replace this line with the output of pd.show_versions()
dchigarev commented 6 months ago

p2 because of low demand for this operation, we can always fall back to the old implementation