PrefectHQ / prefect

Prefect is a workflow orchestration tool empowering developers to build, observe, and react to data pipelines
https://prefect.io
Apache License 2.0
15.29k stars 1.5k forks source link

RayTaskRunner with remote address doesn't load `prefect-ray` dependency #14099

Closed parkedwards closed 1 week ago

parkedwards commented 1 week ago

First check

Bug summary

When setting a remote address with the RayTaskRunner (aka specifying the address= kwarg) pointed to a cluster with multiple nodes, we hit a ModuleNotFound error for prefect-ray

https://docs.ray.io/en/latest/ray-core/handling-dependencies.html#environment-dependencies

if we're using the RayTaskRunner, as a user I'd expect that the prefect-ray package exists already

Reproduction

(1) Setup a local Ray cluster with this docker-compose.yaml
https://github.com/MarvinSt/ray-docker-compose

(2) Setup a flow with `RayTaskRunner` pointed to this cluster

@flow(log_prints=True, task_runner=RayTaskRunner(address="ray://localhost:10001"))
async def my_flow():
  futures = [my_task.submit(n) for n in range(1, 25)]
  for f in futures:
    f.wait()

@zzstoatzz discovered that we may need to pass this in as an init_kwarg

@flow(log_prints=True, task_runner=RayTaskRunner(address="ray://localhost:10001", init_kwargs=dict(runtime_env={"pip": ["prefect-ray==0.4.0rc1"]})))

Error

ModuleNotFoundError: No module named 'prefect_ray' 15:21:07.232 | ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed with 1 pending items. 15:21:07.568 | ERROR | Flow run 'unique-bustard' - Finished in state Failed("Flow run encountered an exception: ModuleNotFoundError: No module named 'prefect_ray'") 15:21:07.575 | ERROR | prefect.engine - Engine execution of flow run 'd1edb31a-6827-46d9-a69f-30da8585c2eb' exited with unexpected exception Traceback (most recent call last): File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/engine.py", line 41, in run_coro_as_sync(run_flow_async(flow, flow_run=flow_run)) File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 240, in run_coro_as_sync return call.result() ^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result return self.future.result(timeout=timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result return self.get_result() ^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py", line 401, in get_result raise self._exception File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async result = await coro ^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 223, in coroutine_wrapper return await task ^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/flow_engine.py", line 632, in run_flow_async return engine.state if return_type == "state" else engine.result() ^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/flow_engine.py", line 195, in result _result = run_coro_as_sync(_result) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 232, in run_coro_as_sync return from_sync.call_in_new_thread(coroutine_wrapper) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 220, in call_in_new_thread return call.result() ^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result return self.future.result(timeout=timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result return self.get_result() ^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py", line 401, in get_result raise self._exception File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async result = await coro ^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 223, in coroutine_wrapper return await task ^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 353, in ctx_call result = await async_fn(args, kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/states.py", line 81, in _get_state_result raise await get_state_exception(state) File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/flow_engine.py", line 569, in run_context yield self File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/flow_engine.py", line 630, in run_flow_async await engine.call_flow_fn() File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/flow_engine.py", line 584, in _call_flow_fn result = await call_with_parameters(self.flow.fn, self.parameters) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/flow.py", line 89, in monitor_model_drift futures = [retrain_model.submit(n) for n in range(1, 25)] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/flow.py", line 89, in futures = [retrain_model.submit(n) for n in range(1, 25)] ^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect/tasks.py", line 956, in submit future = task_runner.submit(self, parameters, wait_for) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/prefect_ray/task_runners.py", line 217, in submit ray_decorator(self._run_prefect_task) File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/remote_function.py", line 250, in remote return func_cls._remote(args=args, kwargs=kwargs, updated_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper return fn(args, kwargs) ^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/util/tracing/tracing_helper.py", line 310, in _invocation_remote_span return method(self, args, kwargs, _args, _kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/remote_function.py", line 272, in _remote return client_mode_convert_function(self, args, kwargs, task_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/_private/client_mode_hook.py", line 164, in client_mode_convert_function return client_func._remote(in_args, in_kwargs, kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/util/client/common.py", line 300, in _remote return self.options(option_args).remote(args, kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/util/client/common.py", line 589, in remote return return_refs(ray.call_remote(self, *args, *kwargs)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/util/client/api.py", line 100, in call_remote return self.worker.call_remote(instance, args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/util/client/worker.py", line 555, in call_remote task = instance._prepare_client_task() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/util/client/common.py", line 595, in _prepare_client_task task = self._remote_stub._prepare_client_task() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/util/client/common.py", line 326, in _prepare_client_task self._ensure_ref() File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/util/client/common.py", line 321, in _ensure_ref self._ref = ray.worker._put_pickled( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/edwardpark/CODE/personal/ray-qa/venv/lib/python3.11/site-packages/ray/util/client/worker.py", line 509, in _put_pickled raise cloudpickle.loads(resp.error) ModuleNotFoundError: No module named 'prefect_ray'


### Versions

```Text
➜ pip freeze | grep prefect
prefect==3.0.0rc2
prefect-ray==0.4.0rc1

Additional context

No response