PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.51k stars 1.64k forks source link

Ray integration failing when specifying resources #10542

Open TheisFerre opened 1 year ago

TheisFerre commented 1 year ago

First check

Bug summary

  1. When using the prefect-ray integration i get an error when specifying the resources that a task needs to use, when calling it from my flow

  2. The following code reproduces the error:

Example from prefect-ray integration docs:

from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options

@task
def process(x):
    return x + 1

@flow(task_runner=RayTaskRunner("ray://<my-ray-service>:10001"))
def my_flow():
    # equivalent to setting @ray.remote(num_cpus=1)
    with remote_options(num_cpus=1):
        process.submit(42)

my_flow()

Deployment is submitted using CLI:

prefect deployment build flow.py:my_flow \
    --name k8sjob \
    --pool my-k8s-pool \
    --infra kubernetes-job  \
    --override namespace="prefect" \
    --override image="<my-image>"  \
    --apply
  1. The stack trace from the code above, when looking at the flow run logs:
image

I have given the full stack-trace from the kubernetes job below.

If i simply create my flow without specifying the resource e.g.

@flow(task_runner=RayTaskRunner("ray://<my-ray-service>:10001"))
def my_flow():
    process(42)

Everything works as expected.

  1. The following infrastructure was used:

I have the same version installed for all images involved in this (prefect==2.11.5, ray==2.5.0, prefect-ray==0.2.5)

Reproduction

See code block above

Error

15:01:06.927 | INFO    | Flow run 'dynamic-dingo' - Downloading flow code from storage at ''
15:01:07.919 | WARNING | ray._private.worker - Failed to set SIGTERM handler, processes mightnot be cleaned up properly on exit.
15:01:07.963 | INFO    | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://ray-cluster-kuberay-head-svc.ray.svc.cluster.local:10001
15:01:09.806 | INFO    | prefect.task_runner.ray - Using Ray cluster with 2 nodes.
15:01:09.807 | INFO    | prefect.task_runner.ray - The Ray UI is available at 10.0.7.18:8265
15:01:10.063 | INFO    | Flow run 'dynamic-dingo' - Created task run 'process-0' for task 'process'
15:01:12.005 | ERROR   | Flow run 'dynamic-dingo' - Crash detected! Execution was interrupted by an unexpected exception: RuntimeError: There is no current event loop in thread 'ray_client_server_1'.
15:01:12.063 | ERROR   | prefect.engine - Engine execution of flow run '481bee97-825c-4270-a097-348984ee8b38' exited with unexpected exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 2477, in <module>
    enter_flow_run_engine_from_subprocess(flow_run_id)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 296, in enter_flow_run_engine_from_subprocess
    state = from_sync.wait_for_call_in_loop_thread(
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 282, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
  File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 441, in retrieve_flow_then_begin_flow_run
    return await begin_flow_run(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 519, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
  File "/usr/local/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/usr/local/lib/python3.9/contextlib.py", line 199, in __aexit__
    await self.gen.athrow(typ, value, traceback)
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1913, in report_flow_run_crashes
    yield
  File "/usr/local/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1390, in create_task_run_then_submit
    await submit_task_run(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1451, in submit_task_run
    future = await task_runner.submit(
  File "/usr/local/lib/python3.9/site-packages/prefect_ray/task_runners.py", line 176, in submit
    self._ray_refs[key] = ray_decorator(self._run_prefect_task).remote(
  File "/usr/local/lib/python3.9/site-packages/ray/remote_function.py", line 133, in _remote_proxy
    return self._remote(args=args, kwargs=kwargs, **self._default_options)
  File "/usr/local/lib/python3.9/site-packages/ray/util/tracing/tracing_helper.py", line 306, in _invocation_remote_span
    return method(self, args, kwargs, *_args, **_kwargs)
  File "/usr/local/lib/python3.9/site-packages/ray/remote_function.py", line 252, in _remote
    return client_mode_convert_function(self, args, kwargs, **task_options)
  File "/usr/local/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 164, in client_mode_convert_function
    return client_func._remote(in_args, in_kwargs, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/common.py", line 298, in _remote
    return self.options(**option_args).remote(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/common.py", line 581, in remote
    return return_refs(ray.call_remote(self, *args, **kwargs))
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/api.py", line 100, in call_remote
    return self.worker.call_remote(instance, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/worker.py", line 556, in call_remote
    task = instance._prepare_client_task()
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/common.py", line 587, in _prepare_client_task
    task = self._remote_stub._prepare_client_task()
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/common.py", line 324, in _prepare_client_task
    self._ensure_ref()
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/common.py", line 319, in _ensure_ref
    self._ref = ray.worker._put_pickled(
  File "/usr/local/lib/python3.9/site-packages/ray/util/client/worker.py", line 510, in _put_pickled
    raise cloudpickle.loads(resp.error)
RuntimeError: There is no current event loop in thread 'ray_client_server_1'.

Versions

Version:             2.11.5
API version:         0.8.4
Python version:      3.9.17
Git commit:          a597971f
Built:               Thu, Aug 24, 2023 2:14 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server

Additional context

No response

Archerzlt commented 2 months ago

I encounter the same problem,how to solve this problem?

lovenjak commented 2 months ago

Please follow up on this. Clusters are of limited use if we can't specify custom resources.