modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.74k stars 650 forks source link

BUG: nlargest raise KeyError in Ray when some column partitions are missing a sort column #4164

Open labanyamukhopadhyay opened 2 years ago

labanyamukhopadhyay commented 2 years ago

Describe the problem

There is an issue with nlargest function that results in ray.exceptions.RayTaskError: ray::deploy_ray_func() due to columns parameter in nlargest.

Source code

#saleprice correlation matrix
corrmat = df_train.corr()
k = 10 #number of variables for heatmap
cols = corrmat.nlargest(k, 'SalePrice')['SalePrice'].index
cm = np.corrcoef(df_train[cols].values.T)
sns.set(font_scale=1.25)
hm = sns.heatmap(cm, cbar=True, annot=True, square=True, fmt='.2f', annot_kws={'size': 10}, yticklabels=cols.values, xticklabels=cols.values)
plt.show()

Error message

---------------------------------------------------------------------------
RayTaskError(KeyError)                    Traceback (most recent call last)
/var/folders/qj/jybppsbd2jl75s8y2q8s2xx80000gn/T/ipykernel_39687/912778826.py in <module>
      1 #saleprice correlation matrix
      2 k = 10 #number of variables for heatmap
----> 3 cols = corrmat.nlargest(k, 'SalePrice')['SalePrice'].index
      4 cm = np.corrcoef(df_train[cols].values.T)
      5 sns.set(font_scale=1.25)

~/Desktop/modin/modin/pandas/dataframe.py in nlargest(self, n, columns, keep)
   1471         Return the first `n` rows ordered by `columns` in descending order.
   1472         """
-> 1473         return DataFrame(query_compiler=self._query_compiler.nlargest(n, columns, keep))
   1474 
   1475     def nsmallest(self, n, columns, keep="first"):  # noqa: PR01, RT01, D200

~/Desktop/modin/modin/core/storage_formats/pandas/query_compiler.py in nlargest(self, *args, **kwargs)
   1774 
   1775     def nlargest(self, *args, **kwargs):
-> 1776         return self._nsort(sort_type="nlargest", *args, **kwargs)
   1777 
   1778     def eval(self, expr, **kwargs):

~/Desktop/modin/modin/core/storage_formats/pandas/query_compiler.py in _nsort(self, n, columns, keep, sort_type)
   1766 
   1767         new_modin_frame = self._modin_frame.apply_full_axis(
-> 1768             axis=0, func=map_func, new_columns=new_columns
   1769         )
   1770         return self.__constructor__(new_modin_frame)

~/Desktop/modin/modin/core/dataframe/pandas/dataframe/dataframe.py in run_f_on_minimally_updated_metadata(self, *args, **kwargs)
    105                 elif apply_axis == "rows":
    106                     obj._propagate_index_objs(axis=0)
--> 107             result = f(self, *args, **kwargs)
    108             if apply_axis is None and not transpose:
    109                 result._deferred_index = self._deferred_index

~/Desktop/modin/modin/core/dataframe/pandas/dataframe/dataframe.py in apply_full_axis(self, axis, func, new_index, new_columns, dtypes)
   1584             new_columns=new_columns,
   1585             dtypes=dtypes,
-> 1586             other=None,
   1587         )
   1588 

~/Desktop/modin/modin/core/dataframe/pandas/dataframe/dataframe.py in run_f_on_minimally_updated_metadata(self, *args, **kwargs)
    105                 elif apply_axis == "rows":
    106                     obj._propagate_index_objs(axis=0)
--> 107             result = f(self, *args, **kwargs)
    108             if apply_axis is None and not transpose:
    109                 result._deferred_index = self._deferred_index

~/Desktop/modin/modin/core/dataframe/pandas/dataframe/dataframe.py in broadcast_apply_full_axis(self, axis, func, other, new_index, new_columns, apply_indices, enumerate_partitions, dtypes)
   2009             if new_axis is None
   2010             else new_axis
-> 2011             for i, new_axis in enumerate([new_index, new_columns])
   2012         ]
   2013         if dtypes == "copy":

~/Desktop/modin/modin/core/dataframe/pandas/dataframe/dataframe.py in <listcomp>(.0)
   2009             if new_axis is None
   2010             else new_axis
-> 2011             for i, new_axis in enumerate([new_index, new_columns])
   2012         ]
   2013         if dtypes == "copy":

~/Desktop/modin/modin/core/dataframe/pandas/dataframe/dataframe.py in _compute_axis_labels(self, axis, partitions)
    404             partitions = self._partitions
    405         return self._partition_mgr_cls.get_indices(
--> 406             axis, partitions, lambda df: df.axes[axis]
    407         )
    408 

~/Desktop/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py in get_indices(cls, axis, partitions, index_func)
    135                 else []
    136             )
--> 137         new_idx = ray.get(new_idx)
    138         return new_idx[0].append(new_idx[1:]) if len(new_idx) else new_idx
    139 

~/opt/anaconda3/envs/modinenv/lib/python3.7/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
    103             if func.__name__ != "init" or is_client_mode_enabled_by_default:
    104                 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105         return func(*args, **kwargs)
    106 
    107     return wrapper

~/opt/anaconda3/envs/modinenv/lib/python3.7/site-packages/ray/worker.py in get(object_refs, timeout)
   1711                     worker.core_worker.dump_object_store_memory_usage()
   1712                 if isinstance(value, RayTaskError):
-> 1713                     raise value.as_instanceof_cause()
   1714                 else:
   1715                     raise value

RayTaskError(KeyError): ray::apply_func() (pid=39726, ip=127.0.0.1)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::deploy_ray_func() (pid=39726, ip=127.0.0.1)
  File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
  File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'SalePrice'

The above exception was the direct cause of the following exception:

ray::deploy_ray_func() (pid=39726, ip=127.0.0.1)
  File "/Users/labanyamukhopadhyay/Desktop/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/axis_partition.py", line 211, in deploy_ray_func
    result = func(*args)
  File "/Users/labanyamukhopadhyay/Desktop/modin/modin/core/dataframe/pandas/partitioning/axis_partition.py", line 160, in deploy_axis_func
    result = func(dataframe, **kwargs)
  File "/Users/labanyamukhopadhyay/Desktop/modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1302, in _map_reduce_func
    series_result = func(df, *args, **kwargs)
  File "/Users/labanyamukhopadhyay/Desktop/modin/modin/core/storage_formats/pandas/query_compiler.py", line 1759, in map_func
    df, n=n, columns=columns, keep=keep
  File "/Users/labanyamukhopadhyay/opt/anaconda3/envs/modinenv/lib/python3.7/site-packages/pandas/core/frame.py", line 6636, in nlargest
    return algorithms.SelectNFrame(self, n=n, keep=keep, columns=columns).nlargest()
  File "/Users/labanyamukhopadhyay/opt/anaconda3/envs/modinenv/lib/python3.7/site-packages/pandas/core/algorithms.py", line 1221, in nlargest
    return self.compute("nlargest")
  File "/Users/labanyamukhopadhyay/opt/anaconda3/envs/modinenv/lib/python3.7/site-packages/pandas/core/algorithms.py", line 1349, in compute
    dtype = frame[column].dtype
  File "/Users/labanyamukhopadhyay/opt/anaconda3/envs/modinenv/lib/python3.7/site-packages/pandas/core/frame.py", line 3458, in __getitem__
    indexer = self.columns.get_loc(key)
  File "/Users/labanyamukhopadhyay/opt/anaconda3/envs/modinenv/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
    raise KeyError(key) from err
KeyError: 'SalePrice'
mvashishtha commented 2 years ago

@labanyamukhopadhyay thank you for reporting this! It's hard to tell just from the stack trace what is going wrong in Modin. Would you be able to create a minimal reproducible example of the Modin error, including any data required to reproduce the error? It's ideal to have a snippet that works in Modin, but not in pandas.

labanyamukhopadhyay commented 2 years ago

@mvashishtha hopefully this is enough to reproduce the error! The following works in pandas but not modin

Code to reproduce error:

import modin.pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import warnings
warnings.filterwarnings('ignore')
%matplotlib inline
df_train = pd.read_csv('../examples/data/train.csv')
corrmat = df_train.corr()
k = 10 #number of variables for heatmap
cols = corrmat.nlargest(k, 'SalePrice')['SalePrice'].index

Training Data:

train.csv

mvashishtha commented 2 years ago

@labanyamukhopadhyay thank you! By the way, if you use ```python instead of just ``` to demarcate your Python code in Github markdown, your code will show up as Python!

I can reproduce the bug with nlargest on Modin version 0.13.0+14.g99e8507c. Here's a short, self-contained example:

import modin.pandas as pd
from modin.config import MinPartitionSize
# Compute nlargest with a single column partition.
pd.DataFrame([[1] * (MinPartitionSize.get())]).nlargest(1, columns=0)
# Try to compute nlargest with two column partitions. KeyError: 0
pd.DataFrame([[1] * (MinPartitionSize.get() + 1)]).nlargest(1, columns=0)

The problem is that Modin tries to parallelize the row sort by splitting it up among column partitions, but not every partition will have access to the columns needed to do the sort. In the above example, in the first case, all the columns are in a single partition, but in the second, one of the two partitions is missing the first column, 0. We could fix this by adding the sort columns into each partition, doing the sort, and then removing the sort columns. If that's too complicated, we can default to pandas instead. If we had a good way to parallelize sort along the sorting axis as in #3535, we could use that to speed up nlargest. @modin-project/modin-contributors , any thoughts?

devin-petersohn commented 2 years ago

So it looks like the assumption in our implementation is that nlargest is run independently on each column. Marked for the next release.

dorisjlee commented 2 years ago

I also ran into the same RayTaskError(KeyError) error today with the latest 0.15 code.

In addition, if we do nlargest on pd.Series, it defaults to pandas implementation, even though the docs states that nlargest is supported for pd.Series.

Screen Shot 2022-06-08 at 4 12 37 PM

Reproducible Example:

import modin.pandas as pd
filename = "data/synthetic_100_1000.parquet"
df = pd.read_parquet(filename)
df.nlargest(5,"col_0") # RayTaskError

df["col_0"].nlargest(5) # default to pandas

synthetic_100_1000.parquet.zip

mvashishtha commented 2 years ago

Note that #3435 has a similar root cause: when doing a distributed groupby, not all partitions have the keys they need to group by.

eromoe commented 1 year ago

I got key error too, nlargest is common in use.