modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.87k stars 651 forks source link

BUG: compare() check for equal index at API layer doesn't do anything #5699

Open mvashishtha opened 1 year ago

mvashishtha commented 1 year ago

Modin version checks

Reproducible Example

import modin.pandas as pd

print(pd.DataFrame([1, 2]).compare(pd.DataFrame([1])))

Issue Description

I get an error from a remote task that is doing a pandas compare.

Expected Behavior

I want the API layer to catch the error here, but compare_index has no effect when other is a modin dataframe or series.

Error Logs

```python-traceback --------------------------------------------------------------------------- RayTaskError(ValueError) Traceback (most recent call last) Input In [7], in () 1 import modin.pandas as pd ----> 3 print(pd.DataFrame([1, 2]).compare(pd.DataFrame([1]))) File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level) File ~/software_sources/modin/modin/pandas/base.py:3855, in BasePandasDataset.__str__(self) 3847 def __str__(self): # pragma: no cover 3848 """ 3849 Return str(self). 3850 (...) 3853 str 3854 """ -> 3855 return repr(self) File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level) File ~/software_sources/modin/modin/pandas/dataframe.py:232, in DataFrame.__repr__(self) 224 """ 225 Return a string representation for a particular ``DataFrame``. 226 (...) 229 str 230 """ 231 num_rows = pandas.get_option("display.max_rows") or len(self.index) --> 232 num_cols = pandas.get_option("display.max_columns") or len(self.columns) 233 result = repr(self._build_repr_df(num_rows, num_cols)) 234 if len(self.index) > num_rows or len(self.columns) > num_cols: 235 # The split here is so that we don't repr pandas row lengths. File ~/software_sources/modin/modin/pandas/base.py:3930, in BasePandasDataset.__getattribute__(self, item) 3916 @disable_logging 3917 def __getattribute__(self, item): 3918 """ 3919 Return item from the `BasePandasDataset`. 3920 (...) 3928 Any 3929 """ -> 3930 attr = super().__getattribute__(item) 3931 if item not in _DEFAULT_BEHAVIOUR and not self._query_compiler.lazy_execution: 3932 # We default to pandas on empty DataFrames. This avoids a large amount of 3933 # pain in underlying implementation and returns a result immediately rather 3934 # than dealing with the edge cases that empty DataFrames have. 3935 if callable(attr) and self.empty and hasattr(self._pandas_class, item): File ~/software_sources/modin/modin/pandas/dataframe.py:275, in DataFrame._get_columns(self) 266 def _get_columns(self): 267 """ 268 Get the columns for this ``DataFrame``. 269 (...) 273 The union of all indexes across the partitions. 274 """ --> 275 return self._query_compiler.columns File ~/software_sources/modin/modin/core/storage_formats/pandas/query_compiler.py:82, in _get_axis..(self) 80 return lambda self: self._modin_frame.index 81 else: ---> 82 return lambda self: self._modin_frame.columns File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:382, in PandasDataframe._get_columns(self) 373 """ 374 Get the columns from the cache object. 375 (...) 379 An index object containing the column labels. 380 """ 381 if self._columns_cache is None: --> 382 self._columns_cache, column_widths = self._compute_axis_labels_and_lengths( 383 1 384 ) 385 if self._column_widths_cache is None: 386 self._column_widths_cache = column_widths File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level) File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:459, in PandasDataframe._compute_axis_labels_and_lengths(self, axis, partitions) 457 if partitions is None: 458 partitions = self._partitions --> 459 new_index, internal_idx = self._partition_mgr_cls.get_indices(axis, partitions) 460 return new_index, list(map(len, internal_idx)) File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level) File ~/software_sources/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py:894, in PandasDataframePartitionManager.get_indices(cls, axis, partitions, index_func) 892 if len(target): 893 new_idx = [idx.apply(func) for idx in target[0]] --> 894 new_idx = cls.get_objects_from_partitions(new_idx) 895 else: 896 new_idx = [pandas.Index([])] File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level) File ~/software_sources/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py:117, in PandasOnRayDataframePartitionManager.get_objects_from_partitions(cls, partitions) 113 partitions[idx] = part.force_materialization() 114 assert all( 115 [len(partition.list_of_blocks) == 1 for partition in partitions] 116 ), "Implementation assumes that each partition contains a signle block." --> 117 return RayWrapper.materialize( 118 [partition.list_of_blocks[0] for partition in partitions] 119 ) File ~/software_sources/modin/modin/core/execution/ray/common/engine_wrapper.py:92, in RayWrapper.materialize(cls, obj_id) 77 @classmethod 78 def materialize(cls, obj_id): 79 """ 80 Get the value of object from the Plasma store. 81 (...) 90 Whatever was identified by `obj_id`. 91 """ ---> 92 return ray.get(obj_id) File ~/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook..wrapper(*args, **kwargs) 103 if func.__name__ != "init" or is_client_mode_enabled_by_default: 104 return getattr(ray, func.__name__)(*args, **kwargs) --> 105 return func(*args, **kwargs) File ~/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/ray/_private/worker.py:2289, in get(object_refs, timeout) 2287 worker.core_worker.dump_object_store_memory_usage() 2288 if isinstance(value, RayTaskError): -> 2289 raise value.as_instanceof_cause() 2290 else: 2291 raise value RayTaskError(ValueError): ray::_apply_func() (pid=79246, ip=127.0.0.1) At least one of the input arguments for this task could not be computed: ray.exceptions.RayTaskError: ray::_deploy_ray_func() (pid=79246, ip=127.0.0.1) File "/Users/maheshvashishtha/software_sources/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py", line 563, in _deploy_ray_func result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs) File "/Users/maheshvashishtha/software_sources/modin/modin/core/dataframe/pandas/partitioning/axis_partition.py", line 260, in deploy_func_between_two_axis_partitions result = func(lt_frame, rt_frame, *f_args, **f_kwargs) File "/Users/maheshvashishtha/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1592, in _tree_reduce_func series_result = func(df, *args, **kwargs) File "/Users/maheshvashishtha/software_sources/modin/modin/core/storage_formats/pandas/query_compiler.py", line 3441, in lambda left, right: pandas.DataFrame.compare( File "/Users/maheshvashishtha/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/pandas/core/frame.py", line 7850, in compare return super().compare( File "/Users/maheshvashishtha/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/pandas/core/generic.py", line 9216, in compare mask = ~((self == other) | (self.isna() & other.isna())) File "/Users/maheshvashishtha/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/pandas/core/ops/common.py", line 72, in new_method return method(self, other) File "/Users/maheshvashishtha/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/pandas/core/arraylike.py", line 42, in __eq__ return self._cmp_method(other, operator.eq) File "/Users/maheshvashishtha/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/pandas/core/frame.py", line 7579, in _cmp_method self, other = ops.align_method_FRAME(self, other, axis, flex=False, level=None) File "/Users/maheshvashishtha/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/pandas/core/ops/__init__.py", line 289, in align_method_FRAME raise ValueError( ValueError: Can only compare identically-labeled DataFrame objects ```

Installed Versions

UserWarning: Setuptools is replacing distutils. INSTALLED VERSIONS ------------------ commit : f4ed1c803c1d0139c5f731f6d723252025645134 python : 3.10.6.final.0 python-bits : 64 OS : Darwin OS-release : 21.5.0 Version : Darwin Kernel Version 21.5.0: Tue Apr 26 21:08:22 PDT 2022; root:xnu-8020.121.3~4/RELEASE_X86_64 machine : x86_64 processor : i386 byteorder : little LC_ALL : None LANG : en_US.UTF-8 LOCALE : en_US.UTF-8 Modin dependencies ------------------ modin : 0.18.0+92.gf4ed1c803 ray : 2.1.0 dask : 2022.7.1 distributed : 2022.7.1 hdk : None pandas dependencies ------------------- pandas : 1.5.3 numpy : 1.23.2 pytz : 2022.2.1 dateutil : 2.8.2 setuptools : 63.4.1 pip : 22.3.1 Cython : None pytest : 7.1.2 hypothesis : None sphinx : 4.5.0 blosc : None feather : 0.4.1 xlsxwriter : None lxml.etree : 4.9.1 html5lib : None pymysql : None psycopg2 : None jinja2 : 3.1.2 IPython : 8.4.0 pandas_datareader: None bs4 : 4.11.1 bottleneck : None brotli : 1.0.9 fastparquet : 0.8.1 fsspec : 2022.7.1 gcsfs : None matplotlib : 3.5.2 numba : None numexpr : 2.8.3 odfpy : None openpyxl : 3.0.10 pandas_gbq : 0.17.7 pyarrow : 8.0.0 pyreadstat : None pyxlsb : None s3fs : 2022.7.1 scipy : 1.9.0 snappy : None sqlalchemy : 1.4.39 tables : 3.7.0 tabulate : None xarray : 2022.6.0 xlrd : 2.0.1 xlwt : None zstandard : None tzdata : None
anmyachev commented 1 year ago

In this case, asynchronous execution will degrade.

Why do you want to move the check to the main process?

mvashishtha commented 1 year ago

@anmyachev we need a synchronous check for equality so we can throw a ValueError if the indexes don't match on either axis:

import pandas as pd

pd.DataFrame([[1]]).compare(pd.DataFrame([[1, 2]]))

ValueError: Can only compare identically-labeled (both index and columns) DataFrame objects

We shouldn't require every query compiler to do this synchronous check.

sfc-gh-mvashishtha commented 4 months ago

Also, we should add this check to the API layer because the error message depends on the type of the API-layer object. For Series, we have ValueError: Can only compare identically-labeled Series objects, but for dataframe, we have ValueError: Can only compare identically-labeled (both index and columns) DataFrame objects.