Closed tim-sparq closed 2 years ago
why do you use master as the variable here?
You need to use the ray object returned from ray_on_aml.getRay(). That's the entry to the cluster. In your code, the ray object you have does not refer to the ray object of the cluster hence the job is not submitted to the workers.
You should change to
ray = ray_on_aml.getRay()
I was using ray = ray_on_aml.get()
previously, but that was causing @ray.remote
to throw an exception on the worker because of course, when ray_on_aml.get()
is run on a worker node, it returns None, by design!
You don't need to do anything in the worker. if ray: do something else: print("in worker node, do nothing") That's what distributed computing is for. You design and submit the operation at the head node. The head node distribute the operation at workers and collect result back to head node.
What I am saying is that in the worker, if ray == None (which it does), you can't use the decorator @ray.remote
in your code, otherwise you get this error:
INFO:__main__:Exiting context: TrackUserError
INFO:__main__:Exiting context: RunHistory
Cleaning up all outstanding Run operations, waiting 900.0 seconds
1 items cleaning up...
Cleanup took 1.1022090911865234 seconds
INFO:__main__:Exiting context: ProjectPythonPath
Traceback (most recent call last):
File "preprocess.py", line 13, in <module>
@ray.remote
AttributeError: 'NoneType' object has no attribute 'remote'
I see the use of master now. The worker process exit you saw is just the ray start process. The ray actors created by the head node still get executed at workers by a separate process. Even the ray start process is done, the worker is still alive if the cluster is still alive (which depends on if there are active processes at the master node). You can test with following code.
import logging
import time
from ray_on_aml.core import Ray_On_AML
import time
import ray
logging.basicConfig(level=logging.INFO)
ray_on_aml = Ray_On_AML()
master = ray_on_aml.getRay()
@ray.remote
def slow_function():
time.sleep(10)
return 1
if master:
print("cluster resources before job execution ", master.cluster_resources())
for _ in range(5):
ray.get(slow_function.remote())
time.sleep(30)
print("cluster resources after job execution ", master.cluster_resources())
else:
print("in worker node")
The output about the cluster size still the same before and after the job is done. The worker's ray start process finished way before the last print in master. cluster resources before job execution {'CPU': 40.0, 'memory': 264201192448.0, 'node:10.0.0.18': 1.0, 'object_store_memory': 20000000000.0, 'node:10.0.0.20': 1.0} cluster resources after job execution {'memory': 264201192448.0, 'node:10.0.0.18': 1.0, 'CPU': 40.0, 'object_store_memory': 20000000000.0, 'node:10.0.0.20': 1.0}
When running the following script as an AML job, the worker node exits immediately, stating that the run has completed successfully.
Logs: