Open peterghaddad opened 2 years ago
Tagging @ckw017.
I'd lean towards Ray job submission for submitting jobs, rather than using Ray client.
@DmitriGekhtman Using the Job SDK is fine, but is there a way to return a value after job completion? For example, at the end of the job script returning the best checkpoint would be useful. We can do get_job_logs but I don't see a solution for returning a specific value after job completion.
Getting a Python object out of a job isn't currently supported, but I think it could make sense and might not be too bad to implement. From a Ray developer perspective, it would be great if we can slightly enrich job submission to achieve parity with the relevant Ray client features.
If you open a Ray feature request issue, we could get more discussion going.
I created the following issue. I agree. For the above script, it doesn't take too long to execute so I would think the Ray client is fine to utilize. I do believe this issue is still relevant and any insight would be helpful.
@DmitriGekhtman @ckw017 As an FYI, I get the above errors when using any Ray Serve with the client.
Put failed:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In [7], line 34
30 obs = json_input["observation"]
32 action = self.trainer.compute_s
---> 34 serve.start(
35 http_options={"host": "0.0.0.0"}
36 )
38 ServePPOModel.deploy(best_checkpoint_path)
File ~/.local/lib/python3.9/site-packages/ray/serve/api.py:99, in start(detached, http_options, dedicated_cpu, **kwargs)
53 @guarded_deprecation_warning(instructions=MIGRATION_MESSAGE)
54 @Deprecated(message=MIGRATION_MESSAGE)
55 def start(
(...)
59 **kwargs,
60 ) -> ServeControllerClient:
61 """Initialize a serve instance.
62
63 By default, the instance will be scoped to the lifetime of the returned
(...)
97 Serve controller actor. Defaults to False.
98 """
---> 99 client = _private_api.serve_start(detached, http_options, dedicated_cpu, **kwargs)
101 # Record after Ray has been started.
102 record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1")
File ~/.local/lib/python3.9/site-packages/ray/serve/_private/api.py:196, in serve_start(detached, http_options, dedicated_cpu, **kwargs)
193 # Used for scheduling things to the head node explicitly.
194 # Assumes that `serve.start` runs on the head node.
195 head_node_id = ray.get_runtime_context().node_id.hex()
--> 196 controller = ServeController.options(
197 num_cpus=1 if dedicated_cpu else 0,
198 name=controller_name,
199 lifetime="detached" if detached else None,
200 max_restarts=-1,
201 max_task_retries=-1,
202 # Schedule the controller on the head node with a soft constraint. This
203 # prefers it to run on the head node in most cases, but allows it to be
204 # restarted on other nodes in an HA cluster.
205 scheduling_strategy=NodeAffinitySchedulingStrategy(head_node_id, soft=True)
206 if RAY_INTERNAL_SERVE_CONTROLLER_PIN_ON_NODE
207 else None,
208 namespace=SERVE_NAMESPACE,
209 max_concurrency=CONTROLLER_MAX_CONCURRENCY,
210 ).remote(
211 controller_name,
212 http_config=http_options,
213 head_node_id=head_node_id,
214 detached=detached,
215 )
217 proxy_handles = ray.get(controller.get_http_proxies.remote())
218 if len(proxy_handles) > 0:
File ~/.local/lib/python3.9/site-packages/ray/actor.py:637, in ActorClass.options.<locals>.ActorOptionWrapper.remote(self, *args, **kwargs)
636 def remote(self, *args, **kwargs):
--> 637 return actor_cls._remote(args=args, kwargs=kwargs, **updated_options)
File ~/.local/lib/python3.9/site-packages/ray/util/tracing/tracing_helper.py:387, in _tracing_actor_creation.<locals>._invocation_actor_class_remote_span(self, args, kwargs, *_args, **_kwargs)
385 if not _is_tracing_enabled():
386 assert "_ray_trace_ctx" not in kwargs
--> 387 return method(self, args, kwargs, *_args, **_kwargs)
389 class_name = self.__ray_metadata__.class_name
390 method_name = "__init__"
File ~/.local/lib/python3.9/site-packages/ray/actor.py:765, in ActorClass._remote(self, args, kwargs, **actor_options)
762 actor_options["max_concurrency"] = 1000 if is_asyncio else 1
764 if client_mode_should_convert(auto_init=True):
--> 765 return client_mode_convert_actor(self, args, kwargs, **actor_options)
767 # fill actor required options
768 for k, v in ray_option_utils.actor_options.items():
File ~/.local/lib/python3.9/site-packages/ray/_private/client_mode_hook.py:198, in client_mode_convert_actor(actor_cls, in_args, in_kwargs, **kwargs)
196 setattr(actor_cls, RAY_CLIENT_MODE_ATTR, key)
197 client_actor = ray._get_converted(key)
--> 198 return client_actor._remote(in_args, in_kwargs, **kwargs)
File ~/.local/lib/python3.9/site-packages/ray/util/client/common.py:396, in ClientActorClass._remote(self, args, kwargs, **option_args)
394 if kwargs is None:
395 kwargs = {}
--> 396 return self.options(**option_args).remote(*args, **kwargs)
File ~/.local/lib/python3.9/site-packages/ray/util/client/common.py:593, in ActorOptionWrapper.remote(self, *args, **kwargs)
591 def remote(self, *args, **kwargs):
592 self._remote_stub._init_signature.bind(*args, **kwargs)
--> 593 futures = ray.call_remote(self, *args, **kwargs)
594 assert len(futures) == 1
595 actor_class = None
File ~/.local/lib/python3.9/site-packages/ray/util/client/api.py:100, in _ClientAPI.call_remote(self, instance, *args, **kwargs)
86 def call_remote(self, instance: "ClientStub", *args, **kwargs) -> List[Future]:
87 """call_remote is called by stub objects to execute them remotely.
88
89 This is used by stub objects in situations where they're called
(...)
98 kwargs: opaque keyword arguments
99 """
--> 100 return self.worker.call_remote(instance, *args, **kwargs)
File ~/.local/lib/python3.9/site-packages/ray/util/client/worker.py:545, in Worker.call_remote(self, instance, *args, **kwargs)
544 def call_remote(self, instance, *args, **kwargs) -> List[Future]:
--> 545 task = instance._prepare_client_task()
546 # data is serialized tuple of (args, kwargs)
547 task.data = dumps_from_client((args, kwargs), self._client_id)
File ~/.local/lib/python3.9/site-packages/ray/util/client/common.py:578, in OptionWrapper._prepare_client_task(self)
577 def _prepare_client_task(self):
--> 578 task = self._remote_stub._prepare_client_task()
579 set_task_options(task, self._options)
580 return task
File ~/.local/lib/python3.9/site-packages/ray/util/client/common.py:407, in ClientActorClass._prepare_client_task(self)
406 def _prepare_client_task(self) -> ray_client_pb2.ClientTask:
--> 407 self._ensure_ref()
408 task = ray_client_pb2.ClientTask()
409 task.type = ray_client_pb2.ClientTask.ACTOR
File ~/.local/lib/python3.9/site-packages/ray/util/client/common.py:377, in ClientActorClass._ensure_ref(self)
374 # Check pickled size before sending it to server, which is more
375 # efficient and can be done synchronously inside remote() call.
376 check_oversized_function(data, self._name, "actor", None)
--> 377 self._ref = ray.worker._put_pickled(
378 data, client_ref_id=self._client_side_ref.id
379 )
File ~/.local/lib/python3.9/site-packages/ray/util/client/worker.py:499, in Worker._put_pickled(self, data, client_ref_id)
497 if not resp.valid:
498 try:
--> 499 raise cloudpickle.loads(resp.error)
500 except (pickle.UnpicklingError, TypeError):
501 logger.exception("Failed to deserialize {}".format(resp.error))
AttributeError: 'ForwardRef' object has no attribute '__forward_module__'
Log channel is reconnecting. Logs produced while the connection was down can be found on the head node of the cluster in `ray_client_server_[port].out`
2022-09-15 18:00:45,873 WARNING dataclient.py:395 -- Encountered connection issues in the data channel. Attempting to reconnect.
Will take a look today, what version of Ray did you spot this on?
@ckw017 This is on Ray 2.0. My full pip is above. I'm seeing this when running both locally and in a Jupyter Notebook in the cloud.
Ah hmm, found this issue: https://github.com/ray-project/ray/issues/26443
Can you double check the python version where you're running the script and the python version where the cluster is running?
@ckw017 Server is running 3.9.5 and client is running 3.9.7.
The Ray image I am using is ray:2.0.0-py39. I will do some additional testing on my side. If you get time, can you try using a later version of Python to see if you run into this issue as well? Might be a good test case.
Would the hope to enable Ray to work with 3.9.7? Will ray:2.0.0-py39 ever increase its version from 3.9.5 to > 3.9.5? Definitely the problem is with 3.9.7 or greater.
I'll check if I run into the same issue. The easiest fix I can think of here would be to switch your local version to 3.9.5 (if you're using conda, you can do something like conda create -n py395 python=3.9.5
).
I suspect that Ray will work on 3.9.7, it would just need a compatible version (i.e. same patch version) of python on the client side as well.
^Was able to reproduce with 3.9.7 client and 3.9.5 cluster. I'll add a warning if the user's local patch version mismatches the cluster's patch version.
Wanted to double check if the workaround of using 3.9.5 locally is possible for you
What happened + What you expected to happen
When running Ray Tune from a Jupyter Notebook using tune.Tuner produces the following Stacktrace:
I am utilizing the Ray Client for submitting the job. I saw a similar issue related which is now closed (this utilized Ray Core). #20012
Please Reference Reproduction Script #1
Versions / Dependencies
Running Ray 2.0 with Kuberay v0.3.0
pip list
As an additional side error: I receive the following when running within the Job SDK for the example script. This only occurs if I set my working directory to a different directory than my current $PWD.
This is the specific example I am following.
Reproduction script
Reproduction Script Number 1