PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.88k stars 1.56k forks source link

FileNotFoundError errors when running with a remote ray cluster #13015

Open tekumara opened 2 years ago

tekumara commented 2 years ago

When running a flow from my laptop against a remote ray cluster, prefect tries to reference directories that only exist on my laptop (eg: /Users/tekumara/.prefect/storage):

$ python flows/ray_flow.py
20:38:31.738 | INFO    | prefect.engine - Created flow run 'fiery-copperhead' for flow 'greetings'
20:38:31.738 | INFO    | Flow run 'fiery-copperhead' - Starting 'RayTaskRunner'; submitted tasks will be run in parallel...
20:38:31.790 | INFO    | prefect.task_runner.ray - Connecting to an existing Ray instance at ray://127.0.0.1:10001
20:38:35.568 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
20:38:35.568 | INFO    | prefect.task_runner.ray - The Ray UI is available at 10.136.205.104:8265
20:38:37.838 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_hello-811087cd-0' for task 'say_hello'
20:38:39.172 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_hello-811087cd-0' for execution.
20:38:39.477 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_goodbye-261e56a8-0' for task 'say_goodbye'
20:38:39.531 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_goodbye-261e56a8-0' for execution.
20:38:39.838 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_hello-811087cd-1' for task 'say_hello'
20:38:39.887 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_hello-811087cd-1' for execution.
20:38:40.169 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_goodbye-261e56a8-1' for task 'say_goodbye'
20:38:40.205 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_goodbye-261e56a8-1' for execution.
(get_dashboard_url pid=385) /home/ray/anaconda3/lib/python3.9/site-packages/paramiko/transport.py:236: CryptographyDeprecationWarning: Blowfish has been deprecated
(get_dashboard_url pid=385)   "class": algorithms.Blowfish,
20:38:40.504 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_hello-811087cd-2' for task 'say_hello'
20:38:40.540 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_hello-811087cd-2' for execution.
20:38:40.826 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_goodbye-261e56a8-2' for task 'say_goodbye'
20:38:40.862 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_goodbye-261e56a8-2' for execution.
(begin_task_run pid=385) 03:38:41.096 | INFO    | Task run 'say_hello-811087cd-0' - hello arthur
20:38:41.165 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_hello-811087cd-3' for task 'say_hello'
20:38:41.207 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_hello-811087cd-3' for execution.
20:38:41.510 | INFO    | Flow run 'fiery-copperhead' - Created task run 'say_goodbye-261e56a8-3' for task 'say_goodbye'
20:38:41.551 | INFO    | Flow run 'fiery-copperhead' - Submitted task run 'say_goodbye-261e56a8-3' for execution.
(begin_task_run pid=385) 03:38:41.605 | INFO    | Task run 'say_hello-811087cd-1' - hello trillian
(begin_task_run pid=385) print goodbye
(begin_task_run pid=385) 03:38:42.104 | INFO    | Task run 'say_goodbye-261e56a8-3' - goodbye marvin
(begin_task_run pid=385) print goodbye
(begin_task_run pid=385) 03:38:42.593 | INFO    | Task run 'say_goodbye-261e56a8-2' - goodbye ford
(begin_task_run pid=385) print goodbye
(begin_task_run pid=385) 03:38:43.139 | INFO    | Task run 'say_goodbye-261e56a8-1' - goodbye trillian
(begin_task_run pid=385) 03:38:43.635 | INFO    | Task run 'say_hello-811087cd-3' - hello marvin
(begin_task_run pid=385) print goodbye
(begin_task_run pid=385) 03:38:44.126 | INFO    | Task run 'say_goodbye-261e56a8-0' - goodbye arthur
20:38:44.644 | INFO    | Task run 'say_hello-811087cd-0' - Crash detected! Execution was interrupted by an unexpected exception.
(begin_task_run pid=385) 03:38:44.623 | INFO    | Task run 'say_hello-811087cd-2' - hello ford
20:38:44.936 | INFO    | Task run 'say_goodbye-261e56a8-0' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:45.279 | INFO    | Task run 'say_hello-811087cd-1' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:45.561 | INFO    | Task run 'say_goodbye-261e56a8-1' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:45.845 | INFO    | Task run 'say_hello-811087cd-2' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:46.193 | INFO    | Task run 'say_goodbye-261e56a8-2' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:46.472 | INFO    | Task run 'say_hello-811087cd-3' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:46.751 | INFO    | Task run 'say_goodbye-261e56a8-3' - Crash detected! Execution was interrupted by an unexpected exception.
20:38:48.741 | ERROR   | Flow run 'fiery-copperhead' - Finished in state Failed('8/8 states failed.')
Traceback (most recent call last):
  File "/Users/tekumara/code/orion-demo/flows/ray_flow.py", line 38, in <module>
    greetings(["arthur", "trillian", "ford", "marvin"])
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/flows.py", line 367, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 150, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/Users/tekumara/.pyenv/versions/3.9.13/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/tekumara/.pyenv/versions/3.9.13/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/engine.py", line 226, in create_then_begin_flow_run
    return state.result()
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result
    state.result()
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
ray.exceptions.RayTaskError: ray::begin_task_run() (pid=385, ip=10.136.205.104)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/tekumara/.prefect/storage'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=385, ip=10.136.205.104)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1313, in mkdir
    self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/tekumara/.prefect'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=385, ip=10.136.205.104)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1313, in mkdir
    self._accessor.mkdir(self, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/tekumara'

During handling of the above exception, another exception occurred:

ray::begin_task_run() (pid=385, ip=10.136.205.104)
  File "/Users/tekumara/code/orion-demo/.venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
  File "/tmp/ray/session_2022-07-19_04-28-59_171336_7/runtime_resources/pip/3a2844a8f2bb50700ae9149311bce23037a95e22/virtualenv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/home/ray/anaconda3/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
    return native_run(wrapper(), debug=debug)
  File "/home/ray/anaconda3/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/ray/anaconda3/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/ray/anaconda3/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
    return await func(*args)
  File "/tmp/ray/session_2022-07-19_04-28-59_171336_7/runtime_resources/pip/3a2844a8f2bb50700ae9149311bce23037a95e22/virtualenv/lib/python3.9/site-packages/prefect/engine.py", line 944, in begin_task_run
    return await orchestrate_task_run(
  File "/tmp/ray/session_2022-07-19_04-28-59_171336_7/runtime_resources/pip/3a2844a8f2bb50700ae9149311bce23037a95e22/virtualenv/lib/python3.9/site-packages/prefect/engine.py", line 1081, in orchestrate_task_run
    await _persist_serialized_result(
  File "/tmp/ray/session_2022-07-19_04-28-59_171336_7/runtime_resources/pip/3a2844a8f2bb50700ae9149311bce23037a95e22/virtualenv/lib/python3.9/site-packages/prefect/results.py", line 15, in _persist_serialized_result
    await filesystem.write_path(key, content)
  File "/tmp/ray/session_2022-07-19_04-28-59_171336_7/runtime_resources/pip/3a2844a8f2bb50700ae9149311bce23037a95e22/virtualenv/lib/python3.9/site-packages/prefect/filesystems.py", line 94, in write_path
    path.parent.mkdir(exist_ok=True, parents=True)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1317, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1317, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1317, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/home/ray/anaconda3/lib/python3.9/pathlib.py", line 1313, in mkdir
    self._accessor.mkdir(self, mode)
PermissionError: [Errno 13] Permission denied: '/Users'

_flows/rayflow.py:

from typing import List

from prefect import flow, get_run_logger, task
from prefect_ray.task_runners import RayTaskRunner

@task
def say_hello(name: str) -> None:
    logger = get_run_logger()
    logger.info(f"hello {name}")

@task
def say_goodbye(name: str) -> None:
    # logs not currently working see https://github.com/PrefectHQ/prefect-ray/issues/25
    logger = get_run_logger()
    print("print goodbye")
    logger.info(f"goodbye {name}")

# run on an existing ray cluster
@flow(
    task_runner=RayTaskRunner(
        # 127.0.0.1:10001 is port-forwarded to the remote ray cluster
        address="ray://127.0.0.1:10001",
        init_kwargs={"runtime_env": {"pip": ["prefect==2.0b12"]}},
    )
)
def greetings(names: List[str]) -> None:
    for name in names:
        say_hello.submit(name)
        say_goodbye.submit(name)

if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])

prefect 2.0b12

ahuang11 commented 2 years ago

Hi thanks for sharing this!

I was wondering if you can test this with the newest Prefect version pip install -U "prefect>=2.0"

Also, I suspect you need to use a remote storage for Ray if remote address is provided, see: https://orion-docs.prefect.io/concepts/storage/

tekumara commented 2 years ago

This still occurs in prefect 2.3.2.

I'm trying to run directly against the cluster without a Deployment, and so there is no explicit storage involved.

I would have thought it possible to run the flow locally, but submit tasks to the remote ray cluster.

tekumara commented 2 years ago

From the stack trace above it looks like the error is occurring in the filesystems module.

Digging into this further, it looks what is happening is prefect is persisting task run results to PREFECT_LOCAL_STORAGE_PATH. This path defaults to ${PREFECT_HOME}/storage.... the problem seems to be that prefect is resolving ${PREFECT_HOME} in the process running the flow (eg: my laptop which is/Users/tekumara/.prefect) and not in the process running the task (eg: on the ray cluster this would be /home/ray/.prefect)

tekumara commented 2 years ago

My current workaround is prior to running the flow, set PREFECT_LOCAL_STORAGE_PATH to a writable path inside the container running on the ray cluster, eg:

export PREFECT_LOCAL_STORAGE_PATH=/tmp/prefect/storage
ahuang11 commented 2 years ago

Thanks so much for helping debug this!

I think we could use by wrapping call.func in task_runner.submit with https://github.com/PrefectHQ/prefect/blob/b65d1366eeb89fb3593a546459859d825af8f37d/src/prefect/settings.py#L806

e.g.

async def submit(
    self,
    key: UUID,
    call: Callable[..., Awaitable[State[R]]],
) -> None:
    def _submit_call_func_wrapper(*args, **kwargs):
        with temporary_settings(updates={PREFECT_LOCAL_STORAGE_PATH: "/tmp/prefect/storage"}):
            return call.func(*args, **kwargs)

    if not self._started:
        raise RuntimeError(
            "The task runner must be started before submitting work."
        )

    call_kwargs = self._optimize_futures(call.keywords)

    # Ray does not support the submission of async functions and we must create a
    # sync entrypoint
    self._ray_refs[key] = ray.remote(sync_compatible(_submit_call_func_wrapper)).remote(
        **call_kwargs
    )

And maybe could resolve https://github.com/PrefectHQ/prefect-ray/issues/37 with Michael's suggestion too.

Would you be interested in contributing a PR?

pcmoritz commented 1 year ago

We also ran into this. The workaround we used is setting export PREFECT_HOME="/tmp/prefect" on the laptop, which exists both on the laptop and the cluster. We should try to fix this upstream or otherwise every single user of prefect-ray will run into this problem :)

zanieb commented 1 year ago

I'll attempt to address this as a part of https://github.com/PrefectHQ/prefect/pull/6908

pcmoritz commented 1 year ago

Thanks, that's awesome! I'm happy to try out a PR once it is ready, I have a setup that reproduces the problem. Also happy to try out if https://github.com/PrefectHQ/prefect/issues/13015 fixes the problem in case that helps you @madkinsz :)

zanieb commented 1 year ago

I think we'll want to consider more fundamentally the "local persistence for tasks on remote workers" story. I added a couple tickets to the tracking pull request:

  • Ensure task run results persisted to local file systems on remote workers respect relative paths
  • Investigate storing results after return from the remote worker for task runs with local file systems
pcmoritz commented 1 year ago

Btw, unfortunately the workaround from https://github.com/PrefectHQ/prefect/issues/13015 is not working for me, even after some obvious modifications to fix the obvious problems with it (like shuffling sync_compatible into the function). Maybe temporary_settings is not enough (I have a feeling these settings are pickled with cloudpickle, and temporary_settings is not enough to overwrite that, but I didn't dig deep enough to really understand what is going on).

What did work for me is the following atrocious hack:

diff --git a/prefect_ray/task_runners.py b/prefect_ray/task_runners.py
index deb7a8d..fcafb74 100644
--- a/prefect_ray/task_runners.py
+++ b/prefect_ray/task_runners.py
@@ -79,6 +79,11 @@ import anyio
 import ray
 from prefect.futures import PrefectFuture
 from prefect.orion.schemas.states import State
+from prefect.settings import (PREFECT_HOME,
+                              PREFECT_PROFILES_PATH,
+                              PREFECT_LOCAL_STORAGE_PATH,
+                              PREFECT_LOGGING_SETTINGS_PATH,
+                              PREFECT_ORION_DATABASE_CONNECTION_URL)
 from prefect.states import exception_to_crashed_state
 from prefect.task_runners import BaseTaskRunner, R, TaskConcurrencyType
 from prefect.utilities.asyncutils import sync_compatible
@@ -116,6 +121,13 @@ class RayTaskRunner(BaseTaskRunner):
         address: str = None,
         init_kwargs: dict = None,
     ):
+        import pathlib
+        PREFECT_HOME.value = lambda: pathlib.Path("/tmp/prefect")
+        PREFECT_PROFILES_PATH.value = lambda: pathlib.Path("/tmp/prefect/profiles.toml")
+        PREFECT_LOCAL_STORAGE_PATH.value = lambda: pathlib.Path("/tmp/prefect/storage")
+        PREFECT_LOGGING_SETTINGS_PATH.value = lambda: pathlib.Path("/tmp/prefect/logging.yml")
+        PREFECT_ORION_DATABASE_CONNECTION_URL.value = lambda: pathlib.Path("sqlite+aiosqlite:////tmp/prefect/orion.db")
+
         # Store settings
         self.address = address
         self.init_kwargs = init_kwargs.copy() if init_kwargs else {}
pcmoritz commented 1 year ago

Any updates on this?

desertaxle commented 1 year ago

As a workaround for this issue, we are adding instructions to the prefect-ray documentation to recommend updating the PREFECT_LOCAL_STORAGE_PATH setting to a path available on the Ray worker and in flow execution environment in PrefectHQ/prefect-ray#47. This is not a perfect solution, but should unblock current use cases. We will continue to work on improving results management when local storage is used in conjunction with remote workers.