modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.92k stars 653 forks source link

BUG: passing in large data object to DataFrame apply() function resulting in SegFault #6641

Open SiRumCz opened 1 year ago

SiRumCz commented 1 year ago

Modin version checks

Reproducible Example

import modin.pandas as pd
import numpy as np

n_features = 20000
n_samples = 20000
features = [f'f{i+1}' for i in range(n_features)]
samples = [f's{i+1}' for i in range(n_samples)]

df = pd.DataFrame(np.random.rand(n_samples, n_features), columns=features, index=samples).round(2)

# mat_df = pd.DataFrame(np.random.rand(n_features, n_features), columns=features, index=features).round(2)
# res_df = df.dot(mat_df)

# this can be simplified to:
mat_arr = np.random.rand(n_features, n_features)
res_df = df.dot(mat_arr)

Issue Description

Getting Segmentation fault (core dumped) when doing matrix multiplication on 20K x 20K matrices.

I have two servers, each with 16 threads, >50GB available RAM, >300GB available storage. Code tested on both py@3.8+modin@0.23.4 and py@3.10+modin@0.24.1, both showed same result.

The code is trying to do a dot() on two 20K x 20K matrices, each matrix is roughly 3GB (showing on object store memory in one of the nodes), the seg fault happens when calling df.dot(mat_df). However, the code will work on small matrices like 500 x 500.

There's not much to show on the terminal, basically it just says UserWarning: Ray Client is attempting to retrieve a 3.00 GiB object over the network, which may be slow. Consider serializing the object to a file and using S3 or rsync instead. and the program gets killed.

I have tried to increase my object store memory and also specify a plasma directory, and still getting seg fault, the problem seems to happen after the computation has finished on remote node and being transferred by to client, I saw an increase to 12GB on the object store memory , and around 3GB increase on the client's main memory (when the warning message shows up), right when the main memory increased to 3GB, the program gets killed.

Expected Behavior

should return the transformed df.

Error Logs

```python-traceback Replace this line with the error backtrace (if applicable). ```

Installed Versions

UserWarning: Setuptools is replacing distutils. INSTALLED VERSIONS ------------------ commit : 4c01f6464a1fb7a201a4e748c7420e5cb41d16ce python : 3.10.13.final.0 python-bits : 64 OS : Linux OS-release : 5.15.0-84-generic Version : #93~20.04.1-Ubuntu SMP Wed Sep 6 16:15:40 UTC 2023 machine : x86_64 processor : x86_64 byteorder : little LC_ALL : None LANG : en_US.UTF-8 LOCALE : en_US.UTF-8 Modin dependencies ------------------ modin : 0.24.1 ray : 2.7.1 dask : 2023.9.3 distributed : 2023.9.3 hdk : None pandas dependencies ------------------- pandas : 2.1.1 numpy : 1.26.0 pytz : 2023.3.post1 dateutil : 2.8.2 setuptools : 65.5.0 pip : 23.0.1 Cython : None pytest : None hypothesis : None sphinx : None blosc : None feather : None xlsxwriter : None lxml.etree : None html5lib : None pymysql : None psycopg2 : None jinja2 : 3.1.2 IPython : 8.16.1 pandas_datareader : None bs4 : 4.12.2 bottleneck : None dataframe-api-compat: None fastparquet : None fsspec : 2023.9.2 gcsfs : None matplotlib : None numba : None numexpr : None odfpy : None openpyxl : None pandas_gbq : None pyarrow : 13.0.0 pyreadstat : None pyxlsb : None s3fs : None scipy : None sqlalchemy : None tables : None tabulate : None xarray : None xlrd : None zstandard : None tzdata : 2023.3 qtpy : None pyqt5 : None
noloerino commented 1 year ago

Thanks for reporting this @SiRumCz. Do you have a more detailed stack trace? I'm unable to reproduce the error locally, and I don't have access to the server hardware that you do.

SiRumCz commented 1 year ago

@noloerino Hi, as I mentioned in the description, there's not much useful information on the terminal because the program did not exit with error code, rather it was killed unexpectedly. But if you think logs from Ray could help, please let me know where and how I can pull for you, I am not very familiar with Ray clusters.

tested code and result:

>>> n_features = 20000
>>> n_samples = 20000
>>> features = [f'f{i+1}' for i in range(n_features)]
>>> samples = [f's{i+1}' for i in range(n_samples)]
>>> df = pd.DataFrame(np.random.rand(n_samples, n_features).round(2), columns=features, index=samples)
UserWarning: Distributing <class 'numpy.ndarray'> object. This may take some time.
>>> mat_arr = np.random.rand(n_features, n_features).round(2)
>>> df.dot(mat_arr)
Segmentation fault (core dumped)

I would suggest to try with ray clusters, even on a smaller scale, it seems like the problem happens when result is being transferred back to Ray client after being computed on remote node, because when I test the same code on 5000 x 5000, 10K x 10K, and 15K x 15K data, they all finish without any error.

anmyachev commented 1 year ago

@SiRumCz could you run the following code and show output here?

import modin.pandas as pd
import numpy as np

n_features = 20000
n_samples = 20000
features = [f'f{i+1}' for i in range(n_features)]
samples = [f's{i+1}' for i in range(n_samples)]

df = pd.DataFrame(np.random.rand(n_samples, n_features), columns=features, index=samples).round(2)
print(f"{df._query_compiler._modin_frame._partitions.shape=}")

from modin.config import CpuCount, NPartitions
print(f"{CpuCount.get()=}, {NPartitions.get()=}")

UPD: could you also show grpcio version in your env?

SiRumCz commented 1 year ago

@anmyachev Thanks for the update, here's the output:

>>> print(f"{df._query_compiler._modin_frame._partitions.shape=}")
df._query_compiler._modin_frame._partitions.shape=(48, 48)
>>> from modin.config import CpuCount, NPartitions
>>> print(f"{CpuCount.get()=}, {NPartitions.get()=}")
CpuCount.get()=16, NPartitions.get()=48

On both of my py3.8 and py3.10 venv, the version is 1.59.0.

anmyachev commented 1 year ago

@SiRumCz it would be great if you could also provide the following information:

  1. Ray client logs can be found at /tmp/ray/session_latest/logs on the head node (by default). Please show if possible.
  2. Could you also show the result of ray.cluster_resources() after connecting ray to the cluster from the main process?
  3. What Modin environment variables did you set manually?
  4. How did you set up the cluster, at least in general terms? Did you enable object spilling and so on?
SiRumCz commented 1 year ago

@anmyachev

  1. There are many log files, which one would you like to see?
  2. {'accelerator_type:G': 3.0, 'node:__internal_head__': 1.0, 'node:<addr1>': 2.0, 'node:<addr2>': 1.0, 'CPU': 48.0, 'object_store_memory': 51305297510.0, 'memory': 113189397300.0, 'GPU': 3.0}
  3. I have
    1. MODIN_EXPERIMENTAL_GROUPBY=True
    2. MODIN_ENGINE=ray
    3. MODIN_RAY_CLUSTER=True
    4. RAY_ADDRESS="ray://xxx:10001"
  4. I did not set any variable manually in the test, ray start --head --port=6379 --dashboard-host "0.0.0.0" and ray start --address='<addr1>:6379'. I believe the object spilling is enabled by default and have checked in the config, however, even the 20K x 20K test, the memory usage seems reasonable.
SiRumCz commented 1 year ago

@anmyachev I have an update, it seems like the problem is not from dot, but the problem might come from the network. I have removed dot operation and the code will still fail with seg fault:

import modin.pandas as pd
import numpy as np

n_features = 20000
n_samples = 20000
features = [f'f{i+1}' for i in range(n_features)]
samples = [f's{i+1}' for i in range(n_samples)]

df = pd.DataFrame(np.random.rand(n_samples, n_features), columns=features, index=samples).round(2)
mat_df = pd.DataFrame(np.random.rand(n_features, n_features), columns=features, index=features).round(2)
res_df = df.apply(lambda x,m: m.iloc[0], axis=1, m=mat_df) # this will still get seg fault

Error message:

>>> df.apply(lambda x,m: m.head(1), axis=1, m=mat_df)
UserWarning: Ray Client is attempting to retrieve a 2.99 GiB object over the network, which may be slow. Consider serializing the object to a file and using S3 or rsync instead.
Segmentation fault (core dumped)

The problem seems to be during or after the Client requested for the object over the network.

Update: While it works fine on simple apply func, it will give me segfault If I include the large matrix df in the apply function (even if not use it):

>>> df.apply(lambda x: x+1)
          f1    f2    f3    f4    f5    f6    f7    f8    f9   f10   f11   f12   f13   f14  ...  f19987  f19988  f19989  f19990  f19991  f19992  f19993  f19994  f19995  f19996  f19997  f19998  f19999  f20000
s1      1.87  1.05  1.88  1.67  1.84  1.12  1.58  1.68  1.33  1.81  1.21  1.33  1.14  1.93  ...    1.89    1.96    1.40    1.08    1.27    1.13    1.81    1.77    1.78    1.56    1.39    1.90    1.15    1.59
...      ...   ...   ...   ...   ...   ...   ...   ...   ...   ...   ...   ...   ...   ...  ...     ...     ...     ...     ...     ...     ...     ...     ...     ...     ...     ...     ...     ...     ...
s20000  1.28  1.24  1.66  1.27  1.47  1.07  1.06  1.24  1.84  1.57  1.90  1.36  1.01  1.08  ...    1.25    1.44    1.34    1.78    1.94    1.73    1.52    1.60    1.87    1.47    1.07    1.43    1.69    1.47

[20000 rows x 20000 columns]
>>> df.apply(lambda x, m: x+1, m=mat_df)
UserWarning: Ray Client is attempting to retrieve a 2.99 GiB object over the network, which may be slow. Consider serializing the object to a file and using S3 or rsync instead.
Segmentation fault (core dumped)

The matrix parameter in the apply func doesn't need to be a dataframe, I tried with numpy array and it has the same problem, my guess is that the param data cannot be too large to trigger some mechanism, otherwise I will get seg fault.

anmyachev commented 1 year ago

@modin-project/modin-ray maybe you can tell us which direction to look in order to understand the reason for this segfault?

SiRumCz commented 1 year ago

@anmyachev I have enabled faulthandler while running the code, and got a few errors that might be the root cause for the problem.

>>> df.dot(mat_df)
UserWarning: Ray Client is attempting to retrieve a 2.99 GiB object over the network, which may be slow. Consider serializing the object to a file and using S3 or rsync instead.
Fatal Python error: Segmentation fault

Thread 0x00007fa92b7fe640 (most recent call first):
  File "/usr/local/lib/python3.8/threading.py", line 302 in wait
  File "/usr/local/lib/python3.8/queue.py", line 170 in get
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_channel.py", line 274 in consume_request_iterator
  File "/usr/local/lib/python3.8/threading.py", line 870 in run
  File "/usr/local/lib/python3.8/threading.py", line 932 in _bootstrap_inner
  File "/usr/local/lib/python3.8/threading.py", line 890 in _bootstrap

Thread 0x00007fa92bfff640 (most recent call first):
  File "/home/.../venv/lib/python3.8/site-packages/ray/util/client/dataclient.py", line 247 in _requests
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_channel.py", line 274 in consume_request_iterator
  File "/usr/local/lib/python3.8/threading.py", line 870 in run
  File "/usr/local/lib/python3.8/threading.py", line 932 in _bootstrap_inner
  File "/usr/local/lib/python3.8/threading.py", line 890 in _bootstrap

Thread 0x00007fa968ff9640 (most recent call first):
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_channel.py", line 1656 in channel_spin
  File "/usr/local/lib/python3.8/threading.py", line 870 in run
  File "/usr/local/lib/python3.8/threading.py", line 932 in _bootstrap_inner
  File "/usr/local/lib/python3.8/threading.py", line 890 in _bootstrap

Thread 0x00007fa9697fa640 (most recent call first):
  File "/usr/local/lib/python3.8/threading.py", line 306 in wait
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_common.py", line 116 in _wait_once
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_common.py", line 156 in wait
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_channel.py", line 958 in _next
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_channel.py", line 541 in __next__
  File "/home/.../venv/lib/python3.8/site-packages/ray/util/client/logsclient.py", line 67 in _log_main
  File "/usr/local/lib/python3.8/threading.py", line 870 in run
  File "/usr/local/lib/python3.8/threading.py", line 932 in _bootstrap_inner
  File "/usr/local/lib/python3.8/threading.py", line 890 in _bootstrap

Thread 0x00007fa969ffb640 (most recent call first):
  File "/usr/local/lib/python3.8/threading.py", line 306 in wait
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_common.py", line 116 in _wait_once
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_common.py", line 156 in wait
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_channel.py", line 958 in _next
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_channel.py", line 541 in __next__
  File "/home/.../venv/lib/python3.8/site-packages/ray/util/client/dataclient.py", line 273 in _data_main
  File "/usr/local/lib/python3.8/threading.py", line 870 in run
  File "/usr/local/lib/python3.8/threading.py", line 932 in _bootstrap_inner
  File "/usr/local/lib/python3.8/threading.py", line 890 in _bootstrap

Thread 0x00007fa96a7fc640 (most recent call first):
  File "/home/.../venv/lib/python3.8/site-packages/grpc/_channel.py", line 1843 in _poll_connectivity
  File "/usr/local/lib/python3.8/threading.py", line 870 in run
  File "/usr/local/lib/python3.8/threading.py", line 932 in _bootstrap_inner
  File "/usr/local/lib/python3.8/threading.py", line 890 in _bootstrap

Current thread 0x00007faa8e908740 (most recent call first):
  File "/home/.../venv/lib/python3.8/site-packages/ray/util/client/dataclient.py", line 565 in PutObject
  File "/home/.../venv/lib/python3.8/site-packages/ray/util/client/worker.py", line 507 in _put_pickled
  File "/home/.../venv/lib/python3.8/site-packages/ray/util/client/worker.py", line 496 in put
  File "/home/.../venv/lib/python3.8/site-packages/ray/util/client/api.py", line 52 in put
  File "/home/.../venv/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 102 in wrapper
  File "/home/.../venv/lib/python3.8/site-packages/ray/_private/auto_init_hook.py", line 24 in auto_init_wrapper
  File "/home/.../venv/lib/python3.8/site-packages/modin/core/execution/ray/common/engine_wrapper.py", line 111 in put
  File "/home/.../venv/lib/python3.8/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py", line 251 in preprocess_func
  File "/home/.../venv/lib/python3.8/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 120 in preprocess_func
  File "/home/.../venv/lib/python3.8/site-packages/modin/logging/logger_decorator.py", line 128 in run_and_log
  File "/home/.../venv/lib/python3.8/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 469 in broadcast_axis_partitions
  File "/home/.../venv/lib/python3.8/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 58 in wait
  File "/home/.../venv/lib/python3.8/site-packages/modin/logging/logger_decorator.py", line 128 in run_and_log
  File "/home/.../venv/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 3078 in broadcast_apply_full_axis
  File "/home/.../venv/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/utils.py", line 376 in run_f_on_minimally_updated_metadata
  File "/home/.../venv/lib/python3.8/site-packages/modin/logging/logger_decorator.py", line 128 in run_and_log
  File "/home/.../venv/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 2609 in apply_full_axis
  File "/home/.../venv/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/utils.py", line 376 in run_f_on_minimally_updated_metadata
  File "/home/.../venv/lib/python3.8/site-packages/modin/logging/logger_decorator.py", line 128 in run_and_log
  File "/home/.../venv/lib/python3.8/site-packages/modin/core/storage_formats/pandas/query_compiler.py", line 2237 in dot
  File "/home/.../venv/lib/python3.8/site-packages/modin/logging/logger_decorator.py", line 128 in run_and_log
  File "/home/.../venv/lib/python3.8/site-packages/modin/pandas/dataframe.py", line 724 in dot
  File "/home/.../venv/lib/python3.8/site-packages/modin/logging/logger_decorator.py", line 128 in run_and_log
  File "<stdin>", line 1 in <module>
Segmentation fault (core dumped)

The problem seems to be from a package grpc, upon checking my system, although the grpcio python package is the same version for both Ray nodes, they do have different Ubuntu versions, one has 22.04 and one has 20.04, and I am assuming one has higher grpc lib.

SiRumCz commented 1 year ago

@anmyachev I can make it to work by not setting an RAY_ADDRESS, Modin/Ray seems to be able to pick up the address when I am running the client code on one of the Ray nodes. However, it is worth noting that I am seeing different ports after this change, before it was on port 10001, and now 6379 is being picked up. Before:

2023-10-12 18:18:13,232 INFO worker.py:1329 -- Using address ray://<addr>:10001 set in the environment variable RAY_ADDRESS

After:

2023-10-13 10:06:44,844 INFO worker.py:1458 -- Connecting to existing Ray cluster at address: <addr>:6379...
2023-10-13 10:06:44,852 INFO worker.py:1633 -- Connected to Ray cluster. View the dashboard at http://<addr>:8265
anmyachev commented 1 year ago

@anmyachev I can make it to work by not setting an RAY_ADDRESS, Modin/Ray seems to be able to pick up the address when I am running the client code on one of the Ray nodes. However, it is worth noting that I am seeing different ports after this change, before it was on port 10001, and now 6379 is being picked up.

@SiRumCz good catch! Let's ensure (if needed) you are connecting to an existing cluster (instead of creating a new one), use ray.cluster_resources().

Thanks for all the information! I'm working to figure out what's wrong.

SiRumCz commented 1 year ago

@anmyachev thanks for the advice. Just to add to the solutions, while it works, I am see some activities that's kind of confusing:

  1. the memory usage on my nodes seems to be very high during the dataframe operation, there are moments that almost all the memory space are being filled with all cores running 100%.
  2. during the operation, sometimes I can see messages that's related to OOM issue like 19 Workers (tasks / actors) killed due to memory pressure (OOM), and Some workers of the worker process(1964300) have not registered within the timeout. The process is still alive, probably it's hanging during start.

Although I am seeing these messages, my code would still be able to give me the result.

anmyachev commented 1 year ago

the memory usage on my nodes seems to be very high during the dataframe operation, there are moments that almost all the memory space are being filled with all cores running 100%.

Well round operation is highly parallel. It can use all available cores (partitions are not combined into columns or rows). For this operation, I would expect potentially two times as much memory usage as the dataframe occupies. However for dot it can be 3x (it should be decreased down to 2x by https://github.com/modin-project/modin/pull/6644)

anmyachev commented 1 year ago

@anmyachev I can make it to work by not setting an RAY_ADDRESS, Modin/Ray seems to be able to pick up the address when I am running the client code on one of the Ray nodes. However, it is worth noting that I am seeing different ports after this change, before it was on port 10001, and now 6379 is being picked up. Before:

2023-10-12 18:18:13,232 INFO worker.py:1329 -- Using address ray://<addr>:10001 set in the environment variable RAY_ADDRESS

After:

2023-10-13 10:06:44,844 INFO worker.py:1458 -- Connecting to existing Ray cluster at address: <addr>:6379...
2023-10-13 10:06:44,852 INFO worker.py:1633 -- Connected to Ray cluster. View the dashboard at http://<addr>:8265

@SiRumCz It turns out there are two modes of operation. Connecting to a cluster through a Ray client and without a client. Perhaps connecting through the client does not work for Modin due to some architectural restrictions (which are mentioned in the note), but which exactly I don’t know yet.

SiRumCz commented 1 year ago

@anmyachev thanks for the explanation, due to some architectural restrictions (which are mentioned in the note), could you please provide a bit more details or links to this so I can learn a bit more about it, thanks.

anmyachev commented 1 year ago

@anmyachev I mean this general note:

Ray Client has architectural limitations and may not work as expected when using Ray for ML workloads (like Ray Tune or Ray Train). Use [Ray Jobs API](https://docs.ray.io/en/latest/cluster/running-applications/job-submission/index.html#jobs-overview) for interactive development on ML projects.

I don’t know the details yet, but maybe @modin-project/modin-ray know.

anmyachev commented 1 year ago

The problem seems to be from a package grpc, upon checking my system, although the grpcio python package is the same version for both Ray nodes, they do have different Ubuntu versions, one has 22.04 and one has 20.04, and I am assuming one has higher grpc lib.

@SiRumCz can you install the same grpc versions for both systems?

SiRumCz commented 1 year ago

@anmyachev by grpc, do you mean grpcio for python? I actually have the same grpcio version (1.59.0) for both system.

anmyachev commented 1 year ago

and I am assuming one has higher grpc lib.

@SiRumCz not grpcio but grpc. You wrote that one of the Ubuntu systems may have a different version of grpc. If you see that Segmentation fault problem has gone away, we could try to do a reproducer for grpc or ray (because looks like a bug). However, upgrading package may not be easy and may take a lot of time, so don't do it if you don't have one :)

SiRumCz commented 1 year ago

@anmyachev That was my thoughts but upon checking grpc installation page, it seems to be language specific, and for python, it is grpcio, so I assume my environment is ok.

If I am to install grpc package, do you have any idea which one to install?

grpc package in Ubuntu
libgrpc++-dev: high performance general RPC framework (development)
libgrpc++1.51: high performance general RPC framework
libgrpc++1.51-dbgsym: debug symbols for libgrpc++1.51
libgrpc-dev: high performance general RPC framework (development)
libgrpc29: high performance general RPC framework
libgrpc29-dbgsym: debug symbols for libgrpc29
protobuf-compiler-grpc: high performance general RPC framework - protobuf plugin
protobuf-compiler-grpc-dbgsym: debug symbols for protobuf-compiler-grpc
python3-grpcio: GRPC system (Python 3)
python3-grpcio-dbgsym: debug symbols for python3-grpcio
ruby-grpc: GRPC system in Ruby
ruby-grpc-dbgsym: debug symbols for ruby-grpc
ruby-grpc-tools: Development tools for Ruby gRPC
anmyachev commented 1 year ago

@SiRumCz looks like I found pretty close issue in ray itself https://github.com/ray-project/ray/issues/38713. There's also a Segmentation Fault when one try to put a large object through ray.put (this is similar to what we see in your stack trace here) in Ray Client connection mode.

so I assume my environment is ok.

So let's assume that the environment is correct and just try to avoid connecting through the Ray Client for now.

SiRumCz commented 1 year ago

@anmyachev good find!

anmyachev commented 1 year ago

Blocked by https://github.com/ray-project/ray/issues/38713