ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.16k stars 5.8k forks source link

[Ray client] Ray Zombie Process Issue #30894

Open muratkoc93 opened 1 year ago

muratkoc93 commented 1 year ago

Hi , we built ray cluster with 3 nodes. We have a job and are submiting it on cluster. After the job is completed , I'm looking at processes using script that is ps -ef | grep ray and I see that the processes are not terminated.

sahip 120004 66798 24 21:52 ? 00:00:07 ray::IDLE sahip 120072 66798 18 21:52 ? 00:00:05 ray::IDLE sahip 120073 66798 23 21:52 ? 00:00:07 ray::IDLE sahip 120074 66798 18 21:52 ? 00:00:05 ray::IDLE sahip 120075 66798 14 21:52 ? 00:00:04 ray::IDLE sahip 120076 66798 15 21:52 ? 00:00:04 ray::IDLE sahip 120077 66798 20 21:52 ? 00:00:06 ray::IDLE sahip 120078 66798 13 21:52 ? 00:00:04 ray::IDLE sahip 120510 66798 9 21:52 ? 00:00:02 ray::IDLE sahip 120511 66798 8 21:52 ? 00:00:02 ray::IDLE sahip 120799 113783 0 21:52 pts/2 00:00:00 grep ray

I dont want to use ps aux | grep ray::IDLE | grep -v grep | awk '{print $2}' | xargs kill -9 script to kill process.

Can I do this with one line of code that I'll add to my code?

Code is :

ray.init(address="ray://aaa:10001",runtime_env={'env_vars': {'__MODIN_AUTOIMPORT_PANDAS__': '1'},"working_dir": "./" , "pip": ["numpy","pytz","toolz","openpyxl","fsspec","modin","cassandra-driver"]})

@ray.remote
def read_cassandra(assetx,start_date,end_date):
    auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
    cluster = Cluster(['xxx','yyy','zzz'],auth_provider=auth_provider)
    session = cluster.connect('ford_pdm_keyspace')
    query = "SELECT * from ford_sensor_data where sensor_id="+str(int(assetx))+" and  sensor_time>='"+str(start_date)+"'and sensor_time<='"+str(end_date)+"' ALLOW FILTERING"
    rows = session.execute(query)
    df = pd.DataFrame(list(rows))
    dict_historic_data[assetx] = df
    return dict_historic_data
    #print(df)

x = [read_cassandra.remote(asset,start_date,end_date) for asset in assets]
print(x)
start_time = time.time()
y = ray.get(x)

print(y)
print(dict_historic_data)
print("time="+str(time.time()-start_time))

thank you :)

rkooo567 commented 1 year ago

@muratkoc93 can you provide a script that I can run? The given script is not runnable because of the Cassandra setup

muratkoc93 commented 1 year ago

@rkooo567 job is connecting casandra and reading data.

rkooo567 commented 1 year ago

Can you provide a minimal repro script that doesn't require external dependencies? It should be possible to make a repro that doesn't require cassandra setup. It is not possible for me to run the script to reproduce the issue.

vsokolovskii commented 1 year ago

I'm having the same issue, apparently, the problem is still present. What info would help you to fix it?

rkooo567 commented 1 year ago

@vsokolovskii it's the best if you can provide us a repro script that I can run and reproduce the issue.

akdienes commented 1 year ago

I also have zombie processes. my cluster has no running jobs or tasks or open client connections, and yet it's leaving 100+ worker nodes alive. this makes using Ray much more expensive in terms of compute hours than it should be.

maybe it happens when there is some unhandled error on the task?

rkooo567 commented 1 year ago

I feel like there are 2 different things going on here.

akdienes commented 1 year ago

you are using the autoscaler and expects it to shutdown worker nodes?

yes, precisely

I will hold off on updating / creating issues until I can get a minimal example. I have tried to reduce my code but the problem goes away when I do so.