modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.91k stars 653 forks source link

BUG: operations with NPartitions not a power of 2 fails in some cases. BUG IN PARTITON_MANAGER?. possible solution in here! #7383

Closed Liquidmasl closed 2 months ago

Liquidmasl commented 2 months ago

Modin version checks

Reproducible Example

I was am really struggling creating a reproducer, all the data I create does not lead to the error. I will further try to find artificial data that works.

I have a pointcloud that I save in 75 partitions to parquets

When I load it again (using a fresh process), i get issues


with context(NPartitions=75):
    df = pd.read_parquet(r'input_output/20240904-081345/preprocessor_poloplast_0-0-0.parquet')
print(df.max())

leads to

  File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 873, in map_partitions_joined_by_column
    result[i : i + step, j] = joined_column_partitions[j].apply(
    ~~~~~~^^^^^^^^^^^^^^^^^
ValueError: could not broadcast input array from shape (2,) into shape (1,)

with context(NPartitions=50):
    df = pd.read_parquet(r'input_output/20240904-081345/preprocessor_poloplast_0-0-0.parquet')
print(df.max())

works fine

Setting MinColumnsPerPartition to something larger then the amount of columns I have (its just 15 columns) nothing changes.

Issue Description

using partitions numbers that are no power of 2 can lead to issues. Details below

Expected Behavior

I would like the operations to not fail with the given error

Error Logs

```python-traceback Traceback (most recent call last): File "/home/cirqular/PycharmProjects/cirqular_mono_repo/archive/debug.py", line 42, in df = pd.read_parquet(r'input_output/20240904-081345/preprocessor_poloplast_0-0-0.parquet') ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/utils.py", line 591, in wrapped return func(*params.args, **params.kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log return obj(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/pandas/io.py", line 325, in read_parquet query_compiler=FactoryDispatcher.read_parquet( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/execution/dispatching/factories/dispatcher.py", line 202, in read_parquet return cls.get_factory()._read_parquet(**kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/execution/dispatching/factories/factories.py", line 257, in _read_parquet return cls.io_cls.read_parquet(**kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log return obj(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/io/file_dispatcher.py", line 165, in read if not AsyncReadMode.get() and hasattr(query_compiler, "dtypes"): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 329, in dtypes return self._modin_frame.dtypes ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 424, in dtypes dtypes = self._dtypes.get() ^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/metadata/dtypes.py", line 928, in get self._value = self._value.to_series() ^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/metadata/dtypes.py", line 440, in to_series self.materialize() File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/metadata/dtypes.py", line 405, in materialize self._materialize_cols_with_unknown_dtypes() File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/metadata/dtypes.py", line 391, in _materialize_cols_with_unknown_dtypes self._known_dtypes.update(self._parent_df._compute_dtypes(subset)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log return obj(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 480, in _compute_dtypes obj.tree_reduce(0, lambda df: df.dtypes, dtype_builder) File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log return obj(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/dataframe/utils.py", line 753, in run_f_on_minimally_updated_metadata result = f(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 2217, in tree_reduce map_parts = self._partition_mgr_cls.map_partitions(self._partitions, map_func) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/execution/modin_aqp.py", line 165, in magic result_parts = f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log return obj(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 73, in wait result = func(cls, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 698, in map_partitions new_partitions = cls.map_partitions_joined_by_column( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/logging/logger_decorator.py", line 144, in run_and_log return obj(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^ File "/home/cirqular/anaconda3/envs/lightning/lib/python3.11/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 873, in map_partitions_joined_by_column result[i : i + step, j] = joined_column_partitions[j].apply( ~~~~~~^^^^^^^^^^^^^^^^^ ValueError: could not broadcast input array from shape (2,) into shape (1,) ```

Installed Versions

INSTALLED VERSIONS ------------------ commit : c8bbca8e4e00c681370e3736b2f73bb0352408c3 python : 3.11.9.final.0 python-bits : 64 OS : Linux OS-release : 6.8.0-40-generic Version : #40~22.04.3-Ubuntu SMP PREEMPT_DYNAMIC Tue Jul 30 17:30:19 UTC 2 machine : x86_64 processor : x86_64 byteorder : little LC_ALL : None LANG : en_US.UTF-8 LOCALE : en_US.UTF-8 Modin dependencies ------------------ modin : 0.31.0 ray : 2.24.0 dask : 2024.6.0 distributed : 2024.6.0 pandas dependencies ------------------- pandas : 2.2.2 numpy : 1.26.3 pytz : 2024.1 dateutil : 2.9.0.post0 setuptools : 69.5.1 pip : 24.1.2 Cython : None pytest : None hypothesis : None sphinx : None blosc : None feather : None xlsxwriter : None lxml.etree : None html5lib : None pymysql : None psycopg2 : None jinja2 : 3.1.3 IPython : 8.25.0 pandas_datareader : None adbc-driver-postgresql: None adbc-driver-sqlite : None bs4 : 4.12.3 bottleneck : None dataframe-api-compat : None fastparquet : None fsspec : 2024.2.0 gcsfs : None matplotlib : 3.8.4 numba : None numexpr : None odfpy : None openpyxl : None pandas_gbq : None pyarrow : 16.1.0 pyreadstat : None python-calamine : None pyxlsb : None s3fs : None scipy : 1.14.0 sqlalchemy : 2.0.32 tables : None tabulate : None xarray : None xlrd : None zstandard : None tzdata : 2024.1 qtpy : None pyqt5 : None
Liquidmasl commented 2 months ago

This method, in partition_manager seams to be the issue:

    @classmethod
    def map_partitions_joined_by_column(
        cls,
        partitions,
        column_splits,
        map_func,
        map_func_args=None,
        map_func_kwargs=None,
    ):
        """
        Combine several blocks by column into one virtual partition and apply "map_func" to them.

        Parameters
        ----------
        partitions : NumPy 2D array
            Partitions of Modin Frame.
        column_splits : int
            The number of splits by column.
        map_func : callable
            Function to apply.
        map_func_args : iterable, optional
            Positional arguments for the 'map_func'.
        map_func_kwargs : dict, optional
            Keyword arguments for the 'map_func'.

        Returns
        -------
        NumPy array
            An array of new partitions for Modin Frame.
        """
        if column_splits < 1:
            raise ValueError(
                "The value of columns_splits must be greater than or equal to 1."
            )
        # step cannot be less than 1
        step = max(partitions.shape[0] // column_splits, 1)
        preprocessed_map_func = cls.preprocess_func(map_func)
        kw = {
            "num_splits": step,
        }
        result = np.empty(partitions.shape, dtype=object)
        for i in range(
            0,
            partitions.shape[0],
            step,
        ):
            joined_column_partitions = cls.column_partitions(partitions[i : i + step])
            for j in range(partitions.shape[1]):
                result[i : i + step, j] = joined_column_partitions[j].apply(
                    preprocessed_map_func,
                    *map_func_args if map_func_args is not None else (),
                    **kw,
                    **map_func_kwargs if map_func_kwargs is not None else {},
                )

        return result

So when partitions.shape is (75,1) when in the outer loop i = 74, step = 2 (I dont know where step comes from) this returnes a list with is 2 long:

joined_column_partitions[j].apply(
                    preprocessed_map_func,
                    *map_func_args if map_func_args is not None else (),
                    **kw,
                    **map_func_kwargs if map_func_kwargs is not None else {},
                )

while this just wants 1 element:

result[i : i + step, j] 

which means in this case...

result[74 : 75, 0]  # where this just means ..
result[74,0]  # because the shape[0] is just 75

This all seams to happen because in map_partitions

        if np.prod(partitions.shape) <= 1.5 * CpuCount.get():

this evaluates to false when i go above 64 partitions, so the behaviour changes.

Still I fail to see how I can fix this. I dont need nor want column partition, as we just have very little columns. This Completely blocks my progress currently, and I am at a bit of a loss.

Liquidmasl commented 2 months ago

back to the partition_manager

            joined_column_partitions = cls.column_partitions(partitions[i : i + step])
            for j in range(partitions.shape[1]):
                result[i : i + step, j] = joined_column_partitions[j].apply(
                    preprocessed_map_func,
                    *map_func_args if map_func_args is not None else (),
                    **kw,
                    **map_func_kwargs if map_func_kwargs is not None else {},
                )

I fund something I dont quite understand

partitions[i : i + step].shape = (1,1) this makes sense joined_column_partitions is just 1 element, which makes sense. joined_column_partitions[j].apply(...) returnes 2 elements, which kinda doesnt make sense to me. And it also breaks.

Makes me nuts. I need to solve this but I dont understand

Liquidmasl commented 2 months ago

So it seams the issue is that the apply function gets the number of splits with the **kw parameter.

this is set right before with:

kw = {
            "num_splits" : step
        }

This is an issue because in case of step 2 and 75 partitions, the last piece will just be 1 element, as noted in the previous comment.

changing it to

 kw = {
                "num_splits": len(partitions[i : i + step]),
            }

The issue does not appear.

But I cant see how I can apply that fix for my pipeline now.... Without pulling and building from source.. right? I would highly appreciate help from someone who knows whatsup here haha

allemp commented 2 months ago

I am also affected by this issue. In my case I'm using awswrangler and a ray cluster and this has been very difficult to reproduce. It seems that it's sensitive to partitions and only happens sometimes.

Liquidmasl commented 2 months ago

I thought i could help myself out by just using even number of partitions, but that does only help for small number of partitions. using 266 partitions I get

  File "/usr/local/lib/python3.11/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 873, in map_partitions_joined_by_column
2024-09-19T10:57:22.379431833Z     result[i : i + step, j] = joined_column_partitions[j].apply(
2024-09-19T10:57:22.379433676Z     ~~~~~~^^^^^^^^^^^^^^^^^
2024-09-19T10:57:22.379435480Z ValueError: could not broadcast input array from shape (8,) into shape (2,)

This is a mayor issue, as it completely undermines the whole purpose of the package. I need to be able to set partition sizes without it crashing...

Liquidmasl commented 2 months ago

So it seams the issue is that the apply function gets the number of splits with the **kw parameter.

this is set right before with:

kw = {
            "num_splits" : step
        }

This is an issue because in case of step 2 and 75 partitions, the last piece will just be 1 element, as noted in the previous comment.

changing it to

 kw = {
                "num_splits": len(partitions[i : i + step]),
            }

The issue does not appear.

But I cant see how I can apply that fix for my pipeline now.... Without pulling and building from source.. right? I would highly appreciate help from someone who knows whatsup here haha

Could some collaborator look into this solution if it would be viable for a hotfix? To my (somone who has to little insight in the inner workings) it looks like a simple bug with simpler solution...

anmyachev commented 2 months ago

Hi @Liquidmasl! Thanks for researching the problem. I opened https://github.com/modin-project/modin/pull/7399 to fix this problem. Will you be able to upgrade to the new version of Modin?

Liquidmasl commented 2 months ago

Hi @Liquidmasl! Thanks for researching the problem. I opened #7399 to fix this problem. Will you be able to upgrade to the new version of Modin?

I cannot tell you how hapy you are making me right now!

I will be able to upgrade to the new version immediatly yes!

Liquidmasl commented 2 months ago

Thank you very much for the super fast response time here, amazing!

I suppose it might be some time until the next release, so until then I will try and install directly from github, lets see how successful I am

Thanks again!