modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.87k stars 651 forks source link

Modin fails to load csv from s3 with ray client #2688

Closed Bhavya6187 closed 2 years ago

Bhavya6187 commented 3 years ago

System information

import ray
import os
import ray.util
ray.util.connect("<service_ip>:50051")
import modin.pandas as pd
pd.DEFAULT_NPARTITIONS = 10
df = pd.read_csv("s3://<bucket>/HIGGS_100k.csv")

Describe the problem

Modin fails to load csv from s3 with ray client and throws an error.

Source code / logs

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-10-9b3c648a226d> in <module>()
----> 1 df = pd.read_csv("s3://<s3_bucket>/HIGGS_100k.csv")

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/pandas/io.py in parser_func(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, escapechar, comment, encoding, dialect, error_bad_lines, warn_bad_lines, skipfooter, doublequote, delim_whitespace, low_memory, memory_map, float_precision)
    114 
    115         kwargs = {k: v for k, v in f_locals.items() if k in _pd_read_csv_signature}
--> 116         return _read(**kwargs)
    117 
    118     return parser_func

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/pandas/io.py in _read(**kwargs)
    133 
    134     Engine.subscribe(_update_engine)
--> 135     pd_obj = EngineDispatcher.read_csv(**kwargs)
    136     # This happens when `read_csv` returns a TextFileReader object for iterating through
    137     if isinstance(pd_obj, pandas.io.parsers.TextFileReader):

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/data_management/factories/dispatcher.py in read_csv(cls, **kwargs)
    102     @classmethod
    103     def read_csv(cls, **kwargs):
--> 104         return cls.__engine._read_csv(**kwargs)
    105 
    106     @classmethod

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/data_management/factories/factories.py in _read_csv(cls, **kwargs)
     85     @classmethod
     86     def _read_csv(cls, **kwargs):
---> 87         return cls.io_cls.read_csv(**kwargs)
     88 
     89     @classmethod

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/base/io/file_dispatcher.py in read(cls, *args, **kwargs)
     27     @classmethod
     28     def read(cls, *args, **kwargs):
---> 29         query_compiler = cls._read(*args, **kwargs)
     30         # TODO (devin-petersohn): Make this section more general for non-pandas kernel
     31         # implementations.

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/base/io/text/csv_dispatcher.py in _read(cls, filepath_or_buffer, **kwargs)
    192         dtypes = cls.get_dtypes(dtypes_ids) if len(dtypes_ids) > 0 else None
    193 
--> 194         partition_ids = cls.build_partition(partition_ids, row_lengths, column_widths)
    195         # If parse_dates is present, the column names that we have might not be
    196         # the same length as the returned column names. If we do need to modify

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/base/io/text/text_file_dispatcher.py in build_partition(cls, partition_ids, row_lengths, column_widths)
     51                     for j in range(len(partition_ids[i]))
     52                 ]
---> 53                 for i in range(len(partition_ids))
     54             ]
     55         )

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/base/io/text/text_file_dispatcher.py in <listcomp>(.0)
     51                     for j in range(len(partition_ids[i]))
     52                 ]
---> 53                 for i in range(len(partition_ids))
     54             ]
     55         )

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/base/io/text/text_file_dispatcher.py in <listcomp>(.0)
     49                         width=column_widths[j],
     50                     )
---> 51                     for j in range(len(partition_ids[i]))
     52                 ]
     53                 for i in range(len(partition_ids))

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/partition.py in __init__(self, object_id, length, width, ip, call_queue)
     25 class PandasOnRayFramePartition(BaseFramePartition):
     26     def __init__(self, object_id, length=None, width=None, ip=None, call_queue=None):
---> 27         assert type(object_id) is ray.ObjectID
     28 
     29         self.oid = object_id

AssertionError:
devin-petersohn commented 3 years ago

Thanks @Bhavya6187! Ray 2.0 will deprecate the ray.ObjectID name in favor of ray.ObjectRef. This name change is currently supported in the latest stable release of Ray.

shossain commented 3 years ago

@devin-petersohn: I installed the latest version of modin (0.8.3+41.g5cb3283) from the master. I am getting a similar error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/pandas/io.py", line 134, in read_csv
    return _read(**kwargs)
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/pandas/io.py", line 60, in _read
    pd_obj = EngineDispatcher.read_csv(**kwargs)
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/data_management/factories/dispatcher.py", line 104, in read_csv
    return cls.__engine._read_csv(**kwargs)
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/data_management/factories/factories.py", line 87, in _read_csv
    return cls.io_cls.read_csv(**kwargs)
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/engines/base/io/file_dispatcher.py", line 29, in read
    query_compiler = cls._read(*args, **kwargs)
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/engines/base/io/text/csv_dispatcher.py", line 30, in _read
    return cls.single_worker_read(filepath_or_buffer, **kwargs)
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/backends/pandas/parsers.py", line 87, in single_worker_read
    return cls.query_compiler_cls.from_pandas(pandas_frame, cls.frame_cls)
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 209, in from_pandas
    return cls(data_cls.from_pandas(df))
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/engines/base/frame/data.py", line 2032, in from_pandas
    new_frame, new_lengths, new_widths = cls._frame_mgr_cls.from_pandas(df, True)
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/engines/base/frame/partition_manager.py", line 580, in from_pandas
    for i in range(0, len(df), row_chunksize)
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/engines/base/frame/partition_manager.py", line 580, in <listcomp>
    for i in range(0, len(df), row_chunksize)
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/engines/base/frame/partition_manager.py", line 578, in <listcomp>
    for j in range(0, len(df.columns), col_chunksize)
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/partition.py", line 148, in put
    return PandasOnRayFramePartition(ray.put(obj), len(obj.index), len(obj.columns))
  File "/Users/shossain/anaconda3/envs/py37/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/partition.py", line 27, in __init__
    assert type(object_id) is ray.ObjectRef

Here is the snippet:

import ray
import ray.util
ray.util.connect('<Host>:<Port>')

import modin
import modin.pandas as pd

columns_names = [
        "trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag",
        "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude",
        "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount",
        "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type",
        "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall",
        "max_temperature", "min_temperature", "average_wind_speed", "pickup_nyct2010_gid",
        "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010",
        "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma",
        "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname",
        "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode",
        "dropoff_ntaname", "dropoff_puma",
    ]
parse_dates=["pickup_datetime", "dropoff_datetime"]

df = pd.read_csv('https://modin-datasets.s3.amazonaws.com/trips_data.csv', names=columns_names,
                    header=None, parse_dates=parse_dates)
devin-petersohn commented 3 years ago

@shossain thanks for the follow up, it appears that Ray has separate objects for the client. I have opened an issue (ray-project/ray#14042) to track it.

shossain commented 3 years ago

@devin-petersohn: is there a combination of Ray and Modin versions that is sure to work? I am trying to load some data and run some simple analysis.

devin-petersohn commented 3 years ago

The issue is the Client API, which is new. If you create and connect to a Ray cluster without the Client, it will work.

shossain commented 3 years ago

@devin-petersohn: I tried to submit the following script to an existing cluster using ray submit:

import ray
ray.init(address='auto', _redis_password='5241590000000000')

import modin.pandas as pd

columns_names = [
        "trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag",
        "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude",
        "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount",
        "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type",
        "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall",
        "max_temperature", "min_temperature", "average_wind_speed", "pickup_nyct2010_gid",
        "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010",
        "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma",
        "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname",
        "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode",
        "dropoff_ntaname", "dropoff_puma",
    ]

df = pd.read_csv('https://modin-datasets.s3.amazonaws.com/trips_data.csv', names=columns_names)

def q1(df):
    return df.groupby("cab_type")["cab_type"].count()
print(df)      # Works fine
print(q1(df))  # Throws exception

But, I am getting the following exception:

(raylet) [2021-02-22 12:17:09,584 C 4100 4100] pull_manager.cc:100:  Check failed: active_object_pull_requests_[obj_id].erase(request_it->first) 
(raylet) [2021-02-22 12:17:09,584 E 4100 4100] logging.cc:435: *** Aborted at 1614025029 (unix time) try "date -d @1614025029" if you are using GNU date ***
(raylet) [2021-02-22 12:17:09,584 E 4100 4100] logging.cc:435: PC: @                0x0 (unknown)
(raylet) [2021-02-22 12:17:09,593 E 4100 4100] logging.cc:435: *** SIGABRT (@0x3e800001004) received by PID 4100 (TID 0x7f3902812800) from PID 4100; stack trace: ***
(raylet) [2021-02-22 12:17:09,595 E 4100 4100] logging.cc:435:     @     0x556fe05a223f google::(anonymous namespace)::FailureSignalHandler()
(raylet) [2021-02-22 12:17:09,596 E 4100 4100] logging.cc:435:     @     0x7f3902d743c0 (unknown)
(raylet) [2021-02-22 12:17:09,596 E 4100 4100] logging.cc:435:     @     0x7f390285d18b gsignal
(raylet) [2021-02-22 12:17:09,596 E 4100 4100] logging.cc:435:     @     0x7f390283c859 abort
(raylet) [2021-02-22 12:17:09,599 E 4100 4100] logging.cc:435:     @     0x556fe0593615 ray::SpdLogMessage::Flush()
(raylet) [2021-02-22 12:17:09,601 E 4100 4100] logging.cc:435:     @     0x556fe059364d ray::RayLog::~RayLog()
(raylet) [2021-02-22 12:17:09,602 E 4100 4100] logging.cc:435:     @     0x556fe028df8d ray::PullManager::DeactivatePullBundleRequest()
(raylet) [2021-02-22 12:17:09,603 E 4100 4100] logging.cc:435:     @     0x556fe0290ed9 ray::PullManager::CancelPull()
(raylet) [2021-02-22 12:17:09,604 E 4100 4100] logging.cc:435:     @     0x556fe027e28a ray::ObjectManager::CancelPull()
(raylet) [2021-02-22 12:17:09,605 E 4100 4100] logging.cc:435:     @     0x556fe01d0b77 ray::raylet::DependencyManager::RemoveTaskDependencies()
(raylet) [2021-02-22 12:17:09,606 E 4100 4100] logging.cc:435:     @     0x556fe023afdd ray::raylet::ClusterTaskManager::DispatchScheduledTasksToWorkers()
(raylet) [2021-02-22 12:17:09,607 E 4100 4100] logging.cc:435:     @     0x556fe0209d2f ray::raylet::NodeManager::HandleWorkerAvailable()
(raylet) [2021-02-22 12:17:09,608 E 4100 4100] logging.cc:435:     @     0x556fe0209e30 ray::raylet::NodeManager::HandleWorkerAvailable()
(raylet) [2021-02-22 12:17:09,608 E 4100 4100] logging.cc:435:     @     0x556fe020a373 ray::raylet::NodeManager::ProcessAnnounceWorkerPortMessage()
(raylet) [2021-02-22 12:17:09,609 E 4100 4100] logging.cc:435:     @     0x556fe0226f1a ray::raylet::NodeManager::ProcessClientMessage()
(raylet) [2021-02-22 12:17:09,610 E 4100 4100] logging.cc:435:     @     0x556fe01852a1 _ZNSt17_Function_handlerIFvSt10shared_ptrIN3ray16ClientConnectionEElRKSt6vectorIhSaIhEEEZNS1_6raylet6Raylet12HandleAcceptERKN5boost6system10error_codeEEUlS3_lS8_E0_E9_M_invokeERKSt9_Any_dataOS3_OlS8_
(raylet) [2021-02-22 12:17:09,614 E 4100 4100] logging.cc:435:     @     0x556fe054da4e ray::ClientConnection::ProcessMessage()
(raylet) [2021-02-22 12:17:09,618 E 4100 4100] logging.cc:435:     @     0x556fe054aaec boost::asio::detail::reactive_socket_recv_op<>::do_complete()
(raylet) [2021-02-22 12:17:09,622 E 4100 4100] logging.cc:435:     @     0x556fe0910e41 boost::asio::detail::scheduler::do_run_one()
(raylet) [2021-02-22 12:17:09,624 E 4100 4100] logging.cc:435:     @     0x556fe09124e9 boost::asio::detail::scheduler::run()
(raylet) [2021-02-22 12:17:09,624 E 4100 4100] logging.cc:435:     @     0x556fe09149d7 boost::asio::io_context::run()
(raylet) [2021-02-22 12:17:09,627 E 4100 4100] logging.cc:435:     @     0x556fe0151572 main
(raylet) [2021-02-22 12:17:09,627 E 4100 4100] logging.cc:435:     @     0x7f390283e0b3 __libc_start_main
(raylet) [2021-02-22 12:17:09,629 E 4100 4100] logging.cc:435:     @     0x556fe0166665 (unknown)
devin-petersohn commented 3 years ago

@shossain What version of Ray are you running?

@simon-mo Have you seen this before?

shossain commented 3 years ago

Ray 2.0.0.dev0 installed from here: https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl

On Tue, Feb 23, 2021 at 6:30 AM Devin Petersohn notifications@github.com wrote:

@shossain https://github.com/shossain What version of Ray are you running?

@simon-mo https://github.com/simon-mo Have you seen this before?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/modin-project/modin/issues/2688#issuecomment-784242353, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAGXPMHGYG4P5QVVSWVZPSTTAO3XZANCNFSM4XB5TRCA .

devin-petersohn commented 3 years ago

This is an issue currently on latest Ray wheels, being discussed here: https://github.com/ray-project/ray/issues/14279

pyrito commented 2 years ago

I am not able to reproduce this on the latest master. This seems to be working now.