modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.59k stars 648 forks source link

how to take down ray and put up again in local mode #7249

Closed SiRumCz closed 2 weeks ago

SiRumCz commented 1 month ago

My program has memory risk, and part of it seems to come from memory leak (idling ray workers holding a big chunk of memory). I have a for loop to independently run chunks of csv file on a series of tasks, I wish to kill ray after each iteration to release memory, and let Modin to put it up again with fresh ray workers. However, my code is the following:

import pandas

for df_ in pandas.read_csv('xxx.csv', chunk=5000):
    df_.to_csv(xxx)
    run_my_tasks(xxx) # Modin will initialize ray in first iteration
    ray.shutdown()

however, I got below error:

File "/home/.../lib/python3.9/site-packages/modin/core/execution/ray/common/deferred_execution.py", line 309, in _deconstruct_chain
    output[out_pos] = out_pos
IndexError: list assignment index out of range
YarShev commented 1 month ago

Hi @SiRumCz, thanks for posting this issue. I guess there might be an issue with multiple Ray initialization in Modin codebase. We would have to look into this deeper. Meanwhile, can you explicitly put ray.init() before run_my_tasks(xxx) to see if it works?

SiRumCz commented 1 month ago

@YarShev Thanks for your response. Yes, I have tried that method, and unfortunately I got: ValueError: An application is trying to access a Ray object whose owner is unknown(00ffffffffffffffffffffffffffffffffffffff0100000002e1f505). Please make sure that all Ray objects you are trying to access are part of the current Ray session. Note that object IDs generated randomly (ObjectID.from_random()) or out-of-band (ObjectID.from_binary(...)) cannot be passed as a task argument because Ray does not know which task created them. If this was not how your object ID was generated, please file an issue at https://github.com/ray-project/ray/issues/

YarShev commented 1 month ago

@SiRumCz, could you try to execute ray.init() and importlib.reload(pd) before run_my_tasks(xxx), where pd is import modin.pandas as pd?

YarShev commented 1 month ago

@SiRumCz, I opened https://github.com/modin-project/modin/pull/7280, which adds reload_modin function. Tested on the following example and it passed to me.

import modin.pandas as pd
from modin.utils import reload_modin
import ray

ray.init(num_cpus=16)  # can be commented out, works

df = pd.read_csv("example.csv")
df = df.abs()
print(df)

ray.shutdown()
reload_modin()
ray.init(num_cpus=16)  # can be commented out, works

df = pd.read_csv("example.csv")
df = df.abs()
print(df)
SiRumCz commented 1 month ago

thanks, I ended up using a Process to wrap my task into a new process, ray will be taken down when process ends. But I am happy that there will be a feature for this, cheers :-)

SiRumCz commented 2 weeks ago

@YarShev, I have another question regarding to the reload function, can I only shutdown the ray I initialized from the process? My understanding is that ray.shutdown() is equivalent to ray stop which kills all the ray components owned by myself. In other word, when there are multiple processes I run that use ray, can I avoid killing their ray processes?

YarShev commented 2 weeks ago

Your understanding is correct, ray.shutdown() kills all Ray processes. If we are talking about your warkaround,

I ended up using a Process to wrap my task into a new process, ray will be taken down when process ends

I think you can avoid the calls to ray.init() and ray.shutdown() in the process wrapping your task. You should set up a Ray cluster manually on your machine with this instruction, for instance, and then Modin will be able to connect to the existing Ray cluster in your process.

SiRumCz commented 2 weeks ago

Wrapping my task in a Process only partially addressed my problem. I am also encountering another problem where majority of the memory go into Buff/Cache, only leaving a tiny bit to the free memory. Have you guys encountered similar situation?

YarShev commented 2 weeks ago

How much memory do you have on the system? What data sizes do you want to process?

SiRumCz commented 2 weeks ago

My system has 32GB memory, the data size is around 5 millions lines of log data (~1.5GB csv file). But my project involves quite complicated works, and because it uses nested dataframes and nested modin functions such as apply() and/or groupby(), the actual memory consumption is lot more than the data size.

YarShev commented 2 weeks ago

32GB might be insufficient but Ray should start spilling objects onto disk if available memory got depleted and the flow should finish. Do you encounter OOM error?

SiRumCz commented 2 weeks ago

I tried to optimize my project to fit into 32GB, and yes Ray object spilling helped a lot. But one of my real challenges is after it finishes, not all the memory are being released, the majority goes into the Buffer/Cache if I look at top. That leaves me with very little memory to work with for my later tasks, I am not sure if it's problem from Ray, Modin, or my bad implementation.

SiRumCz commented 2 weeks ago

what I am seeing is very similar to this post: https://github.com/ray-project/ray/issues/7053#issue-559960990

YarShev commented 2 weeks ago

@SiRumCz, let's keep track of the issue in Ray. Also, we merged reload_modin feature into main so you can check it out.